Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle message decoding more safely #20

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/smpp_gateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
@admin.register(MOMessage)
class MOMessageAdmin(admin.ModelAdmin):
list_display = (
"decoded_short_message",
"safe_decoded_short_message",
"backend",
"status",
"create_time",
Expand Down
6 changes: 3 additions & 3 deletions src/smpp_gateway/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ def error_pdu_handler(self, pdu: Command):

# ############### Listen for and send MT Messages ################

def receive_pg_notifies(self):
def receive_pg_notify(self):
self._pg_conn.poll()
while self._pg_conn.notifies:
if self._pg_conn.notifies:
notify = self._pg_conn.notifies.pop()
logger.info(f"Got NOTIFY:{notify}")
self.send_mt_messages()
Expand Down Expand Up @@ -245,7 +245,7 @@ def listen(self, ignore_error_codes=None, auto_send_enquire_link=True):
if ready_socket is self._socket:
self.read_once(ignore_error_codes, auto_send_enquire_link)
else:
self.receive_pg_notifies()
self.receive_pg_notify()
if self.hc_worker:
self.hc_worker.success_ping()
if self.exit_signal_received():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Generated by Django 4.2.13 on 2024-06-22 17:50

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("smpp_gateway", "0006_message_status_indexes"),
]

operations = [
migrations.AddField(
model_name="momessage",
name="error",
field=models.TextField(blank=True, verbose_name="error"),
),
migrations.AlterField(
model_name="momessage",
name="status",
field=models.CharField(
choices=[
("new", "New"),
("processing", "Processing"),
("error", "Error"),
("done", "Done"),
],
max_length=32,
verbose_name="status",
),
),
]
24 changes: 21 additions & 3 deletions src/smpp_gateway/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class MOMessage(AbstractTimestampModel, models.Model):
class Status(models.TextChoices):
NEW = "new", _("New")
PROCESSING = "processing", _("Processing")
ERROR = "error", _("Error")
DONE = "done", _("Done")

backend = models.ForeignKey(
Expand All @@ -28,19 +29,36 @@ class Status(models.TextChoices):
short_message = models.BinaryField(_("short message"))
params = models.JSONField(_("params"))
status = models.CharField(_("status"), max_length=32, choices=Status.choices)
error = models.TextField(_("error"), blank=True)

@cached_property
def decoded_short_message(self) -> str:
def get_decoded_short_message(self) -> str:
data_coding = self.params.get("data_coding", 0)
short_message = self.short_message.tobytes() # type: bytes
# Support the 3 most common data_coding values
# https://stackoverflow.com/a/11986844/166053
if data_coding == 0:
return short_message.decode("ascii")
# data_coding 0 means the message is encoded in ASCII, but sometimes
# we get values values outside the ASCII range, such as b'\xa4' ('¤').
# Fall back to iso-8859-1 in this case.
try:
return short_message.decode("ascii")
except UnicodeDecodeError:
return short_message.decode("iso-8859-1")
if data_coding == 3:
return short_message.decode("iso-8859-1")
if data_coding == 8:
return short_message.decode("utf-16-be")
raise ValueError(
f"Unsupported data_coding {data_coding}. Short message: {short_message}"
)

@cached_property
def safe_decoded_short_message(self) -> str:
try:
return self.get_decoded_short_message()
except Exception as err:
return str(err)

def __str__(self):
return f"{self.params['short_message']} ({self.id})"

Expand Down
39 changes: 21 additions & 18 deletions src/smpp_gateway/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,41 @@


def handle_mo_messages(smses: QuerySet[MOMessage]):
received_smses = []
try:
for sms in smses:
connection = lookup_connections(
backend=sms.backend, identities=[sms.params["source_addr"]]
)[0]
fields = {
"to_addr": sms.params["destination_addr"],
"from_addr": sms.params["source_addr"],
}
receive(sms.decoded_short_message, connection, fields=fields)
received_smses.append(sms)
finally:
if received_smses:
MOMessage.objects.filter(pk__in=[sms.pk for sms in received_smses]).update(
status=MOMessage.Status.DONE
for sms in smses:
connection = lookup_connections(
backend=sms.backend, identities=[sms.params["source_addr"]]
)[0]
fields = {
"to_addr": sms.params["destination_addr"],
"from_addr": sms.params["source_addr"],
}
try:
decoded_short_message = sms.get_decoded_short_message()
except (ValueError, UnicodeDecodeError) as err:
logger.exception("Failed to decode short message")
MOMessage.objects.filter(pk=sms.pk).update(
status=MOMessage.Status.ERROR,
error=str(err),
)
else:
receive(decoded_short_message, connection, fields=fields)
MOMessage.objects.filter(pk=sms.pk).update(status=MOMessage.Status.DONE)


def listen_mo_messages(channel: str):
"""Batch process any queued incoming messages, then listen to be notified
of new arrivals.
"""
exit_signal_received = set_exit_signals()
smses = get_mo_messages_to_process(limit=100)
# FIXME: Allow this limit to be set from an environment variable
smses = get_mo_messages_to_process(limit=1)
while smses:
handle_mo_messages(smses)
# If an exit was triggered, do so before retrieving more messages to process...
if exit_signal_received():
logger.info("Received exit signal, leaving processing loop...")
return
smses = get_mo_messages_to_process(limit=100)
smses = get_mo_messages_to_process(limit=1)

pg_conn = pg_listen(channel)

Expand Down
27 changes: 20 additions & 7 deletions tests/test_subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def receive_was_called(self, mock_receive: mock.Mock, message: MOMessage):
[
call
for call in mock_receive.call_args_list
if call.args[0] == message.decoded_short_message
if call.args[0] == message.safe_decoded_short_message
]
)

Expand All @@ -39,28 +39,41 @@ def test_error_in_middle_of_patch(self):

Unreceived messages should be left untouched to be retried later.
"""
MOMessageFactory.create_batch(5)
messages = MOMessage.objects.all()
MOMessageFactory.create_batch(6)
bad_short_msg = MOMessage.objects.order_by("pk").first()
# Invalid data_coding value will cause a decoding failure
bad_short_msg.params["data_coding"] = 123
bad_short_msg.save()
messages = MOMessage.objects.order_by("pk").all()

with mock.patch("smpp_gateway.subscribers.receive") as mock_receive:
mock_receive.side_effect = [None, None, Exception("boom"), None, None]
mock_receive.side_effect = [None, None, Exception("boom"), None, None, None]
with pytest.raises(Exception, match=r"boom"):
handle_mo_messages(messages)

for received_message in messages[:2]:
# Message 0 raised a message decoding exception, it should be marked
# with an error since there is no sense in retrying decoding errors
# before a code change
failed_message = messages[0]
failed_message.refresh_from_db()
assert failed_message.status == MOMessage.Status.ERROR
# receive() should never be called for a message with a failed decoding
assert self.receive_was_called(mock_receive, failed_message) == 0

for received_message in messages[1:3]:
# Messages 1, 2 should be received and marked as done
received_message.refresh_from_db()
assert received_message.status == MOMessage.Status.DONE
assert self.receive_was_called(mock_receive, received_message)

# Message 3 raised an exception, should not be marked done so it
# will be retried later
failed_message = messages[2]
failed_message = messages[3]
failed_message.refresh_from_db()
assert failed_message.status == MOMessage.Status.NEW
assert self.receive_was_called(mock_receive, failed_message)

for other_message in messages[3:]:
for other_message in messages[4:]:
# Messages 4, 5 should remain new to be received later
other_message.refresh_from_db()
assert other_message.status == MOMessage.Status.NEW
Expand Down
Loading