Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update windborne ingestor to fetch per mission and add it to API #194

Merged
merged 7 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion django_project/gap/admin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class MeasurementAdmin(admin.ModelAdmin):
"""Measurement admin."""

list_display = (
'station', 'dataset_attribute', 'date_time', 'value'
'station', 'dataset_attribute', 'date_time', 'value', 'station_history'
)
list_filter = ('station',)
search_fields = ('name',)
Expand Down
10 changes: 10 additions & 0 deletions django_project/gap/admin/station.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from gap.models import (
Station, StationType, StationHistory
)
from gap.tasks.station import assign_history_of_stations_to_measurement


@admin.register(StationType)
Expand All @@ -19,6 +20,14 @@
pass


@admin.action(description='Assign history to measurement')
def assign_history_to_measurement(modeladmin, request, queryset):
"""Assign history to measurement."""
assign_history_of_stations_to_measurement.delay(

Check warning on line 26 in django_project/gap/admin/station.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/admin/station.py#L26

Added line #L26 was not covered by tests
list(queryset.values_list('id', flat=True))
)


@admin.register(Station)
class StationAdmin(admin.ModelAdmin):
"""Station admin."""
Expand All @@ -28,6 +37,7 @@
)
list_filter = ('provider', 'station_type', 'country')
search_fields = ('code', 'name')
actions = (assign_history_to_measurement,)


@admin.register(StationHistory)
Expand Down
4 changes: 2 additions & 2 deletions django_project/gap/fixtures/4.dataset_type.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@
"model": "gap.datasettype",
"pk": 5,
"fields": {
"name": "Airborne Observational",
"name": "WindBorne Observational",
"description": "",
"type": "historical",
"variable_name": "default"
"variable_name": "windborne_observational"
}
},
{
Expand Down
168 changes: 104 additions & 64 deletions django_project/gap/ingestor/wind_borne_systems.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

PROVIDER = 'WindBorne Systems'
STATION_TYPE = 'Balloon'
DATASET_TYPE = 'Airborne Observational'
DATASET_TYPE = 'WindBorne Observational'
DATASET_NAME = 'WindBorne Balloons Observations'
USERNAME_ENV_NAME = 'WIND_BORNE_SYSTEMS_USERNAME'
PASSWORD_ENV_NAME = 'WIND_BORNE_SYSTEMS_PASSWORD'
Expand All @@ -44,11 +44,12 @@
if not self.password:
raise EnvIsNotSetException(PASSWORD_ENV_NAME)

def measurements(self, since=None) -> (list, int, bool):
def measurements(self, mission_id, since=None) -> (list, int, bool):
"""Return measurements, since and has_next_page."""
params = {
'include_ids': True,
'include_mission_name': True
'include_mission_name': True,
'mission_id': mission_id
}
if since:
params['since'] = since
Expand All @@ -67,6 +68,18 @@
f'{response.status_code}: {response.text} : {response.url}'
)

def missions(self) -> list:
"""Return missions."""
response = requests.get(
f'{self.base_url}/missions.json',
auth=HTTPBasicAuth(self.username, self.password)
)
if response.status_code == 200:
return response.json()['missions']
raise Exception(

Check warning on line 79 in django_project/gap/ingestor/wind_borne_systems.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/ingestor/wind_borne_systems.py#L79

Added line #L79 was not covered by tests
f'{response.status_code}: {response.text} : {response.url}'
)


class WindBorneSystemsIngestor(BaseIngestor):
"""Ingestor for WindBorneSystems."""
Expand Down Expand Up @@ -96,68 +109,95 @@
for dataset_attr in self.dataset.datasetattribute_set.all():
self.attributes[dataset_attr.source] = dataset_attr

def mission_ids(self, api: WindBorneSystemsAPI):
"""Mission ids."""
mission_ids = list(
Station.objects.filter(
provider=self.provider,
station_type=self.station_type
).values_list('code', flat=True)
)
# From API
mission_ids.extend([mission['id'] for mission in api.missions()])
return list(set(mission_ids))

def run(self):
"""Run the ingestor."""
api = WindBorneSystemsAPI()
additional_config = self.session.additional_config
global_since = additional_config.get('since', None)

# Run for every mission id
missions = self.mission_ids(api)
for mission_id in missions:
print(f'Checking mission {mission_id}')

has_next_page = True
since = self.session.additional_config.get('since', None)
while has_next_page:
observations, since, has_next_page = api.measurements(since)

# Process if it has observations
if len(observations):
for observation in observations:
# Get date time
date_time = datetime.fromtimestamp(
observation['timestamp']
)
date_time = timezone.make_aware(
date_time, timezone.get_default_timezone()
)

