Skip to content

Commit

Permalink
write flat parquet files to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
rymarczy committed Sep 26, 2023
1 parent 00001fb commit 3fa0f08
Show file tree
Hide file tree
Showing 35 changed files with 712 additions and 1,077 deletions.
64 changes: 36 additions & 28 deletions python_src/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions python_src/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ snapshot = 'lamp_py.postgres.snapshot:run'
[tool.poetry.dependencies]
python = "^3.9"
SQLAlchemy = "^1.4.39"
pyarrow = "^11.0.0"
pyarrow = "^13.0.0"
boto3 = "^1.23.3"
pandas = "^1.4.3"
numpy = "^1.23.1"
Expand Down Expand Up @@ -77,6 +77,6 @@ max-line-length = 80
min-similarity-lines = 10
# ignore session maker as it gives pylint fits
# https://github.com/PyCQA/pylint/issues/7090
ignored-classes = ['sqlalchemy.orm.session.sessionmaker']
ignored-classes = ['sqlalchemy.orm.session.sessionmaker','pyarrow.compute']
# ignore the migrations directory. its going to have duplication and _that is ok_.
ignore-paths = ["^src/lamp_py/migrations/.*$"]
89 changes: 89 additions & 0 deletions python_src/src/lamp_py/ingestion/config_busloc_trip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import List, Tuple
import pyarrow

from .gtfs_rt_detail import GTFSRTDetail
from .gtfs_rt_structs import (
trip_descriptor,
vehicle_descriptor,
stop_time_event,
)


class RtBusTripDetail(GTFSRTDetail):
"""
Detail for how to convert RT GTFS Trip Updates from json entries into
parquet tables.
"""

def transform_for_write(self, table: pyarrow.table) -> pyarrow.table:
"""modify table schema before write to parquet"""
return self.flatten_schema(
self.explode_table_column(
self.flatten_schema(table), "trip_update.stop_time_update"
)
)

@property
def import_schema(self) -> pyarrow.schema:
return pyarrow.schema(
[
("id", pyarrow.string()),
(
"trip_update",
pyarrow.struct(
[
(
"timestamp",
pyarrow.uint64(),
), # Not currently provided by Busloc
(
"delay",
pyarrow.int32(),
), # Not currently provided by Busloc
(
"trip",
trip_descriptor,
), # Busloc currently only provides trip_id, route_id and schedule_relationship
(
"vehicle",
vehicle_descriptor,
), # Busloc currently only provides id and label
(
"stop_time_update",
pyarrow.list_(
pyarrow.struct(
[
("stop_sequence", pyarrow.uint32()),
("stop_id", pyarrow.string()),
("arrival", stop_time_event),
("departure", stop_time_event),
(
"schedule_relationship",
pyarrow.string(),
),
("cause_id", pyarrow.uint16()),
(
"cause_description",
pyarrow.string(),
),
("remark", pyarrow.string()),
]
)
),
),
]
),
),
]
)

@property
def table_sort_order(self) -> List[Tuple[str, str]]:
return [
("trip_update.trip.start_date", "ascending"),
("trip_update.trip.route_pattern_id", "ascending"),
("trip_update.trip.route_id", "ascending"),
("trip_update.trip.direction_id", "ascending"),
("trip_update.vehicle.id", "ascending"),
("feed_timestamp", "ascending"),
]
69 changes: 69 additions & 0 deletions python_src/src/lamp_py/ingestion/config_busloc_vehicle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import List, Tuple
import pyarrow

from .gtfs_rt_detail import GTFSRTDetail
from .gtfs_rt_structs import (
position,
vehicle_descriptor,
trip_descriptor,
)


class RtBusVehicleDetail(GTFSRTDetail):
"""
Detail for how to convert RT GTFS Bus Vehicle Positions from json
entries into parquet tables.
"""

@property
def import_schema(self) -> pyarrow.schema:
return pyarrow.schema(
[
("id", pyarrow.string()),
("is_deleted", pyarrow.bool_()),
(
"vehicle",
pyarrow.struct(
[
("position", position),
("location_source", pyarrow.string()),
("timestamp", pyarrow.uint64()),
("trip", trip_descriptor),
("vehicle", vehicle_descriptor),
(
"operator",
pyarrow.struct(
[
("id", pyarrow.string()),
("first_name", pyarrow.string()),
("last_name", pyarrow.string()),
("name", pyarrow.string()),
("logon_time", pyarrow.uint64()),
]
),
),
("block_id", pyarrow.string()),
("run_id", pyarrow.string()),
("stop_id", pyarrow.string()),
("current_stop_sequence", pyarrow.uint32()),
("revenue", pyarrow.bool_()),
("current_status", pyarrow.string()),
("load", pyarrow.uint16()),
("capacity", pyarrow.uint16()),
("occupancy_percentage", pyarrow.uint16()),
("occupancy_status", pyarrow.string()),
]
),
),
]
)

@property
def table_sort_order(self) -> List[Tuple[str, str]]:
return [
("vehicle.trip.start_date", "ascending"),
("vehicle.trip.route_id", "ascending"),
("vehicle.block_id", "ascending"),
("vehicle.vehicle.id", "ascending"),
("feed_timestamp", "ascending"),
]
Loading

0 comments on commit 3fa0f08

Please sign in to comment.