Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX (LAMP_ALL_RT_fields): Use Static Schedule to filter Initial Station of Trip #398

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions src/lamp_py/tableau/jobs/rt_rail.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,63 @@ def __init__(self) -> None:
" ON "
" ve.pm_trip_id = vt.pm_trip_id"
" LEFT JOIN "
" ("
" SELECT "
" DISTINCT "
" direction_id "
" , route_id "
" , parent_station "
" , static_version_key "
" , true as drop_flag "
" FROM "
" ( "
" SELECT "
" canon_trips.direction_id "
" , CASE WHEN canon_trips.trunk_route_id = 'Green' THEN canon_trips.route_id ELSE canon_trips.trunk_route_id END as route_id "
" , static_stops.parent_station "
" , ROW_NUMBER() OVER (PARTITION BY canon_trips.static_version_key,canon_trips.direction_id,canon_trips.route_id ORDER BY static_stop_times.stop_sequence) as stop_sequence "
" , canon_trips.static_version_key "
" FROM "
" ( "
" SELECT "
" DISTINCT ON (coalesce(static_trips.branch_route_id,static_trips.trunk_route_id),static_route_patterns.direction_id,static_route_patterns.static_version_key) "
" static_route_patterns.direction_id as direction_id "
" , static_route_patterns.representative_trip_id as representative_trip_id "
" , static_trips.trunk_route_id as trunk_route_id "
" , coalesce(static_trips.branch_route_id, static_trips.trunk_route_id) as route_id "
" , static_route_patterns.static_version_key as static_version_key "
" FROM "
" static_route_patterns "
" JOIN static_trips on "
" static_route_patterns.representative_trip_id = static_trips.trip_id "
" AND static_route_patterns.static_version_key = static_trips.static_version_key "
" JOIN static_routes on "
" static_routes.route_id = static_trips.route_id "
" AND static_routes.static_version_key = static_trips.static_version_key "
" WHERE "
" static_routes.route_type < 2 "
" AND (static_route_patterns.route_pattern_typicality = 1 "
" or static_route_patterns.route_pattern_typicality = 5) "
" order by "
" coalesce(static_trips.branch_route_id, static_trips.trunk_route_id), "
" static_route_patterns.direction_id, "
" static_route_patterns.static_version_key, "
" static_route_patterns.route_pattern_typicality desc) as canon_trips "
" JOIN static_stop_times on "
" canon_trips.representative_trip_id = static_stop_times.trip_id "
" AND canon_trips.static_version_key = static_stop_times.static_version_key "
" JOIN static_stops on "
" static_stop_times.stop_id = static_stops.stop_id "
" AND static_stop_times.static_version_key = static_stops.static_version_key ) as drop_station "
Comment on lines +93 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sanity check for my sake.

this gets every station on every trip, then its filtered out to only be the first station for every trip.

we use that for the filtering later rather than the ve.canonical_stop_sequence

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this subquery is the same one used to create canonical_stop_sequence but we are limiting it to the first parent_station in each each direction and then doing an anti-join with the drop_flag field.

" WHERE "
" drop_station.stop_sequence = 1 "
" ) drop_join"
" ON "
" drop_join.direction_id = vt.direction_id "
" AND drop_join.route_id = vt.route_id "
" AND drop_join.parent_station = ve.parent_station "
" AND drop_join.static_version_key = vt.static_version_key "
" LEFT JOIN "
" vehicle_events prev_ve"
" ON "
" ve.pm_event_id = prev_ve.next_trip_stop_pm_event_id"
Expand All @@ -97,17 +154,14 @@ def __init__(self) -> None:
" AND vt.static_version_key = sr.static_version_key"
" WHERE "
" sr.route_type < 2"
" AND ("
" ve.canonical_stop_sequence > 1"
" OR ve.canonical_stop_sequence IS NULL"
" )"
" AND drop_join.drop_flag IS NULL "
" AND ("
" ve.vp_stop_timestamp IS NOT null"
" OR ve.vp_move_timestamp IS NOT null"
" )"
" %s"
" ORDER BY "
" ve.service_date, vt.route_id, vt.direction_id, vt.vehicle_id, vt.start_time"
" ve.service_date, vt.vehicle_id, vt.start_time"
Comment on lines -110 to +164
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was the order change requested?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't requested, but we're only doing this to reduce the parquet file size and dropping the two SORT field fields gave us a speed up in query time with almost no change in file size.

";"
)
# based on testing, batch_size of 1024 * 256 should result in a maximum
Expand Down Expand Up @@ -201,7 +255,7 @@ def update_parquet(self, db_manager: DatabaseManager) -> bool:
max_start_date -= datetime.timedelta(days=1)

update_query = self.table_query % (
f" AND vt.service_date >= {max_start_date.strftime('%Y%m%d')} ",
f" AND ve.service_date >= {max_start_date.strftime('%Y%m%d')} ",
Comment on lines -204 to +258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

faster because the base of the query is on vehicle events?

)

db_manager.write_to_parquet(
Expand Down
Loading