From 026d983f480af7aee5267ce2197b3f5ffda50587 Mon Sep 17 00:00:00 2001 From: Ryan Rymarczyk Date: Wed, 10 Jul 2024 14:50:37 -0400 Subject: [PATCH 1/2] use static schedule filter --- src/lamp_py/tableau/jobs/rt_rail.py | 64 ++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/src/lamp_py/tableau/jobs/rt_rail.py b/src/lamp_py/tableau/jobs/rt_rail.py index 794d621c..23ad60a4 100644 --- a/src/lamp_py/tableau/jobs/rt_rail.py +++ b/src/lamp_py/tableau/jobs/rt_rail.py @@ -73,6 +73,63 @@ def __init__(self) -> None: " ON " " ve.pm_trip_id = vt.pm_trip_id" " LEFT JOIN " + " (" + " SELECT " + " DISTINCT " + " direction_id " + " , route_id " + " , parent_station " + " , static_version_key " + " , true as drop_flag " + " FROM " + " ( " + " SELECT " + " canon_trips.direction_id " + " , CASE WHEN canon_trips.trunk_route_id = 'Green' THEN canon_trips.route_id ELSE canon_trips.trunk_route_id END as route_id " + " , static_stops.parent_station " + " , ROW_NUMBER() OVER (PARTITION BY canon_trips.static_version_key,canon_trips.direction_id,canon_trips.route_id ORDER BY static_stop_times.stop_sequence) as stop_sequence " + " , canon_trips.static_version_key " + " FROM " + " ( " + " SELECT " + " DISTINCT ON (coalesce(static_trips.branch_route_id,static_trips.trunk_route_id),static_route_patterns.direction_id,static_route_patterns.static_version_key) " + " static_route_patterns.direction_id as direction_id " + " , static_route_patterns.representative_trip_id as representative_trip_id " + " , static_trips.trunk_route_id as trunk_route_id " + " , coalesce(static_trips.branch_route_id, static_trips.trunk_route_id) as route_id " + " , static_route_patterns.static_version_key as static_version_key " + " FROM " + " static_route_patterns " + " JOIN static_trips on " + " static_route_patterns.representative_trip_id = static_trips.trip_id " + " AND static_route_patterns.static_version_key = static_trips.static_version_key " + " JOIN static_routes on " + " static_routes.route_id = static_trips.route_id " + " AND static_routes.static_version_key = static_trips.static_version_key " + " WHERE " + " static_routes.route_type < 2 " + " AND (static_route_patterns.route_pattern_typicality = 1 " + " or static_route_patterns.route_pattern_typicality = 5) " + " order by " + " coalesce(static_trips.branch_route_id, static_trips.trunk_route_id), " + " static_route_patterns.direction_id, " + " static_route_patterns.static_version_key, " + " static_route_patterns.route_pattern_typicality desc) as canon_trips " + " JOIN static_stop_times on " + " canon_trips.representative_trip_id = static_stop_times.trip_id " + " AND canon_trips.static_version_key = static_stop_times.static_version_key " + " JOIN static_stops on " + " static_stop_times.stop_id = static_stops.stop_id " + " AND static_stop_times.static_version_key = static_stops.static_version_key ) as drop_station " + " WHERE " + " drop_station.stop_sequence = 1 " + " ) drop_join" + " ON " + " drop_join.direction_id = vt.direction_id " + " AND drop_join.route_id = vt.route_id " + " AND drop_join.parent_station = ve.parent_station " + " AND drop_join.static_version_key = vt.static_version_key " + " LEFT JOIN " " vehicle_events prev_ve" " ON " " ve.pm_event_id = prev_ve.next_trip_stop_pm_event_id" @@ -97,17 +154,14 @@ def __init__(self) -> None: " AND vt.static_version_key = sr.static_version_key" " WHERE " " sr.route_type < 2" - " AND (" - " ve.canonical_stop_sequence > 1" - " OR ve.canonical_stop_sequence IS NULL" - " )" + " AND drop_join.drop_flag IS NULL " " AND (" " ve.vp_stop_timestamp IS NOT null" " OR ve.vp_move_timestamp IS NOT null" " )" " %s" " ORDER BY " - " ve.service_date, vt.route_id, vt.direction_id, vt.vehicle_id, vt.start_time" + " ve.service_date, vt.vehicle_id, vt.start_time" ";" ) # based on testing, batch_size of 1024 * 256 should result in a maximum From 8a23fa9448af8b351c38c804bb8f63c109e6606e Mon Sep 17 00:00:00 2001 From: Ryan Rymarczyk Date: Tue, 16 Jul 2024 06:26:08 -0400 Subject: [PATCH 2/2] vt where faster --- src/lamp_py/tableau/jobs/rt_rail.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lamp_py/tableau/jobs/rt_rail.py b/src/lamp_py/tableau/jobs/rt_rail.py index 23ad60a4..8c8f4e5e 100644 --- a/src/lamp_py/tableau/jobs/rt_rail.py +++ b/src/lamp_py/tableau/jobs/rt_rail.py @@ -255,7 +255,7 @@ def update_parquet(self, db_manager: DatabaseManager) -> bool: max_start_date -= datetime.timedelta(days=1) update_query = self.table_query % ( - f" AND vt.service_date >= {max_start_date.strftime('%Y%m%d')} ", + f" AND ve.service_date >= {max_start_date.strftime('%Y%m%d')} ", ) db_manager.write_to_parquet(