Skip to content

Commit

Permalink
MARKETING-1770 updated stomp to accept additional headers for ack/nack
Browse files Browse the repository at this point in the history
and keep track of current stomp version that is connected
  • Loading branch information
Namrata committed Aug 1, 2013
1 parent d8f68dc commit 1733a10
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions src/FuseSource/Stomp/Stomp.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class Stomp
protected $_username = '';
protected $_password = '';
protected $_sessionId;
protected $_version;
protected $_read_timeout_seconds = 60;
protected $_read_timeout_milliseconds = 0;
protected $_connect_timeout_seconds = 60;
Expand Down Expand Up @@ -188,7 +189,10 @@ protected function _makeConnection ()
* The Server response back with CONNECTED frame, that has sessionI : A session identifier that uniquely identifies the session.
* @param string $username
* @param string $password
* @param string optionalParams
* @param string $optionalParams, possible keys are:
* accept-version => 1.0,1.1,2.0 - From STOMP 1.1 and onwards, the CONNECT frame MUST include the accept-version header. It SHOULD be set to a comma separated list that the client supports.
* host => stomp.github.org - The name of a virtual host that the client wishes to connect to.
* heart-beat => <cx>,<cy> - Heart-beating can optionally be used to test the healthiness of the underlying TCP connection.
* @return boolean
* @throws StompException
*/
Expand All @@ -215,6 +219,9 @@ public function connect ($username = '', $password = '', $optionalParams = array
if (isset($frame->headers['server']) && false !== stristr(trim($frame->headers['server']), 'rabbitmq')) {
$this->brokerVendor = 'RMQ';
}
if(isset($frame->headers['version'])){//Server returns stomp version that it will be using.
$this->_version = $frame->headers['version'];
}
return true;
} else {
if ($frame instanceof Frame) {
Expand Down Expand Up @@ -333,7 +340,9 @@ protected function _waitForReceipt (Frame $frame, $sync)
* Any messages received on the subscribed destination will henceforth be delivered as MESSAGE frames from the server to the client.
* The ack header controls the message acknowledgment mode.
* @param string $destination Destination queue
* @param array $properties
* @param array $properties, possible values
* ack => auto,client,client-individual - defaults to auto
* id => string - required from 1.1
* @param boolean $sync Perform request synchronously
* @return boolean
* @throws StompException
Expand Down Expand Up @@ -375,7 +384,8 @@ public function subscribe ($destination, $properties = null, $sync = null)
* Remove an existing subscription
* A standard UNSUBSCRIBE frame does not destroy the durable subscription, it only disconnects the client from the durable subscription.
* @param string $destination
* @param array $properties
* @param array $properties , possible values:
* id - required from 1.1
* @param boolean $sync Perform request synchronously
* @return boolean
* @throws StompException
Expand Down Expand Up @@ -465,10 +475,13 @@ public function abort ($transactionId = null, $sync = null)
*
* @param string|Frame $messageMessage ID
* @param string $transactionId
* @param array $optionalHeaders, possible values
* id - ack ID from message for client or client-individual type of subscriptions.. From stomp v1.2
* subscription - set to match the value of the subscription's id header. For v1.1
* @return boolean
* @throws StompException
*/
public function ack ($message, $transactionId = null)
public function ack ($message, $transactionId = null, $optionalHeaders = array())
{
if ($message instanceof Frame) {
$headers = $message->headers;
Expand All @@ -488,6 +501,10 @@ public function ack ($message, $transactionId = null)
$headers['transaction'] = $transactionId;
}
$headers['message-id'] = $message;
//Add/update optional header values
foreach($optionalHeaders as $name => $value){
$headers[$name] = $value;
}
$frame = new Frame('ACK', $headers);
$this->_writeFrame($frame);
return true;
Expand All @@ -500,10 +517,13 @@ public function ack ($message, $transactionId = null)
*
* @param $messageMessage string|Frame ID
* @param $transactionId string
* @param array $optionalHeaders, possible values
* id - ack ID from message for client or client-individual type of subscriptions.. From stomp v1.2
* subscription - set to match the value of the subscription's id header. For v1.1
* @return boolean
* @throws StompException
*/
public function nack($message, $transactionId = null){
public function nack($message, $transactionId = null, $optionalHeaders = array()){
if($message instanceof Frame){
$headers = $message->headers;
if(isset($transactionId)){
Expand All @@ -517,7 +537,12 @@ public function nack($message, $transactionId = null){
if(isset($transactionId)){
$headers['transaction'] = $transactionId;
}

$headers['message-id'] = $message;
//Add/update optional header values
foreach($optionalHeaders as $name => $value){
$headers[$name] = $value;
}
$frame = new Frame('NACK', $headers);
$this->_writeFrame($frame);
return true;
Expand Down

0 comments on commit 1733a10

Please sign in to comment.