A lightweight message queue for PHP that requires no dedicated queue server. Just a Redis server. See smrchy/rsmq for more information.
This is a fork of eislambey/php-rsmq with the following changes:
- Uses predis instead of the Redis extension
- Has some OO wrappers for QueueAttributes and Message
- Provides a simple QueueWorker
composer require andrewbreksa/rsmq
Creates a new instance of RSMQ.
Parameters:
$predis
(\Predis\ClientInterface): *required The Predis instance$ns
(string): optional (Default: "rsmq") The namespace prefix used for all keys created by RSMQ$realtime
(Boolean): optional (Default: false) Enable realtime PUBLISH of new messages
Example:
<?php
use Predis\Client;
use AndrewBreksa\RSMQ\RSMQClient;
$predis = new Client(
[
'host' => '127.0.0.1',
'port' => 6379
]
);
$this->rsmq = new RSMQClient($predis);
Create a new queue.
Parameters:
$name
(string): The Queue name. Maximum 160 characters; alphanumeric characters, hyphens (-), and underscores (_) are allowed.$vt
(int): optional (Default: 30) The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)$delay
(int): optional (Default: 0) The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)$maxsize
(int): optional (Default: 65536) The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)
Returns:
true
(Bool)
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueAlreadyExistsException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$rsmq->createQueue('myqueue');
List all queues
Returns an array:
["qname1", "qname2"]
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$queues = $rsmq->listQueues();
Deletes a queue and all messages.
Parameters:
$name
(string): The Queue name.
Returns:
true
(Bool)
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$rsmq->deleteQueue('myqueue');
Get queue attributes, counter and stats
Parameters:
$queue
(string): The Queue name.
Returns a \AndrewBreksa\RSMQ\QueueAttributes
object with the following properties:
vt
(int): The visibility timeout for the queue in secondsdelay
(int): The delay for new messages in secondsmaxSize
(int): The maximum size of a message in bytestotalReceived
(int): Total number of messages received from the queuetotalSent
(int): Total number of messages sent to the queuecreated
(float): Timestamp (epoch in seconds) when the queue was createdmodified
(float): Timestamp (epoch in seconds) when the queue was last modified withsetQueueAttributes
messageCount
(int): Current number of messages in the queuehiddenMessageCount
(int): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to avt
parameter or when sent with adelay
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$attributes = $rsmq->getQueueAttributes('myqueue');
echo "visibility timeout: ", $attributes->getVt(), "\n";
echo "delay for new messages: ", $attributes->getDelay(), "\n";
echo "max size in bytes: ", $attributes->getMaxSize(), "\n";
echo "total received messages: ", $attributes->getTotalReceived(), "\n";
echo "total sent messages: ", $attributes->getTotalSent(), "\n";
echo "created: ", $attributes->getCreated(), "\n";
echo "last modified: ", $attributes->getModified(), "\n";
echo "current n of messages: ", $attributes->getMessageCount(), "\n";
echo "hidden messages: ", $attributes->getHiddenMessageCount(), "\n";
Sets queue parameters.
Parameters:
$queue
(string): The Queue name.$vt
(int): optional * The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)$delay
(int): optional The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)$maxsize
(int): optional The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)
Note: At least one attribute (vt, delay, maxsize) must be supplied. Only attributes that are supplied will be modified.
Returns a \AndrewBreksa\RSMQ\QueueAttributes
object with the following properties:
vt
(int): The visibility timeout for the queue in secondsdelay
(int): The delay for new messages in secondsmaxSize
(int): The maximum size of a message in bytestotalReceived
(int): Total number of messages received from the queuetotalSent
(int): Total number of messages sent to the queuecreated
(float): Timestamp (epoch in seconds) when the queue was createdmodified
(float): Timestamp (epoch in seconds) when the queue was last modified withsetQueueAttributes
messageCount
(int): Current number of messages in the queuehiddenMessageCount
(int): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to avt
parameter or when sent with adelay
Throws:
\AndrewBreksa\RSMQ\QueueAttributes
\AndrewBreksa\RSMQ\QueueParametersValidationException
\AndrewBreksa\RSMQ\QueueNotFoundException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$queue = 'myqueue';
$vt = 50;
$delay = 10;
$maxsize = 2048;
$rsmq->setQueueAttributes($queue, $vt, $delay, $maxsize);
Sends a new message.
Parameters:
$queue
(string)$message
(string)$delay
(int): optional (Default: queue settings) The time in seconds that the delivery of the message will be delayed. Allowed values: 0-9999999 (around 115 days)
Returns:
$id
(string): The internal message id.
Throws:
\AndrewBreksa\RSMQ\Exceptions\MessageToLongException
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$id = $rsmq->sendMessage('myqueue', 'a message');
echo "Message Sent. ID: ", $id;
Receive the next message from the queue.
Parameters:
$queue
(string): The Queue name.$vt
(int): optional (Default: queue settings) The length of time, in seconds, that the received message will be invisible to others. Allowed values: 0-9999999 (around 115 days)
Returns a \AndrewBreksa\RSMQ\Message
object with the following properties:
message
(string): The message's contents.id
(string): The internal message id.sent
(int): Timestamp of when this message was sent / created.firstReceived
(int): Timestamp of when this message was first received.receiveCount
(int): Number of times this message was received.
Note: Will return an empty array if no message is there
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$message = $rsmq->receiveMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();
Parameters:
$queue
(string): The Queue name.$id
(string): message id to delete.
Returns:
true
if successful,false
if the message was not found (bool).
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$id = $rsmq->sendMessage('queue', 'a message');
$rsmq->deleteMessage('queue', $id);
Receive the next message from the queue and delete it.
Important: This method deletes the message it receives right away. There is no way to receive the message again if something goes wrong while working on the message.
Parameters:
$queue
(string): The Queue name.
Returns a \AndrewBreksa\RSMQ\Message
object with the following properties:
message
(string): The message's contents.id
(string): The internal message id.sent
(int): Timestamp of when this message was sent / created.firstReceived
(int): Timestamp of when this message was first received.receiveCount
(int): Number of times this message was received.
Note: Will return an empty object if no message is there
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$message = $rsmq->popMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();
Change the visibility timer of a single message. The time when the message will be visible again is calculated from the
current time (now) + vt
.
Parameters:
qname
(string): The Queue name.id
(string): The message id.vt
(int): The length of time, in seconds, that this message will not be visible. Allowed values: 0-9999999 (around 115 days)
Returns:
true
if successful,false
if the message was not found (bool).
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$queue = 'myqueue';
$id = $rsmq->sendMessage($queue, 'a message');
if($rsmq->changeMessageVisibility($queue, $id, 60)) {
echo "Message hidden for 60 secs";
}
When creating an instance of AndrewBreksa\RSMQ\RSMQClient
, you can enable the realtime PUBLISH
for new messages by
passing true
for the $realtime
argument of \AndrewBreksa\RSMQ\RSMQClient::__construct
. On every new message that
is sent via sendMessage
, a Redis PUBLISH
will be issued to {rsmq.ns}:rt:{qname}
.
Example for RSMQ with default settings:
- The queue
testQueue
already contains 5 messages. - A new message is being sent to the queue
testQueue
. - The following Redis command will be issued:
PUBLISH rsmq:rt:testQueue 6
The realtime option enables sending a PUBLISH
when a new message is sent to RSMQ, however no further functionality is
built on this feature. Your app could use the Redis SUBSCRIBE
command to be notified of new messages and then attempt
to poll from the queue, however due to how the Redis pub/sub system works,
all listeners will be notified of the new message, this method doesn't lend
itself to driving message handling in environments with more than one subscribed process.
The QueueWorker class provides an easy way to consume RSMQ messages, to use it:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
use AndrewBreksa\RSMQ\ExecutorInterface;
use AndrewBreksa\RSMQ\Message;
use AndrewBreksa\RSMQ\QueueWorker;
use AndrewBreksa\RSMQ\WorkerSleepProvider;
$executor = new class() implements ExecutorInterface{
public function __invoke(Message $message) : bool {
//@todo: do some work, true will ack/delete the message, false will allow the queue's config to "re-publish"
return true;
}
};
$sleepProvider = new class() implements WorkerSleepProvider{
public function getSleep() : ?int {
/**
* This allows you to return null to stop the worker, which can be used with something like redis to mark.
*
* Note that this method is called _before_ we poll for a message, and therefore if it returns null we'll eject
* before we process a message.
*/
return 1;
}
};
$worker = new QueueWorker($rsmq, $executor, $sleepProvider, 'test_queue');
$worker->work(); // here we can optionally pass true to only process one message
The MIT LICENSE. See LICENSE