diff --git a/openwisp_monitoring/device/writer.py b/openwisp_monitoring/device/writer.py index 48071a4c..f09e4824 100644 --- a/openwisp_monitoring/device/writer.py +++ b/openwisp_monitoring/device/writer.py @@ -71,7 +71,6 @@ def write(self, data, time=None, current=False): self.write_device_metrics = [] for interface in data.get('interfaces', []): ifname = interface['name'] - extra_tags = Metric._sort_dict(device_extra_tags) if 'mobile' in interface: self._write_mobile_signal( interface, ifname, ct, self.device_data.pk, current, time=time @@ -98,7 +97,7 @@ def write(self, data, time=None, current=False): name=name, key='traffic', main_tags={'ifname': Metric._makekey(ifname)}, - extra_tags=extra_tags, + extra_tags=device_extra_tags, ) self._append_metric_data( metric, field_value, current, time=time, extra_values=extra_values @@ -119,7 +118,7 @@ def write(self, data, time=None, current=False): name=name, key='wifi_clients', main_tags={'ifname': Metric._makekey(ifname)}, - extra_tags=extra_tags, + extra_tags=device_extra_tags, ) # avoid tsdb overwrite clients client_time = time @@ -172,7 +171,7 @@ def _get_extra_tags(self, device): tags['location_id'] = str(device_location.location_id) if device_location.floorplan_id: tags['floorplan_id'] = str(device_location.floorplan_id) - return tags + return Metric._sort_dict(tags) def _get_mobile_signal_type(self, signal): if not signal: @@ -212,7 +211,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No content_type_id=ct.id, configuration='signal_strength', name='signal strength', - key=ifname, + key='signal', + main_tags={'ifname': Metric._makekey(ifname)}, ) self._append_metric_data( metric, signal_strength, current, time=time, extra_values=extra_values @@ -240,7 +240,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No content_type_id=ct.id, configuration='signal_quality', name='signal quality', - key=ifname, + key='signal', + main_tags={'ifname': Metric._makekey(ifname)}, ) self._append_metric_data( metric, signal_quality, current, time=time, extra_values=extra_values @@ -253,7 +254,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No content_type_id=ct.id, configuration='access_tech', name='access technology', - key=ifname, + key='signal', + main_tags={'ifname': Metric._makekey(ifname)}, ) self._append_metric_data( metric, diff --git a/openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py b/openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py index 96e9d4b8..ba532423 100644 --- a/openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py +++ b/openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py @@ -2,71 +2,24 @@ import logging -from django.core.exceptions import ObjectDoesNotExist from django.db import migrations -from swapper import load_model -from .influxdb.influxdb_alter_structure_0006 import EXCLUDED_MEASUREMENTS +from .influxdb.influxdb_alter_structure_0006 import ( + update_metric_timeseries_structure_forward_migration, + update_metric_timeseries_structure_reverse_migration, +) CHUNK_SIZE = 1000 logger = logging.getLogger(__name__) -def forward_migrate_metric(metric_model, configuration, new_key): - metric_qs = metric_model.objects.filter(configuration=configuration).exclude( - key__in=EXCLUDED_MEASUREMENTS - ) - updated_metrics = [] - for metric in metric_qs.iterator(chunk_size=CHUNK_SIZE): - try: - extra_tags = {'organization_id': str(metric.content_object.organization_id)} - except ObjectDoesNotExist: - extra_tags = {} - metric.main_tags = { - 'ifname': metric.key, - } - metric.extra_tags.update(extra_tags) - metric.key = new_key - updated_metrics.append(metric) - if len(updated_metrics) > CHUNK_SIZE: - metric_model.objects.bulk_update( - updated_metrics, fields=['main_tags', 'extra_tags', 'key'] - ) - updated_metrics = [] - if updated_metrics: - metric_model.objects.bulk_update( - updated_metrics, fields=['main_tags', 'extra_tags', 'key'] - ) - - -def forward_migration(apps, schema_editor): - Metric = load_model('monitoring', 'Metric') - forward_migrate_metric(Metric, configuration='clients', new_key='wifi_clients') - forward_migrate_metric(Metric, configuration='traffic', new_key='traffic') - - -def reverse_migration(apps, schema_editor): - # Reverse migration is required because of the - # the unique together condition implemented in - # Metric model. - Metric = load_model('monitoring', 'Metric') - updated_metrics = [] - for metric in Metric.objects.filter(key__in=['traffic', 'wifi_clients']).iterator( - chunk_size=CHUNK_SIZE - ): - metric.key = metric.main_tags['ifname'] - updated_metrics.append(metric) - if len(updated_metrics) > CHUNK_SIZE: - Metric.objects.bulk_update(updated_metrics, fields=['key']) - updated_metrics = [] - if updated_metrics: - Metric.objects.bulk_update(updated_metrics, fields=['key']) - - class Migration(migrations.Migration): dependencies = [('monitoring', '0004_metric_main_and_extra_tags')] operations = [ - migrations.RunPython(forward_migration, reverse_code=reverse_migration) + migrations.RunPython( + update_metric_timeseries_structure_forward_migration, + reverse_code=update_metric_timeseries_structure_reverse_migration, + ) ] diff --git a/openwisp_monitoring/monitoring/migrations/0012_migrate_signal_metrics.py b/openwisp_monitoring/monitoring/migrations/0012_migrate_signal_metrics.py new file mode 100644 index 00000000..8311b803 --- /dev/null +++ b/openwisp_monitoring/monitoring/migrations/0012_migrate_signal_metrics.py @@ -0,0 +1,38 @@ +# Manually created + +import logging + +from django.db import migrations + +from .influxdb.influxdb_alter_structure_0006 import ( + update_metric_timeseries_structure_forward_migration, + update_metric_timeseries_structure_reverse_migration, +) + +CHUNK_SIZE = 1000 + +logger = logging.getLogger(__name__) + + +def forward_migration(apps, schema_editor): + update_metric_timeseries_structure_forward_migration(apps, schema_editor) + from ..tasks import migrate_timeseries_database + + migrate_timeseries_database.delay() + + +def reverse_migration(apps, schema_editor): + update_metric_timeseries_structure_reverse_migration( + apps, schema_editor, metric_keys=['signal'] + ) + + +class Migration(migrations.Migration): + dependencies = [('monitoring', '0011_alter_metric_field_name')] + + operations = [ + migrations.RunPython( + forward_migration, + reverse_code=reverse_migration, + ) + ] diff --git a/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py b/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py index af437262..873e1d7c 100644 --- a/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py +++ b/openwisp_monitoring/monitoring/migrations/influxdb/influxdb_alter_structure_0006.py @@ -2,6 +2,7 @@ import time import requests +from django.core.exceptions import ObjectDoesNotExist from influxdb.exceptions import InfluxDBServerError from swapper import load_model @@ -164,10 +165,36 @@ def migrate_wifi_clients(): def migrate_traffic_data(): - migrate_influxdb_data(new_measurement='traffic', configuration='traffic') + migrate_influxdb_data( + new_measurement='traffic', + configuration='traffic', + delete_query=f"{DELETE_QUERY} AND access_tech != ''", + ) logger.info('"traffic" measurements successfully migrated.') +def migrate_signal_strength_data(): + migrate_influxdb_data( + new_measurement='signal', configuration='signal_strength', delete_query=None + ) + logger.info('"signal_strength" measurements successfully migrated.') + + +def migrate_signal_quality_data(): + migrate_influxdb_data( + new_measurement='signal', configuration='signal_quality', delete_query=None + ) + logger.info('"signal_quality" measurements successfully migrated.') + + +def migrate_access_tech_data(): + migrate_influxdb_data( + new_measurement='signal', + configuration='access_tech', + ) + logger.info('"access_tech" measurements successfully migrated.') + + def requires_migration(): """Indicates whether influxdb data migration is necessary. @@ -190,4 +217,77 @@ def migrate_influxdb_structure(): return migrate_wifi_clients() migrate_traffic_data() + migrate_signal_strength_data() + migrate_signal_quality_data() + migrate_access_tech_data() logger.info('Timeseries data migration completed.') + + +def _forward_migrate_metric(metric_model, configuration, new_key, add_org_tag=True): + metric_qs = metric_model.objects.filter(configuration=configuration).exclude( + key__in=EXCLUDED_MEASUREMENTS + ) + updated_metrics = [] + for metric in metric_qs.iterator(chunk_size=CHUNK_SIZE): + try: + assert add_org_tag is True + extra_tags = {'organization_id': str(metric.content_object.organization_id)} + except (AssertionError, ObjectDoesNotExist): + extra_tags = {} + metric.main_tags = { + 'ifname': metric.key, + } + metric.extra_tags.update(extra_tags) + metric.key = new_key + updated_metrics.append(metric) + if len(updated_metrics) > CHUNK_SIZE: + metric_model.objects.bulk_update( + updated_metrics, fields=['main_tags', 'extra_tags', 'key'] + ) + updated_metrics = [] + if updated_metrics: + metric_model.objects.bulk_update( + updated_metrics, fields=['main_tags', 'extra_tags', 'key'] + ) + + +def update_metric_timeseries_structure_forward_migration(apps, schema_editor): + """ + Updates metric objects to use static value for key and set + interface name in the main tags + """ + Metric = load_model('monitoring', 'Metric') + _forward_migrate_metric(Metric, configuration='clients', new_key='wifi_clients') + _forward_migrate_metric(Metric, configuration='traffic', new_key='traffic') + _forward_migrate_metric( + Metric, configuration='signal_strength', new_key='signal', add_org_tag=False + ) + _forward_migrate_metric( + Metric, configuration='signal_quality', new_key='signal', add_org_tag=False + ) + _forward_migrate_metric( + Metric, configuration='access_tech', new_key='signal', add_org_tag=False + ) + + +def update_metric_timeseries_structure_reverse_migration( + apps, schema_editor, metric_keys=None +): + """ + Reverse migration is required because of the + the unique together condition implemented in + Metric model. + """ + Metric = load_model('monitoring', 'Metric') + metric_keys = metric_keys or ['traffic', 'wifi_clients', 'signal'] + updated_metrics = [] + for metric in Metric.objects.filter( + key__in=metric_keys, + ).iterator(chunk_size=CHUNK_SIZE): + metric.key = metric.main_tags['ifname'] + updated_metrics.append(metric) + if len(updated_metrics) > CHUNK_SIZE: + Metric.objects.bulk_update(updated_metrics, fields=['key']) + updated_metrics = [] + if updated_metrics: + Metric.objects.bulk_update(updated_metrics, fields=['key'])