diff --git a/src/FuseSource/Stomp/Stomp.php b/src/FuseSource/Stomp/Stomp.php index c4f32fb..4c18176 100755 --- a/src/FuseSource/Stomp/Stomp.php +++ b/src/FuseSource/Stomp/Stomp.php @@ -73,6 +73,7 @@ class Stomp protected $_username = ''; protected $_password = ''; protected $_sessionId; + protected $_version; protected $_read_timeout_seconds = 60; protected $_read_timeout_milliseconds = 0; protected $_connect_timeout_seconds = 60; @@ -188,7 +189,10 @@ protected function _makeConnection () * The Server response back with CONNECTED frame, that has sessionI : A session identifier that uniquely identifies the session. * @param string $username * @param string $password - * @param string optionalParams + * @param string $optionalParams, possible keys are: + * accept-version => 1.0,1.1,2.0 - From STOMP 1.1 and onwards, the CONNECT frame MUST include the accept-version header. It SHOULD be set to a comma separated list that the client supports. + * host => stomp.github.org - The name of a virtual host that the client wishes to connect to. + * heart-beat => , - Heart-beating can optionally be used to test the healthiness of the underlying TCP connection. * @return boolean * @throws StompException */ @@ -215,6 +219,9 @@ public function connect ($username = '', $password = '', $optionalParams = array if (isset($frame->headers['server']) && false !== stristr(trim($frame->headers['server']), 'rabbitmq')) { $this->brokerVendor = 'RMQ'; } + if(isset($frame->headers['version'])){//Server returns stomp version that it will be using. + $this->_version = $frame->headers['version']; + } return true; } else { if ($frame instanceof Frame) { @@ -333,7 +340,9 @@ protected function _waitForReceipt (Frame $frame, $sync) * Any messages received on the subscribed destination will henceforth be delivered as MESSAGE frames from the server to the client. * The ack header controls the message acknowledgment mode. * @param string $destination Destination queue - * @param array $properties + * @param array $properties, possible values + * ack => auto,client,client-individual - defaults to auto + * id => string - required from 1.1 * @param boolean $sync Perform request synchronously * @return boolean * @throws StompException @@ -375,7 +384,8 @@ public function subscribe ($destination, $properties = null, $sync = null) * Remove an existing subscription * A standard UNSUBSCRIBE frame does not destroy the durable subscription, it only disconnects the client from the durable subscription. * @param string $destination - * @param array $properties + * @param array $properties , possible values: + * id - required from 1.1 * @param boolean $sync Perform request synchronously * @return boolean * @throws StompException @@ -465,10 +475,13 @@ public function abort ($transactionId = null, $sync = null) * * @param string|Frame $messageMessage ID * @param string $transactionId + * @param array $optionalHeaders, possible values + * id - ack ID from message for client or client-individual type of subscriptions.. From stomp v1.2 + * subscription - set to match the value of the subscription's id header. For v1.1 * @return boolean * @throws StompException */ - public function ack ($message, $transactionId = null) + public function ack ($message, $transactionId = null, $optionalHeaders = array()) { if ($message instanceof Frame) { $headers = $message->headers; @@ -488,6 +501,10 @@ public function ack ($message, $transactionId = null) $headers['transaction'] = $transactionId; } $headers['message-id'] = $message; + //Add/update optional header values + foreach($optionalHeaders as $name => $value){ + $headers[$name] = $value; + } $frame = new Frame('ACK', $headers); $this->_writeFrame($frame); return true; @@ -500,10 +517,13 @@ public function ack ($message, $transactionId = null) * * @param $messageMessage string|Frame ID * @param $transactionId string + * @param array $optionalHeaders, possible values + * id - ack ID from message for client or client-individual type of subscriptions.. From stomp v1.2 + * subscription - set to match the value of the subscription's id header. For v1.1 * @return boolean * @throws StompException */ - public function nack($message, $transactionId = null){ + public function nack($message, $transactionId = null, $optionalHeaders = array()){ if($message instanceof Frame){ $headers = $message->headers; if(isset($transactionId)){ @@ -517,7 +537,12 @@ public function nack($message, $transactionId = null){ if(isset($transactionId)){ $headers['transaction'] = $transactionId; } + $headers['message-id'] = $message; + //Add/update optional header values + foreach($optionalHeaders as $name => $value){ + $headers[$name] = $value; + } $frame = new Frame('NACK', $headers); $this->_writeFrame($frame); return true;