From f8bfb5684deca069b366682151ee91696dd901c6 Mon Sep 17 00:00:00 2001 From: Gagan Deep Date: Mon, 29 Jul 2024 17:35:01 +0530 Subject: [PATCH] [fix] Fixed timseries structure for storing signal metric #586 Updated timeseries migration script to also handle signal metrics. Fixes #586 Signed-off-by: Gagan Deep --- openwisp_monitoring/device/writer.py | 16 +-- .../migrations/0005_migrate_metrics.py | 63 ++--------- .../migrations/0012_migrate_signal_metrics.py | 38 +++++++ .../influxdb/influxdb_alter_structure_0006.py | 102 +++++++++++++++++- 4 files changed, 156 insertions(+), 63 deletions(-) create mode 100644 openwisp_monitoring/monitoring/migrations/0012_migrate_signal_metrics.py diff --git a/openwisp_monitoring/device/writer.py b/openwisp_monitoring/device/writer.py index 8b0fdd7a2..aeb124618 100644 --- a/openwisp_monitoring/device/writer.py +++ b/openwisp_monitoring/device/writer.py @@ -73,7 +73,6 @@ def write(self, data, time=None, current=False): self.write_device_metrics = [] for interface in data.get('interfaces', []): ifname = interface['name'] - extra_tags = Metric._sort_dict(device_extra_tags) if 'mobile' in interface: self._write_mobile_signal( interface, ifname, ct, self.device_data.pk, current, time=time @@ -100,7 +99,7 @@ def write(self, data, time=None, current=False): name=name, key='traffic', main_tags={'ifname': Metric._makekey(ifname)}, - extra_tags=extra_tags, + extra_tags=device_extra_tags, ) self._append_metric_data( metric, field_value, current, time=time, extra_values=extra_values @@ -121,7 +120,7 @@ def write(self, data, time=None, current=False): name=name, key='wifi_clients', main_tags={'ifname': Metric._makekey(ifname)}, - extra_tags=extra_tags, + extra_tags=device_extra_tags, ) # avoid tsdb overwrite clients client_time = time @@ -174,7 +173,7 @@ def _get_extra_tags(self, device): tags['location_id'] = str(device_location.location_id) if device_location.floorplan_id: tags['floorplan_id'] = str(device_location.floorplan_id) - return tags + return Metric._sort_dict(tags) def _get_mobile_signal_type(self, signal): if not signal: @@ -214,7 +213,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No content_type_id=ct.id, configuration='signal_strength', name='signal strength', - key=ifname, + key='signal', + main_tags={'ifname': Metric._makekey(ifname)}, ) self._append_metric_data( metric, signal_strength, current, time=time, extra_values=extra_values @@ -242,7 +242,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No content_type_id=ct.id, configuration='signal_quality', name='signal quality', - key=ifname, + key='signal', + main_tags={'ifname': Metric._makekey(ifname)}, ) self._append_metric_data( metric, signal_quality, current, time=time, extra_values=extra_values @@ -255,7 +256,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No content_type_id=ct.id, configuration='access_tech', name='access technology', - key=ifname, + key='signal', + main_tags={'ifname': Metric._makekey(ifname)}, ) self._append_metric_data( metric, diff --git a/openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py b/openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py index 96e9d4b89..ba5324234 100644 --- a/openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py +++ b/openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py @@ -2,71 +2,24 @@ import logging -from django.core.exceptions import ObjectDoesNotExist from django.db import migrations -from swapper import load_model -from .influxdb.influxdb_alter_structure_0006 import EXCLUDED_MEASUREMENTS +from .influxdb.influxdb_alter_structure_0006 import ( + update_metric_timeseries_structure_forward_migration, + update_metric_timeseries_structure_reverse_migration, +) CHUNK_SIZE = 1000 logger = logging.getLogger(__name__) -def forward_migrate_metric(metric_model, configuration, new_key): - metric_qs = metric_model.objects.filter(configuration=configuration).exclude( - key__in=EXCLUDED_MEASUREMENTS - ) - updated_metrics = [] - for metric in metric_qs.iterator(chunk_size=CHUNK_SIZE): - try: - extra_tags = {'organization_id': str(metric.content_object.organization_id)} - except ObjectDoesNotExist: - extra_tags = {} - metric.main_tags = { - 'ifname': metric.key, - } - metric.extra_tags.update(extra_tags) - metric.key = new_key - updated_metrics.append(metric) - if len(updated_metrics) > CHUNK_SIZE: - metric_model.objects.bulk_update( - updated_metrics, fields=['main_tags', 'extra_tags', 'key'] - ) - updated_metrics = [] - if updated_metrics: - metric_model.objects.bulk_update( - updated_metrics, fields=['main_tags', 'extra_tags', 'key'] - ) - - -def forward_migration(apps, schema_editor): - Metric = load_model('monitoring', 'Metric') - forward_migrate_metric(Metric, configuration='clients', new_key='wifi_clients') - forward_migrate_metric(Metric, configuration='traffic', new_key='traffic') - - -def reverse_migration(apps, schema_editor): - # Reverse migration is required because of the - # the unique together condition implemented in - # Metric model. - Metric = load_model('monitoring', 'Metric') - updated_metrics = [] - for metric in Metric.objects.filter(key__in=['traffic', 'wifi_clients']).iterator( - chunk_size=CHUNK_SIZE - ): - metric.key = metric.main_tags['ifname'] - updated_metrics.append(metric) - if len(updated_metrics) > CHUNK_SIZE: - Metric.objects.bulk_update(updated_metrics, fields=['key']) - updated_metrics = [] - if updated_metrics: - Metric.objects.bulk_update(updated_metrics, fields=['key']) - - class Migration(migrations.Migration): dependencies = [('monitoring', '0004_metric_main_and_extra_tags')] operations = [ - migrations.RunPython(forward_migration, reverse_code=reverse_migration) + migrations.RunPython( + update_metric_timeseries_structure_forward_migration, + reverse_code=update_metric_timeseries_structure_reverse_migration, + ) ] diff --git a/openwisp_monitoring/monitoring/migrations/0012_migrate_signal_metrics.py b/openwisp_monitoring/monitoring/migrations/0012_migrate_signal_metrics.py new file mode 100644 index 000000000..8311b803c --- /dev/null +++ b/openwisp_monitoring/monitoring/migrations/0012_migrate_signal_metrics.py @@ -0,0 +1,38 @@ +# Manually created + +import logging + +from django.db import migrations + +from .influxdb.influxdb_alter_structure_0006 import ( + update_metric_timeseries_structure_forward_migration, + update_metric_timeseries_structure_reverse_migration, +) + +CHUNK_SIZE = 1000 + +logger = logging.getLogger(__name__) + + +def forward_migration(apps, schema_editor): + update_metric_timeseries_structure_forward_migration(apps, schema_editor) + from ..tasks import migrate_timeseries_database + + migrate_timeseries_database.delay() + + +def reverse_migration(apps, schema_editor): + update_metric_timeseries_structure_reverse_migration( + apps, schema_editor, metric_keys=['signal'] + ) + + +class Migration(migrations.Migration): + dependencies = [('monitoring', '0011_alter_metric_field_name')] + + operations = [ + migrations.RunPython( + forward_migration, + reverse_code=reverse_migration, + ) + ] diff --git a/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py b/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py index 406bc4ea2..cbc122825 100644 --- a/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py +++ b/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py @@ -2,6 +2,7 @@ import time import requests +from django.core.exceptions import ObjectDoesNotExist from influxdb.exceptions import InfluxDBServerError from swapper import load_model @@ -166,10 +167,36 @@ def migrate_wifi_clients(): def migrate_traffic_data(): - migrate_influxdb_data(new_measurement='traffic', configuration='traffic') + migrate_influxdb_data( + new_measurement='traffic', + configuration='traffic', + delete_query=f"{DELETE_QUERY} AND access_tech != ''", + ) logger.info('"traffic" measurements successfully migrated.') +def migrate_signal_strength_data(): + migrate_influxdb_data( + new_measurement='signal', configuration='signal_strength', delete_query=None + ) + logger.info('"signal_strength" measurements successfully migrated.') + + +def migrate_signal_quality_data(): + migrate_influxdb_data( + new_measurement='signal', configuration='signal_quality', delete_query=None + ) + logger.info('"signal_quality" measurements successfully migrated.') + + +def migrate_access_tech_data(): + migrate_influxdb_data( + new_measurement='signal', + configuration='access_tech', + ) + logger.info('"access_tech" measurements successfully migrated.') + + def requires_migration(): """ Returns "False" if all measurements presents in InfluxDB @@ -192,4 +219,77 @@ def migrate_influxdb_structure(): return migrate_wifi_clients() migrate_traffic_data() + migrate_signal_strength_data() + migrate_signal_quality_data() + migrate_access_tech_data() logger.info('Timeseries data migration completed.') + + +def _forward_migrate_metric(metric_model, configuration, new_key, add_org_tag=True): + metric_qs = metric_model.objects.filter(configuration=configuration).exclude( + key__in=EXCLUDED_MEASUREMENTS + ) + updated_metrics = [] + for metric in metric_qs.iterator(chunk_size=CHUNK_SIZE): + try: + assert add_org_tag is True + extra_tags = {'organization_id': str(metric.content_object.organization_id)} + except (AssertionError, ObjectDoesNotExist): + extra_tags = {} + metric.main_tags = { + 'ifname': metric.key, + } + metric.extra_tags.update(extra_tags) + metric.key = new_key + updated_metrics.append(metric) + if len(updated_metrics) > CHUNK_SIZE: + metric_model.objects.bulk_update( + updated_metrics, fields=['main_tags', 'extra_tags', 'key'] + ) + updated_metrics = [] + if updated_metrics: + metric_model.objects.bulk_update( + updated_metrics, fields=['main_tags', 'extra_tags', 'key'] + ) + + +def update_metric_timeseries_structure_forward_migration(apps, schema_editor): + """ + Updates metric objects to use static value for key and set + interface name in the main tags + """ + Metric = load_model('monitoring', 'Metric') + _forward_migrate_metric(Metric, configuration='clients', new_key='wifi_clients') + _forward_migrate_metric(Metric, configuration='traffic', new_key='traffic') + _forward_migrate_metric( + Metric, configuration='signal_strength', new_key='signal', add_org_tag=False + ) + _forward_migrate_metric( + Metric, configuration='signal_quality', new_key='signal', add_org_tag=False + ) + _forward_migrate_metric( + Metric, configuration='access_tech', new_key='signal', add_org_tag=False + ) + + +def update_metric_timeseries_structure_reverse_migration( + apps, schema_editor, metric_keys=None +): + """ + Reverse migration is required because of the + the unique together condition implemented in + Metric model. + """ + Metric = load_model('monitoring', 'Metric') + metric_keys = metric_keys or ['traffic', 'wifi_clients', 'signal'] + updated_metrics = [] + for metric in Metric.objects.filter( + key__in=metric_keys, + ).iterator(chunk_size=CHUNK_SIZE): + metric.key = metric.main_tags['ifname'] + updated_metrics.append(metric) + if len(updated_metrics) > CHUNK_SIZE: + Metric.objects.bulk_update(updated_metrics, fields=['key']) + updated_metrics = [] + if updated_metrics: + Metric.objects.bulk_update(updated_metrics, fields=['key'])