# Points
point = Point(
x=observation['longitude'],
y=observation['latitude'],
srid=4326
)
station, _ = Station.objects.update_or_create(
provider=self.provider,
station_type=self.station_type,
code=observation['mission_id'],
defaults={
'name': observation['mission_name'],
'geometry': point,
'altitude': observation['altitude'],
}
)
StationHistory.objects.update_or_create(
station=station,
date_time=date_time,
defaults={
'geometry': point,
'altitude': observation['altitude'],
}
)

# Save the measurements
for variable, dataset_attribute in self.attributes.items():
try:
value = observation[variable]
if value is not None:
Measurement.objects.update_or_create(
station=station,
dataset_attribute=dataset_attribute,
date_time=date_time,
defaults={
'value': observation[variable]
}
)
except KeyError:
pass
# Save last since
self.session.additional_config = {
'since': since,
}
self.session.save()
mission_since_key = f'since-{mission_id}'
mission_since = additional_config.get(
mission_since_key, None
)
since = mission_since if mission_since else global_since

has_next_page = True
while has_next_page:
observations, since, has_next_page = api.measurements(
mission_id, since
)

# Process if it has observations
if len(observations):
for observation in observations:
# Get date time
date_time = datetime.fromtimestamp(
observation['timestamp']
)
date_time = timezone.make_aware(
date_time, timezone.get_default_timezone()
)

# Points
point = Point(
x=observation['longitude'],
y=observation['latitude'],
srid=4326
)
station, _ = Station.objects.update_or_create(
provider=self.provider,
station_type=self.station_type,
code=observation['mission_id'],
defaults={
'name': observation['mission_name'],
'geometry': point,
'altitude': observation['altitude'],
}
)
history, _ = StationHistory.objects.update_or_create(
station=station,
date_time=date_time,
defaults={
'geometry': point,
'altitude': observation['altitude'],
}
)

# Save the measurements
for variable, attribute in self.attributes.items():
try:
value = observation[variable]
if value is not None:
Measurement.objects.update_or_create(
station=station,
dataset_attribute=attribute,
date_time=date_time,
defaults={
'value': observation[variable],
'station_history': history
}
)
except KeyError:
pass

Check warning on line 198 in django_project/gap/ingestor/wind_borne_systems.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/ingestor/wind_borne_systems.py#L197-L198

Added lines #L197 - L198 were not covered by tests

# Save last since for mission
additional_config[mission_since_key] = since
self.session.additional_config = additional_config
self.session.save()
19 changes: 19 additions & 0 deletions django_project/gap/migrations/0031_measurement_station_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 4.2.7 on 2024-10-17 09:52

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('gap', '0030_preferences_ingestor_config'),
]

operations = [
migrations.AddField(
model_name='measurement',
name='station_history',
field=models.ForeignKey(blank=True, help_text='Station history of the measurement.', null=True, on_delete=django.db.models.deletion.CASCADE, to='gap.stationhistory'),
),
]
11 changes: 10 additions & 1 deletion django_project/gap/models/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from core.models.common import Definition
from gap.models.common import Unit
from gap.models.dataset import Dataset
from gap.models.station import Station
from gap.models.station import Station, StationHistory


class Attribute(Definition):
Expand Down Expand Up @@ -77,6 +77,15 @@ class Measurement(models.Model):
date_time = models.DateTimeField()
value = models.FloatField()

# Specifically measurement is linked to a station history
station_history = models.ForeignKey(
StationHistory, on_delete=models.CASCADE,
help_text=(
'Station history of the measurement.'
),
null=True, blank=True
)

def __str__(self):
return f'{self.date_time} - {self.value}'

Expand Down
10 changes: 8 additions & 2 deletions django_project/gap/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
.. note:: Helper for reading NetCDF File
"""

from gap.ingestor.wind_borne_systems import PROVIDER
from gap.models import Dataset, DatasetStore
from gap.utils.netcdf import NetCDFProvider
from gap.providers.airborne_observation import ObservationAirborneDatasetReader
from gap.providers.cbam import CBAMZarrReader, CBAMNetCDFReader # noqa
from gap.providers.salient import SalientNetCDFReader, SalientZarrReader # noqa
from gap.providers.observation import ObservationDatasetReader
from gap.providers.salient import (
SalientNetCDFReader, SalientZarrReader
) # noqa
from gap.providers.tio import (
TomorrowIODatasetReader,
PROVIDER_NAME as TIO_PROVIDER,
TioZarrReader
)
from gap.utils.netcdf import NetCDFProvider


def get_reader_from_dataset(dataset: Dataset):
Expand All @@ -32,6 +36,8 @@ def get_reader_from_dataset(dataset: Dataset):
return SalientZarrReader
elif dataset.provider.name in ['Tahmo', 'Arable']:
return ObservationDatasetReader
elif dataset.provider.name in [PROVIDER]:
return ObservationAirborneDatasetReader
elif (
dataset.provider.name == TIO_PROVIDER and
dataset.store_type == DatasetStore.EXT_API
Expand Down
Loading
Loading