Skip to content

Commit

Permalink
Tahmo ingestor (#17)
Browse files Browse the repository at this point in the history
* Add country ingestor

* Add code to the station

* Fix flake

* Add Ingestor Session

* Add Tahmo ingestor

* Add ingestor to worker

* Add missing docstring

* Disable celery on tests

* Rename field

* Fix tests
  • Loading branch information
meomancer authored Jul 2, 2024
1 parent 80ddd22 commit a548ab9
Show file tree
Hide file tree
Showing 25 changed files with 687 additions and 140 deletions.
1 change: 1 addition & 0 deletions deployment/docker-compose.override.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,6 @@ services:
links:
- db
- redis
- worker
- celery_beat
entrypoint: [ ]
2 changes: 1 addition & 1 deletion deployment/docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ services:
- DATABASE_PASSWORD=docker
- DATABASE_HOST=db
- RABBITMQ_HOST=rabbitmq
- DJANGO_SETTINGS_MODULE=core.settings.dev
- DJANGO_SETTINGS_MODULE=core.settings.test
- SECRET_KEY=SECRET_KEY

# Redis config
Expand Down
15 changes: 4 additions & 11 deletions deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ volumes:
nginx-cache:
backups-data:
data-volume:
redis-data:

x-common-django:
&default-common-django
Expand Down Expand Up @@ -40,6 +41,8 @@ services:
image: bitnami/redis:7.0.2
environment:
- REDIS_PASSWORD=${REDIS_PASSWORD:-redis_password}
volumes:
- redis-data:/bitnami/redis/data

db:
image: kartoza/postgis:14-3.3
Expand Down Expand Up @@ -94,14 +97,4 @@ services:
- media-data:/home/web/media
- nginx-cache:/home/web/nginx_cache
links:
- django

dev:
<<: *default-common-django
entrypoint: [ ]
volumes:
- static-data:/home/web/static
- media-data:/home/web/media
links:
- db
- redis
- django
2 changes: 1 addition & 1 deletion django_project/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
f'@{os.environ.get("REDIS_HOST", "")}',
)

app = Celery('georepo')
app = Celery('GAP')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
Expand Down
18 changes: 18 additions & 0 deletions django_project/core/settings/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# coding=utf-8
"""
Tomorrow Now GAP.
.. note:: Project level settings.
"""

from .prod import * # noqa

TEST_RUNNER = 'core.tests.runner.CustomTestRunner'
DEBUG = True

# Disable caching while in development
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
}
}
33 changes: 33 additions & 0 deletions django_project/core/tests/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# coding=utf-8
"""
Tomorrow Now GAP.
.. note:: Test runner.
"""

from core.celery import app as celery_app
from django.conf import settings
from django.test.runner import DiscoverRunner


class CustomTestRunner(DiscoverRunner):
"""Postgres schema test runner."""

@staticmethod
def __disable_celery():
"""Disabling celery."""
settings.CELERY_BROKER_URL = \
celery_app.conf.BROKER_URL = 'filesystem:///dev/null/'
celery_app.conf.task_always_eager = True
data = {
'data_folder_in': '/tmp',
'data_folder_out': '/tmp',
'data_folder_processed': '/tmp',
}
settings.BROKER_TRANSPORT_OPTIONS = \
celery_app.conf.BROKER_TRANSPORT_OPTIONS = data

def setup_test_environment(self, **kwargs):
"""Prepare test env."""
CustomTestRunner.__disable_celery()
super(CustomTestRunner, self).setup_test_environment(**kwargs)
14 changes: 12 additions & 2 deletions django_project/gap/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.contrib import admin

from .models import (
Attribute, Country, Provider, Measurement, Station
Attribute, Country, Provider, Measurement, Station, IngestorSession
)


Expand Down Expand Up @@ -46,7 +46,7 @@ class MeasurementAdmin(admin.ModelAdmin):
"""Measurement admin."""

list_display = (
'station', 'attribute', 'date', 'value'
'station', 'attribute', 'date_time', 'value'
)
list_filter = ('station', 'attribute')
search_fields = ('name',)
Expand All @@ -61,3 +61,13 @@ class StationAdmin(admin.ModelAdmin):
)
list_filter = ('provider', 'country')
search_fields = ('code', 'name')


@admin.register(IngestorSession)
class IngestorSessionAdmin(admin.ModelAdmin):
"""IngestorSession admin."""

list_display = (
'run_at', 'status', 'end_at', 'ingestor_type'
)
list_filter = ('ingestor_type', 'status')
5 changes: 4 additions & 1 deletion django_project/gap/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class Meta: # noqa
name = factory.Sequence(
lambda n: f'station-{n}'
)
code = factory.Sequence(
lambda n: f'code-{n}'
)
country = factory.SubFactory(CountryFactory)
geometry = factory.LazyFunction(lambda: Point(0, 0))
provider = factory.SubFactory(ProviderFactory)
Expand All @@ -104,5 +107,5 @@ class Meta: # noqa

