Skip to content

Commit

Permalink
Add testing for Missing Service Dates
Browse files Browse the repository at this point in the history
Service Dates are being added shortly after reading gtfs-rt parquet
files, both for trip updates and vehicle positions. Create a new test
suite for testing them, named after the l0_gtfs-rt_events file that
calls them.

Adding these tests and a new test suite required a bit of a shuffle to
our testing directory.
* a new csv file was added for vp gtfs-rt data that is missing some
  service dates
* a new parquet file was added for tu gtfs-rt data that is missing some
  service dates (i coulndn't figure out how to get the csv strat to work
  here)
* the pytest fixture for reading local parquet files is used by both
  tests, so it had to be moved into a new, top level `conftest.py` file.
* similarly, the csv -> vp parquet functionality is needed by both and
  was moved into `test_resources.py`
* lastly, a failing test in performance manager tests was corrected.
  • Loading branch information
mzappitello committed Aug 22, 2023
1 parent f69dea3 commit 9d0a3b1
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 57 deletions.
51 changes: 51 additions & 0 deletions python_src/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
this file contains fixtures that are intended to be used across multiple test
files
"""

from typing import (
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
)

import pytest
from _pytest.monkeypatch import MonkeyPatch
from pyarrow import fs, parquet, Table


@pytest.fixture(autouse=True, name="get_pyarrow_table_patch")
def fixture_get_pyarrow_table_patch(monkeypatch: MonkeyPatch) -> Iterator[None]:
"""
the aws.s3 function `_get_pyarrow_table` function reads parquet files from
s3 and returns a pyarrow table. when testing on our github machines, we
don't have access to s3, so all tests must be run against local files.
monkeypatch the function to read from a local filepath.
"""

def mock__get_pyarrow_table(
filename: Union[str, List[str]],
filters: Optional[Union[Sequence[Tuple], Sequence[List[Tuple]]]] = None,
) -> Table:
active_fs = fs.LocalFileSystem()

if isinstance(filename, list):
to_load = filename
else:
to_load = [filename]

if len(to_load) == 0:
return Table.from_pydict({})

return parquet.ParquetDataset(
to_load, filesystem=active_fs, filters=filters
).read_pandas()

monkeypatch.setattr(
"lamp_py.aws.s3._get_pyarrow_table", mock__get_pyarrow_table
)

yield
66 changes: 66 additions & 0 deletions python_src/tests/performance_manager/test_l0_gtfs_rt_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import pathlib

from lamp_py.performance_manager.l0_rt_vehicle_positions import (
get_vp_dataframe,
transform_vp_datatypes,
)
from lamp_py.performance_manager.l0_rt_trip_updates import (
get_and_unwrap_tu_dataframe,
)
from lamp_py.performance_manager.gtfs_utils import (
add_missing_service_dates,
)

from ..test_resources import test_files_dir, csv_to_vp_parquet


def test_vp_missing_service_date(tmp_path: pathlib.Path) -> None:
"""
test that missing service dates in gtfs-rt vehicle position files can be
correctly backfilled.
"""
csv_file = os.path.join(test_files_dir, "vp_missing_start_date.csv")

parquet_folder = tmp_path.joinpath(
"RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=11"
)
parquet_folder.mkdir(parents=True)
parquet_file = str(parquet_folder.joinpath("flat_file.parquet"))

csv_to_vp_parquet(csv_file, parquet_file)

events = get_vp_dataframe(to_load=[parquet_file], route_ids=["Blue"])
events = transform_vp_datatypes(events)

# ensure that there are NaN service dates
assert events["service_date"].hasnans

# add the service dates that are missing
events = add_missing_service_dates(
events, timestamp_key="vehicle_timestamp"
)

# check that new service dates match existing and are numbers
assert len(events["service_date"].unique()) == 1
assert not events["service_date"].hasnans


def test_tu_missing_service_date() -> None:
"""
test that trip update gtfs data with missing service dates can be processed
correctly.
"""
parquet_file = os.path.join(test_files_dir, "tu_missing_start_date.parquet")
events = get_and_unwrap_tu_dataframe([parquet_file], route_ids=["Blue"])

# check that NaN service dates exist from reading the file
assert events["service_date"].hasnans

events = add_missing_service_dates(
events_dataframe=events, timestamp_key="timestamp"
)

# check that all service dates exist and are the same
assert not events["service_date"].hasnans
assert len(events["service_date"].unique()) == 1
71 changes: 14 additions & 57 deletions python_src/tests/performance_manager/test_performance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
Callable,
)

import pandas
import pytest
import sqlalchemy as sa
from _pytest.monkeypatch import MonkeyPatch
import pyarrow
from pyarrow import fs, parquet, csv, Table
from pyarrow import Table

from lamp_py.performance_manager.flat_file import write_flat_files
from lamp_py.performance_manager.l0_gtfs_static_load import (
Expand Down Expand Up @@ -62,15 +58,16 @@
from lamp_py.performance_manager.gtfs_utils import (
add_static_version_key_column,
add_parent_station_column,
rail_routes_from_filepath,
)

from ..test_resources import springboard_dir, test_files_dir
from ..test_resources import springboard_dir, test_files_dir, csv_to_vp_parquet


@lru_cache
def test_files() -> List[str]:
"""
collaps all of the files in the lamp test files dir into a list
collapse all of the files in the lamp test files dir into a list
"""
paths = []

Expand Down Expand Up @@ -150,31 +147,6 @@ def mock__get_static_parquet_paths(
mock__get_static_parquet_paths,
)

def mock__get_pyarrow_table(
filename: Union[str, List[str]],
filters: Optional[Union[Sequence[Tuple], Sequence[List[Tuple]]]] = None,
) -> Table:
logging.debug(
"Mock Get Pyarrow Table filename=%s, filters=%s", filename, filters
)
active_fs = fs.LocalFileSystem()

if isinstance(filename, list):
to_load = filename
else:
to_load = [filename]

if len(to_load) == 0:
return Table.from_pydict({})

return parquet.ParquetDataset(
to_load, filesystem=active_fs, filters=filters
).read_pandas()

monkeypatch.setattr(
"lamp_py.aws.s3._get_pyarrow_table", mock__get_pyarrow_table
)

# pylint: disable=R0913
# pylint too many arguments (more than 5)
def mock__write_parquet_file(
Expand Down Expand Up @@ -391,7 +363,8 @@ def test_gtfs_rt_processing(
assert "RT_VEHICLE_POSITIONS" in path

# check that we can load the parquet file into a dataframe correctly
positions = get_vp_dataframe(files["vp_paths"], db_manager)
route_ids = rail_routes_from_filepath(files["vp_paths"], db_manager)
positions = get_vp_dataframe(files["vp_paths"], route_ids)
position_size = positions.shape[0]
assert positions.shape[1] == 12

Expand All @@ -413,9 +386,7 @@ def test_gtfs_rt_processing(
assert positions.shape[1] == 14
assert position_size > positions.shape[0]

trip_updates = get_and_unwrap_tu_dataframe(
files["tu_paths"], db_manager
)
trip_updates = get_and_unwrap_tu_dataframe(files["tu_paths"], route_ids)
trip_update_size = trip_updates.shape[0]
assert trip_updates.shape[1] == 8

Expand Down Expand Up @@ -572,31 +543,17 @@ def test_whole_table(
)

csv_file = os.path.join(test_files_dir, "vehicle_positions_flat_input.csv")

vp_folder = "RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=11"
parquet_folder = tmp_path.joinpath(vp_folder)
parquet_folder = tmp_path.joinpath(
"RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=11"
)
parquet_folder.mkdir(parents=True)
parquet_file = str(parquet_folder.joinpath("flat_file.parquet"))

csv_to_vp_parquet(csv_file, parquet_file)

parquet_file = parquet_folder.joinpath("flat_file.parquet")

options = csv.ConvertOptions(
column_types={
"current_status": pyarrow.string(),
"current_stop_sequence": pyarrow.int64(),
"stop_id": pyarrow.string(),
"vehicle_timestamp": pyarrow.int64(),
"direction_id": pyarrow.int64(),
"route_id": pyarrow.string(),
"start_date": pyarrow.string(),
"start_time": pyarrow.string(),
"vehicle_id": pyarrow.string(),
}
)
table = csv.read_csv(csv_file, convert_options=options)
parquet.write_table(table, parquet_file)
db_manager.add_metadata_paths(
[
str(parquet_file),
parquet_file,
]
)

Expand Down
Binary file not shown.
20 changes: 20 additions & 0 deletions python_src/tests/test_files/vp_missing_start_date.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
current_status,current_stop_sequence,stop_id,vehicle_timestamp,direction_id,route_id,start_date,start_time,vehicle_id,trip_id,vehicle_label,vehicle_consist
STOPPED_AT,1,70059,1683547153,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
INCOMING_AT,10,70057,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713,
STOPPED_AT,10,70057,1683547246,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
INCOMING_AT,20,70055,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713,
IN_TRANSIT_TO,20,70055,1683547299,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
STOPPED_AT,20,70055,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713,
INCOMING_AT,30,70053,1683547429,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
STOPPED_AT,30,70053,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713,
INCOMING_AT,40,70051,1683547531,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
STOPPED_AT,40,70051,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713,
IN_TRANSIT_TO,50,70049,1683547652,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
INCOMING_AT,50,70049,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713,
STOPPED_AT,50,70049,1683547786,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
IN_TRANSIT_TO,60,70047,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713,
INCOMING_AT,60,70047,1683547846,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
STOPPED_AT,60,70047,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713,
IN_TRANSIT_TO,70,70045,1683547970,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
INCOMING_AT,70,70045,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713,
STOPPED_AT,70,70045,1683548133,0,Blue,,07:38:00,B-54768A0A,55458882,0713,
25 changes: 25 additions & 0 deletions python_src/tests/test_resources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
import os
import pyarrow
from pyarrow import csv, parquet

test_files_dir = os.path.join(os.path.dirname(__file__), "test_files")

incoming_dir = os.path.join(test_files_dir, "INCOMING")
springboard_dir = os.path.join(test_files_dir, "SPRINGBOARD")


def csv_to_vp_parquet(csv_filepath: str, parquet_filepath: str) -> None:
"""
read vehicle position data in csv format and write it to a parquet file
"""
vp_csv_options = csv.ConvertOptions(
column_types={
"current_status": pyarrow.string(),
"current_stop_sequence": pyarrow.int64(),
"stop_id": pyarrow.string(),
"vehicle_timestamp": pyarrow.int64(),
"direction_id": pyarrow.int64(),
"route_id": pyarrow.string(),
"start_date": pyarrow.string(),
"start_time": pyarrow.string(),
"vehicle_id": pyarrow.string(),
"vehicle_consist": pyarrow.string(),
}
)

table = csv.read_csv(csv_filepath, convert_options=vp_csv_options)
parquet.write_table(table, parquet_filepath)

0 comments on commit 9d0a3b1

Please sign in to comment.