Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous communication with Endoints via celery tasks #37

Merged
merged 5 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions doc/source/documentation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -298,16 +298,21 @@ Asynchronous Communication
...........................

Given that endpoints are comparably slow to respond, handling communication
asynchronously is essention for efficient operation. This can be effectively
asynchronously is essential for efficient operation. This can be effectively
managed using Celery, a distributed task queue. When Leshan notifies the
backend of an endpoint status update, Celery can be used to handle the
long-running API calls, ensuring that the backend remains responsive and
scalable. Once the API call is complete the database will be updated with the
result (e.g. ``completed``, ``pending``, ``failed``). Retransmissions can be
implemented based on the result and the requirements of the application. As the
backend communicates with many endpoints simultaneously, an efficient queing
mechanism is essential to ensure that the system remains responsive and
scalable.
scalable. As the backend communicates with many endpoints simultaneously, an
efficient queing mechanism is essential to ensure that the system remains
responsive and scalable.

Before the backend executes the API call, it updates the endpointOperation
status to ``SENDING``, indicating an ongoing operation.

Once the API call is complete the database will be updated with the result
(e.g. ``CONFIRMED``, ``FAILED``, ``QUEUED``) depending on the result of the
request. The ``FAILED`` status is assigned after 3 attempts. Retransmissions are
triggered when the endpoint updates it's registration the next time.

Example Communication
.....................
Expand Down
58 changes: 29 additions & 29 deletions server/django/db_initial_resource_types.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"object_id": 3303,
"resource_id": 5700,
"name": "temperature",
"data_type": "float_value"
"data_type": "FLOAT"
}
},
{
Expand All @@ -14,7 +14,7 @@
"object_id": 3304,
"resource_id": 5700,
"name": "humidity",
"data_type": "float_value"
"data_type": "FLOAT"
}
},
{
Expand All @@ -23,7 +23,7 @@
"object_id": 3,
"resource_id": 0,
"name": "manufacturer",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -32,7 +32,7 @@
"object_id": 3,
"resource_id": 1,
"name": "model_number",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -41,7 +41,7 @@
"object_id": 3,
"resource_id": 2,
"name": "serial_number",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -50,7 +50,7 @@
"object_id": 3,
"resource_id": 3,
"name": "firmware_version",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -77,7 +77,7 @@
"object_id": 3,
"resource_id": 6,
"name": "power_source",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -86,7 +86,7 @@
"object_id": 3,
"resource_id": 7,
"name": "power_source_v",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -95,7 +95,7 @@
"object_id": 3,
"resource_id": 8,
"name": "power_source_i",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -104,7 +104,7 @@
"object_id": 3,
"resource_id": 9,
"name": "battery_level",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -113,7 +113,7 @@
"object_id": 3,
"resource_id": 10,
"name": "memory_free",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -122,7 +122,7 @@
"object_id": 3,
"resource_id": 11,
"name": "error_code",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -140,7 +140,7 @@
"object_id": 3,
"resource_id": 13,
"name": "current_time",
"data_type": "int_value"
"data_type": "TIME"
}
},
{
Expand All @@ -149,7 +149,7 @@
"object_id": 3,
"resource_id": 14,
"name": "utc_offset",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -158,7 +158,7 @@
"object_id": 3,
"resource_id": 15,
"name": "timezone",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -167,7 +167,7 @@
"object_id": 3,
"resource_id": 16,
"name": "binding_mode",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -176,7 +176,7 @@
"object_id": 3,
"resource_id": 17,
"name": "device_type",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -185,7 +185,7 @@
"object_id": 3,
"resource_id": 18,
"name": "hardware_version",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -194,7 +194,7 @@
"object_id": 3,
"resource_id": 19,
"name": "software_version",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -203,7 +203,7 @@
"object_id": 3,
"resource_id": 20,
"name": "battery_status",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -212,7 +212,7 @@
"object_id": 3,
"resource_id": 21,
"name": "memory_total",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -239,7 +239,7 @@
"object_id": 5,
"resource_id": 1,
"name": "package_uri",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -257,7 +257,7 @@
"object_id": 5,
"resource_id": 3,
"name": "state",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -266,7 +266,7 @@
"object_id": 5,
"resource_id": 4,
"name": "update_result",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -275,7 +275,7 @@
"object_id": 5,
"resource_id": 5,
"name": "package_name",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -284,7 +284,7 @@
"object_id": 5,
"resource_id": 6,
"name": "package_version",
"data_type": "str_value"
"data_type": "STRING"
}
},
{
Expand All @@ -293,7 +293,7 @@
"object_id": 10240,
"resource_id": 0,
"name": "ep_registered",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -302,7 +302,7 @@
"object_id": 10240,
"resource_id": 1,
"name": "ep_unregistered",
"data_type": "int_value"
"data_type": "INTEGER"
}
},
{
Expand All @@ -311,7 +311,7 @@
"object_id": 10240,
"resource_id": 2,
"name": "ep_registration_update",
"data_type": "int_value"
"data_type": "INTEGER"
}
}
]
7 changes: 7 additions & 0 deletions server/django/django_start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,12 @@ else
echo "lwm2m resource types data already loaded, skipping..."
fi

# Choosing gevent as the event loop for celery for significantly lower memory
# usage. The application is IO bound as it is waiting for ReST API responses.
#
# -P gevent -c 100: 100 concurrent workers
echo "Starting the Celery worker..."
celery -A server worker --loglevel=info -P gevent -c 100 2>&1 | tee -a "logs/celery_$(date +%Y-%m-%d_%H-%M-%S).log" &

echo "Starting the server..."
python manage.py runserver 0.0.0.0:8000 2>&1 | tee -a "$logfile"
2 changes: 2 additions & 0 deletions server/django/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ django-extensions==3.2.3
requests==2.31.0
djangorestframework==3.15.1
whitenoise==6.6.0
celery[redis]==5.4.0
gevent==24.2.1

# Generation of Entity-Relationship Diagrams
graphviz==0.20.3
Expand Down
Loading