Skip to content

Commit

Permalink
Merge branch 'SSM-192'
Browse files Browse the repository at this point in the history
  • Loading branch information
mmartinezf committed Mar 7, 2019
2 parents 47120dd + 20c9352 commit a9d7888
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public function __construct(JSONMessageFactory $jsonMessageFactory)
/**
* @param array $message
*
* @return mixed
* @throws ParseMessageException
* @throws ReaderException
*/
Expand All @@ -53,7 +54,7 @@ public function handleMessage(array $message)
throw new InvalidJSONMessageException('Undefined index key Message: ' . print_r($body, true));
}

call_user_func($this->callback, $this->jsonMessageFactory->create($body['Message']));
return call_user_func($this->callback, $this->jsonMessageFactory->create($body['Message']));

} catch(InvalidJSONMessageException $e) {
throw new ParseMessageException(json_encode($message),0, $e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ protected function consume($timeout)

$messages = isset($result['Messages']) ? $result['Messages'] : [];
foreach ($messages as $message) {
$this->messageHandler->handleMessage($message);
$this->sqs->deleteMessage(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $message['ReceiptHandle']]);
$response = $this->messageHandler->handleMessage($message);
if(is_null($response) || !is_bool($response) || $response) {
$this->sqs->deleteMessage(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $message['ReceiptHandle']]);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ public function handleMessage(AMQPMessage $message)

try {
$task = $this->jsonMessageFactory->create($message->body);
call_user_func($this->callback, $task);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
$response = call_user_func($this->callback, $task);
if(is_null($response) || !is_bool($response) || $response) {
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
} catch(InvalidJSONMessageException $e) {
throw new ParseMessageException(json_encode($message->getBody()), 0, $e);
}
Expand Down

0 comments on commit a9d7888

Please sign in to comment.