diff --git a/python_src/src/lamp_py/migrations/versions/performance_manager/018_a4870e30ec68_remove_hash_columns.py b/python_src/src/lamp_py/migrations/versions/performance_manager/018_a4870e30ec68_remove_hash_columns.py index d6812ebb..173527db 100644 --- a/python_src/src/lamp_py/migrations/versions/performance_manager/018_a4870e30ec68_remove_hash_columns.py +++ b/python_src/src/lamp_py/migrations/versions/performance_manager/018_a4870e30ec68_remove_hash_columns.py @@ -177,8 +177,7 @@ def upgrade() -> None: CREATE OR REPLACE VIEW opmi_all_rt_fields_joined AS SELECT vt.service_date - , ve.pm_trip_id as trip_hash - , NULL as trip_stop_hash + , ve.pm_trip_id , ve.stop_sequence , ve.stop_id , LAG (ve.stop_id, 1) OVER (PARTITION BY ve.pm_trip_id ORDER BY COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp, ve.vp_move_timestamp)) as previous_stop_id diff --git a/python_src/src/lamp_py/performance_manager/gtfs_utils.py b/python_src/src/lamp_py/performance_manager/gtfs_utils.py index 8bce9a99..2d2f9ef3 100644 --- a/python_src/src/lamp_py/performance_manager/gtfs_utils.py +++ b/python_src/src/lamp_py/performance_manager/gtfs_utils.py @@ -25,7 +25,7 @@ def start_time_to_seconds( return int(hour) * 3600 + int(minute) * 60 + int(second) -def get_unique_trip_stop_columns() -> List[str]: +def unique_trip_stop_columns() -> List[str]: """ columns used to determine if a event is a unique trip stop """ diff --git a/python_src/src/lamp_py/performance_manager/l0_gtfs_rt_events.py b/python_src/src/lamp_py/performance_manager/l0_gtfs_rt_events.py index 4800fd0f..c6b3fcd5 100644 --- a/python_src/src/lamp_py/performance_manager/l0_gtfs_rt_events.py +++ b/python_src/src/lamp_py/performance_manager/l0_gtfs_rt_events.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import Dict, List, Tuple import numpy import pandas @@ -19,11 +19,11 @@ ) from lamp_py.runtime_utils.process_logger import ProcessLogger -from .gtfs_utils import get_unique_trip_stop_columns +from .gtfs_utils import unique_trip_stop_columns from .l0_rt_trip_updates import process_tu_files from .l0_rt_vehicle_positions import process_vp_files from .l1_rt_trips import process_trips, load_new_trip_data -from .l1_rt_metrics import process_metrics +from .l1_rt_metrics import update_metrics_from_temp_events def get_gtfs_rt_paths(db_manager: DatabaseManager) -> List[Dict[str, List]]: @@ -85,9 +85,9 @@ def combine_events( ) process_logger.log_start() - trip_stop_columns = get_unique_trip_stop_columns() + trip_stop_columns = unique_trip_stop_columns() - # merge together the trip stop hashes and the timestamps + # merge together the trip_stop_columns and the timestamps events = pandas.merge( vp_events[ trip_stop_columns + ["vp_stop_timestamp", "vp_move_timestamp"] @@ -104,10 +104,18 @@ def combine_events( ) # concat all of the details that went into each trip event, dropping - # duplicates. this is now a dataframe mapping hashes back to the things - # that went into them. - # TODO: review trip_id processing # pylint: disable=fixme - # add more intelligent trip_id processing, this approach will randomly select trip_id record to keep + # duplicates. + # DRAGONS + # selection of unique `details_columns` records could have non-deterministic behavior + # + # many vehicle position and trip update events have to be aggregated together + # for each unique trip-stop event record. `details_columns` columns that are + # not a part of `trip_stop_columns` are semi-randomly dropped, with the first + # column being kept based on a sort-order. + # + # currently, the sort-order only takes into account NA values in a few + # columns to priortize the selection of records from vehicle positions over + # trip updates. beyond that, records are essentially dropped at randmon details_columns = [ "service_date", "start_time", @@ -128,7 +136,8 @@ def combine_events( ) # create sort column to indicate which records have null values for select columns - # we want to drop these null value records whenever possible to prioritize details from vehicle_positions + # we want to drop these null value records whenever possible + # to prioritize records from vehicle_positions event_details["na_sort"] = ( event_details[ ["trip_id", "stop_sequence", "vehicle_label", "vehicle_consist"] @@ -149,7 +158,7 @@ def combine_events( .drop(columns="na_sort") ) - # pull the details and add them to the events table + # join `details_columns` to df with timestamps events = events.merge( event_details, how="left", on=trip_stop_columns, validate="one_to_one" ) @@ -160,30 +169,7 @@ def combine_events( return events -def do_trips_insert_update(db_manager: DatabaseManager) -> None: - """ - perform INSERT/UPDATE operations related to event trips - """ - # INSERT/UPDATE vehicle_trips table from temp_event_compare - # makes sure that every event has a pm_trip_id for insertion into vehicle_events table - load_new_trip_data(db_manager=db_manager) - - # load pm_trip_id values into temp_event_compare - update_temp_trip_id = ( - sa.update(TempEventCompare.__table__) - .values(pm_trip_id=VehicleTrips.pm_trip_id) - .where( - TempEventCompare.service_date == VehicleTrips.service_date, - TempEventCompare.route_id == VehicleTrips.route_id, - TempEventCompare.direction_id == VehicleTrips.direction_id, - TempEventCompare.start_time == VehicleTrips.start_time, - TempEventCompare.vehicle_id == VehicleTrips.vehicle_id, - ) - ) - db_manager.execute(update_temp_trip_id) - - -def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]: +def flag_insert_update_events(db_manager: DatabaseManager) -> Tuple[int, int]: """ update do_update and do_insert flag columns in temp_event_compare table @@ -193,7 +179,7 @@ def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]: @db_manager DatabaseManager object for database interaction - @return List[int, int] - do_update count, do_insert count + @return Tuple[int, int] - do_update count, do_insert count """ # populate do_update column of temp_event_compare @@ -233,7 +219,9 @@ def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]: update_count_query = sa.select( sa.func.count(TempEventCompare.do_update) ).where(TempEventCompare.do_update == sa.true()) - update_count = db_manager.select_as_list(update_count_query)[0]["count"] + update_count = int( + db_manager.select_as_list(update_count_query)[0]["count"] + ) # populate do_insert column of temp_event_compare do_insert_pre_select = ( @@ -258,7 +246,9 @@ def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]: insert_count_query = sa.select( sa.func.count(TempEventCompare.do_insert) ).where(TempEventCompare.do_insert == sa.true()) - insert_count = db_manager.select_as_list(insert_count_query)[0]["count"] + insert_count = int( + db_manager.select_as_list(insert_count_query)[0]["count"] + ) # remove records from temp_event_compare that are not related to updates or inserts delete_temp = sa.delete(TempEventCompare.__table__).where( @@ -267,10 +257,10 @@ def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]: ) db_manager.execute(delete_temp) - return [update_count, insert_count] + return (update_count, insert_count) -def upload_to_database( +def build_temp_events( events: pandas.DataFrame, db_manager: DatabaseManager ) -> pandas.DataFrame: """ @@ -282,7 +272,7 @@ def upload_to_database( events where appropriate and insert the new ones. """ process_logger = ProcessLogger( - "gtfs_rt.insert_update_events", + "gtfs_rt.build_temp_events", event_count=events.shape[0], ) process_logger.log_start() @@ -299,12 +289,45 @@ def upload_to_database( events, ) - # perform RDS INSERT/UPDATE related to trips records - do_trips_insert_update(db_manager) + # make sure vehicle_trips has trips for all events in temp_event_compare + load_new_trip_data(db_manager=db_manager) + + # load pm_trip_id values into temp_event_compare from vehicle_trips + update_temp_trip_id = ( + sa.update(TempEventCompare.__table__) + .values(pm_trip_id=VehicleTrips.pm_trip_id) + .where( + TempEventCompare.service_date == VehicleTrips.service_date, + TempEventCompare.route_id == VehicleTrips.route_id, + TempEventCompare.direction_id == VehicleTrips.direction_id, + TempEventCompare.start_time == VehicleTrips.start_time, + TempEventCompare.vehicle_id == VehicleTrips.vehicle_id, + ) + ) + db_manager.execute(update_temp_trip_id) # populate do_insert and do_update columns of temp_event_compare # get counts of records belonging to each flag update_count, insert_count = flag_insert_update_events(db_manager) + process_logger.add_metadata( + db_update_rowcount=update_count, + db_insert_rowcount=insert_count, + ) + + process_logger.log_complete() + + return update_count + insert_count + + +def update_events_from_temp(db_manager: DatabaseManager) -> None: + """ " + Insert and Update vehicle_events table from records in temp_event_compare + + """ + process_logger = ProcessLogger( + "gtfs_rt.insert_update_events", + ) + process_logger.log_start() # INSERT/UPDATE vehicle_events records with vp_move_timestamp # this will insert new event records into vehicle_events if they DO NOT exist @@ -377,6 +400,12 @@ def upload_to_database( db_manager.execute(update_vp_stop_events) # update vehicle_events records with tu_stop_timestamp + # + # vehicle_events > temp_event_compare comparison is not used for trip updates + # we don't just want the earliest timestamp value for trip updates, we want the + # last value before the trip arrives at the parent_station + # so, we assume trip updates are processed chronologically and always + # insert the last trip update timestamp value that is recieved update_tu_stop_events = ( sa.update(VehicleEvents.__table__) .values( @@ -393,8 +422,6 @@ def upload_to_database( process_logger.log_complete() - return update_count + insert_count - def process_gtfs_rt_files(db_manager: DatabaseManager) -> None: """ @@ -454,13 +481,14 @@ def process_gtfs_rt_files(db_manager: DatabaseManager) -> None: # continue events processing if records exist if events.shape[0] > 0: - change_count = upload_to_database(events, db_manager) - # add new trips to VehicleTrips table + change_count = build_temp_events(events, db_manager) + if change_count > 0: + update_events_from_temp(db_manager) + # update trips data in vehicle_trips table process_trips(db_manager) - # add new metrics to VehicleMetrics table - if change_count > 0: - process_metrics(db_manager) + # update event metrics columns + update_metrics_from_temp_events(db_manager) db_manager.execute( sa.update(MetadataLog.__table__) diff --git a/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py b/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py index 83d9e094..266aa081 100644 --- a/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py +++ b/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py @@ -11,7 +11,7 @@ add_static_version_key_column, add_parent_station_column, remove_bus_records, - get_unique_trip_stop_columns, + unique_trip_stop_columns, ) @@ -180,7 +180,7 @@ def reduce_trip_updates(trip_updates: pandas.DataFrame) -> pandas.DataFrame: ) process_logger.log_start() - trip_stop_columns = get_unique_trip_stop_columns() + trip_stop_columns = unique_trip_stop_columns() # sort all trip updates by reverse timestamp, then drop all of the updates # for the same trip and same station but the first one. the first update will diff --git a/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py b/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py index 2687a903..3e2d43cd 100644 --- a/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py +++ b/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py @@ -11,7 +11,7 @@ add_static_version_key_column, add_parent_station_column, remove_bus_records, - get_unique_trip_stop_columns, + unique_trip_stop_columns, ) @@ -118,8 +118,8 @@ def transform_vp_timestamps( convert raw vp data into a timestamped event data for each stop on a trip. this method will add - * "vp_move_timestamp" - when the vehicle begins moving towards the hashed stop - * "vp_stop_timestamp" - when the vehicle arrives at the hashed stop + * "vp_move_timestamp" - when the vehicle begins moving towards the event parent_staion + * "vp_stop_timestamp" - when the vehicle arrives at the event parent_station this method will remove "is_moving" and "vehicle_timestamp" """ @@ -128,7 +128,7 @@ def transform_vp_timestamps( ) process_logger.log_start() - trip_stop_columns = get_unique_trip_stop_columns() + trip_stop_columns = unique_trip_stop_columns() # TODO: review trip_id processing # pylint: disable=fixme # add more intelligent trip_id processing, this approach will randomly select trip_id record to keep diff --git a/python_src/src/lamp_py/performance_manager/l1_cte_statements.py b/python_src/src/lamp_py/performance_manager/l1_cte_statements.py index b74c3e04..b530002f 100644 --- a/python_src/src/lamp_py/performance_manager/l1_cte_statements.py +++ b/python_src/src/lamp_py/performance_manager/l1_cte_statements.py @@ -10,13 +10,12 @@ ) -def get_static_trips_cte( +def static_trips_subquery( static_version_key: int, service_date: int ) -> sa.sql.selectable.Subquery: """ - return CTE named "static_trip_cte" representing all static trips on given service date - - a "set" of static trips will be returned for every "static_version_key" key value. + return Selectable representing all static trips on + given service_date and static_version_key value combo created fields to be returned: - static_trip_first_stop (bool indicating first stop of trip) @@ -111,13 +110,13 @@ def get_static_trips_cte( StaticRoutes.route_type != 3, ServiceIdDates.service_date == int(service_date), ) - .subquery(name="static_trips_cte") + .subquery(name="static_trips_sub") ) -def get_rt_trips_cte(service_date: int) -> sa.sql.selectable.Subquery: +def rt_trips_subquery(service_date: int) -> sa.sql.selectable.Subquery: """ - return CTE named "rt_trips_cte" representing all RT trips on a given service date + return Selectable representing all RT trips on a given service date created fields to be returned: - rt_trip_first_stop_flag (bool indicating first stop of trip by trip_hash) @@ -179,99 +178,97 @@ def get_rt_trips_cte(service_date: int) -> sa.sql.selectable.Subquery: VehicleEvents.vp_stop_timestamp.is_not(None), ), ) - ).subquery(name="rt_trips_cte") + ).subquery(name="rt_trips_sub") -def get_trips_for_metrics( +def trips_for_metrics_subquery( static_version_key: int, service_date: int ) -> sa.sql.selectable.Subquery: """ - return CTE named "trips_for_metrics" with fields needed to develop metrics tables - - will return one record for every trip_stop_hash on 'service_date' + return Selectable named "trips_for_metrics" with fields needed to develop metrics tables - joins rt_trips_cte to VehicleTrips on trip_hash field + will return one record for every unique trip-stop on 'service_date' - then joins static_trips_cte on static_trip_id_guess, timestamp, parent_station and static_stop_rank, + joins rt_trips_sub to static_trips_sub on static_trip_id_guess, static_version_key, parent_station and static_stop_rank, the join with static_stop_rank is required for routes that may visit the same parent station more than once on the same route, I think this only occurs on bus routes, so we may be able to drop this for performance_manager """ - static_trips_cte = get_static_trips_cte(static_version_key, service_date) - rt_trips_cte = get_rt_trips_cte(service_date) + static_trips_sub = static_trips_subquery(static_version_key, service_date) + rt_trips_sub = rt_trips_subquery(service_date) return ( sa.select( - rt_trips_cte.c.static_version_key, - rt_trips_cte.c.pm_trip_id, - rt_trips_cte.c.service_date, - rt_trips_cte.c.direction_id, - rt_trips_cte.c.route_id, - rt_trips_cte.c.branch_route_id, - rt_trips_cte.c.trunk_route_id, - rt_trips_cte.c.stop_count, - rt_trips_cte.c.start_time, - rt_trips_cte.c.vehicle_id, - rt_trips_cte.c.parent_station, - rt_trips_cte.c.vp_move_timestamp.label("move_timestamp"), + rt_trips_sub.c.static_version_key, + rt_trips_sub.c.pm_trip_id, + rt_trips_sub.c.service_date, + rt_trips_sub.c.direction_id, + rt_trips_sub.c.route_id, + rt_trips_sub.c.branch_route_id, + rt_trips_sub.c.trunk_route_id, + rt_trips_sub.c.stop_count, + rt_trips_sub.c.start_time, + rt_trips_sub.c.vehicle_id, + rt_trips_sub.c.parent_station, + rt_trips_sub.c.vp_move_timestamp.label("move_timestamp"), sa.func.coalesce( - rt_trips_cte.c.vp_stop_timestamp, - rt_trips_cte.c.tu_stop_timestamp, + rt_trips_sub.c.vp_stop_timestamp, + rt_trips_sub.c.tu_stop_timestamp, ).label("stop_timestamp"), sa.func.coalesce( - rt_trips_cte.c.vp_move_timestamp, - rt_trips_cte.c.vp_stop_timestamp, - rt_trips_cte.c.tu_stop_timestamp, + rt_trips_sub.c.vp_move_timestamp, + rt_trips_sub.c.vp_stop_timestamp, + rt_trips_sub.c.tu_stop_timestamp, ).label("sort_timestamp"), sa.func.coalesce( - static_trips_cte.c.static_trip_first_stop, - rt_trips_cte.c.rt_trip_first_stop_flag, + static_trips_sub.c.static_trip_first_stop, + rt_trips_sub.c.rt_trip_first_stop_flag, ).label("first_stop_flag"), sa.func.coalesce( - static_trips_cte.c.static_trip_last_stop, - rt_trips_cte.c.rt_trip_last_stop_flag, + static_trips_sub.c.static_trip_last_stop, + rt_trips_sub.c.rt_trip_last_stop_flag, ).label("last_stop_flag"), sa.func.coalesce( - static_trips_cte.c.static_trip_stop_rank, - rt_trips_cte.c.rt_trip_stop_rank, + static_trips_sub.c.static_trip_stop_rank, + rt_trips_sub.c.rt_trip_stop_rank, ).label("stop_rank"), - sa.func.lead(rt_trips_cte.c.vp_move_timestamp) + sa.func.lead(rt_trips_sub.c.vp_move_timestamp) .over( - partition_by=rt_trips_cte.c.vehicle_id, + partition_by=rt_trips_sub.c.vehicle_id, order_by=sa.func.coalesce( - rt_trips_cte.c.vp_move_timestamp, - rt_trips_cte.c.vp_stop_timestamp, - rt_trips_cte.c.tu_stop_timestamp, + rt_trips_sub.c.vp_move_timestamp, + rt_trips_sub.c.vp_stop_timestamp, + rt_trips_sub.c.tu_stop_timestamp, ), ) .label("next_station_move"), ) .distinct( - rt_trips_cte.c.service_date, - rt_trips_cte.c.pm_trip_id, - rt_trips_cte.c.parent_station, + rt_trips_sub.c.service_date, + rt_trips_sub.c.pm_trip_id, + rt_trips_sub.c.parent_station, ) - .select_from(rt_trips_cte) + .select_from(rt_trips_sub) .join( - static_trips_cte, + static_trips_sub, sa.and_( - rt_trips_cte.c.static_trip_id_guess - == static_trips_cte.c.static_trip_id, - rt_trips_cte.c.static_version_key - == static_trips_cte.c.static_version_key, - rt_trips_cte.c.parent_station - == static_trips_cte.c.parent_station, - rt_trips_cte.c.rt_trip_stop_rank - >= static_trips_cte.c.static_trip_stop_rank, + rt_trips_sub.c.static_trip_id_guess + == static_trips_sub.c.static_trip_id, + rt_trips_sub.c.static_version_key + == static_trips_sub.c.static_version_key, + rt_trips_sub.c.parent_station + == static_trips_sub.c.parent_station, + rt_trips_sub.c.rt_trip_stop_rank + >= static_trips_sub.c.static_trip_stop_rank, ), isouter=True, ) .order_by( - rt_trips_cte.c.service_date, - rt_trips_cte.c.pm_trip_id, - rt_trips_cte.c.parent_station, - static_trips_cte.c.static_trip_stop_rank, + rt_trips_sub.c.service_date, + rt_trips_sub.c.pm_trip_id, + rt_trips_sub.c.parent_station, + static_trips_sub.c.static_trip_stop_rank, ) ).subquery(name="trip_for_metrics") diff --git a/python_src/src/lamp_py/performance_manager/l1_rt_metrics.py b/python_src/src/lamp_py/performance_manager/l1_rt_metrics.py index 6c1a2d4d..d26dfc96 100644 --- a/python_src/src/lamp_py/performance_manager/l1_rt_metrics.py +++ b/python_src/src/lamp_py/performance_manager/l1_rt_metrics.py @@ -6,25 +6,25 @@ TempEventCompare, ) from lamp_py.runtime_utils.process_logger import ProcessLogger -from .l1_cte_statements import get_trips_for_metrics +from .l1_cte_statements import trips_for_metrics_subquery # pylint: disable=R0914 # pylint too many local variables (more than 15) -def process_metrics_table( +def update_metrics_columns( db_manager: DatabaseManager, seed_service_date: int, static_version_key: int, ) -> None: """ - process updates to metrics table + update metrics columns in vehicle_events table for seed_service_date, static_version_key combination """ process_logger = ProcessLogger("l1_rt_metrics_table_loader") process_logger.log_start() - trips_for_metrics = get_trips_for_metrics( + trips_for_metrics = trips_for_metrics_subquery( static_version_key, seed_service_date ) @@ -239,9 +239,10 @@ def process_metrics_table( # pylint: enable=R0914 -def process_metrics(db_manager: DatabaseManager) -> None: +def update_metrics_from_temp_events(db_manager: DatabaseManager) -> None: """ - insert and update metrics table + update daily metrics values for service_date, static_version_key combos in + temp_event_compare table """ service_date_query = sa.select( TempEventCompare.service_date, @@ -251,7 +252,7 @@ def process_metrics(db_manager: DatabaseManager) -> None: for result in db_manager.select_as_list(service_date_query): service_date = int(result["service_date"]) static_version_key = int(result["static_version_key"]) - process_metrics_table( + update_metrics_columns( db_manager=db_manager, seed_service_date=service_date, static_version_key=static_version_key, diff --git a/python_src/src/lamp_py/performance_manager/l1_rt_trips.py b/python_src/src/lamp_py/performance_manager/l1_rt_trips.py index 984580c7..76ada1d5 100644 --- a/python_src/src/lamp_py/performance_manager/l1_rt_trips.py +++ b/python_src/src/lamp_py/performance_manager/l1_rt_trips.py @@ -11,7 +11,7 @@ ) from lamp_py.runtime_utils.process_logger import ProcessLogger from .l1_cte_statements import ( - get_static_trips_cte, + static_trips_subquery, ) @@ -365,7 +365,7 @@ def backup_rt_static_trip_match( this matches an RT trip to a static trip with the same branch_route_id or trunk_route_id if branch is null and direction with the closest start_time """ - static_trips_sub = get_static_trips_cte( + static_trips_sub = static_trips_subquery( static_version_key, seed_service_date ) diff --git a/python_src/tests/performance_manager/test_performance_manager.py b/python_src/tests/performance_manager/test_performance_manager.py index d800a669..3f090509 100644 --- a/python_src/tests/performance_manager/test_performance_manager.py +++ b/python_src/tests/performance_manager/test_performance_manager.py @@ -16,7 +16,8 @@ combine_events, get_gtfs_rt_paths, process_gtfs_rt_files, - upload_to_database, + build_temp_events, + update_events_from_temp, ) from lamp_py.performance_manager.l0_rt_trip_updates import ( get_and_unwrap_tu_dataframe, @@ -382,7 +383,8 @@ def test_gtfs_rt_processing( missing_columns = set(events.columns) - expected_columns assert len(missing_columns) == 0 - upload_to_database(events, db_manager) + build_temp_events(events, db_manager) + update_events_from_temp(db_manager) check_logs(caplog)