diff --git a/src/Cmp/Queues/Domain/Event/Subscriber.php b/src/Cmp/Queues/Domain/Event/Subscriber.php index c838c1a..ef52d2d 100644 --- a/src/Cmp/Queues/Domain/Event/Subscriber.php +++ b/src/Cmp/Queues/Domain/Event/Subscriber.php @@ -45,23 +45,19 @@ public function subscribe(EventSubscriptor $eventSubscriptor) public function start($timeout=0) { + if(!isset($this->subscriptors[0])) { + throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.'); + } + while(true) { try { - $this->processOne($timeout); + $this->queueReader->read(array($this, 'notify'), $timeout); } catch(TimeoutReaderException $e) { break; } } } - public function processOne($timeout) - { - if(!isset($this->subscriptors[0])) { - throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.'); - } - $this->queueReader->read(array($this, 'notify'), $timeout); - } - /** * @param DomainEvent $domainEvent */ diff --git a/src/Cmp/Queues/Domain/Task/Consumer.php b/src/Cmp/Queues/Domain/Task/Consumer.php index 3a95da8..96e7271 100644 --- a/src/Cmp/Queues/Domain/Task/Consumer.php +++ b/src/Cmp/Queues/Domain/Task/Consumer.php @@ -29,23 +29,13 @@ public function consume(callable $callback, $timeout=0) { while(true) { try { - $this->consumeOnce($callback, $timeout); + $this->queueReader->read($callback, $timeout); } catch(TimeoutReaderException $e) { break; } } } - /** - * Consumes a single task in a blocking manner - * @param callable $callback Callable that'll be invoked when a message is received - * @param int $timeout The process will block a max of $timeout seconds, or indefinitely if 0 - */ - public function consumeOnce(callable $callback, $timeout) - { - $this->queueReader->read($callback, $timeout); - } - /** * Purges all messages from the queue */ diff --git a/src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/QueueReader.php b/src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/QueueReader.php index 4839dbd..867267e 100644 --- a/src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/QueueReader.php +++ b/src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/QueueReader.php @@ -10,6 +10,7 @@ use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\QueueConfig; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPLazyConnection; +use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Exception\AMQPTimeoutException; use Psr\Log\LoggerInterface; @@ -55,6 +56,11 @@ class QueueReader implements DomainQueueReader */ protected $channel; + /** + * @var string + */ + protected $consumerTag = ''; + /** * QueueReader constructor. * @param AMQPLazyConnection $connection @@ -98,11 +104,11 @@ public function read(callable $callback, $timeout=0) try { $this->consume($timeout); } catch(AMQPTimeoutException $e) { + $this->stopConsuming(); throw new TimeoutReaderException("Timed out at $timeout seconds while reading.", 0, $e); } catch(\Exception $e) { + $this->stopConsuming(); throw new ReaderException("Error occurred while reading", 0, $e); - } finally { - $this->channel->basic_cancel(''); } } @@ -158,16 +164,18 @@ protected function queueDeclareAndBind() */ protected function consume($timeout) { - $this->logger->debug('Waiting for messages on queue:' . $this->queueConfig->getName()); - $this->channel->basic_consume( - $this->queueConfig->getName(), - '', - $this->consumeConfig->getNoLocal(), - $this->consumeConfig->getNoAck(), - $this->consumeConfig->getExclusive(), - $this->consumeConfig->getNoWait(), - array($this->messageHandler, 'handleMessage') - ); + if ($this->consumerTag === '') { + $this->logger->debug('Waiting for messages on queue:'.$this->queueConfig->getName()); + $this->consumerTag = $this->channel->basic_consume( + $this->queueConfig->getName(), + '', + $this->consumeConfig->getNoLocal(), + $this->consumeConfig->getNoAck(), + $this->consumeConfig->getExclusive(), + $this->consumeConfig->getNoWait(), + array($this->messageHandler, 'handleMessage') + ); + } $this->channel->wait(null, false, $timeout); } @@ -190,6 +198,21 @@ protected function initialize() } } + /** + * Stops the consuming of messages + */ + private function stopConsuming() + { + if ($this->consumerTag) { + try { + $this->channel->basic_cancel($this->consumerTag); + } catch(\Exception $e) { + } + + $this->consumerTag = ''; + } + } + /** * Destructor */ diff --git a/tests/behat/features/bootstrap/Context/DomainContext.php b/tests/behat/features/bootstrap/Context/DomainContext.php index a9441e7..fa85418 100644 --- a/tests/behat/features/bootstrap/Context/DomainContext.php +++ b/tests/behat/features/bootstrap/Context/DomainContext.php @@ -201,7 +201,7 @@ public function iShouldNotConsumeAnyTask() $taskCallback = new TestTaskCallback(); try { - $this->consumer->consumeOnce(array($taskCallback, 'setTask'), 2); + $this->consumer->consume(array($taskCallback, 'setTask'), 2); } catch(TimeoutReaderException $e) { } diff --git a/tests/spec/Cmp/Queues/Domain/Event/SubscriberSpec.php b/tests/spec/Cmp/Queues/Domain/Event/SubscriberSpec.php index 01a8103..1a45148 100644 --- a/tests/spec/Cmp/Queues/Domain/Event/SubscriberSpec.php +++ b/tests/spec/Cmp/Queues/Domain/Event/SubscriberSpec.php @@ -45,18 +45,8 @@ function it_notifies_Subscriptor( $this->notify($domainEvent); } - function it_reads_from_queue( - QueueReader $queueReader, - EventSubscriptor $eventSubscriptor - ) - { - $queueReader->read(Argument::cetera())->shouldBeCalled(); - $this->subscribe($eventSubscriptor); - $this->processOne(function(){}, 1); - } - function it_should_not_read_from_queue_if_no_EventSubscriptor_added() { - $this->shouldThrow('Cmp\Queues\Domain\Event\Exception\DomainEventException')->duringProcessOne(function(){}); + $this->shouldThrow('Cmp\Queues\Domain\Event\Exception\DomainEventException')->duringStart(1); } } diff --git a/tests/spec/Cmp/Queues/Domain/Task/ConsumerSpec.php b/tests/spec/Cmp/Queues/Domain/Task/ConsumerSpec.php deleted file mode 100644 index fa19c10..0000000 --- a/tests/spec/Cmp/Queues/Domain/Task/ConsumerSpec.php +++ /dev/null @@ -1,41 +0,0 @@ -beConstructedWith($queueReader); - } - - function it_is_initializable() - { - $this->shouldHaveType('Cmp\Queues\Domain\Task\Consumer'); - } - - function it_reads_from_queue(QueueReader $queueReader) - { - $callback = function(){}; - $queueReader->read($callback, 1)->shouldBeCalled(); - $this->consumeOnce($callback, 1); - } - - function it_purges_queue(QueueReader $queueReader) - { - $queueReader->purge()->willReturn()->shouldBeCalled(); - $this->purge(); - } -}