Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CSV Flatfile Pipeline Testing #154

Merged
merged 2 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 147 additions & 14 deletions python_src/tests/performance_manager/test_performance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from functools import lru_cache
from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union

import pyarrow
import pandas
import pytest
import sqlalchemy as sa
from _pytest.monkeypatch import MonkeyPatch
from pyarrow import fs, parquet
import pyarrow
from pyarrow import fs, parquet, csv

from lamp_py.performance_manager.l0_gtfs_static_load import (
process_static_tables,
Expand Down Expand Up @@ -37,6 +38,7 @@
StaticTrips,
StaticCalendarDates,
VehicleEvents,
VehicleTrips,
StaticDirections,
)
from lamp_py.postgres.postgres_utils import DatabaseManager
Expand All @@ -50,7 +52,7 @@
add_parent_station_column,
)

from ..test_resources import springboard_dir
from ..test_resources import springboard_dir, test_files_dir


@lru_cache
Expand All @@ -75,7 +77,7 @@ def set_env_vars() -> None:
boostrap .env file for local testing
"""
if int(os.environ.get("BOOTSTRAPPED", 0)) == 1:
logging.warning("allready bootstrapped")
logging.warning("already bootstrapped")
else:
env_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..", "..", "..", ".env"
Expand Down Expand Up @@ -227,6 +229,14 @@ def test_static_tables(
"""
caplog.set_level(logging.INFO)

db_manager.truncate_table(StaticTrips, restart_identity=True)
db_manager.truncate_table(StaticRoutes, restart_identity=True)
db_manager.truncate_table(StaticStops, restart_identity=True)
db_manager.truncate_table(StaticStopTimes, restart_identity=True)
db_manager.truncate_table(StaticCalendar, restart_identity=True)
db_manager.truncate_table(StaticCalendarDates, restart_identity=True)
db_manager.truncate_table(StaticDirections, restart_identity=True)

paths = [file for file in test_files() if "FEED_INFO" in file]
db_manager.add_metadata_paths(paths)

