Файл: concrete5.7.5.6/concrete/vendor/zendframework/zend-queue/library/ZendQueue/Stomp/Connection.php
Строк: 298
<?php
/**
* Zend Framework (http://framework.zend.com/)
*
* @link http://github.com/zendframework/zf2 for the canonical source repository
* @copyright Copyright (c) 2005-2012 Zend Technologies USA Inc. (http://www.zend.com)
* @license http://framework.zend.com/license/new-bsd New BSD License
* @package Zend_Queue
*/
namespace ZendQueueStomp;
use ZendQueueException;
/**
* The Stomp client interacts with a Stomp server.
*
* @category Zend
* @package Zend_Queue
* @subpackage Stomp
*/
class Connection implements StompConnection
{
const READ_TIMEOUT_DEFAULT_USEC = 0; // 0 microseconds
const READ_TIMEOUT_DEFAULT_SEC = 5; // 5 seconds
/**
* Connection options
* @var array
*/
protected $_options;
/**
* tcp/udp socket
*
* @var resource
*/
protected $_socket = false;
/**
* open() opens a socket to the Stomp server
*
* @param array $options ('scheme', 'host', 'port')
* @param string $scheme
* @param string $host
* @param int $port
* @param array $options Accepts "timeout_sec" and "timeout_usec" keys
* @return true;
* @throws ZendQueueException
*/
public function open($scheme, $host, $port, array $options = array())
{
$str = $scheme . '://' . $host;
$this->_socket = fsockopen($str, $port, $errno, $errstr);
if ($this->_socket === false) {
// aparently there is some reason that fsockopen will return false
// but it normally throws an error.
throw new ExceptionConnectionException("Unable to connect to $str; error = $errstr ( errno = $errno )");
}
if (!isset($options['timeout_sec'])) {
$options['timeout_sec'] = self::READ_TIMEOUT_DEFAULT_SEC;
}
if (! isset($options['timeout_usec'])) {
$options['timeout_usec'] = self::READ_TIMEOUT_DEFAULT_USEC;
}
stream_set_timeout($this->_socket, $options['timeout_sec'], $options['timeout_usec']);
$this->_options = $options;
return true;
}
/**
* Close the socket explicitly when destructed
*
* @return void
*/
public function __destruct()
{
}
/**
* Close connection
*
* @param boolean $destructor
* @return void
*/
public function close($destructor = false)
{
// Gracefully disconnect
if (!$destructor) {
$frame = $this->createFrame();
$frame->setCommand('DISCONNECT');
$this->write($frame);
}
// @todo: Should be fixed.
// When the socket is "closed", it will trigger the below error when php exits
// Fatal error: Exception thrown without a stack frame in Unknown on line 0
// Danlo: I suspect this is because this has already been claimed by the interpeter
// thus trying to shutdown this resources, which is already shutdown is a problem.
if (is_resource($this->_socket)) {
// fclose($this->_socket);
}
// $this->_socket = null;
}
/**
* Check whether we are connected to the server
*
* @return true
* @throws ZendQueueException
*/
public function ping()
{
if (!is_resource($this->_socket)) {
throw new ExceptionConnectionException('Not connected to Stomp server');
}
return true;
}
/**
* Write a frame to the stomp server
*
* example: $response = $client->write($frame)->read();
*
* @param ZendQueueStomStompFrame $frame
* @return $this
*/
public function write(StompFrame $frame)
{
$this->ping();
$output = $frame->toFrame();
$bytes = fwrite($this->_socket, $output, strlen($output));
if ($bytes === false || $bytes == 0) {
throw new ExceptionRangeException('No bytes written');
}
return $this;
}
/**
* Tests the socket to see if there is data for us
*
* @return boolean
*/
public function canRead()
{
$read = array($this->_socket);
$write = null;
$except = null;
return stream_select(
$read,
$write,
$except,
0,
100000
) == 1;
// see http://us.php.net/manual/en/function.stream-select.php
}
/**
* Reads in a frame from the socket or returns false.
*
* @return ZendQueueStompStompFrame|false
* @throws ZendQueueException
*/
public function read()
{
$this->ping();
$response = '';
// as per protocol COMMAND is 1st n terminated then headers also n terminated
// COMMAND and header block are seperated by a blank line.
// read command and headers
while (($line = @fgets($this->_socket)) !== false) {
$response .= $line;
if (rtrim($line) === '') break;
}
$this->_checkSocketReadTimeout();
// to differenciate between a byte message and
// non-byte message, check content-length header
$headers = Frame::extractHeaders($response);
if (!isset($headers[Frame::CONTENT_LENGTH])) {
// read till we hit the end of frame marker
do {
$chunk = @fgets($this->_socket);
if ( $chunk === false || strlen($chunk) === 0) {
$this->_checkSocketReadTimeout();
break;
}
if (substr($chunk, -2) === Frame::END_OF_FRAME) {
// add the chunk above to the result before returning
$response .= $chunk;
break;
}
$response .= $chunk;
} while (feof($this->_socket) === false);
} else {
// we have a content-length header set
$contentLength = $headers[Frame::CONTENT_LENGTH] + 2;
$current_pos = ftell($this->_socket);
$chunk = '';
for ($read_to = $current_pos + $contentLength;
$read_to > $current_pos;
$current_pos = ftell($this->_socket)
) {
$chunk = fread($this->_socket, $read_to - $current_pos);
if ($chunk === false || strlen($chunk) === 0) {
$this->_checkSocketReadTimeout();
break;
}
$response .= $chunk;
// Break if the connection ended prematurely
if (feof($this->_socket)) {
break;
}
}
}
if ($response === '') {
return false;
}
// we already have headers, prevent extracting the headers again
$frame = $this->createFrame();
$frame->setCommand(Frame::extractCommand($response))
->setHeaders($headers)
->setBody(Frame::extractBody($response));
return $frame;
}
/**
* Set the frameClass to be used
*
* This must be a ZendQueueStompStompFrame.
*
* @param string $classname - class is an instance of ZendQueueStompStompFrame
* @return $this;
*/
public function setFrameClass($classname)
{
$this->_options['frameClass'] = $classname;
return $this;
}
/**
* Get the frameClass
*
* @return string
*/
public function getFrameClass()
{
return isset($this->_options['frameClass'])
? $this->_options['frameClass']
: 'ZendQueueStompFrame';
}
/**
* Create an empty frame
*
* @return ZendQueueStompStompFrame
*/
public function createFrame()
{
$class = $this->getFrameClass();
$frame = new $class();
if (!$frame instanceof StompFrame) {
throw new ExceptionLogicException('Invalid Frame class provided; must implement ZendQueueStompStompFrame');
}
return $frame;
}
/**
* Check if the connection has timed out
*
* @throws ZendQueueException if the connection has timed out
*/
protected function _checkSocketReadTimeout()
{
if (!is_resource($this->_socket)) {
return;
}
$info = stream_get_meta_data($this->_socket);
$timedout = $info['timed_out'];
if ($timedout) {
$this->close();
throw new ExceptionConnectionException(
"Read timed out after {$this->_options['timeout_sec']} seconds"
);
}
}
}