Skip to content

Commit

Permalink
test pipline with input and output csv files
Browse files Browse the repository at this point in the history
  • Loading branch information
rymarczy committed Jul 18, 2023
1 parent 594d076 commit c195dc4
Show file tree
Hide file tree
Showing 33 changed files with 1,800 additions and 18 deletions.
180 changes: 162 additions & 18 deletions python_src/tests/performance_manager/test_performance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from functools import lru_cache
from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union

import pyarrow
import pandas
import pytest
import sqlalchemy as sa
from _pytest.monkeypatch import MonkeyPatch
from pyarrow import fs, parquet
import pyarrow
from pyarrow import fs, parquet, csv

from lamp_py.performance_manager.l0_gtfs_static_load import (
process_static_tables,
Expand Down Expand Up @@ -36,6 +37,8 @@
StaticTrips,
StaticCalendarDates,
VehicleEvents,
VehicleEventMetrics,
VehicleTrips,
StaticDirections,
)
from lamp_py.postgres.postgres_utils import DatabaseManager
Expand All @@ -50,7 +53,7 @@
add_parent_station_column,
)

from ..test_resources import springboard_dir
from ..test_resources import springboard_dir, test_files_dir


@lru_cache
Expand All @@ -75,7 +78,7 @@ def set_env_vars() -> None:
boostrap .env file for local testing
"""
if int(os.environ.get("BOOTSTRAPPED", 0)) == 1:
logging.warning("allready bootstrapped")
logging.warning("already bootstrapped")
else:
env_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..", "..", "..", ".env"
Expand Down Expand Up @@ -227,6 +230,14 @@ def test_static_tables(
"""
caplog.set_level(logging.INFO)

db_manager.truncate_table(StaticTrips, restart_identity=True)
db_manager.truncate_table(StaticRoutes, restart_identity=True)
db_manager.truncate_table(StaticStops, restart_identity=True)
db_manager.truncate_table(StaticStopTimes, restart_identity=True)
db_manager.truncate_table(StaticCalendar, restart_identity=True)
db_manager.truncate_table(StaticCalendarDates, restart_identity=True)
db_manager.truncate_table(StaticDirections, restart_identity=True)

paths = [file for file in test_files() if "FEED_INFO" in file]
db_manager.add_metadata_paths(paths)