Expand All @@ -245,13 +255,13 @@ def test_static_tables(
# processing the static tables our db tables should have these many record
# counts.
row_counts = {
StaticTrips: 9731,
StaticTrips: 11709,
StaticRoutes: 24,
StaticStops: 9706,
StaticStopTimes: 160977,
StaticCalendar: 76,
StaticCalendarDates: 70,
StaticDirections: 408,
StaticStops: 9743,
StaticStopTimes: 186618,
StaticCalendar: 102,
StaticCalendarDates: 85,
StaticDirections: 378,
}

with db_manager.session.begin() as session:
Expand Down Expand Up @@ -284,6 +294,7 @@ def test_gtfs_rt_processing(
"""
caplog.set_level(logging.INFO)
db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)

db_manager.execute(
sa.delete(MetadataLog.__table__).where(
Expand All @@ -295,7 +306,7 @@ def test_gtfs_rt_processing(
file
for file in test_files()
if ("RT_VEHICLE_POSITIONS" in file or "RT_TRIP_UPDATES" in file)
and ("hour=10" in file or "hour=11" in file)
and ("hour=12" in file or "hour=13" in file)
]
db_manager.add_metadata_paths(paths)

Expand Down Expand Up @@ -390,6 +401,7 @@ def test_vp_only(
caplog.set_level(logging.INFO)

db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)
db_manager.execute(
sa.delete(MetadataLog.__table__).where(
~MetadataLog.path.contains("FEED_INFO")
Expand All @@ -399,7 +411,7 @@ def test_vp_only(
paths = [
p
for p in test_files()
if "RT_VEHICLE_POSITIONS" in p and ("hourt=10" in p or "hour=11" in p)
if "RT_VEHICLE_POSITIONS" in p and ("hourt=12" in p or "hour=13" in p)
]
db_manager.add_metadata_paths(paths)

Expand All @@ -417,6 +429,7 @@ def test_tu_only(
caplog.set_level(logging.INFO)

db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)
db_manager.execute(
sa.delete(MetadataLog.__table__).where(
~MetadataLog.path.contains("FEED_INFO")
Expand All @@ -426,7 +439,7 @@ def test_tu_only(
paths = [
p
for p in test_files()
if "RT_TRIP_UPDATES" in p and ("hourt=10" in p or "hour=11" in p)
if "RT_TRIP_UPDATES" in p and ("hourt=12" in p or "hour=13" in p)
]

db_manager.add_metadata_paths(paths)
Expand All @@ -445,15 +458,135 @@ def test_vp_and_tu(
caplog.set_level(logging.INFO)

db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)
db_manager.execute(
sa.delete(MetadataLog.__table__).where(
~MetadataLog.path.contains("FEED_INFO")
)
)

paths = [p for p in test_files() if "hourt=10" in p or "hour=11" in p]
paths = [p for p in test_files() if "hourt=12" in p or "hour=13" in p]
db_manager.add_metadata_paths(paths)

process_gtfs_rt_files(db_manager)

check_logs(caplog)


def test_whole_table(
db_manager: DatabaseManager, caplog: pytest.LogCaptureFixture
) -> None:
"""
check whole flat file
"""
caplog.set_level(logging.INFO)

db_manager.truncate_table(VehicleEvents, restart_identity=True)
db_manager.truncate_table(VehicleTrips, restart_identity=True)
db_manager.execute(
sa.delete(MetadataLog.__table__).where(
~MetadataLog.path.contains("FEED_INFO")
)
)

csv_file = os.path.join(test_files_dir, "vehicle_positions_flat_input.csv")
parquet_folder = "RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=11"
parquet_file = os.path.join(
springboard_dir, parquet_folder, "flat_file.parquet"
)
os.makedirs(os.path.join(springboard_dir, parquet_folder), exist_ok=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use the pytest temp directory fixture instead of os mkdir.

https://docs.pytest.org/en/6.2.x/tmpdir.html

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated with pytest temp_path fixture.


options = csv.ConvertOptions(
column_types={
"current_status": pyarrow.string(),
"current_stop_sequence": pyarrow.int64(),
"stop_id": pyarrow.string(),
"vehicle_timestamp": pyarrow.int64(),
"direction_id": pyarrow.int64(),
"route_id": pyarrow.string(),
"start_date": pyarrow.string(),
"start_time": pyarrow.string(),
"vehicle_id": pyarrow.string(),
}
)
table = csv.read_csv(csv_file, convert_options=options)
parquet.write_table(table, parquet_file)
db_manager.add_metadata_paths(
[
parquet_file,
]
)

process_gtfs_rt_files(db_manager)

os.remove(parquet_file)

result_select = (
sa.select(
VehicleEvents.stop_sequence,
VehicleEvents.stop_id,
VehicleEvents.parent_station,
VehicleEvents.vp_move_timestamp,
VehicleEvents.vp_stop_timestamp,
VehicleTrips.direction_id,
VehicleTrips.direction,
VehicleTrips.direction_destination,
VehicleTrips.route_id,
VehicleTrips.branch_route_id,
VehicleTrips.trunk_route_id,
VehicleTrips.service_date,
VehicleTrips.start_time,
VehicleTrips.vehicle_id,
VehicleTrips.stop_count,
VehicleTrips.trip_id,
VehicleTrips.vehicle_label,
VehicleTrips.static_trip_id_guess,
VehicleEvents.dwell_time_seconds,
VehicleEvents.travel_time_seconds,
VehicleEvents.headway_branch_seconds,
VehicleEvents.headway_trunk_seconds,
)
.select_from(VehicleEvents)
.join(
VehicleTrips,
VehicleTrips.pm_trip_id == VehicleEvents.pm_trip_id,
isouter=True,
)
)
result_dtypes = {
"vp_move_timestamp": "Int64",
"vp_stop_timestamp": "Int64",
"dwell_time_seconds": "Int64",
"travel_time_seconds": "Int64",
"headway_branch_seconds": "Int64",
"headway_trunk_seconds": "Int64",
"static_trip_id_guess": "Int64",
}
sort_by = [
"route_id",
"direction_id",
"stop_sequence",
"stop_id",
"service_date",
"start_time",
]
db_result_df = db_manager.select_as_dataframe(result_select)
db_result_df = db_result_df.astype(dtype=result_dtypes).sort_values(
by=sort_by,
ignore_index=True,
)

csv_result_df = pandas.read_csv(
os.path.join(test_files_dir, "pipeline_flat_out.csv"),
dtype=result_dtypes,
)
csv_result_df = csv_result_df.sort_values(
by=sort_by,
ignore_index=True,
)

compare_result = db_result_df.compare(csv_result_df, align_axis=1)
print(compare_result, flush=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only print this if the assert fails?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated.

assert compare_result.shape[0] == 0

check_logs(caplog)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
51 changes: 51 additions & 0 deletions python_src/tests/test_files/may_8.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
[
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=0/988b37f0d2c64959947b8b1f14b7bcb1-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=1/41f1ba3787f241ebac396f837697319d-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=10/c531508d7d75447ebd11ad84f1acb8f5-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=11/6e2e5418927f4e409817dd6d35fe006c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=12/04fae375300c45e2a0f70b8fbefa8f57-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=13/5f9b9d4a1fcc417eae0543ce3fe35c8f-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=14/d7c86c2680c348dc89ff5ee48fa9aa1a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=15/497ca6c8e1fd4336b292828cd75dab71-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=16/5f2d4152f68c4d8084d2f61f725747a2-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=17/3737947bca714ac996256ea8d6584f8f-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=18/a174d41b894f487abcf1985358fa864d-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=19/9b7bb1ca3a2c4d23bce4752f140d69d6-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=2/585445dfb6af459ba13e6e30ed334fb8-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=20/d962ab976269463fada01d74c34b68fa-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=21/04776345eee74ddca2820756751e9af4-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=22/e2f39365b5f841c2add18b99e1823cf9-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=23/5be9d5b3f23b4802b1812294c9a17906-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=3/ceb16d39bf39484ba8988f3a63444102-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=4/12da6efd05bd4e4593ee8f8def696d54-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=5/53fe3719ef9e43529f2a3dba77559e28-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=6/ffa6d50067c1450fa0b7c46d6968189e-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=7/5c180d5ba5bb478986c4c14eb85f203e-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=8/322a69c1c3834a3f87be9f86af11ee3c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=9/a24caf40824c4fee89ce2b3cf723bd81-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=0/a5966d94cbb84e598ef441c33d0a40bd-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=1/82f34664d02342e1a6ab5610c522eec3-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=10/7860005f30e24367b08d38f95733c3dc-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=11/79da1157fc9a4ed1815d5259552aa61c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=12/506974e326b64407aa2e72cfe3ed3c9a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=13/b323d1f74c024e1c9367a71dec50dcec-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=14/6db7726bc6e741238ed8657b0cda67cd-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=15/8bcbfa22395d4ff0b6d08a8690ed2996-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=16/9e3d723320474c9483b75dbe5022635a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=17/9aa1effad25e4e18ae14f6e7e6ac22ff-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=18/ca3d60ad6f5848bdb3ccf286f3c2f88c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=19/6ab6a2c511384017804b994773c47f57-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=2/0ab51554c49b43919da1878a5c0d5b5c-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=20/193f5bb01d724ec9bc84875a5188cb05-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=21/dc1febbf0d66410a8ee38f147f8a8e6a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=22/3ef77104e0394fcaa015ca313088bac9-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=23/8799e7ad66b84723bae4cee2a406e2fa-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=3/fe98952000c9429ead729d71ca50c8ab-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=4/ca6c098488c243b7b3e3daac61cd7e2f-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=5/2a57a8a37ab848a2808ce6904540cdb0-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=6/5e5045c41fb84087b7e539a72373289a-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=7/23a22439028f47be96ec3d894ee91200-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=8/193a495b375641679cdf6caefba89956-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=9/12c748fcd4214650a646a47042dae3c3-0.parquet",
"mbta-ctd-dataplatform-dev-springboard/lamp/FEED_INFO/timestamp=1682375024/e84307ae774a4d8c8968c5e38e7affdc-0.parquet"
]
Loading