Skip to content

Commit

Permalink
incorporate PR Comments 1
Browse files Browse the repository at this point in the history
  • Loading branch information
rymarczy committed Jul 17, 2023
1 parent 3a34af2 commit b941cd5
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ def upgrade() -> None:
CREATE OR REPLACE VIEW opmi_all_rt_fields_joined AS
SELECT
vt.service_date
, ve.pm_trip_id as trip_hash
, NULL as trip_stop_hash
, ve.pm_trip_id
, ve.stop_sequence
, ve.stop_id
, LAG (ve.stop_id, 1) OVER (PARTITION BY ve.pm_trip_id ORDER BY COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp, ve.vp_move_timestamp)) as previous_stop_id
Expand Down
2 changes: 1 addition & 1 deletion python_src/src/lamp_py/performance_manager/gtfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def start_time_to_seconds(
return int(hour) * 3600 + int(minute) * 60 + int(second)


def get_unique_trip_stop_columns() -> List[str]:
def unique_trip_stop_columns() -> List[str]:
"""
columns used to determine if a event is a unique trip stop
"""
Expand Down
128 changes: 78 additions & 50 deletions python_src/src/lamp_py/performance_manager/l0_gtfs_rt_events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Dict, List, Tuple

import numpy
import pandas
Expand All @@ -19,11 +19,11 @@
)
from lamp_py.runtime_utils.process_logger import ProcessLogger

from .gtfs_utils import get_unique_trip_stop_columns
from .gtfs_utils import unique_trip_stop_columns
from .l0_rt_trip_updates import process_tu_files
from .l0_rt_vehicle_positions import process_vp_files
from .l1_rt_trips import process_trips, load_new_trip_data
from .l1_rt_metrics import process_metrics
from .l1_rt_metrics import update_metrics_from_temp_events


def get_gtfs_rt_paths(db_manager: DatabaseManager) -> List[Dict[str, List]]:
Expand Down Expand Up @@ -85,9 +85,9 @@ def combine_events(
)
process_logger.log_start()

trip_stop_columns = get_unique_trip_stop_columns()
trip_stop_columns = unique_trip_stop_columns()