Expand All @@ -245,13 +256,13 @@ def test_static_tables(
# processing the static tables our db tables should have these many record
# counts.
row_counts = {
StaticTrips: 9731,
StaticTrips: 11709,
StaticRoutes: 24,
StaticStops: 9706,
StaticStopTimes: 160977,
StaticCalendar: 76,
StaticCalendarDates: 70,
StaticDirections: 408,
StaticStops: 9743,
StaticStopTimes: 186618,
StaticCalendar: 102,
StaticCalendarDates: 85,
StaticDirections: 378,
}

with db_manager.session.begin() as session:
Expand Down Expand Up @@ -284,6 +295,8 @@ def test_gtfs_rt_processing(
"""
caplog.set_level(logging.INFO)
db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)
db_manager.truncate_table(VehicleEventMetrics, restart_identity=True)

db_manager.execute(
sa.delete(MetadataLog.__table__).where(
Expand All @@ -295,7 +308,7 @@ def test_gtfs_rt_processing(
file
for file in test_files()
if ("RT_VEHICLE_POSITIONS" in file or "RT_TRIP_UPDATES" in file)
and ("hour=10" in file or "hour=11" in file)
and ("hour=12" in file or "hour=13" in file)
]
db_manager.add_metadata_paths(paths)

Expand Down Expand Up @@ -323,12 +336,12 @@ def test_gtfs_rt_processing(
# remove bus records from dataframe
positions = remove_bus_records(positions, db_manager)
assert positions.shape[1] == 13
assert position_size > positions.shape[0]
assert position_size >= positions.shape[0]

# remove bus records from dataframe
positions = add_parent_station_column(positions, db_manager)
assert positions.shape[1] == 14
assert position_size > positions.shape[0]
assert position_size >= positions.shape[0]

positions = transform_vp_timestamps(positions)
assert positions.shape[1] == 15
Expand All @@ -346,12 +359,12 @@ def test_gtfs_rt_processing(
# remove bus records from dataframe
trip_updates = remove_bus_records(trip_updates, db_manager)
assert trip_updates.shape[1] == 11
assert trip_update_size > trip_updates.shape[0]
assert trip_update_size >= trip_updates.shape[0]

# remove bus records from dataframe
trip_updates = add_parent_station_column(trip_updates, db_manager)
assert trip_updates.shape[1] == 12
assert trip_update_size > trip_updates.shape[0]
assert trip_update_size >= trip_updates.shape[0]

trip_updates = reduce_trip_updates(trip_updates)
assert trip_update_size > trip_updates.shape[0]
Expand Down Expand Up @@ -398,6 +411,8 @@ def test_vp_only(
caplog.set_level(logging.INFO)

db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)
db_manager.truncate_table(VehicleEventMetrics, restart_identity=True)
db_manager.execute(
sa.delete(MetadataLog.__table__).where(
~MetadataLog.path.contains("FEED_INFO")
Expand All @@ -407,7 +422,7 @@ def test_vp_only(
paths = [
p
for p in test_files()
if "RT_VEHICLE_POSITIONS" in p and ("hourt=10" in p or "hour=11" in p)
if "RT_VEHICLE_POSITIONS" in p and ("hourt=12" in p or "hour=13" in p)
]
db_manager.add_metadata_paths(paths)

Expand All @@ -425,6 +440,8 @@ def test_tu_only(
caplog.set_level(logging.INFO)

db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)
db_manager.truncate_table(VehicleEventMetrics, restart_identity=True)
db_manager.execute(
sa.delete(MetadataLog.__table__).where(
~MetadataLog.path.contains("FEED_INFO")
Expand All @@ -434,7 +451,7 @@ def test_tu_only(
paths = [
p
for p in test_files()
if "RT_TRIP_UPDATES" in p and ("hourt=10" in p or "hour=11" in p)
if "RT_TRIP_UPDATES" in p and ("hourt=12" in p or "hour=13" in p)
]

db_manager.add_metadata_paths(paths)
Expand All @@ -453,15 +470,142 @@ def test_vp_and_tu(
caplog.set_level(logging.INFO)

db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)
db_manager.truncate_table(VehicleEventMetrics, restart_identity=True)
db_manager.execute(
sa.delete(MetadataLog.__table__).where(
~MetadataLog.path.contains("FEED_INFO")
)
)

paths = [p for p in test_files() if "hourt=10" in p or "hour=11" in p]
paths = [p for p in test_files() if "hourt=12" in p or "hour=13" in p]
db_manager.add_metadata_paths(paths)

process_gtfs_rt_files(db_manager)

check_logs(caplog)


def test_whole_table(
db_manager: DatabaseManager, caplog: pytest.LogCaptureFixture
) -> None:
"""
check whole flat file
"""
caplog.set_level(logging.INFO)

db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)
db_manager.truncate_table(VehicleEventMetrics, restart_identity=True)
db_manager.execute(
sa.delete(MetadataLog.__table__).where(
~MetadataLog.path.contains("FEED_INFO")
)
)

csv_file = os.path.join(test_files_dir, "vehicle_positions_flat_input.csv")
parquet_folder = "RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=11"
parquet_file = os.path.join(
springboard_dir, parquet_folder, "flat_file.parquet"
)
os.makedirs(os.path.join(springboard_dir, parquet_folder), exist_ok=True)

options = csv.ConvertOptions(
column_types={
"current_status": pyarrow.string(),
"current_stop_sequence": pyarrow.int64(),
"stop_id": pyarrow.string(),
"vehicle_timestamp": pyarrow.int64(),
"direction_id": pyarrow.int64(),
"route_id": pyarrow.string(),
"start_date": pyarrow.string(),
"start_time": pyarrow.string(),
"vehicle_id": pyarrow.string(),
}
)
table = csv.read_csv(csv_file, convert_options=options)
parquet.write_table(table, parquet_file)
db_manager.add_metadata_paths(
[
parquet_file,
]
)

process_gtfs_rt_files(db_manager)

os.remove(parquet_file)

result_select = (
sa.select(
VehicleEvents.stop_sequence,
VehicleEvents.stop_id,
VehicleEvents.parent_station,
VehicleEvents.vp_move_timestamp,
VehicleEvents.vp_stop_timestamp,
VehicleTrips.direction_id,
VehicleTrips.direction,
VehicleTrips.direction_destination,
VehicleTrips.route_id,
VehicleTrips.branch_route_id,
VehicleTrips.trunk_route_id,
VehicleTrips.service_date,
VehicleTrips.start_time,
VehicleTrips.vehicle_id,
VehicleTrips.stop_count,
VehicleTrips.trip_id,
VehicleTrips.vehicle_label,
VehicleTrips.static_trip_id_guess,
VehicleEventMetrics.dwell_time_seconds,
VehicleEventMetrics.travel_time_seconds,
VehicleEventMetrics.headway_branch_seconds,
VehicleEventMetrics.headway_trunk_seconds,
)
.select_from(VehicleEvents)
.join(
VehicleTrips,
VehicleTrips.trip_hash == VehicleEvents.trip_hash,
isouter=True,
)
.join(
VehicleEventMetrics,
VehicleEvents.trip_stop_hash == VehicleEventMetrics.trip_stop_hash,
isouter=True,
)
)
result_dtypes = {
"vp_move_timestamp": "Int64",
"vp_stop_timestamp": "Int64",
"dwell_time_seconds": "Int64",
"travel_time_seconds": "Int64",
"headway_branch_seconds": "Int64",
"headway_trunk_seconds": "Int64",
"static_trip_id_guess": "Int64",
}
sort_by = [
"route_id",
"direction_id",
"stop_sequence",
"stop_id",
"service_date",
"start_time",
]
db_result_df = db_manager.select_as_dataframe(result_select)
db_result_df = db_result_df.astype(dtype=result_dtypes).sort_values(
by=sort_by,
ignore_index=True,
)

csv_result_df = pandas.read_csv(
os.path.join(test_files_dir, "pipeline_flat_out.csv"),
dtype=result_dtypes,
)
csv_result_df = csv_result_df.sort_values(
by=sort_by,
ignore_index=True,
)

compare_result = db_result_df.compare(csv_result_df, align_axis=0)
print(compare_result, flush=True)
assert compare_result.shape[0] == 0

check_logs(caplog)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
51 changes: 51 additions & 0 deletions python_src/tests/test_files/may_8.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
[
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=0/988b37f0d2c64959947b8b1f14b7bcb1-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=1/41f1ba3787f241ebac396f837697319d-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=10/c531508d7d75447ebd11ad84f1acb8f5-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=11/6e2e5418927f4e409817dd6d35fe006c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=12/04fae375300c45e2a0f70b8fbefa8f57-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=13/5f9b9d4a1fcc417eae0543ce3fe35c8f-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=14/d7c86c2680c348dc89ff5ee48fa9aa1a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=15/497ca6c8e1fd4336b292828cd75dab71-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=16/5f2d4152f68c4d8084d2f61f725747a2-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=17/3737947bca714ac996256ea8d6584f8f-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=18/a174d41b894f487abcf1985358fa864d-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=19/9b7bb1ca3a2c4d23bce4752f140d69d6-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=2/585445dfb6af459ba13e6e30ed334fb8-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=20/d962ab976269463fada01d74c34b68fa-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=21/04776345eee74ddca2820756751e9af4-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=22/e2f39365b5f841c2add18b99e1823cf9-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=23/5be9d5b3f23b4802b1812294c9a17906-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=3/ceb16d39bf39484ba8988f3a63444102-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=4/12da6efd05bd4e4593ee8f8def696d54-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=5/53fe3719ef9e43529f2a3dba77559e28-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=6/ffa6d50067c1450fa0b7c46d6968189e-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=7/5c180d5ba5bb478986c4c14eb85f203e-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=8/322a69c1c3834a3f87be9f86af11ee3c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=9/a24caf40824c4fee89ce2b3cf723bd81-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=0/a5966d94cbb84e598ef441c33d0a40bd-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=1/82f34664d02342e1a6ab5610c522eec3-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=10/7860005f30e24367b08d38f95733c3dc-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=11/79da1157fc9a4ed1815d5259552aa61c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=12/506974e326b64407aa2e72cfe3ed3c9a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=13/b323d1f74c024e1c9367a71dec50dcec-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=14/6db7726bc6e741238ed8657b0cda67cd-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=15/8bcbfa22395d4ff0b6d08a8690ed2996-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=16/9e3d723320474c9483b75dbe5022635a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=17/9aa1effad25e4e18ae14f6e7e6ac22ff-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=18/ca3d60ad6f5848bdb3ccf286f3c2f88c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=19/6ab6a2c511384017804b994773c47f57-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=2/0ab51554c49b43919da1878a5c0d5b5c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=20/193f5bb01d724ec9bc84875a5188cb05-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=21/dc1febbf0d66410a8ee38f147f8a8e6a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=22/3ef77104e0394fcaa015ca313088bac9-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=23/8799e7ad66b84723bae4cee2a406e2fa-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=3/fe98952000c9429ead729d71ca50c8ab-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=4/ca6c098488c243b7b3e3daac61cd7e2f-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=5/2a57a8a37ab848a2808ce6904540cdb0-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=6/5e5045c41fb84087b7e539a72373289a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=7/23a22439028f47be96ec3d894ee91200-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=8/193a495b375641679cdf6caefba89956-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=9/12c748fcd4214650a646a47042dae3c3-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/FEED_INFO/timestamp=1682375024/e84307ae774a4d8c8968c5e38e7affdc-0.parquet"
]
Loading

0 comments on commit c195dc4

Please sign in to comment.