Skip to content

Commit

Permalink
django: implement asynchronous requests
Browse files Browse the repository at this point in the history
The asynchronous function process_pending_operationss checks the
database for queued endpointOperations and sends them to the related
endpoint. The function is called e.g. during data insertion via the
Admin dashboard or during an registration update of an endpoing.

Signed-off-by: Jonas Remmert <jremmert@gmx.net>
  • Loading branch information
jonas-rem committed Jul 4, 2024
1 parent 0cb12c5 commit 17f712b
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 60 deletions.
62 changes: 4 additions & 58 deletions server/django/sensordata/admin.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import requests
import logging
import os
from django.utils import timezone
from django.contrib import admin
from .tasks import process_pending_operations
from .models import (
Endpoint,
ResourceType,
Expand All @@ -15,9 +14,6 @@

log = logging.getLogger('sensordata')

# Check if we run in a container or locally
LESHAN_URI = os.getenv('LESHAN_URI', 'http://0.0.0.0:8080') + '/api'


@admin.register(Endpoint)
class EndpointAdmin(admin.ModelAdmin):
Expand Down Expand Up @@ -79,63 +75,13 @@ class EndpointOperationAdmin(admin.ModelAdmin):
'last_attempt', 'operation_type')

def save_model(self, request, obj, form, change):
# Update both timestamps, as we handle a manual entry. Automatic
# retries would only update the last_attempt
obj.last_attempt = timezone.now()
# Update created timestamp, as we handle a manual entry.
obj.timestamp_created = timezone.now()
super().save_model(request, obj, form, change)

# Get the resource associated with the endpoint operation
resource = obj.resource
endpoint = resource.endpoint
resource_type = resource.resource_type

# Determine the value based on the type of resource value
value = None
if resource_type.data_type == 'integer':
value = resource.int_value
elif resource_type.data_type == 'float':
value = resource.float_value
elif resource_type.data_type == 'string':
value = resource.str_value
elif resource_type.data_type == 'bool':
value = resource.int_value
elif resource_type.data_type == 'time':
value = resource.int_value
# Execute operation
elif resource_type.data_type == '':
pass
else:
log.error('Resource value not found')
return

# Construct the URL based on the endpoint, object_id, and resource_id
url = (
f'{LESHAN_URI}/clients/{endpoint.endpoint}/'
f'{resource_type.object_id}/0/{resource_type.resource_id}'
)
params = {'timeout': 5, 'format': 'CBOR'}
headers = {'Content-Type': 'application/json'}
data = {
"id": resource_type.resource_id,
"kind": "singleResource",
"value": value,
"type": resource_type.data_type
}

# Send the request
response = requests.put(url, params=params, headers=headers, json=data)

if response.status_code == 200:
log.debug(f'Data sent to endpoint {endpoint.endpoint} successfully')
log.debug(f'Response: {response.status_code} - {response.json()}')
obj.status = 'completed'
else:
log.error(f'Failed to send data: {response.status_code}')
obj.status = 'pending'
obj.transmit_counter += 1
# Trigger the async task to process the operation
process_pending_operations.delay(obj.resource.endpoint.endpoint)

obj.save()

@admin.register(Firmware)
class FirmwareAdmin(admin.ModelAdmin):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 5.0.6 on 2024-06-10 12:05

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('sensordata', '0005_rename_timestamp_sent_endpointoperation_timestamp_created_and_more'),
]

operations = [
migrations.AlterField(
model_name='endpointoperation',
name='last_attempt',
field=models.DateTimeField(null=True),
),
migrations.AlterField(
model_name='endpointoperation',
name='timestamp_created',
field=models.DateTimeField(auto_now_add=True),
),
]
14 changes: 12 additions & 2 deletions server/django/sensordata/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class Meta:
def __str__(self):
return f"{self.object_id}/{self.resource_id} - {self.name}"

def get_value_field(self):
return dict(ResourceType.TYPE_CHOICES).get(self.data_type)

