Skip to content

Commit

Permalink
[fix] Fixed timseries structure for storing signal metric #586
Browse files Browse the repository at this point in the history
Updated timeseries migration script to also handle signal metrics.

Fixes #586

Signed-off-by: Gagan Deep <pandafy.dev@gmail.com>
  • Loading branch information
pandafy committed Jul 29, 2024
1 parent cc04078 commit f8bfb56
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 63 deletions.
16 changes: 9 additions & 7 deletions openwisp_monitoring/device/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def write(self, data, time=None, current=False):
self.write_device_metrics = []
for interface in data.get('interfaces', []):
ifname = interface['name']
extra_tags = Metric._sort_dict(device_extra_tags)
if 'mobile' in interface:
self._write_mobile_signal(
interface, ifname, ct, self.device_data.pk, current, time=time
Expand All @@ -100,7 +99,7 @@ def write(self, data, time=None, current=False):
name=name,
key='traffic',
main_tags={'ifname': Metric._makekey(ifname)},
extra_tags=extra_tags,
extra_tags=device_extra_tags,
)
self._append_metric_data(
metric, field_value, current, time=time, extra_values=extra_values
Expand All @@ -121,7 +120,7 @@ def write(self, data, time=None, current=False):
name=name,
key='wifi_clients',
main_tags={'ifname': Metric._makekey(ifname)},
extra_tags=extra_tags,
extra_tags=device_extra_tags,
)
# avoid tsdb overwrite clients
client_time = time
Expand Down Expand Up @@ -174,7 +173,7 @@ def _get_extra_tags(self, device):
tags['location_id'] = str(device_location.location_id)
if device_location.floorplan_id:
tags['floorplan_id'] = str(device_location.floorplan_id)
return tags
return Metric._sort_dict(tags)

