Skip to content

Commit

Permalink
Merge pull request #10 from marcrubiocmp/master
Browse files Browse the repository at this point in the history
Issues with a process with more than once consumer
  • Loading branch information
marcrubiocmp authored Apr 19, 2017
2 parents a4fafb7 + 4edeb8d commit 1fc0ef2
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 85 deletions.
14 changes: 5 additions & 9 deletions src/Cmp/Queues/Domain/Event/Subscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,19 @@ public function subscribe(EventSubscriptor $eventSubscriptor)

public function start($timeout=0)
{
if(!isset($this->subscriptors[0])) {
throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
}

while(true) {
try {
$this->processOne($timeout);
$this->queueReader->read(array($this, 'notify'), $timeout);
} catch(TimeoutReaderException $e) {
break;
}
}
}

public function processOne($timeout)
{
if(!isset($this->subscriptors[0])) {
throw new DomainEventException('You must add at least 1 EventSubscriptor in order to publish start reading from queue.');
}
$this->queueReader->read(array($this, 'notify'), $timeout);
}

/**
* @param DomainEvent $domainEvent
*/
Expand Down
12 changes: 1 addition & 11 deletions src/Cmp/Queues/Domain/Task/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,13 @@ public function consume(callable $callback, $timeout=0)
{
while(true) {
try {
$this->consumeOnce($callback, $timeout);
$this->queueReader->read($callback, $timeout);
} catch(TimeoutReaderException $e) {
break;
}
}
}

/**
* Consumes a single task in a blocking manner
* @param callable $callback Callable that'll be invoked when a message is received
* @param int $timeout The process will block a max of $timeout seconds, or indefinitely if 0
*/
public function consumeOnce(callable $callback, $timeout)
{
$this->queueReader->read($callback, $timeout);
}

/**
* Purges all messages from the queue
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\QueueConfig;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use Psr\Log\LoggerInterface;

Expand Down Expand Up @@ -55,6 +56,11 @@ class QueueReader implements DomainQueueReader
*/
protected $channel;

/**
* @var string
*/
protected $consumerTag = '';

/**
* QueueReader constructor.
* @param AMQPLazyConnection $connection
Expand Down Expand Up @@ -98,11 +104,11 @@ public function read(callable $callback, $timeout=0)
try {
$this->consume($timeout);
} catch(AMQPTimeoutException $e) {
$this->stopConsuming();
throw new TimeoutReaderException("Timed out at $timeout seconds while reading.", 0, $e);
} catch(\Exception $e) {
$this->stopConsuming();
throw new ReaderException("Error occurred while reading", 0, $e);
} finally {
$this->channel->basic_cancel('');
}
}

Expand Down Expand Up @@ -158,16 +164,18 @@ protected function queueDeclareAndBind()
*/
protected function consume($timeout)
{
$this->logger->debug('Waiting for messages on queue:' . $this->queueConfig->getName());
$this->channel->basic_consume(
$this->queueConfig->getName(),
'',
$this->consumeConfig->getNoLocal(),
$this->consumeConfig->getNoAck(),
$this->consumeConfig->getExclusive(),
$this->consumeConfig->getNoWait(),
array($this->messageHandler, 'handleMessage')
);
if ($this->consumerTag === '') {
$this->logger->debug('Waiting for messages on queue:'.$this->queueConfig->getName());
$this->consumerTag = $this->channel->basic_consume(
$this->queueConfig->getName(),
'',
$this->consumeConfig->getNoLocal(),
$this->consumeConfig->getNoAck(),
$this->consumeConfig->getExclusive(),
$this->consumeConfig->getNoWait(),
array($this->messageHandler, 'handleMessage')
);
}
$this->channel->wait(null, false, $timeout);
}

Expand All @@ -190,6 +198,21 @@ protected function initialize()
}
}

/**
* Stops the consuming of messages
*/
private function stopConsuming()
{
if ($this->consumerTag) {
try {
$this->channel->basic_cancel($this->consumerTag);
} catch(\Exception $e) {
}

$this->consumerTag = '';
}
}

/**
* Destructor
*/
Expand Down
2 changes: 1 addition & 1 deletion tests/behat/features/bootstrap/Context/DomainContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public function iShouldNotConsumeAnyTask()
$taskCallback = new TestTaskCallback();

try {
$this->consumer->consumeOnce(array($taskCallback, 'setTask'), 2);
$this->consumer->consume(array($taskCallback, 'setTask'), 2);
} catch(TimeoutReaderException $e) {
}

Expand Down
12 changes: 1 addition & 11 deletions tests/spec/Cmp/Queues/Domain/Event/SubscriberSpec.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,8 @@ function it_notifies_Subscriptor(
$this->notify($domainEvent);
}

function it_reads_from_queue(
QueueReader $queueReader,
EventSubscriptor $eventSubscriptor
)
{
$queueReader->read(Argument::cetera())->shouldBeCalled();
$this->subscribe($eventSubscriptor);
$this->processOne(function(){}, 1);
}

function it_should_not_read_from_queue_if_no_EventSubscriptor_added()
{
$this->shouldThrow('Cmp\Queues\Domain\Event\Exception\DomainEventException')->duringProcessOne(function(){});
$this->shouldThrow('Cmp\Queues\Domain\Event\Exception\DomainEventException')->duringStart(1);
}
}
41 changes: 0 additions & 41 deletions tests/spec/Cmp/Queues/Domain/Task/ConsumerSpec.php

This file was deleted.

0 comments on commit 1fc0ef2

Please sign in to comment.