diff --git a/src/Cmp/Queues/Infrastructure/AWS/v20121105/Queue/MessageHandler.php b/src/Cmp/Queues/Infrastructure/AWS/v20121105/Queue/MessageHandler.php index cff6f0a..3618c0b 100644 --- a/src/Cmp/Queues/Infrastructure/AWS/v20121105/Queue/MessageHandler.php +++ b/src/Cmp/Queues/Infrastructure/AWS/v20121105/Queue/MessageHandler.php @@ -32,6 +32,7 @@ public function __construct(JSONMessageFactory $jsonMessageFactory) /** * @param array $message * + * @return mixed * @throws ParseMessageException * @throws ReaderException */ @@ -53,7 +54,7 @@ public function handleMessage(array $message) throw new InvalidJSONMessageException('Undefined index key Message: ' . print_r($body, true)); } - call_user_func($this->callback, $this->jsonMessageFactory->create($body['Message'])); + return call_user_func($this->callback, $this->jsonMessageFactory->create($body['Message'])); } catch(InvalidJSONMessageException $e) { throw new ParseMessageException(json_encode($message),0, $e); diff --git a/src/Cmp/Queues/Infrastructure/AWS/v20121105/Queue/QueueReader.php b/src/Cmp/Queues/Infrastructure/AWS/v20121105/Queue/QueueReader.php index 4273c9b..a52562e 100644 --- a/src/Cmp/Queues/Infrastructure/AWS/v20121105/Queue/QueueReader.php +++ b/src/Cmp/Queues/Infrastructure/AWS/v20121105/Queue/QueueReader.php @@ -118,8 +118,10 @@ protected function consume($timeout) $messages = isset($result['Messages']) ? $result['Messages'] : []; foreach ($messages as $message) { - $this->messageHandler->handleMessage($message); - $this->sqs->deleteMessage(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $message['ReceiptHandle']]); + $response = $this->messageHandler->handleMessage($message); + if(is_null($response) || !is_bool($response) || $response) { + $this->sqs->deleteMessage(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $message['ReceiptHandle']]); + } } } } diff --git a/src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/MessageHandler.php b/src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/MessageHandler.php index 6b5f4a9..5b9980d 100644 --- a/src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/MessageHandler.php +++ b/src/Cmp/Queues/Infrastructure/AmqpLib/v26/RabbitMQ/Queue/MessageHandler.php @@ -41,8 +41,10 @@ public function handleMessage(AMQPMessage $message) try { $task = $this->jsonMessageFactory->create($message->body); - call_user_func($this->callback, $task); - $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); + $response = call_user_func($this->callback, $task); + if(is_null($response) || !is_bool($response) || $response) { + $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); + } } catch(InvalidJSONMessageException $e) { throw new ParseMessageException(json_encode($message->getBody()), 0, $e); }