# merge together the trip stop hashes and the timestamps
# merge together the trip_stop_columns and the timestamps
events = pandas.merge(
vp_events[
trip_stop_columns + ["vp_stop_timestamp", "vp_move_timestamp"]
Expand All @@ -104,10 +104,18 @@ def combine_events(
)

# concat all of the details that went into each trip event, dropping
# duplicates. this is now a dataframe mapping hashes back to the things
# that went into them.
# TODO: review trip_id processing # pylint: disable=fixme
# add more intelligent trip_id processing, this approach will randomly select trip_id record to keep
# duplicates.
# DRAGONS
# selection of unique `details_columns` records could have non-deterministic behavior
#
# many vehicle position and trip update events have to be aggregated together
# for each unique trip-stop event record. `details_columns` columns that are
# not a part of `trip_stop_columns` are semi-randomly dropped, with the first
# column being kept based on a sort-order.
#
# currently, the sort-order only takes into account NA values in a few
# columns to priortize the selection of records from vehicle positions over
# trip updates. beyond that, records are essentially dropped at randmon
details_columns = [
"service_date",
"start_time",
Expand All @@ -128,7 +136,8 @@ def combine_events(
)

# create sort column to indicate which records have null values for select columns
# we want to drop these null value records whenever possible to prioritize details from vehicle_positions
# we want to drop these null value records whenever possible
# to prioritize records from vehicle_positions
event_details["na_sort"] = (
event_details[
["trip_id", "stop_sequence", "vehicle_label", "vehicle_consist"]
Expand All @@ -149,7 +158,7 @@ def combine_events(
.drop(columns="na_sort")
)

# pull the details and add them to the events table
# join `details_columns` to df with timestamps
events = events.merge(
event_details, how="left", on=trip_stop_columns, validate="one_to_one"
)
Expand All @@ -160,30 +169,7 @@ def combine_events(
return events


def do_trips_insert_update(db_manager: DatabaseManager) -> None:
"""
perform INSERT/UPDATE operations related to event trips
"""
# INSERT/UPDATE vehicle_trips table from temp_event_compare
# makes sure that every event has a pm_trip_id for insertion into vehicle_events table
load_new_trip_data(db_manager=db_manager)

# load pm_trip_id values into temp_event_compare
update_temp_trip_id = (
sa.update(TempEventCompare.__table__)
.values(pm_trip_id=VehicleTrips.pm_trip_id)
.where(
TempEventCompare.service_date == VehicleTrips.service_date,
TempEventCompare.route_id == VehicleTrips.route_id,
TempEventCompare.direction_id == VehicleTrips.direction_id,
TempEventCompare.start_time == VehicleTrips.start_time,
TempEventCompare.vehicle_id == VehicleTrips.vehicle_id,
)
)
db_manager.execute(update_temp_trip_id)


def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]:
def flag_insert_update_events(db_manager: DatabaseManager) -> Tuple[int, int]:
"""
update do_update and do_insert flag columns in temp_event_compare table
Expand All @@ -193,7 +179,7 @@ def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]:
@db_manager DatabaseManager object for database interaction
@return List[int, int] - do_update count, do_insert count
@return Tuple[int, int] - do_update count, do_insert count
"""

# populate do_update column of temp_event_compare
Expand Down Expand Up @@ -233,7 +219,9 @@ def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]:
update_count_query = sa.select(
sa.func.count(TempEventCompare.do_update)
).where(TempEventCompare.do_update == sa.true())
update_count = db_manager.select_as_list(update_count_query)[0]["count"]
update_count = int(
db_manager.select_as_list(update_count_query)[0]["count"]
)

# populate do_insert column of temp_event_compare
do_insert_pre_select = (
Expand All @@ -258,7 +246,9 @@ def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]:
insert_count_query = sa.select(
sa.func.count(TempEventCompare.do_insert)
).where(TempEventCompare.do_insert == sa.true())
insert_count = db_manager.select_as_list(insert_count_query)[0]["count"]
insert_count = int(
db_manager.select_as_list(insert_count_query)[0]["count"]
)

# remove records from temp_event_compare that are not related to updates or inserts
delete_temp = sa.delete(TempEventCompare.__table__).where(
Expand All @@ -267,10 +257,10 @@ def flag_insert_update_events(db_manager: DatabaseManager) -> List[int]:
)
db_manager.execute(delete_temp)

return [update_count, insert_count]
return (update_count, insert_count)


def upload_to_database(
def build_temp_events(
events: pandas.DataFrame, db_manager: DatabaseManager
) -> pandas.DataFrame:
"""
Expand All @@ -282,7 +272,7 @@ def upload_to_database(
events where appropriate and insert the new ones.
"""
process_logger = ProcessLogger(
"gtfs_rt.insert_update_events",
"gtfs_rt.build_temp_events",
event_count=events.shape[0],
)
process_logger.log_start()
Expand All @@ -299,12 +289,45 @@ def upload_to_database(
events,
)

# perform RDS INSERT/UPDATE related to trips records
do_trips_insert_update(db_manager)
# make sure vehicle_trips has trips for all events in temp_event_compare
load_new_trip_data(db_manager=db_manager)

# load pm_trip_id values into temp_event_compare from vehicle_trips
update_temp_trip_id = (
sa.update(TempEventCompare.__table__)
.values(pm_trip_id=VehicleTrips.pm_trip_id)
.where(
TempEventCompare.service_date == VehicleTrips.service_date,
TempEventCompare.route_id == VehicleTrips.route_id,
TempEventCompare.direction_id == VehicleTrips.direction_id,
TempEventCompare.start_time == VehicleTrips.start_time,
TempEventCompare.vehicle_id == VehicleTrips.vehicle_id,
)
)
db_manager.execute(update_temp_trip_id)

# populate do_insert and do_update columns of temp_event_compare
# get counts of records belonging to each flag
update_count, insert_count = flag_insert_update_events(db_manager)
process_logger.add_metadata(
db_update_rowcount=update_count,
db_insert_rowcount=insert_count,
)

process_logger.log_complete()

return update_count + insert_count


def update_events_from_temp(db_manager: DatabaseManager) -> None:
""" "
Insert and Update vehicle_events table from records in temp_event_compare
"""
process_logger = ProcessLogger(
"gtfs_rt.insert_update_events",
)
process_logger.log_start()

# INSERT/UPDATE vehicle_events records with vp_move_timestamp
# this will insert new event records into vehicle_events if they DO NOT exist
Expand Down Expand Up @@ -377,6 +400,12 @@ def upload_to_database(
db_manager.execute(update_vp_stop_events)

# update vehicle_events records with tu_stop_timestamp
#
# vehicle_events > temp_event_compare comparison is not used for trip updates
# we don't just want the earliest timestamp value for trip updates, we want the
# last value before the trip arrives at the parent_station
# so, we assume trip updates are processed chronologically and always
# insert the last trip update timestamp value that is recieved
update_tu_stop_events = (
sa.update(VehicleEvents.__table__)
.values(
Expand All @@ -393,8 +422,6 @@ def upload_to_database(

process_logger.log_complete()

return update_count + insert_count


def process_gtfs_rt_files(db_manager: DatabaseManager) -> None:
"""
Expand Down Expand Up @@ -454,13 +481,14 @@ def process_gtfs_rt_files(db_manager: DatabaseManager) -> None:

# continue events processing if records exist
if events.shape[0] > 0:
change_count = upload_to_database(events, db_manager)
# add new trips to VehicleTrips table
change_count = build_temp_events(events, db_manager)

if change_count > 0:
update_events_from_temp(db_manager)
# update trips data in vehicle_trips table
process_trips(db_manager)
# add new metrics to VehicleMetrics table
if change_count > 0:
process_metrics(db_manager)
# update event metrics columns
update_metrics_from_temp_events(db_manager)

db_manager.execute(
sa.update(MetadataLog.__table__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
add_static_version_key_column,
add_parent_station_column,
remove_bus_records,
get_unique_trip_stop_columns,
unique_trip_stop_columns,
)


Expand Down Expand Up @@ -180,7 +180,7 @@ def reduce_trip_updates(trip_updates: pandas.DataFrame) -> pandas.DataFrame:
)
process_logger.log_start()

trip_stop_columns = get_unique_trip_stop_columns()
trip_stop_columns = unique_trip_stop_columns()

# sort all trip updates by reverse timestamp, then drop all of the updates
# for the same trip and same station but the first one. the first update will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
add_static_version_key_column,
add_parent_station_column,
remove_bus_records,
get_unique_trip_stop_columns,
unique_trip_stop_columns,
)


Expand Down Expand Up @@ -118,8 +118,8 @@ def transform_vp_timestamps(
convert raw vp data into a timestamped event data for each stop on a trip.
this method will add
* "vp_move_timestamp" - when the vehicle begins moving towards the hashed stop
* "vp_stop_timestamp" - when the vehicle arrives at the hashed stop
* "vp_move_timestamp" - when the vehicle begins moving towards the event parent_staion
* "vp_stop_timestamp" - when the vehicle arrives at the event parent_station
this method will remove "is_moving" and "vehicle_timestamp"
"""
Expand All @@ -128,7 +128,7 @@ def transform_vp_timestamps(
)
process_logger.log_start()

trip_stop_columns = get_unique_trip_stop_columns()
trip_stop_columns = unique_trip_stop_columns()
# TODO: review trip_id processing # pylint: disable=fixme
# add more intelligent trip_id processing, this approach will randomly select trip_id record to keep

Expand Down
Loading

0 comments on commit b941cd5

Please sign in to comment.