From f0a4652a766a9398be5cdb1a2c2302afb79697cf Mon Sep 17 00:00:00 2001 From: Ryan Rymarczyk Date: Wed, 2 Oct 2024 09:50:49 -0400 Subject: [PATCH] FEAT: Add 'revenue' field to Rail data pipeline (#446) This change adds the GTFS-RT "revenue" field to the LAMP Rail Performance Data pipeline. - Create db migrations to add revenue column to vehicle_trips table - Read vehicle.trip.revenue column from VehiclePositions parquet files - Load revenue data from parquet files into DB table - Add is_revenue column to LAMP_ALL_RT_fields OPMI export - Add where clause for only revenue trips to Subway Performance Data export The use of the revenue field differs between the OPMI export and Subway Performance Data export. For the OMPI export an additional field is added so that revenue trips can be filtered out at their discretion. The Subway Performance Data export was never meant to include non-revenue event data, so it is filtered out from the export all together. Asana Task: https://app.asana.com/0/1205827492903547/1208216161546522 --- Data_Dictionary.md | 32 ++++---- src/lamp_py/aws/s3.py | 35 +++++++-- src/lamp_py/ingestion/light_rail_gps.py | 16 ++-- .../001_07903947aabe_initial_changes.py | 45 ----------- .../005_96187da84955_remove_metadata.py | 53 ------------- .../008_32ba735d080c_add_revenue_columns.py | 74 +++++++++++++++++++ .../005_32ba735d080c_add_revenue_columns.py | 74 +++++++++++++++++++ .../009_32ba735d080c_add_revenue_columns.py | 74 +++++++++++++++++++ src/lamp_py/performance_manager/flat_file.py | 1 + .../performance_manager/l0_gtfs_rt_events.py | 1 + .../l0_gtfs_static_load.py | 2 +- .../performance_manager/l0_rt_trip_updates.py | 1 + .../l0_rt_vehicle_positions.py | 7 ++ .../performance_manager/l1_cte_statements.py | 5 +- .../performance_manager/l1_rt_trips.py | 4 + .../rail_performance_manager_schema.py | 2 + src/lamp_py/postgres/seed_metadata.py | 6 +- src/lamp_py/tableau/jobs/rt_rail.py | 2 + .../test_performance_manager.py | 13 ++-- 19 files changed, 306 insertions(+), 141 deletions(-) create mode 100644 src/lamp_py/migrations/versions/performance_manager_dev/008_32ba735d080c_add_revenue_columns.py create mode 100644 src/lamp_py/migrations/versions/performance_manager_prod/005_32ba735d080c_add_revenue_columns.py create mode 100644 src/lamp_py/migrations/versions/performance_manager_staging/009_32ba735d080c_add_revenue_columns.py diff --git a/Data_Dictionary.md b/Data_Dictionary.md index 7ce8422b..2b432f97 100644 --- a/Data_Dictionary.md +++ b/Data_Dictionary.md @@ -8,17 +8,17 @@ LAMP currently produces the following sets of public data exports: # Subway Performance Data -Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of rail service. +Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of revenue rail service. | field name | type | description | source | | ----------- | --------- | ----------- | ------------ | -| service_date | int64 | equivalent to GTFS-RT `start_date` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) as `int` instead of `string` | GTFS-RT | -| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) converted to seconds after midnight | GTFS-RT | -| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | -| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT | +| service_date | int64 | equivalent to GTFS-RT `start_date` value in [Trip Descriptor][gtfs-tripdescriptor] as `int` instead of `string` | GTFS-RT | +| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor][gtfs-tripdescriptor] converted to seconds after midnight | GTFS-RT | +| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | +| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT | | trunk_route_id | string | line if multiple routes exist on line, otherwise `route_id`, e.g. `Green` for `Green-B` route, `Blue` for `Blue` route | GTFS-RT | -| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | -| direction_id | bool | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) as `bool` instead of `int` | GTFS-RT | +| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | +| direction_id | bool | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor][gtfs-tripdescriptor] as `bool` instead of `int` | GTFS-RT | | direction | string | equivalent to GTFS `direction` value from [directions.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#directionstxt) for `route_id`-`direction_id` pair | GTFS | | direction_destination | string | equivalent to GTFS `direction_destination` value from [directions.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#directionstxt) for `route_id`-`direction_id` pair | GTFS | | stop_count | int16 | number of stops recorded on trip | LAMP Calculated | @@ -61,8 +61,8 @@ Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of | field name | type | description | source | | ----------- | --------- | ----------- | ------------ | -| service_date | date | equivalent to GTFS-RT `start_date` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) as `date` instead of `string` | GTFS-RT | -| start_datetime | datetime | equivalent to GTFS-RT `start_time` added to `start_date` from [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | LAMP Calculated | +| service_date | date | equivalent to GTFS-RT `start_date` value in [Trip Descriptor][gtfs-tripdescriptor] as `date` instead of `string` | GTFS-RT | +| start_datetime | datetime | equivalent to GTFS-RT `start_time` added to `start_date` from [Trip Descriptor][gtfs-tripdescriptor] | LAMP Calculated | | static_start_datetime | datetime | equivalent to `start_datetime` if planned trip, otherwise GTFS-RT `start_date` added to `static_start_time` | LAMP Calculated | | stop_sequence | int16 | equivalent to GTFS-RT `current_stop_sequence` value in [VehiclePosition](https://gtfs.org/realtime/reference/#message-vehicleposition) | GTFS-RT | | canonical_stop_sequence | int16 | stop sequence based on "canonical" route trip as defined in [route_patterns.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#route_patternstxt) table | LAMP Calculated | @@ -81,14 +81,15 @@ Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of | previous_stop_departure_sec | int64 | `previous_stop_departure_datetime` as seconds after midnight | LAMP Calculated | stop_arrival_sec | int64 | `stop_arrival_datetime` as seconds after midnight | LAMP Calculated | stop_departure_sec | int64 | `stop_departure_datetime` as seconds after midnight | LAMP Calculated -| direction_id | int8 | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | -| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | -| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route_id. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT | +| is_revenue | boolen | equivalent to MBTA GTFS-RT `revenue` value in [Trip Descriptor](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#non-revenue-trips) as only bool | GTFS-RT | +| direction_id | int8 | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | +| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | +| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route_id. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT | | trunk_route_id | string | line if multiple routes exist on line, otherwise `route_id`, e.g. `Green` for `Green-B` route, `Blue` for `Blue` route | GTFS-RT | -| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) converted to seconds after midnight | GTFS-RT | +| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor][gtfs-tripdescriptor] converted to seconds after midnight | GTFS-RT | | vehicle_id | string | equivalent to GTFS-RT `id` value in [VehicleDescriptor](https://gtfs.org/realtime/reference/#message-vehicledescriptor) | GTFS-RT | stop_count | int16 | number of stops recorded on trip | LAMP Calculated | -| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | +| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | | vehicle_label | string | equivalent to GTFS-RT `label` value in [VehicleDescriptor](https://gtfs.org/realtime/reference/#message-vehicledescriptor). | GTFS-RT | vehicle_consist | string | Pipe separated concatenation of `multi_carriage_details` labels in [CarriageDetails](https://gtfs.org/realtime/reference/#message-CarriageDetails) | GTFS-RT | direction | string | equivalent to GTFS `direction` value from [directions.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#directionstxt) for `route_id`-`direction_id` pair | GTFS | @@ -259,4 +260,5 @@ In generating this dataset, translation string fields contain only the English t | informed_entity.activities | string | Equivalent to `informed_entity[n][activities]` as a `\|` delimitated string. All potential values are defined in the [Activity](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enum-activity) enum. [gtfs-rt-alert]: https://gtfs.org/realtime/reference/#message-alert -[mbta-enhanced]: https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enhanced-fields \ No newline at end of file +[mbta-enhanced]: https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enhanced-fields +[gtfs-tripdescriptor]: https://gtfs.org/realtime/reference/#message-tripdescriptor \ No newline at end of file diff --git a/src/lamp_py/aws/s3.py b/src/lamp_py/aws/s3.py index f451a809..5368ef26 100644 --- a/src/lamp_py/aws/s3.py +++ b/src/lamp_py/aws/s3.py @@ -19,11 +19,12 @@ import boto3 import botocore import botocore.exceptions +from botocore.exceptions import ClientError import pandas +import pyarrow as pa import pyarrow.compute as pc import pyarrow.parquet as pq import pyarrow.dataset as pd -from botocore.exceptions import ClientError from pyarrow import Table, fs from pyarrow.util import guid @@ -254,7 +255,10 @@ def get_zip_buffer(filename: str) -> IO[bytes]: def file_list_from_s3( - bucket_name: str, file_prefix: str, max_list_size: int = 250_000 + bucket_name: str, + file_prefix: str, + max_list_size: int = 250_000, + in_filter: Optional[str] = None, ) -> List[str]: """ get a list of s3 objects @@ -283,7 +287,10 @@ def file_list_from_s3( for obj in page["Contents"]: if obj["Size"] == 0: continue - filepaths.append(os.path.join("s3://", bucket_name, obj["Key"])) + if in_filter is None or in_filter in obj["Key"]: + filepaths.append( + os.path.join("s3://", bucket_name, obj["Key"]) + ) if len(filepaths) > max_list_size: break @@ -673,15 +680,27 @@ def read_parquet( ) -> pandas.core.frame.DataFrame: """ read parquet file or files from s3 and return it as a pandas dataframe + + if requested column from "columns" does not exist in parquet file then + the column will be added as all nulls, this was added to capture + vehicle.trip.revenue field from VehiclePosition files starting december 2023 """ retry_attempts = 2 for retry_attempt in range(retry_attempts + 1): try: - df = ( - _get_pyarrow_dataset(filename, filters) - .to_table(columns=columns) - .to_pandas(self_destruct=True) - ) + ds = _get_pyarrow_dataset(filename, filters) + if columns is None: + table = ds.to_table(columns=columns) + + else: + read_columns = list(set(ds.schema.names) & set(columns)) + table = ds.to_table(columns=read_columns) + for null_column in set(columns).difference(ds.schema.names): + table = table.append_column( + null_column, pa.nulls(table.num_rows) + ) + + df = table.to_pandas(self_destruct=True) break except Exception as exception: if retry_attempt == retry_attempts: diff --git a/src/lamp_py/ingestion/light_rail_gps.py b/src/lamp_py/ingestion/light_rail_gps.py index f64561f2..b9b157e9 100644 --- a/src/lamp_py/ingestion/light_rail_gps.py +++ b/src/lamp_py/ingestion/light_rail_gps.py @@ -209,16 +209,18 @@ def ingest_light_rail_gps() -> None: s3_files = [file for file in s3_files if "LightRailRawGPS" in file] - dataframe, archive_files, error_files = dataframe_from_gz(s3_files) + if len(s3_files) > 0: - write_parquet(dataframe) + dataframe, archive_files, error_files = dataframe_from_gz(s3_files) + + write_parquet(dataframe) + + if len(archive_files) > 0: + move_s3_objects(archive_files, os.environ["ARCHIVE_BUCKET"]) + if len(error_files) > 0: + move_s3_objects(error_files, os.environ["ERROR_BUCKET"]) logger.log_complete() except Exception as exception: logger.log_failure(exception) - - if len(archive_files) > 0: - move_s3_objects(archive_files, os.environ["ARCHIVE_BUCKET"]) - if len(error_files) > 0: - move_s3_objects(error_files, os.environ["ERROR_BUCKET"]) diff --git a/src/lamp_py/migrations/versions/metadata_dev/001_07903947aabe_initial_changes.py b/src/lamp_py/migrations/versions/metadata_dev/001_07903947aabe_initial_changes.py index 05c345b5..4ba12cb6 100644 --- a/src/lamp_py/migrations/versions/metadata_dev/001_07903947aabe_initial_changes.py +++ b/src/lamp_py/migrations/versions/metadata_dev/001_07903947aabe_initial_changes.py @@ -47,51 +47,6 @@ def upgrade() -> None: postgresql_where=sa.text("rail_pm_processed = false"), ) - # pull metadata from the rail performance manager database into the - # metadata database. the table may or may not exist, so wrap this in a try - # except - try: - rpm_db_manager = DatabaseManager( - db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER - ) - - insert_data = [] - # pull metadata from the rail performance manager database via direct - # sql query. the metadata_log table may or may not exist. - with rpm_db_manager.session.begin() as session: - result = session.execute( - text("SELECT path, processed, process_fail FROM metadata_log") - ) - for row in result: - (path, processed, process_fail) = row - insert_data.append( - { - "path": path, - "rail_pm_processed": processed, - "rail_pm_process_fail": process_fail, - } - ) - - except ProgrammingError as error: - # Error 42P01 is an 'Undefined Table' error. This occurs when there is - # no metadata_log table in the rail performance manager database - # - # Raise all other sql errors - insert_data = [] - original_error = error.orig - if ( - original_error is not None - and hasattr(original_error, "pgcode") - and original_error.pgcode == "42P01" - ): - logging.info("No Metadata Table in Rail Performance Manager") - else: - raise - - # insert data into the metadata database - if insert_data: - op.bulk_insert(MetadataLog.__table__, insert_data) - # ### end Alembic commands ### diff --git a/src/lamp_py/migrations/versions/performance_manager_dev/005_96187da84955_remove_metadata.py b/src/lamp_py/migrations/versions/performance_manager_dev/005_96187da84955_remove_metadata.py index 7840e2bc..84587e1d 100644 --- a/src/lamp_py/migrations/versions/performance_manager_dev/005_96187da84955_remove_metadata.py +++ b/src/lamp_py/migrations/versions/performance_manager_dev/005_96187da84955_remove_metadata.py @@ -29,59 +29,6 @@ def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - while True: - try: - rpm_db_manager = DatabaseManager( - db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER - ) - md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) - - with rpm_db_manager.session.begin() as session: - legacy_result = session.execute( - text("SELECT path FROM metadata_log") - ) - legacy_paths = set( - [record[0] for record in legacy_result.fetchall()] - ) - - modern_result = md_db_manager.select_as_list( - sa.select(MetadataLog.path) - ) - modern_paths = set([record["path"] for record in modern_result]) - - missing_paths = legacy_paths - modern_paths - if len(missing_paths) == 0: - break - else: - logging.error( - "Detected %s paths in Legacy Metadata Table not found in Metadata Database", - len(missing_paths), - ) - except ProgrammingError as error: - # Error 42P01 is an 'Undefined Table' error. This occurs when there is - # no metadata_log table in the rail performance manager database - # - # Raise all other sql errors - original_error = error.orig - if ( - original_error is not None - and hasattr(original_error, "pgcode") - and original_error.pgcode == "42P01" - ): - logging.info("No Metadata Table in Rail Performance Manager") - legacy_paths = set() - else: - logging.exception( - "Programming Error when checking Metadata Log" - ) - time.sleep(15) - continue - - except Exception as error: - logging.exception("Programming Error when checking Metadata Log") - time.sleep(15) - continue - op.drop_index("ix_metadata_log_not_processed", table_name="metadata_log") op.drop_table("metadata_log") # ### end Alembic commands ### diff --git a/src/lamp_py/migrations/versions/performance_manager_dev/008_32ba735d080c_add_revenue_columns.py b/src/lamp_py/migrations/versions/performance_manager_dev/008_32ba735d080c_add_revenue_columns.py new file mode 100644 index 00000000..d6d7ebb1 --- /dev/null +++ b/src/lamp_py/migrations/versions/performance_manager_dev/008_32ba735d080c_add_revenue_columns.py @@ -0,0 +1,74 @@ +"""add revenue columns + +Revision ID: 32ba735d080c +Revises: 896dedd8a4db +Create Date: 2024-09-20 08:47:52.784591 + +This change adds a boolean revenue column to the vehcile_trips table. +Initially this will be filled with True and back-filled by a seperate operation + +Details +* upgrade -> drop triggers and indexes from table and add revenue column + +* downgrade -> drop revenue column + +""" + +from alembic import op +import sqlalchemy as sa + +from lamp_py.postgres.rail_performance_manager_schema import ( + TempEventCompare, + VehicleTrips, +) + +# revision identifiers, used by Alembic. +revision = "32ba735d080c" +down_revision = "896dedd8a4db" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER update_vehicle_trips_modified;" + ) + op.drop_index("ix_vehicle_trips_composite_1", table_name="vehicle_trips") + op.drop_constraint("vehicle_trips_unique_trip", table_name="vehicle_trips") + + op.add_column( + "temp_event_compare", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.add_column( + "vehicle_trips", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.execute(sa.update(TempEventCompare).values(revenue=True)) + op.execute(sa.update(VehicleTrips).values(revenue=True)) + op.alter_column("temp_event_compare", "revenue", nullable=False) + op.alter_column("vehicle_trips", "revenue", nullable=False) + + op.create_unique_constraint( + "vehicle_trips_unique_trip", + "vehicle_trips", + ["service_date", "route_id", "trip_id"], + ) + op.create_index( + "ix_vehicle_trips_composite_1", + "vehicle_trips", + ["route_id", "direction_id", "vehicle_id"], + unique=False, + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER update_vehicle_trips_modified;" + ) + + +def downgrade() -> None: + op.drop_column("vehicle_trips", "revenue") + op.drop_column("temp_event_compare", "revenue") diff --git a/src/lamp_py/migrations/versions/performance_manager_prod/005_32ba735d080c_add_revenue_columns.py b/src/lamp_py/migrations/versions/performance_manager_prod/005_32ba735d080c_add_revenue_columns.py new file mode 100644 index 00000000..d6d7ebb1 --- /dev/null +++ b/src/lamp_py/migrations/versions/performance_manager_prod/005_32ba735d080c_add_revenue_columns.py @@ -0,0 +1,74 @@ +"""add revenue columns + +Revision ID: 32ba735d080c +Revises: 896dedd8a4db +Create Date: 2024-09-20 08:47:52.784591 + +This change adds a boolean revenue column to the vehcile_trips table. +Initially this will be filled with True and back-filled by a seperate operation + +Details +* upgrade -> drop triggers and indexes from table and add revenue column + +* downgrade -> drop revenue column + +""" + +from alembic import op +import sqlalchemy as sa + +from lamp_py.postgres.rail_performance_manager_schema import ( + TempEventCompare, + VehicleTrips, +) + +# revision identifiers, used by Alembic. +revision = "32ba735d080c" +down_revision = "896dedd8a4db" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER update_vehicle_trips_modified;" + ) + op.drop_index("ix_vehicle_trips_composite_1", table_name="vehicle_trips") + op.drop_constraint("vehicle_trips_unique_trip", table_name="vehicle_trips") + + op.add_column( + "temp_event_compare", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.add_column( + "vehicle_trips", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.execute(sa.update(TempEventCompare).values(revenue=True)) + op.execute(sa.update(VehicleTrips).values(revenue=True)) + op.alter_column("temp_event_compare", "revenue", nullable=False) + op.alter_column("vehicle_trips", "revenue", nullable=False) + + op.create_unique_constraint( + "vehicle_trips_unique_trip", + "vehicle_trips", + ["service_date", "route_id", "trip_id"], + ) + op.create_index( + "ix_vehicle_trips_composite_1", + "vehicle_trips", + ["route_id", "direction_id", "vehicle_id"], + unique=False, + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER update_vehicle_trips_modified;" + ) + + +def downgrade() -> None: + op.drop_column("vehicle_trips", "revenue") + op.drop_column("temp_event_compare", "revenue") diff --git a/src/lamp_py/migrations/versions/performance_manager_staging/009_32ba735d080c_add_revenue_columns.py b/src/lamp_py/migrations/versions/performance_manager_staging/009_32ba735d080c_add_revenue_columns.py new file mode 100644 index 00000000..d6d7ebb1 --- /dev/null +++ b/src/lamp_py/migrations/versions/performance_manager_staging/009_32ba735d080c_add_revenue_columns.py @@ -0,0 +1,74 @@ +"""add revenue columns + +Revision ID: 32ba735d080c +Revises: 896dedd8a4db +Create Date: 2024-09-20 08:47:52.784591 + +This change adds a boolean revenue column to the vehcile_trips table. +Initially this will be filled with True and back-filled by a seperate operation + +Details +* upgrade -> drop triggers and indexes from table and add revenue column + +* downgrade -> drop revenue column + +""" + +from alembic import op +import sqlalchemy as sa + +from lamp_py.postgres.rail_performance_manager_schema import ( + TempEventCompare, + VehicleTrips, +) + +# revision identifiers, used by Alembic. +revision = "32ba735d080c" +down_revision = "896dedd8a4db" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER update_vehicle_trips_modified;" + ) + op.drop_index("ix_vehicle_trips_composite_1", table_name="vehicle_trips") + op.drop_constraint("vehicle_trips_unique_trip", table_name="vehicle_trips") + + op.add_column( + "temp_event_compare", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.add_column( + "vehicle_trips", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.execute(sa.update(TempEventCompare).values(revenue=True)) + op.execute(sa.update(VehicleTrips).values(revenue=True)) + op.alter_column("temp_event_compare", "revenue", nullable=False) + op.alter_column("vehicle_trips", "revenue", nullable=False) + + op.create_unique_constraint( + "vehicle_trips_unique_trip", + "vehicle_trips", + ["service_date", "route_id", "trip_id"], + ) + op.create_index( + "ix_vehicle_trips_composite_1", + "vehicle_trips", + ["route_id", "direction_id", "vehicle_id"], + unique=False, + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER update_vehicle_trips_modified;" + ) + + +def downgrade() -> None: + op.drop_column("vehicle_trips", "revenue") + op.drop_column("temp_event_compare", "revenue") diff --git a/src/lamp_py/performance_manager/flat_file.py b/src/lamp_py/performance_manager/flat_file.py index 21ebe820..80bf122b 100644 --- a/src/lamp_py/performance_manager/flat_file.py +++ b/src/lamp_py/performance_manager/flat_file.py @@ -348,6 +348,7 @@ def write_daily_table( VehicleEvents.service_date == service_date_int, VehicleTrips.static_version_key == static_version_key, StaticRoutes.route_type < 2, + VehicleTrips.revenue == sa.true(), sa.or_( VehicleEvents.vp_move_timestamp.is_not(None), VehicleEvents.vp_stop_timestamp.is_not(None), diff --git a/src/lamp_py/performance_manager/l0_gtfs_rt_events.py b/src/lamp_py/performance_manager/l0_gtfs_rt_events.py index 728b972e..3689fb06 100644 --- a/src/lamp_py/performance_manager/l0_gtfs_rt_events.py +++ b/src/lamp_py/performance_manager/l0_gtfs_rt_events.py @@ -152,6 +152,7 @@ def combine_events( "direction_id", "parent_station", "vehicle_id", + "revenue", "stop_id", "stop_sequence", "trip_id", diff --git a/src/lamp_py/performance_manager/l0_gtfs_static_load.py b/src/lamp_py/performance_manager/l0_gtfs_static_load.py index 0b0676ed..438a3479 100644 --- a/src/lamp_py/performance_manager/l0_gtfs_static_load.py +++ b/src/lamp_py/performance_manager/l0_gtfs_static_load.py @@ -312,7 +312,7 @@ def load_parquet_files( paths_to_load[:1], columns=table.column_info.columns_to_pull ) assert table.data_table.shape[0] > 0 - except pyarrow.ArrowInvalid as exception: + except (pyarrow.ArrowInvalid, AssertionError) as exception: if table.allow_empty_dataframe is False: raise exception diff --git a/src/lamp_py/performance_manager/l0_rt_trip_updates.py b/src/lamp_py/performance_manager/l0_rt_trip_updates.py index dddd4cb9..5df7d1e7 100644 --- a/src/lamp_py/performance_manager/l0_rt_trip_updates.py +++ b/src/lamp_py/performance_manager/l0_rt_trip_updates.py @@ -179,6 +179,7 @@ def reduce_trip_updates(trip_updates: pandas.DataFrame) -> pandas.DataFrame: trip_updates["stop_sequence"] = None trip_updates["vehicle_label"] = None trip_updates["vehicle_consist"] = None + trip_updates["revenue"] = True process_logger.add_metadata(after_row_count=trip_updates.shape[0]) process_logger.log_complete() diff --git a/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py b/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py index eb10c7ed..40ab0606 100644 --- a/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py +++ b/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py @@ -36,6 +36,7 @@ def get_vp_dataframe( "vehicle.trip.route_id", "vehicle.trip.start_date", "vehicle.trip.start_time", + "vehicle.trip.revenue", "vehicle.vehicle.id", "vehicle.trip.trip_id", "vehicle.vehicle.label", @@ -64,6 +65,7 @@ def get_vp_dataframe( "vehicle.trip.route_id": "route_id", "vehicle.trip.start_date": "start_date", "vehicle.trip.start_time": "start_time", + "vehicle.trip.revenue": "revenue", "vehicle.vehicle.id": "vehicle_id", "vehicle.trip.trip_id": "trip_id", "vehicle.vehicle.label": "vehicle_label", @@ -125,6 +127,11 @@ def transform_vp_datatypes( vehicle_positions["direction_id"] ).astype(numpy.bool_) + # fix revenue field, NULL is True + vehicle_positions["revenue"] = numpy.where( + vehicle_positions["revenue"].eq(False), False, True + ).astype(numpy.bool_) + # store start_time as seconds from start of day as int64 vehicle_positions["start_time"] = ( vehicle_positions["start_time"] diff --git a/src/lamp_py/performance_manager/l1_cte_statements.py b/src/lamp_py/performance_manager/l1_cte_statements.py index 58537ba2..491e9ef1 100644 --- a/src/lamp_py/performance_manager/l1_cte_statements.py +++ b/src/lamp_py/performance_manager/l1_cte_statements.py @@ -137,6 +137,7 @@ def rt_trips_subquery(service_date: int) -> sa.sql.selectable.Subquery: VehicleTrips.vehicle_id, VehicleTrips.stop_count, VehicleTrips.static_trip_id_guess, + VehicleTrips.revenue, VehicleEvents.pm_trip_id, VehicleEvents.stop_sequence, VehicleEvents.parent_station, @@ -301,8 +302,8 @@ def trips_for_headways_subquery( .select_from(rt_trips_sub) .where( # drop trips with one stop count, probably not valid - rt_trips_sub.c.stop_count - > 1, + rt_trips_sub.c.stop_count > 1, + rt_trips_sub.c.revenue == sa.true(), ) .order_by( rt_trips_sub.c.pm_trip_id, diff --git a/src/lamp_py/performance_manager/l1_rt_trips.py b/src/lamp_py/performance_manager/l1_rt_trips.py index 20d5db36..14e1fb96 100644 --- a/src/lamp_py/performance_manager/l1_rt_trips.py +++ b/src/lamp_py/performance_manager/l1_rt_trips.py @@ -103,6 +103,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None: TempEventCompare.vehicle_label, TempEventCompare.vehicle_consist, TempEventCompare.static_version_key, + TempEventCompare.revenue, ) .distinct( TempEventCompare.service_date, @@ -127,6 +128,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None: "vehicle_label", "vehicle_consist", "static_version_key", + "revenue", ] trip_insert_query = ( @@ -152,6 +154,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None: TempEventCompare.trip_id, TempEventCompare.vehicle_label, TempEventCompare.vehicle_consist, + TempEventCompare.revenue, ) .distinct( TempEventCompare.service_date, @@ -179,6 +182,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None: trip_id=distinct_update_query.c.trip_id, vehicle_label=distinct_update_query.c.vehicle_label, vehicle_consist=distinct_update_query.c.vehicle_consist, + revenue=distinct_update_query.c.revenue, ) .where( VehicleTrips.service_date == distinct_update_query.c.service_date, diff --git a/src/lamp_py/postgres/rail_performance_manager_schema.py b/src/lamp_py/postgres/rail_performance_manager_schema.py index df5a7bbb..71e27caa 100644 --- a/src/lamp_py/postgres/rail_performance_manager_schema.py +++ b/src/lamp_py/postgres/rail_performance_manager_schema.py @@ -102,6 +102,7 @@ class VehicleTrips(RpmSqlBase): # pylint: disable=too-few-public-methods vehicle_consist = sa.Column(sa.String(), nullable=True) direction = sa.Column(sa.String(30), nullable=True) direction_destination = sa.Column(sa.String(60), nullable=True) + revenue = sa.Column(sa.Boolean, nullable=False) # static trip matching static_trip_id_guess = sa.Column(sa.String(512), nullable=True) @@ -171,6 +172,7 @@ class TempEventCompare(RpmSqlBase): # pylint: disable=too-few-public-methods vehicle_id = sa.Column(sa.String(60), nullable=False) vehicle_label = sa.Column(sa.String(128), nullable=True) vehicle_consist = sa.Column(sa.String(), nullable=True) + revenue = sa.Column(sa.Boolean, nullable=False) # forign key to static schedule expected values static_version_key = sa.Column( diff --git a/src/lamp_py/postgres/seed_metadata.py b/src/lamp_py/postgres/seed_metadata.py index b90cf3ca..7f17a5dd 100644 --- a/src/lamp_py/postgres/seed_metadata.py +++ b/src/lamp_py/postgres/seed_metadata.py @@ -73,9 +73,7 @@ def reset_rpm(parsed_args: argparse.Namespace) -> None: ) if parsed_args.clear_static: - rpm_db_name = os.getenv( - "ALEMBIC_RPM_DB_NAME", "performance_manager_prod" - ) + rpm_db_name = os.getenv("ALEMBIC_RPM_DB_NAME", "") alembic_downgrade_to_base(rpm_db_name) alembic_upgrade_to_head(rpm_db_name) elif parsed_args.clear_rt: @@ -99,7 +97,7 @@ def run() -> None: ) if parsed_args.clear_static: - md_db_name = os.getenv("ALEMBIC_MD_DB_NAME", "metadata_prod") + md_db_name = os.getenv("ALEMBIC_MD_DB_NAME", "") alembic_downgrade_to_base(md_db_name) alembic_upgrade_to_head(md_db_name) elif parsed_args.clear_rt: diff --git a/src/lamp_py/tableau/jobs/rt_rail.py b/src/lamp_py/tableau/jobs/rt_rail.py index bef10daf..3d933070 100644 --- a/src/lamp_py/tableau/jobs/rt_rail.py +++ b/src/lamp_py/tableau/jobs/rt_rail.py @@ -46,6 +46,7 @@ def __init__(self) -> None: " , extract(epoch FROM (TIMEZONE('America/New_York', TO_TIMESTAMP(ve.vp_move_timestamp)) - vt.service_date::text::timestamp))::int as previous_stop_departure_sec" " , extract(epoch FROM (TIMEZONE('America/New_York', TO_TIMESTAMP(COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp))) - vt.service_date::text::timestamp))::int as stop_arrival_sec" " , extract(epoch FROM (TIMEZONE('America/New_York', TO_TIMESTAMP(next_ve.vp_move_timestamp)) - vt.service_date::text::timestamp))::int as stop_departure_sec" + " , vt.revenue as is_revenue" " , vt.direction_id::int" " , vt.route_id" " , vt.branch_route_id" @@ -146,6 +147,7 @@ def parquet_schema(self) -> pyarrow.schema: ("previous_stop_departure_sec", pyarrow.int64()), ("stop_arrival_sec", pyarrow.int64()), ("stop_departure_sec", pyarrow.int64()), + ("is_revenue", pyarrow.bool_()), ("direction_id", pyarrow.int8()), ("route_id", pyarrow.string()), ("branch_route_id", pyarrow.string()), diff --git a/tests/performance_manager/test_performance_manager.py b/tests/performance_manager/test_performance_manager.py index ad7bc230..ceb11e99 100644 --- a/tests/performance_manager/test_performance_manager.py +++ b/tests/performance_manager/test_performance_manager.py @@ -515,7 +515,7 @@ def test_bad_empty_static_table() -> None: static_tables = get_table_objects() test_table = {"stop_times": static_tables["stop_times"]} - with pytest.raises(pyarrow.ArrowInvalid): + with pytest.raises((pyarrow.ArrowInvalid, AssertionError)): load_parquet_files(test_table, "/tmp/FEED_INFO/timestamp=0000000000") @@ -557,24 +557,24 @@ def test_gtfs_rt_processing( route_ids = rail_routes_from_filepath(files["vp_paths"], rpm_db_manager) positions = get_vp_dataframe(files["vp_paths"], route_ids) position_size = positions.shape[0] - assert positions.shape[1] == 13 + assert positions.shape[1] == 14 # check that the types can be set correctly positions = transform_vp_datatypes(positions) - assert positions.shape[1] == 13 + assert positions.shape[1] == 14 assert position_size == positions.shape[0] # check that it can be combined with the static schedule positions = add_static_version_key_column(positions, rpm_db_manager) - assert positions.shape[1] == 14 + assert positions.shape[1] == 15 assert position_size == positions.shape[0] positions = add_parent_station_column(positions, rpm_db_manager) - assert positions.shape[1] == 15 + assert positions.shape[1] == 16 assert position_size == positions.shape[0] positions = transform_vp_timestamps(positions) - assert positions.shape[1] == 14 + assert positions.shape[1] == 15 assert position_size > positions.shape[0] trip_updates = get_and_unwrap_tu_dataframe(files["tu_paths"], route_ids) @@ -619,6 +619,7 @@ def test_gtfs_rt_processing( expected_columns.add("start_time") expected_columns.add("vehicle_id") expected_columns.add("static_version_key") + expected_columns.add("revenue") assert len(expected_columns) == len(events.columns) missing_columns = set(events.columns) - expected_columns