-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Update Ingestion to write Flat Parquet Files (#169)
Previously, we were using custom python business logic to flatten certain fields of GTFS-RT and BUSLOC feed data into parquet tables for writing to S3. This change updates that business logic to use built-in pyarrow functions for all flattening operations. Additionally, this change adds functionality for pyarrow to explode list-type fields to create individual table records for each list entry. As a side effect, our resulting parquet file schemas have changed significantly. Because of these schema changes, the performance_manager application required updates to how it ingests our S3 parquet files. The changes also required the updating of a large amount of our test-file infrastructure. Asana Task: https://app.asana.com/0/1204931901750675/1205499714216016
- Loading branch information
Showing
35 changed files
with
747 additions
and
1,079 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
from typing import List, Tuple | ||
import pyarrow | ||
|
||
from .gtfs_rt_detail import GTFSRTDetail | ||
from .gtfs_rt_structs import ( | ||
trip_descriptor, | ||
vehicle_descriptor, | ||
stop_time_event, | ||
) | ||
|
||
|
||
class RtBusTripDetail(GTFSRTDetail): | ||
""" | ||
Detail for how to convert RT GTFS Trip Updates from json entries into | ||
parquet tables. | ||
""" | ||
|
||
def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: | ||
"""modify table schema before write to parquet""" | ||
return self.flatten_schema( | ||
self.explode_table_column( | ||
self.flatten_schema(table), "trip_update.stop_time_update" | ||
) | ||
) | ||
|
||
@property | ||
def import_schema(self) -> pyarrow.schema: | ||
return pyarrow.schema( | ||
[ | ||
("id", pyarrow.string()), | ||
( | ||
"trip_update", | ||
pyarrow.struct( | ||
[ | ||
( | ||
"timestamp", | ||
pyarrow.uint64(), | ||
), # Not currently provided by Busloc | ||
( | ||
"delay", | ||
pyarrow.int32(), | ||
), # Not currently provided by Busloc | ||
( | ||
"trip", | ||
trip_descriptor, | ||
), # Busloc currently only provides trip_id, route_id and schedule_relationship | ||
( | ||
"vehicle", | ||
vehicle_descriptor, | ||
), # Busloc currently only provides id and label | ||
( | ||
"stop_time_update", | ||
pyarrow.list_( | ||
pyarrow.struct( | ||
[ | ||
("stop_sequence", pyarrow.uint32()), | ||
("stop_id", pyarrow.string()), | ||
("arrival", stop_time_event), | ||
("departure", stop_time_event), | ||
( | ||
"schedule_relationship", | ||
pyarrow.string(), | ||
), | ||
("cause_id", pyarrow.uint16()), | ||
( | ||
"cause_description", | ||
pyarrow.string(), | ||
), | ||
("remark", pyarrow.string()), | ||
] | ||
) | ||
), | ||
), | ||
] | ||
), | ||
), | ||
] | ||
) | ||
|
||
@property | ||
def table_sort_order(self) -> List[Tuple[str, str]]: | ||
return [ | ||
("trip_update.trip.start_date", "ascending"), | ||
("trip_update.trip.route_pattern_id", "ascending"), | ||
("trip_update.trip.route_id", "ascending"), | ||
("trip_update.trip.direction_id", "ascending"), | ||
("trip_update.vehicle.id", "ascending"), | ||
("feed_timestamp", "ascending"), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
from typing import List, Tuple | ||
import pyarrow | ||
|
||
from .gtfs_rt_detail import GTFSRTDetail | ||
from .gtfs_rt_structs import ( | ||
position, | ||
vehicle_descriptor, | ||
trip_descriptor, | ||
) | ||
|
||
|
||
class RtBusVehicleDetail(GTFSRTDetail): | ||
""" | ||
Detail for how to convert RT GTFS Bus Vehicle Positions from json | ||
entries into parquet tables. | ||
""" | ||
|
||
@property | ||
def import_schema(self) -> pyarrow.schema: | ||
return pyarrow.schema( | ||
[ | ||
("id", pyarrow.string()), | ||
("is_deleted", pyarrow.bool_()), | ||
( | ||
"vehicle", | ||
pyarrow.struct( | ||
[ | ||
("position", position), | ||
("location_source", pyarrow.string()), | ||
("timestamp", pyarrow.uint64()), | ||
("trip", trip_descriptor), | ||
("vehicle", vehicle_descriptor), | ||
( | ||
"operator", | ||
pyarrow.struct( | ||
[ | ||
("id", pyarrow.string()), | ||
("first_name", pyarrow.string()), | ||
("last_name", pyarrow.string()), | ||
("name", pyarrow.string()), | ||
("logon_time", pyarrow.uint64()), | ||
] | ||
), | ||
), | ||
("block_id", pyarrow.string()), | ||
("run_id", pyarrow.string()), | ||
("stop_id", pyarrow.string()), | ||
("current_stop_sequence", pyarrow.uint32()), | ||
("revenue", pyarrow.bool_()), | ||
("current_status", pyarrow.string()), | ||
("load", pyarrow.uint16()), | ||
("capacity", pyarrow.uint16()), | ||
("occupancy_percentage", pyarrow.uint16()), | ||
("occupancy_status", pyarrow.string()), | ||
] | ||
), | ||
), | ||
] | ||
) | ||
|
||
@property | ||
def table_sort_order(self) -> List[Tuple[str, str]]: | ||
return [ | ||
("vehicle.trip.start_date", "ascending"), | ||
("vehicle.trip.route_id", "ascending"), | ||
("vehicle.block_id", "ascending"), | ||
("vehicle.vehicle.id", "ascending"), | ||
("feed_timestamp", "ascending"), | ||
] |
Oops, something went wrong.