-
Notifications
You must be signed in to change notification settings - Fork 0
/
MessageQueue.php
50 lines (50 loc) · 1.88 KB
/
MessageQueue.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?php
class MessageQueueProducer extends MessageQueueBase{
public function Produce(string $message):void{
$insertQuery = "INSERT INTO `$this->Table`(`message`, `status`) VALUES (?,?)";
$insertStmt = $this->Db->prepare($insertQuery);
$insertStmt->execute([$message,MessageQueueBase::$STATUS_INIT]);
}
}
class MessageQueueConsumer extends MessageQueueBase{
public function Consume($consumeMethod):void{
try{
$this->Db->beginTransaction();
$getAvailableQuery = "SELECT `id`,`message`,`status` FROM `message_queue` WHERE `status` = 'INIT' order by id desc limit 1";
$selectStmt = $this->Db->prepare($getAvailableQuery);
$selectStmt->execute();
if($selectStmt->rowCount() == 0){
return;
}
$selectResult = $selectStmt->fetchAll(PDO::FETCH_BOTH);
$this->UpdateStatus($selectResult["message"],MessageQueueBase::$STATUS_CONSUMED);
$this->Db->commit();
}catch(Throwable $e){
$this->Db->rollBack();
throw $e;
}
try{
$consumeMethod($selectResult["message"]);
}catch(Throwable $ex){
$this->UpdateStatus($selectResult["message"],MessageQueueBase::$STATUS_PUKED);
throw $ex;
}
}
}
class MessageQueueBase{
static string $STATUS_INIT = 'INIT';
static string $STATUS_CONSUMED = 'CONSUMED';
static string $STATUS_PUKED = 'PUKED';
protected PDO $Db;
protected string $Table;
public function __construct(PDO $db, string $table) {
$this->Db = $db;
$this->Table = $table;
}
public function UpdateStatus(int $messageId, string $status){
$updateStats = "UPDATE `message_queue` SET `status`=? WHERE `id` = ?";
$updateStmt = $this->Db->prepare($updateStats);
$updateStmt->execute([$status,$messageId]);
}
}
?>