-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add testing for Missing Service Dates
Service Dates are being added shortly after reading gtfs-rt parquet files, both for trip updates and vehicle positions. Create a new test suite for testing them, named after the l0_gtfs-rt_events file that calls them. Adding these tests and a new test suite required a bit of a shuffle to our testing directory. * a new csv file was added for vp gtfs-rt data that is missing some service dates * a new parquet file was added for tu gtfs-rt data that is missing some service dates (i coulndn't figure out how to get the csv strat to work here) * the pytest fixture for reading local parquet files is used by both tests, so it had to be moved into a new, top level `conftest.py` file. * similarly, the csv -> vp parquet functionality is needed by both and was moved into `test_resources.py` * lastly, a failing test in performance manager tests was corrected.
- Loading branch information
1 parent
e92148e
commit 807ff0e
Showing
7 changed files
with
218 additions
and
58 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
""" | ||
this file contains fixtures that are intended to be used across multiple test | ||
files | ||
""" | ||
|
||
from typing import ( | ||
Iterator, | ||
List, | ||
Optional, | ||
Sequence, | ||
Tuple, | ||
Union, | ||
) | ||
|
||
import pytest | ||
from _pytest.monkeypatch import MonkeyPatch | ||
from pyarrow import fs, parquet, Table | ||
|
||
|
||
@pytest.fixture(autouse=True, name="get_pyarrow_table_patch") | ||
def fixture_get_pyarrow_table_patch(monkeypatch: MonkeyPatch) -> Iterator[None]: | ||
""" | ||
the aws.s3 function `_get_pyarrow_table` function reads parquet files from | ||
s3 and returns a pyarrow table. when testing on our github machines, we | ||
don't have access to s3, so all tests must be run against local files. | ||
monkeypatch the function to read from a local filepath. | ||
""" | ||
|
||
def mock__get_pyarrow_table( | ||
filename: Union[str, List[str]], | ||
filters: Optional[Union[Sequence[Tuple], Sequence[List[Tuple]]]] = None, | ||
) -> Table: | ||
active_fs = fs.LocalFileSystem() | ||
|
||
if isinstance(filename, list): | ||
to_load = filename | ||
else: | ||
to_load = [filename] | ||
|
||
if len(to_load) == 0: | ||
return Table.from_pydict({}) | ||
|
||
return parquet.ParquetDataset( | ||
to_load, filesystem=active_fs, filters=filters | ||
).read_pandas() | ||
|
||
monkeypatch.setattr( | ||
"lamp_py.aws.s3._get_pyarrow_table", mock__get_pyarrow_table | ||
) | ||
|
||
yield |
106 changes: 106 additions & 0 deletions
106
python_src/tests/performance_manager/test_l0_gtfs_rt_events.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import os | ||
import pathlib | ||
|
||
from lamp_py.performance_manager.l0_rt_vehicle_positions import ( | ||
get_vp_dataframe, | ||
transform_vp_datatypes, | ||
) | ||
from lamp_py.performance_manager.l0_rt_trip_updates import ( | ||
get_and_unwrap_tu_dataframe, | ||
) | ||
from lamp_py.performance_manager.gtfs_utils import ( | ||
add_missing_service_dates, | ||
service_date_from_timestamp, | ||
) | ||
|
||
from ..test_resources import test_files_dir, csv_to_vp_parquet | ||
|
||
|
||
def test_service_date_from_timestamp() -> None: | ||
""" | ||
test that the service date from timestamp function correctly handles | ||
timestamps around the threshold when the service date switches over. | ||
""" | ||
dst_expected = { | ||
# dst started on 8 march 2020, the clock goes from 1:59 -> 3:00 | ||
20200307: [ | ||
1583650200, # 1:50 am | ||
1583650740, # 1:59 am | ||
1583650799, # 1:59:59 am | ||
], | ||
20200308: [ | ||
1583650800, # 3:00 am | ||
1583651400, # 3:10 am | ||
], | ||
# dst ended on 1 nov 2020, the clock goes from 2:00 -> 1:00 | ||
20201031: [ | ||
1604209800, # 1:50 am | ||
1604210340, # 1:59 am | ||
1604210399, # 1:59:59 am | ||
1604210400, # 1:00 am (second time) | ||
1604214000, # 2:00 am | ||
1604214000, # 2:00 am | ||
1604217000, # 2:50 am | ||
1604217540, # 2:59 am | ||
1604217599, # 2:59:59 am | ||
], | ||
20201101: [ | ||
1604217600, # 3:00 am | ||
1604218200, # 3:10 am | ||
], | ||
} | ||
|
||
for service_date, timestamps in dst_expected.items(): | ||
for timestamp in timestamps: | ||
assert service_date == service_date_from_timestamp(timestamp) | ||
|
||
|
||
def test_vp_missing_service_date(tmp_path: pathlib.Path) -> None: | ||
""" | ||
test that missing service dates in gtfs-rt vehicle position files can be | ||
correctly backfilled. | ||
""" | ||
csv_file = os.path.join(test_files_dir, "vp_missing_start_date.csv") | ||
|
||
parquet_folder = tmp_path.joinpath( | ||
"RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=11" | ||
) | ||
parquet_folder.mkdir(parents=True) | ||
parquet_file = str(parquet_folder.joinpath("flat_file.parquet")) | ||
|
||
csv_to_vp_parquet(csv_file, parquet_file) | ||
|
||
events = get_vp_dataframe(to_load=[parquet_file], route_ids=["Blue"]) | ||
events = transform_vp_datatypes(events) | ||
|
||
# ensure that there are NaN service dates | ||
assert events["service_date"].hasnans | ||
|
||
# add the service dates that are missing | ||
events = add_missing_service_dates( | ||
events, timestamp_key="vehicle_timestamp" | ||
) | ||
|
||
# check that new service dates match existing and are numbers | ||
assert len(events["service_date"].unique()) == 1 | ||
assert not events["service_date"].hasnans | ||
|
||
|
||
def test_tu_missing_service_date() -> None: | ||
""" | ||
test that trip update gtfs data with missing service dates can be processed | ||
correctly. | ||
""" | ||
parquet_file = os.path.join(test_files_dir, "tu_missing_start_date.parquet") | ||
events = get_and_unwrap_tu_dataframe([parquet_file], route_ids=["Blue"]) | ||
|
||
# check that NaN service dates exist from reading the file | ||
assert events["service_date"].hasnans | ||
|
||
events = add_missing_service_dates( | ||
events_dataframe=events, timestamp_key="timestamp" | ||
) | ||
|
||
# check that all service dates exist and are the same | ||
assert not events["service_date"].hasnans | ||
assert len(events["service_date"].unique()) == 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
current_status,current_stop_sequence,stop_id,vehicle_timestamp,direction_id,route_id,start_date,start_time,vehicle_id,trip_id,vehicle_label,vehicle_consist | ||
STOPPED_AT,1,70059,1683547153,0,Blue,,07:38:00,B-54768A0A,55458882,0713, | ||
INCOMING_AT,10,70057,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, | ||
STOPPED_AT,10,70057,1683547246,0,Blue,,07:38:00,B-54768A0A,55458882,0713, | ||
INCOMING_AT,20,70055,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, | ||
IN_TRANSIT_TO,20,70055,1683547299,0,Blue,,07:38:00,B-54768A0A,55458882,0713, | ||
STOPPED_AT,20,70055,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, | ||
INCOMING_AT,30,70053,1683547429,0,Blue,,07:38:00,B-54768A0A,55458882,0713, | ||
STOPPED_AT,30,70053,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, | ||
INCOMING_AT,40,70051,1683547531,0,Blue,,07:38:00,B-54768A0A,55458882,0713, | ||
STOPPED_AT,40,70051,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, | ||
IN_TRANSIT_TO,50,70049,1683547652,0,Blue,,07:38:00,B-54768A0A,55458882,0713, | ||
INCOMING_AT,50,70049,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, | ||
STOPPED_AT,50,70049,1683547786,0,Blue,,07:38:00,B-54768A0A,55458882,0713, | ||
IN_TRANSIT_TO,60,70047,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, | ||
INCOMING_AT,60,70047,1683547846,0,Blue,,07:38:00,B-54768A0A,55458882,0713, | ||
STOPPED_AT,60,70047,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, | ||
IN_TRANSIT_TO,70,70045,1683547970,0,Blue,,07:38:00,B-54768A0A,55458882,0713, | ||
INCOMING_AT,70,70045,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, | ||
STOPPED_AT,70,70045,1683548133,0,Blue,,07:38:00,B-54768A0A,55458882,0713, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,32 @@ | ||
import os | ||
import pyarrow | ||
from pyarrow import csv, parquet | ||
|
||
test_files_dir = os.path.join(os.path.dirname(__file__), "test_files") | ||
|
||
incoming_dir = os.path.join(test_files_dir, "INCOMING") | ||
springboard_dir = os.path.join(test_files_dir, "SPRINGBOARD") | ||
|
||
|
||
def csv_to_vp_parquet(csv_filepath: str, parquet_filepath: str) -> None: | ||
""" | ||
read vehicle position data in csv format and write it to a parquet file | ||
""" | ||
vp_csv_options = csv.ConvertOptions( | ||
column_types={ | ||
"current_status": pyarrow.string(), | ||
"current_stop_sequence": pyarrow.int64(), | ||
"stop_id": pyarrow.string(), | ||
"vehicle_timestamp": pyarrow.int64(), | ||
"direction_id": pyarrow.int64(), | ||
"route_id": pyarrow.string(), | ||
"trip_id": pyarrow.string(), | ||
"start_date": pyarrow.string(), | ||
"start_time": pyarrow.string(), | ||
"vehicle_id": pyarrow.string(), | ||
"vehicle_consist": pyarrow.string(), | ||
} | ||
) | ||
|
||
table = csv.read_csv(csv_filepath, convert_options=vp_csv_options) | ||
parquet.write_table(table, parquet_filepath) |