From f389579e6d8f7bac0a7a0e07ce843d6fd5e7f7fb Mon Sep 17 00:00:00 2001 From: Mike Zappitello Date: Wed, 12 Jul 2023 17:08:43 -0400 Subject: [PATCH] Changes from PR152 squash before merge * change s3 utility for parsing timestamps to return a datetime instead of a timestamp * get routes to prefilter from filepaths at readtime rather than when compiling a list of files to process * add gtfs utility for getting a static version key from a service date and reuse it in the new function and in the existing one --- python_src/src/lamp_py/aws/s3.py | 8 +- .../lamp_py/performance_manager/gtfs_utils.py | 144 ++++++++++-------- .../performance_manager/l0_gtfs_rt_events.py | 18 +-- .../performance_manager/l0_rt_trip_updates.py | 12 +- .../l0_rt_vehicle_positions.py | 12 +- .../src/lamp_py/postgres/postgres_utils.py | 4 +- .../test_performance_manager.py | 4 +- 7 files changed, 110 insertions(+), 92 deletions(-) diff --git a/python_src/src/lamp_py/aws/s3.py b/python_src/src/lamp_py/aws/s3.py index e4c620e3..3c7479d0 100644 --- a/python_src/src/lamp_py/aws/s3.py +++ b/python_src/src/lamp_py/aws/s3.py @@ -279,7 +279,7 @@ def write_parquet_file( process_logger.log_complete() -def get_utc_from_partition_path(path: str) -> float: +def get_datetime_from_partition_path(path: str) -> datetime.datetime: """ process datetime from partitioned s3 path return UTC timestamp """ @@ -289,17 +289,17 @@ def get_utc_from_partition_path(path: str) -> float: month = int(re.findall(r"month=(\d{1,2})", path)[0]) day = int(re.findall(r"day=(\d{1,2})", path)[0]) hour = int(re.findall(r"hour=(\d{1,2})", path)[0]) - date = datetime.datetime( + return_date = datetime.datetime( year=year, month=month, day=day, hour=hour, tzinfo=datetime.timezone.utc, ) - return_date = datetime.datetime.timestamp(date) except IndexError as _: # handle gtfs static paths - return_date = float(re.findall(r"timestamp=(\d{10})", path)[0]) + timestamp = float(re.findall(r"timestamp=(\d{10})", path)[0]) + return_date = datetime.datetime.fromtimestamp(timestamp) return return_date diff --git a/python_src/src/lamp_py/performance_manager/gtfs_utils.py b/python_src/src/lamp_py/performance_manager/gtfs_utils.py index a74f4529..0a6a22c6 100644 --- a/python_src/src/lamp_py/performance_manager/gtfs_utils.py +++ b/python_src/src/lamp_py/performance_manager/gtfs_utils.py @@ -1,5 +1,4 @@ -from typing import Optional, List -import datetime +from typing import Optional, List, Union import numpy import pandas @@ -7,12 +6,12 @@ from lamp_py.postgres.postgres_utils import DatabaseManager from lamp_py.postgres.postgres_schema import ( - ServiceIdDates, StaticFeedInfo, StaticRoutes, StaticStops, ) from lamp_py.runtime_utils.process_logger import ProcessLogger +from lamp_py.aws.s3 import get_datetime_from_partition_path def start_time_to_seconds( @@ -41,6 +40,66 @@ def unique_trip_stop_columns() -> List[str]: ] +def static_version_key_from_service_date( + service_date: int, db_manager: DatabaseManager +) -> int: + """ + for a given service date, determine the correct static schedule to use + """ + # the service date must: + # * be between "feed_start_date" and "feed_end_date" in StaticFeedInfo + # * be less than or equal to "feed_active_date" in StaticFeedInfo + # + # order all static version keys by feed_active_date descending and + # created_on date descending, then choose the first tone. this handles + # multiple static schedules being issued for the same service day + live_match_query = ( + sa.select(StaticFeedInfo.static_version_key) + .where( + StaticFeedInfo.feed_start_date <= service_date, + StaticFeedInfo.feed_end_date >= service_date, + StaticFeedInfo.feed_active_date <= service_date, + ) + .order_by( + StaticFeedInfo.feed_active_date.desc(), + StaticFeedInfo.created_on.desc(), + ) + .limit(1) + ) + + # "feed_start_date" and "feed_end_date" are modified for archived GTFS + # Schedule files. If processing archived static schedules, these alternate + # rules must be used for matching GTFS static to GTFS-RT data + archive_match_query = ( + sa.select(StaticFeedInfo.static_version_key) + .where( + StaticFeedInfo.feed_start_date <= service_date, + StaticFeedInfo.feed_end_date >= service_date, + ) + .order_by( + StaticFeedInfo.feed_start_date.desc(), + StaticFeedInfo.created_on.desc(), + ) + .limit(1) + ) + + result = db_manager.select_as_list(live_match_query) + + # if live_match_query fails, attempt to look for a match using the archive method + if len(result) == 0: + result = db_manager.select_as_list(archive_match_query) + + # if this query does not produce a result, no static schedule info + # exists for this trip update data, so the data + # should not be processed until valid static schedule data exists + if len(result) == 0: + raise IndexError( + f"StaticFeedInfo table has no matching schedule for service_date={service_date}" + ) + + return int(result[0]["static_version_key"]) + + def add_static_version_key_column( events_dataframe: pandas.DataFrame, db_manager: DatabaseManager, @@ -70,60 +129,15 @@ def add_static_version_key_column( events_dataframe["static_version_key"] = 0 for date in events_dataframe["service_date"].unique(): - date = int(date) - # "service_date" from events dataframe must be between "feed_start_date" and "feed_end_date" in StaticFeedInfo - # "service_date" must also be less than or equal to "feed_active_date" in StaticFeedInfo - # StaticFeedInfo, order by feed_active_date descending and created_on date descending - # this should deal with multiple static schedules being issued on the same day - # if this occurs we will use the latest issued schedule - live_match_query = ( - sa.select(StaticFeedInfo.static_version_key) - .where( - StaticFeedInfo.feed_start_date <= date, - StaticFeedInfo.feed_end_date >= date, - StaticFeedInfo.feed_active_date <= date, - ) - .order_by( - StaticFeedInfo.feed_active_date.desc(), - StaticFeedInfo.created_on.desc(), - ) - .limit(1) - ) - - # "feed_start_date" and "feed_end_date" are modified for archived GTFS Schedule files - # If processing archived static schedules, these alternate rules must be used for matching - # GTFS static to GTFS-RT data - archive_match_query = ( - sa.select(StaticFeedInfo.static_version_key) - .where( - StaticFeedInfo.feed_start_date <= date, - StaticFeedInfo.feed_end_date >= date, - ) - .order_by( - StaticFeedInfo.feed_start_date.desc(), - StaticFeedInfo.created_on.desc(), - ) - .limit(1) + service_date = int(date) + static_version_key = static_version_key_from_service_date( + service_date=service_date, db_manager=db_manager ) - result = db_manager.select_as_list(live_match_query) - - # if live_match_query fails, attempt to look for a match using the archive method - if len(result) == 0: - result = db_manager.select_as_list(archive_match_query) - - # if this query does not produce a result, no static schedule info - # exists for this trip update data, so the data - # should not be processed until valid static schedule data exists - if len(result) == 0: - raise IndexError( - f"StaticFeedInfo table has no matching schedule for service_date={date}" - ) - - service_date_mask = events_dataframe["service_date"] == date - events_dataframe.loc[service_date_mask, "static_version_key"] = int( - result[0]["static_version_key"] - ) + service_date_mask = events_dataframe["service_date"] == service_date + events_dataframe.loc[ + service_date_mask, "static_version_key" + ] = static_version_key process_logger.log_complete() @@ -185,8 +199,8 @@ def add_parent_station_column( return events_dataframe -def rail_routes_from_timestamp( - timestamp: float, db_manager: DatabaseManager +def rail_routes_from_filepath( + filepath: Union[List[str], str], db_manager: DatabaseManager ) -> List[str]: """ get a list of rail route_ids that were in effect on a given service date @@ -196,18 +210,20 @@ def rail_routes_from_timestamp( key for a given service date, using the key with the max value (the keys are also timestamps). then pull all the static routes with type """ - date = datetime.datetime.utcfromtimestamp(timestamp) - service_date = f"{date.year:04}{date.month:02}{date.day:02}" + if isinstance(filepath, list): + filepath = filepath[0] + + date = get_datetime_from_partition_path(filepath) + service_date = int(f"{date.year:04}{date.month:02}{date.day:02}") - svk_subquery = ( - sa.select(sa.func.max(ServiceIdDates.static_version_key)) - .where(ServiceIdDates.service_date == service_date) - .scalar_subquery() + static_version_key = static_version_key_from_service_date( + service_date=service_date, db_manager=db_manager ) + result = db_manager.execute( sa.select(StaticRoutes.route_id).where( StaticRoutes.route_type.in_([0, 1, 2]), - StaticRoutes.static_version_key == svk_subquery, + StaticRoutes.static_version_key == static_version_key, ) ) diff --git a/python_src/src/lamp_py/performance_manager/l0_gtfs_rt_events.py b/python_src/src/lamp_py/performance_manager/l0_gtfs_rt_events.py index d2869224..b726eb96 100644 --- a/python_src/src/lamp_py/performance_manager/l0_gtfs_rt_events.py +++ b/python_src/src/lamp_py/performance_manager/l0_gtfs_rt_events.py @@ -6,7 +6,7 @@ from sqlalchemy.dialects import postgresql from lamp_py.aws.ecs import check_for_sigterm -from lamp_py.aws.s3 import get_utc_from_partition_path +from lamp_py.aws.s3 import get_datetime_from_partition_path from lamp_py.postgres.postgres_schema import ( MetadataLog, TempEventCompare, @@ -19,7 +19,7 @@ ) from lamp_py.runtime_utils.process_logger import ProcessLogger -from .gtfs_utils import unique_trip_stop_columns, rail_routes_from_timestamp +from .gtfs_utils import unique_trip_stop_columns from .l0_rt_trip_updates import process_tu_files from .l0_rt_vehicle_positions import process_vp_files from .l1_rt_trips import process_trips, load_new_trip_data @@ -38,18 +38,21 @@ def get_gtfs_rt_paths(db_manager: DatabaseManager) -> List[Dict[str, List]]: vp_files = get_unprocessed_files("RT_VEHICLE_POSITIONS", db_manager) for record in vp_files: - timestamp = get_utc_from_partition_path(record["paths"][0]) + timestamp = get_datetime_from_partition_path( + record["paths"][0] + ).timestamp() grouped_files[timestamp] = { "ids": record["ids"], "vp_paths": record["paths"], "tu_paths": [], - "route_ids": rail_routes_from_timestamp(timestamp, db_manager), } tu_files = get_unprocessed_files("RT_TRIP_UPDATES", db_manager) for record in tu_files: - timestamp = get_utc_from_partition_path(record["paths"][0]) + timestamp = get_datetime_from_partition_path( + record["paths"][0] + ).timestamp() if timestamp in grouped_files: grouped_files[timestamp]["ids"] += record["ids"] grouped_files[timestamp]["tu_paths"] += record["paths"] @@ -58,7 +61,6 @@ def get_gtfs_rt_paths(db_manager: DatabaseManager) -> List[Dict[str, List]]: "ids": record["ids"], "tu_paths": record["paths"], "vp_paths": [], - "route_ids": rail_routes_from_timestamp(timestamp, db_manager), } process_logger.add_metadata(hours_found=len(grouped_files)) @@ -472,7 +474,6 @@ def process_gtfs_rt_files(db_manager: DatabaseManager) -> None: # all events come from vp files. add tu key afterwards. events = process_vp_files( paths=files["vp_paths"], - route_ids=files["route_ids"], db_manager=db_manager, ) events["tu_stop_timestamp"] = None @@ -480,7 +481,6 @@ def process_gtfs_rt_files(db_manager: DatabaseManager) -> None: # all events come from tu files. add vp keys afterwards. events = process_tu_files( paths=files["tu_paths"], - route_ids=files["route_ids"], db_manager=db_manager, ) events["vp_move_timestamp"] = None @@ -489,12 +489,10 @@ def process_gtfs_rt_files(db_manager: DatabaseManager) -> None: # events come from tu and vp files. join them together. vp_events = process_vp_files( paths=files["vp_paths"], - route_ids=files["route_ids"], db_manager=db_manager, ) tu_events = process_tu_files( paths=files["tu_paths"], - route_ids=files["route_ids"], db_manager=db_manager, ) events = combine_events(vp_events, tu_events) diff --git a/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py b/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py index 00223aca..5dadf507 100644 --- a/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py +++ b/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py @@ -11,16 +11,19 @@ add_static_version_key_column, add_parent_station_column, unique_trip_stop_columns, + rail_routes_from_filepath, ) def get_tu_dataframe_chunks( - to_load: Union[str, List[str]], route_ids: List[str] + to_load: Union[str, List[str]], db_manager: DatabaseManager ) -> Iterator[pandas.DataFrame]: """ return interator of dataframe chunks from a trip updates parquet file (or list of files) """ + route_ids = rail_routes_from_filepath(to_load, db_manager) + trip_update_columns = [ "timestamp", "stop_time_update", @@ -108,7 +111,7 @@ def explode_stop_time_update( def get_and_unwrap_tu_dataframe( - paths: Union[str, List[str]], route_ids: List[str] + paths: Union[str, List[str]], db_manager: DatabaseManager ) -> pandas.DataFrame: """ unwrap and explode trip updates records from parquet files @@ -124,7 +127,7 @@ def get_and_unwrap_tu_dataframe( # per batch, this should result in ~5-6 GB of memory use per batch # after batch goes through explod_stop_time_update vectorize operation, # resulting Series has negligible memory use - for batch_events in get_tu_dataframe_chunks(paths, route_ids): + for batch_events in get_tu_dataframe_chunks(paths, db_manager): # store start_date as int64 and rename to service_date batch_events.rename( columns={"start_date": "service_date"}, inplace=True @@ -213,7 +216,6 @@ def reduce_trip_updates(trip_updates: pandas.DataFrame) -> pandas.DataFrame: def process_tu_files( paths: Union[str, List[str]], - route_ids: List[str], db_manager: DatabaseManager, ) -> pandas.DataFrame: """ @@ -224,7 +226,7 @@ def process_tu_files( ) process_logger.log_start() - trip_updates = get_and_unwrap_tu_dataframe(paths, route_ids) + trip_updates = get_and_unwrap_tu_dataframe(paths, db_manager) if trip_updates.shape[0] > 0: trip_updates = add_static_version_key_column(trip_updates, db_manager) trip_updates = add_parent_station_column(trip_updates, db_manager) diff --git a/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py b/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py index 58fb5e71..9a6cfced 100644 --- a/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py +++ b/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py @@ -11,11 +11,12 @@ add_static_version_key_column, add_parent_station_column, unique_trip_stop_columns, + rail_routes_from_filepath, ) def get_vp_dataframe( - to_load: Union[str, List[str]], route_ids: List[str] + to_load: Union[str, List[str]], db_manager: DatabaseManager ) -> pandas.DataFrame: """ return a dataframe from a vehicle position parquet file (or list of files) @@ -24,6 +25,8 @@ def get_vp_dataframe( process_logger = ProcessLogger("vp.get_dataframe") process_logger.log_start() + route_ids = rail_routes_from_filepath(to_load, db_manager) + vehicle_position_cols = [ "current_status", "current_stop_sequence", @@ -74,7 +77,7 @@ def transform_vp_datatypes( ) process_logger.log_start() - # current_staus: 1 = MOVING, 0 = STOPPED_AT + # current_status: 1 = MOVING, 0 = STOPPED_AT vehicle_positions["is_moving"] = numpy.where( vehicle_positions["current_status"] != "STOPPED_AT", True, False ).astype(numpy.bool_) @@ -196,18 +199,17 @@ def transform_vp_timestamps( def process_vp_files( paths: Union[str, List[str]], - route_ids: List[str], db_manager: DatabaseManager, ) -> pandas.DataFrame: """ - Generate a dataframe of Vehicle Events froom gtfs_rt vehicle position parquet files. + Generate a dataframe of Vehicle Events from gtfs_rt vehicle position parquet files. """ process_logger = ProcessLogger( "process_vehicle_positions", file_count=len(paths) ) process_logger.log_start() - vehicle_positions = get_vp_dataframe(paths, route_ids) + vehicle_positions = get_vp_dataframe(paths, db_manager) if vehicle_positions.shape[0] > 0: vehicle_positions = transform_vp_datatypes(vehicle_positions) vehicle_positions = add_static_version_key_column( diff --git a/python_src/src/lamp_py/postgres/postgres_utils.py b/python_src/src/lamp_py/postgres/postgres_utils.py index 5239f777..510c5ded 100644 --- a/python_src/src/lamp_py/postgres/postgres_utils.py +++ b/python_src/src/lamp_py/postgres/postgres_utils.py @@ -10,7 +10,7 @@ import sqlalchemy as sa from sqlalchemy.orm import sessionmaker -from lamp_py.aws.s3 import get_utc_from_partition_path +from lamp_py.aws.s3 import get_datetime_from_partition_path from lamp_py.runtime_utils.process_logger import ProcessLogger from .postgres_schema import MetadataLog @@ -332,7 +332,7 @@ def get_unprocessed_files( for path_record in db_manager.select_as_list(read_md_log): path_id = path_record.get("pk_id") path = str(path_record.get("path")) - path_timestamp = get_utc_from_partition_path(path) + path_timestamp = get_datetime_from_partition_path(path).timestamp() if path_timestamp not in paths_to_load: paths_to_load[path_timestamp] = {"ids": [], "paths": []} diff --git a/python_src/tests/performance_manager/test_performance_manager.py b/python_src/tests/performance_manager/test_performance_manager.py index 78a5030a..74a02f0e 100644 --- a/python_src/tests/performance_manager/test_performance_manager.py +++ b/python_src/tests/performance_manager/test_performance_manager.py @@ -306,7 +306,7 @@ def test_gtfs_rt_processing( assert "RT_VEHICLE_POSITIONS" in path # check that we can load the parquet file into a dataframe correctly - positions = get_vp_dataframe(files["vp_paths"], files["route_ids"]) + positions = get_vp_dataframe(files["vp_paths"], db_manager) position_size = positions.shape[0] assert positions.shape[1] == 12 @@ -329,7 +329,7 @@ def test_gtfs_rt_processing( assert position_size > positions.shape[0] trip_updates = get_and_unwrap_tu_dataframe( - files["tu_paths"], files["route_ids"] + files["tu_paths"], db_manager ) trip_update_size = trip_updates.shape[0] assert trip_updates.shape[1] == 8