def _get_mobile_signal_type(self, signal):
if not signal:
Expand Down Expand Up @@ -214,7 +213,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No
content_type_id=ct.id,
configuration='signal_strength',
name='signal strength',
key=ifname,
key='signal',
main_tags={'ifname': Metric._makekey(ifname)},
)
self._append_metric_data(
metric, signal_strength, current, time=time, extra_values=extra_values
Expand Down Expand Up @@ -242,7 +242,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No
content_type_id=ct.id,
configuration='signal_quality',
name='signal quality',
key=ifname,
key='signal',
main_tags={'ifname': Metric._makekey(ifname)},
)
self._append_metric_data(
metric, signal_quality, current, time=time, extra_values=extra_values
Expand All @@ -255,7 +256,8 @@ def _write_mobile_signal(self, interface, ifname, ct, pk, current=False, time=No
content_type_id=ct.id,
configuration='access_tech',
name='access technology',
key=ifname,
key='signal',
main_tags={'ifname': Metric._makekey(ifname)},
)
self._append_metric_data(
metric,
Expand Down
63 changes: 8 additions & 55 deletions openwisp_monitoring/monitoring/migrations/0005_migrate_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,24 @@

import logging

from django.core.exceptions import ObjectDoesNotExist
from django.db import migrations
from swapper import load_model

from .influxdb.influxdb_alter_structure_0006 import EXCLUDED_MEASUREMENTS
from .influxdb.influxdb_alter_structure_0006 import (
update_metric_timeseries_structure_forward_migration,
update_metric_timeseries_structure_reverse_migration,
)

CHUNK_SIZE = 1000

logger = logging.getLogger(__name__)


def forward_migrate_metric(metric_model, configuration, new_key):
metric_qs = metric_model.objects.filter(configuration=configuration).exclude(
key__in=EXCLUDED_MEASUREMENTS
)
updated_metrics = []
for metric in metric_qs.iterator(chunk_size=CHUNK_SIZE):
try:
extra_tags = {'organization_id': str(metric.content_object.organization_id)}
except ObjectDoesNotExist:
extra_tags = {}
metric.main_tags = {
'ifname': metric.key,
}
metric.extra_tags.update(extra_tags)
metric.key = new_key
updated_metrics.append(metric)
if len(updated_metrics) > CHUNK_SIZE:
metric_model.objects.bulk_update(
updated_metrics, fields=['main_tags', 'extra_tags', 'key']
)
updated_metrics = []
if updated_metrics:
metric_model.objects.bulk_update(
updated_metrics, fields=['main_tags', 'extra_tags', 'key']
)


def forward_migration(apps, schema_editor):
Metric = load_model('monitoring', 'Metric')
forward_migrate_metric(Metric, configuration='clients', new_key='wifi_clients')
forward_migrate_metric(Metric, configuration='traffic', new_key='traffic')


def reverse_migration(apps, schema_editor):
# Reverse migration is required because of the
# the unique together condition implemented in
# Metric model.
Metric = load_model('monitoring', 'Metric')
updated_metrics = []
for metric in Metric.objects.filter(key__in=['traffic', 'wifi_clients']).iterator(
chunk_size=CHUNK_SIZE
):
metric.key = metric.main_tags['ifname']
updated_metrics.append(metric)
if len(updated_metrics) > CHUNK_SIZE:
Metric.objects.bulk_update(updated_metrics, fields=['key'])
updated_metrics = []
if updated_metrics:
Metric.objects.bulk_update(updated_metrics, fields=['key'])


class Migration(migrations.Migration):
dependencies = [('monitoring', '0004_metric_main_and_extra_tags')]

operations = [
migrations.RunPython(forward_migration, reverse_code=reverse_migration)
migrations.RunPython(
update_metric_timeseries_structure_forward_migration,
reverse_code=update_metric_timeseries_structure_reverse_migration,
)
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Manually created

import logging

from django.db import migrations

from .influxdb.influxdb_alter_structure_0006 import (
update_metric_timeseries_structure_forward_migration,
update_metric_timeseries_structure_reverse_migration,
)

CHUNK_SIZE = 1000

logger = logging.getLogger(__name__)


def forward_migration(apps, schema_editor):
update_metric_timeseries_structure_forward_migration(apps, schema_editor)
from ..tasks import migrate_timeseries_database

migrate_timeseries_database.delay()


def reverse_migration(apps, schema_editor):
update_metric_timeseries_structure_reverse_migration(
apps, schema_editor, metric_keys=['signal']
)


class Migration(migrations.Migration):
dependencies = [('monitoring', '0011_alter_metric_field_name')]

operations = [
migrations.RunPython(
forward_migration,
reverse_code=reverse_migration,
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time

import requests
from django.core.exceptions import ObjectDoesNotExist
from influxdb.exceptions import InfluxDBServerError
from swapper import load_model

Expand Down Expand Up @@ -166,10 +167,36 @@ def migrate_wifi_clients():


def migrate_traffic_data():
migrate_influxdb_data(new_measurement='traffic', configuration='traffic')
migrate_influxdb_data(
new_measurement='traffic',
configuration='traffic',
delete_query=f"{DELETE_QUERY} AND access_tech != ''",
)
logger.info('"traffic" measurements successfully migrated.')


def migrate_signal_strength_data():
migrate_influxdb_data(
new_measurement='signal', configuration='signal_strength', delete_query=None
)
logger.info('"signal_strength" measurements successfully migrated.')


def migrate_signal_quality_data():
migrate_influxdb_data(
new_measurement='signal', configuration='signal_quality', delete_query=None
)
logger.info('"signal_quality" measurements successfully migrated.')


def migrate_access_tech_data():
migrate_influxdb_data(
new_measurement='signal',
configuration='access_tech',
)
logger.info('"access_tech" measurements successfully migrated.')


def requires_migration():
"""
Returns "False" if all measurements presents in InfluxDB
Expand All @@ -192,4 +219,77 @@ def migrate_influxdb_structure():
return
migrate_wifi_clients()
migrate_traffic_data()
migrate_signal_strength_data()
migrate_signal_quality_data()
migrate_access_tech_data()
logger.info('Timeseries data migration completed.')


def _forward_migrate_metric(metric_model, configuration, new_key, add_org_tag=True):
metric_qs = metric_model.objects.filter(configuration=configuration).exclude(
key__in=EXCLUDED_MEASUREMENTS
)
updated_metrics = []
for metric in metric_qs.iterator(chunk_size=CHUNK_SIZE):
try:
assert add_org_tag is True
extra_tags = {'organization_id': str(metric.content_object.organization_id)}
except (AssertionError, ObjectDoesNotExist):
extra_tags = {}
metric.main_tags = {
'ifname': metric.key,
}
metric.extra_tags.update(extra_tags)
metric.key = new_key
updated_metrics.append(metric)
if len(updated_metrics) > CHUNK_SIZE:
metric_model.objects.bulk_update(
updated_metrics, fields=['main_tags', 'extra_tags', 'key']
)
updated_metrics = []
if updated_metrics:
metric_model.objects.bulk_update(
updated_metrics, fields=['main_tags', 'extra_tags', 'key']
)


def update_metric_timeseries_structure_forward_migration(apps, schema_editor):
"""
Updates metric objects to use static value for key and set
interface name in the main tags
"""
Metric = load_model('monitoring', 'Metric')
_forward_migrate_metric(Metric, configuration='clients', new_key='wifi_clients')
_forward_migrate_metric(Metric, configuration='traffic', new_key='traffic')
_forward_migrate_metric(
Metric, configuration='signal_strength', new_key='signal', add_org_tag=False
)
_forward_migrate_metric(
Metric, configuration='signal_quality', new_key='signal', add_org_tag=False
)
_forward_migrate_metric(
Metric, configuration='access_tech', new_key='signal', add_org_tag=False
)


def update_metric_timeseries_structure_reverse_migration(
apps, schema_editor, metric_keys=None
):
"""
Reverse migration is required because of the
the unique together condition implemented in
Metric model.
"""
Metric = load_model('monitoring', 'Metric')
metric_keys = metric_keys or ['traffic', 'wifi_clients', 'signal']
updated_metrics = []
for metric in Metric.objects.filter(
key__in=metric_keys,
).iterator(chunk_size=CHUNK_SIZE):
metric.key = metric.main_tags['ifname']
updated_metrics.append(metric)
if len(updated_metrics) > CHUNK_SIZE:
Metric.objects.bulk_update(updated_metrics, fields=['key'])
updated_metrics = []
if updated_metrics:
Metric.objects.bulk_update(updated_metrics, fields=['key'])

0 comments on commit f8bfb56

Please sign in to comment.