class Resource(models.Model):
"""Stores individual resource data, such as sensor readings, from an endpoint."""
endpoint = models.ForeignKey(Endpoint, on_delete=models.PROTECT)
Expand All @@ -50,6 +53,13 @@ class Meta:
def __str__(self):
return f"{self.endpoint} - {self.resource_type} - {self.timestamp}"

# Gets the correct value field, based on the linked ResourceType
def get_value(self):
value_field = self.resource_type.get_value_field()
if value_field:
return getattr(self, value_field)
return None

class Event(models.Model):
"""
Represents a significant event in the system that is associated with a
Expand Down Expand Up @@ -88,8 +98,8 @@ class Status(models.TextChoices):
default=Status.QUEUED,
)
transmit_counter = models.IntegerField(default=0)
timestamp_created = models.DateTimeField()
last_attempt = models.DateTimeField(auto_now=True)
timestamp_created = models.DateTimeField(auto_now_add=True)
last_attempt = models.DateTimeField(auto_now_add=False, null=True)

class Firmware(models.Model):
"""Represents a firmware update file that can be downloaded by an endpoint."""
Expand Down
85 changes: 85 additions & 0 deletions server/django/sensordata/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from celery import shared_task
import requests
from .models import Endpoint, EndpointOperation
import logging
import os
from django.utils import timezone

# Check if we run in a container or locally
LESHAN_URI = os.getenv('LESHAN_URI', 'http://0.0.0.0:8080') + '/api'


logger = logging.getLogger(__name__)


@shared_task
def send_operation(endpointOperation_id: int) -> None:

try:
e_ops = EndpointOperation.objects.get(id=endpointOperation_id)
except EndpointOperation.DoesNotExist:
logger.error(f"Operation {endpointOperation_id} does not exist.")
return

resource = e_ops.resource
resource_type = resource.resource_type
ep = resource.endpoint.endpoint
logger.debug(f"Sending data to endpoint {ep}")

# Construct the URL based on the endpoint, object_id, and resource_id
url = (
f'{LESHAN_URI}/clients/{ep}/'
f'{resource_type.object_id}/0/{resource_type.resource_id}'
)
params = {'timeout': 5, 'format': 'CBOR'}
headers = {'Content-Type': 'application/json'}
data = {
"id": resource_type.resource_id,
"kind": "singleResource",
"value": resource.get_value(),
"type": resource_type.data_type
}

# Store the current attempt in the database as the request may take a while
e_ops.last_attempt = timezone.now()
e_ops.status = e_ops.Status.SENDING
e_ops.save()

response = requests.put(url, params=params, headers=headers, json=data)
if response.status_code == 200:
logger.debug(f'Data sent to endpoint {ep} successfully')
logger.debug(f'Response: {response.status_code} - {response.json()}')
e_ops.status = e_ops.Status.CONFIRMED
else:
logger.error(f'Failed to send data: {response.status_code}')
e_ops.transmit_counter += 1

if e_ops.transmit_counter >= 3:
e_ops.status = e_ops.Status.FAILED
else:
e_ops.status = e_ops.Status.QUEUED

e_ops.save()


@shared_task
def process_pending_operations(endpoint_id):
try:
endpoint = Endpoint.objects.get(endpoint=endpoint_id)
except Endpoint.DoesNotExist:
logger.error(f"Endpoint {endpoint_id} does not exist.")
return

# Get all pending operations for the endpoint
pending_operations = EndpointOperation.objects.filter(
resource__endpoint=endpoint,
status=EndpointOperation.Status.QUEUED
)
logger.info(f"Found {pending_operations.count()} pending operations for {endpoint.endpoint}")
logger.info(f"Pending operations: {pending_operations}")

for operation in pending_operations:
logger.debug(f"Processing operation {operation.id}")

# Call the send_operation task asynchronously
send_operation.delay(operation.id)

0 comments on commit 17f712b

Please sign in to comment.