station = factory.SubFactory(StationFactory)
attribute = factory.SubFactory(AttributeFactory)
date = factory.Faker('date')
date_time = factory.Faker('date_time')
value = factory.Faker('pyfloat')
Empty file.
14 changes: 14 additions & 0 deletions django_project/gap/ingestor/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# coding=utf-8
"""
Tomorrow Now GAP.
.. note:: Exceptions for ingestor.
"""


class FileNotFoundException(Exception):
"""File not found."""

def __init__(self): # noqa
self.message = 'File not found.'
super().__init__(self.message)
178 changes: 178 additions & 0 deletions django_project/gap/ingestor/tahmo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# coding=utf-8
"""
Tomorrow Now GAP.
.. note:: Tahmo ingestor.
"""

import csv
import json
import os
import shutil
from datetime import datetime, timezone
from zipfile import ZipFile

from django.contrib.gis.geos import Point

from gap.ingestor.exceptions import FileNotFoundException
from gap.models import (
Provider, Station, ObservationType, Country, IngestorSession,
Attribute, Measurement
)


class TahmoVariable:
"""Contains Tahmo Variable."""

def __init__(self, name, unit=None):
"""Initialize the Tahmo variable."""
self.name = name
self.unit = unit


TAHMO_VARIABLES = {
'ap': TahmoVariable('Atmospheric pressure'),
'pr': TahmoVariable('Precipitation', 'mm'),
'rh': TahmoVariable('Relative humidity'),
'ra': TahmoVariable('Shortwave radiation', 'W/m2'),
'te': TahmoVariable('Surface air temperature', '°C'),
'wd': TahmoVariable('Wind direction', 'Degrees from North'),
'wg': TahmoVariable('Wind gust', 'm/s'),
'ws': TahmoVariable('Wind speed', 'm/s')
}


class TahmoIngestor:
"""Ingestor for tahmo data."""

def __init__(self, session: IngestorSession):
"""Initialize the ingestor."""
self.session = session

self.provider, _ = Provider.objects.get_or_create(
name='Tahmo'
)
self.obs_type, _ = ObservationType.objects.get_or_create(
name='Ground Observations'
)

def _run(self, dir_path):
"""Run the ingestor."""
# Data is coming from CSV.
# CSV headers:
# - longitude
# - latitude
# - station code
# - name

# INGEST STATIONS
for (dirpath, dirnames, filenames) in os.walk(dir_path):
for filename in filenames:
try:
reader = csv.DictReader(
open(os.path.join(dirpath, filename), 'r')
)
if 'station' in filename:
for data in reader:
try:
point = Point(
x=float(data['longitude']),
y=float(data['latitude']),
srid=4326
)
try:
country = Country.get_countries_by_point(
point
)[0]
except IndexError:
country = None
Station.objects.get_or_create(
code=data['station code'],
provider=self.provider,
defaults={
'name': data['name'],
'geometry': point,
'country': country,
'observation_type': self.obs_type,
}
)
except KeyError as e:
raise Exception(
json.dumps({
'filename': filename,
'data': data,
'error': f'{e}'
})
)
except UnicodeDecodeError:
continue

# INGEST MEASUREMENTS
for (dirpath, dirnames, filenames) in os.walk(dir_path):
for filename in filenames:
code = filename.split('_')[0]
try:
station = Station.objects.get(
code=code, provider=self.provider
)
reader = csv.DictReader(
open(os.path.join(dirpath, filename), 'r')
)
for data in reader:
date = data[''] # noqa
if not date:
continue
date_time = datetime.strptime(
date, '%Y-%m-%d %H:%M'
).replace(tzinfo=timezone.utc)
date_time.replace(second=0)
for key, value in data.items(): # noqa
try:
attr_var = TAHMO_VARIABLES[key]
except KeyError:
continue
try:
attribute, _ = Attribute.objects.get_or_create(
name=attr_var.name
)
try:
unit = attr_var.unit
except KeyError:
unit = None
measure, _ = Measurement.objects.get_or_create(
station=station,
attribute=attribute,
date_time=date_time,
defaults={
'unit': unit,
'value': float(value)
}
)
except (KeyError, ValueError) as e:
raise Exception(
json.dumps({
'filename': filename,
'data': data,
'error': f'{e}'
})
)
except Station.DoesNotExist:
pass

def run(self):
"""Run the ingestor."""
if not self.session.file:
raise FileNotFoundException()

# Extract file
dir_path = os.path.splitext(self.session.file.path)[0]
with ZipFile(self.session.file.path, 'r') as zip_ref:
zip_ref.extractall(dir_path)

# Run the ingestion
try:
self._run(dir_path)
shutil.rmtree(dir_path)
except Exception as e:
shutil.rmtree(dir_path)
raise Exception(e)
5 changes: 4 additions & 1 deletion django_project/gap/management/commands/load_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from django.core.management import call_command
from django.core.management.base import BaseCommand

from gap.models import Country


class Command(BaseCommand):
"""Command to load fixtures."""
Expand All @@ -16,4 +18,5 @@ class Command(BaseCommand):

def handle(self, *args, **options):
"""Handle load fixtures."""
call_command('loaddata', 'gap/fixtures/1.country.json')
if Country.objects.count() == 0:
call_command('loaddata', 'gap/fixtures/1.country.json')
Loading

0 comments on commit a548ab9

Please sign in to comment.