Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GTFS_RT ingestion pipeline for older files #156

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions python_src/src/lamp_py/ingestion/convert_gtfs_rt.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,19 @@ def gz_to_pyarrow(
file_system = current_thread().__dict__["file_system"]
filename = filename.replace("s3://", "")

with file_system.open_input_stream(filename) as file:
json_data = json.load(file)
# some of our older files are named incorrectly, with a simple
# .json suffix rather than a .json.gz suffix. in those cases, the
# s3 open_input_stream is unable to deduce the correct compression
# algo and fails with a UnicodeDecodeError. catch this failure and
# retry using a gzip compression algo. (EAFP Style)
try:
with file_system.open_input_stream(filename) as file:
json_data = json.load(file)
except UnicodeDecodeError as _:
with file_system.open_input_stream(
filename, compression="gzip"
) as file:
json_data = json.load(file)

# Create empty 'table' as dict of lists for export schema
table = self.detail.empty_table()
Expand Down
11 changes: 11 additions & 0 deletions python_src/src/lamp_py/ingestion/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,29 @@ def from_filename(cls, filename: str) -> ConfigType:
# disable too many returns error message
if "mbta.com_realtime_Alerts_enhanced" in filename:
return cls.RT_ALERTS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added spacing between blocks to make it read a little easier.

if "mbta.com_realtime_TripUpdates_enhanced" in filename:
return cls.RT_TRIP_UPDATES
if "concentrate_TripUpdates_enhanced.json" in filename:
return cls.RT_TRIP_UPDATES

if "mbta.com_realtime_VehiclePositions_enhanced" in filename:
return cls.RT_VEHICLE_POSITIONS
if "concentrate_VehiclePositions_enhanced.json" in filename:
return cls.RT_VEHICLE_POSITIONS

if "com_prod_TripUpdates_enhanced" in filename:
return cls.BUS_TRIP_UPDATES

if "com_prod_VehiclePositions_enhanced" in filename:
return cls.BUS_VEHICLE_POSITIONS

if "net_vehicleCount" in filename:
return cls.VEHICLE_COUNT

if "MBTA_GTFS.zip" in filename:
return cls.SCHEDULE

if "LightRailRawGPS" in filename:
return cls.LIGHT_RAIL

Expand Down
29 changes: 27 additions & 2 deletions python_src/tests/ingestion/test_gtfs_rt_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ def test_bad_conversion_local() -> None:
)
converter.add_files(["badfile"])

print("YOLO")

# process the bad file and get the table out
for _ in converter.process_files():
assert False, "Generated Table for s3 badfile"
Expand Down Expand Up @@ -156,6 +154,20 @@ def test_vehicle_positions_file_conversion() -> None:
if lower != "nan":
assert lower == str(np_df[col].min())

# check that we are able to handle older vp files from 16 sept 2019 - 4 march 2020
old_gtfs_rt_file = os.path.join(
incoming_dir,
"2019-12-12T00_00_10_https___mbta_gtfs_s3_dev.s3.amazonaws.com_concentrate_VehiclePositions_enhanced.json",
)
timestamp, filename, table = converter.gz_to_pyarrow(old_gtfs_rt_file)

assert timestamp.month == 12
assert timestamp.year == 2019
assert timestamp.day == 12

assert table.shape[0] > 0
assert table.shape[1] > 0


def test_rt_alert_file_conversion() -> None:
"""
Expand Down Expand Up @@ -300,6 +312,19 @@ def test_rt_trip_file_conversion() -> None:
if lower != "nan":
assert lower == str(np_df[col].min())

# check that we are able to handle older vp files from 16 sept 2019 - 4 march 2020
old_gtfs_rt_file = os.path.join(
incoming_dir,
"2019-12-12T00_00_57_https___mbta_gtfs_s3_dev.s3.amazonaws.com_concentrate_TripUpdates_enhanced.json",
)
timestamp, filename, table = converter.gz_to_pyarrow(old_gtfs_rt_file)
assert timestamp.month == 12
assert timestamp.year == 2019
assert timestamp.day == 12

assert table.shape[0] > 0
assert table.shape[1] > 0


def test_bus_vehicle_positions_file_conversion() -> None:
"""
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Loading