diff --git a/Data_Dictionary.md b/Data_Dictionary.md index 7ce8422b..2b432f97 100644 --- a/Data_Dictionary.md +++ b/Data_Dictionary.md @@ -8,17 +8,17 @@ LAMP currently produces the following sets of public data exports: # Subway Performance Data -Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of rail service. +Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of revenue rail service. | field name | type | description | source | | ----------- | --------- | ----------- | ------------ | -| service_date | int64 | equivalent to GTFS-RT `start_date` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) as `int` instead of `string` | GTFS-RT | -| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) converted to seconds after midnight | GTFS-RT | -| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | -| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT | +| service_date | int64 | equivalent to GTFS-RT `start_date` value in [Trip Descriptor][gtfs-tripdescriptor] as `int` instead of `string` | GTFS-RT | +| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor][gtfs-tripdescriptor] converted to seconds after midnight | GTFS-RT | +| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | +| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT | | trunk_route_id | string | line if multiple routes exist on line, otherwise `route_id`, e.g. `Green` for `Green-B` route, `Blue` for `Blue` route | GTFS-RT | -| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | -| direction_id | bool | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) as `bool` instead of `int` | GTFS-RT | +| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | +| direction_id | bool | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor][gtfs-tripdescriptor] as `bool` instead of `int` | GTFS-RT | | direction | string | equivalent to GTFS `direction` value from [directions.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#directionstxt) for `route_id`-`direction_id` pair | GTFS | | direction_destination | string | equivalent to GTFS `direction_destination` value from [directions.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#directionstxt) for `route_id`-`direction_id` pair | GTFS | | stop_count | int16 | number of stops recorded on trip | LAMP Calculated | @@ -61,8 +61,8 @@ Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of | field name | type | description | source | | ----------- | --------- | ----------- | ------------ | -| service_date | date | equivalent to GTFS-RT `start_date` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) as `date` instead of `string` | GTFS-RT | -| start_datetime | datetime | equivalent to GTFS-RT `start_time` added to `start_date` from [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | LAMP Calculated | +| service_date | date | equivalent to GTFS-RT `start_date` value in [Trip Descriptor][gtfs-tripdescriptor] as `date` instead of `string` | GTFS-RT | +| start_datetime | datetime | equivalent to GTFS-RT `start_time` added to `start_date` from [Trip Descriptor][gtfs-tripdescriptor] | LAMP Calculated | | static_start_datetime | datetime | equivalent to `start_datetime` if planned trip, otherwise GTFS-RT `start_date` added to `static_start_time` | LAMP Calculated | | stop_sequence | int16 | equivalent to GTFS-RT `current_stop_sequence` value in [VehiclePosition](https://gtfs.org/realtime/reference/#message-vehicleposition) | GTFS-RT | | canonical_stop_sequence | int16 | stop sequence based on "canonical" route trip as defined in [route_patterns.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#route_patternstxt) table | LAMP Calculated | @@ -81,14 +81,15 @@ Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of | previous_stop_departure_sec | int64 | `previous_stop_departure_datetime` as seconds after midnight | LAMP Calculated | stop_arrival_sec | int64 | `stop_arrival_datetime` as seconds after midnight | LAMP Calculated | stop_departure_sec | int64 | `stop_departure_datetime` as seconds after midnight | LAMP Calculated -| direction_id | int8 | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | -| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | -| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route_id. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT | +| is_revenue | boolen | equivalent to MBTA GTFS-RT `revenue` value in [Trip Descriptor](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#non-revenue-trips) as only bool | GTFS-RT | +| direction_id | int8 | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | +| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | +| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route_id. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT | | trunk_route_id | string | line if multiple routes exist on line, otherwise `route_id`, e.g. `Green` for `Green-B` route, `Blue` for `Blue` route | GTFS-RT | -| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) converted to seconds after midnight | GTFS-RT | +| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor][gtfs-tripdescriptor] converted to seconds after midnight | GTFS-RT | | vehicle_id | string | equivalent to GTFS-RT `id` value in [VehicleDescriptor](https://gtfs.org/realtime/reference/#message-vehicledescriptor) | GTFS-RT | stop_count | int16 | number of stops recorded on trip | LAMP Calculated | -| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT | +| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT | | vehicle_label | string | equivalent to GTFS-RT `label` value in [VehicleDescriptor](https://gtfs.org/realtime/reference/#message-vehicledescriptor). | GTFS-RT | vehicle_consist | string | Pipe separated concatenation of `multi_carriage_details` labels in [CarriageDetails](https://gtfs.org/realtime/reference/#message-CarriageDetails) | GTFS-RT | direction | string | equivalent to GTFS `direction` value from [directions.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#directionstxt) for `route_id`-`direction_id` pair | GTFS | @@ -259,4 +260,5 @@ In generating this dataset, translation string fields contain only the English t | informed_entity.activities | string | Equivalent to `informed_entity[n][activities]` as a `\|` delimitated string. All potential values are defined in the [Activity](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enum-activity) enum. [gtfs-rt-alert]: https://gtfs.org/realtime/reference/#message-alert -[mbta-enhanced]: https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enhanced-fields \ No newline at end of file +[mbta-enhanced]: https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enhanced-fields +[gtfs-tripdescriptor]: https://gtfs.org/realtime/reference/#message-tripdescriptor \ No newline at end of file diff --git a/src/lamp_py/aws/s3.py b/src/lamp_py/aws/s3.py index f451a809..5368ef26 100644 --- a/src/lamp_py/aws/s3.py +++ b/src/lamp_py/aws/s3.py @@ -19,11 +19,12 @@ import boto3 import botocore import botocore.exceptions +from botocore.exceptions import ClientError import pandas +import pyarrow as pa import pyarrow.compute as pc import pyarrow.parquet as pq import pyarrow.dataset as pd -from botocore.exceptions import ClientError from pyarrow import Table, fs from pyarrow.util import guid @@ -254,7 +255,10 @@ def get_zip_buffer(filename: str) -> IO[bytes]: def file_list_from_s3( - bucket_name: str, file_prefix: str, max_list_size: int = 250_000 + bucket_name: str, + file_prefix: str, + max_list_size: int = 250_000, + in_filter: Optional[str] = None, ) -> List[str]: """ get a list of s3 objects @@ -283,7 +287,10 @@ def file_list_from_s3( for obj in page["Contents"]: if obj["Size"] == 0: continue - filepaths.append(os.path.join("s3://", bucket_name, obj["Key"])) + if in_filter is None or in_filter in obj["Key"]: + filepaths.append( + os.path.join("s3://", bucket_name, obj["Key"]) + ) if len(filepaths) > max_list_size: break @@ -673,15 +680,27 @@ def read_parquet( ) -> pandas.core.frame.DataFrame: """ read parquet file or files from s3 and return it as a pandas dataframe + + if requested column from "columns" does not exist in parquet file then + the column will be added as all nulls, this was added to capture + vehicle.trip.revenue field from VehiclePosition files starting december 2023 """ retry_attempts = 2 for retry_attempt in range(retry_attempts + 1): try: - df = ( - _get_pyarrow_dataset(filename, filters) - .to_table(columns=columns) - .to_pandas(self_destruct=True) - ) + ds = _get_pyarrow_dataset(filename, filters) + if columns is None: + table = ds.to_table(columns=columns) + + else: + read_columns = list(set(ds.schema.names) & set(columns)) + table = ds.to_table(columns=read_columns) + for null_column in set(columns).difference(ds.schema.names): + table = table.append_column( + null_column, pa.nulls(table.num_rows) + ) + + df = table.to_pandas(self_destruct=True) break except Exception as exception: if retry_attempt == retry_attempts: diff --git a/src/lamp_py/ingestion/light_rail_gps.py b/src/lamp_py/ingestion/light_rail_gps.py index f64561f2..b9b157e9 100644 --- a/src/lamp_py/ingestion/light_rail_gps.py +++ b/src/lamp_py/ingestion/light_rail_gps.py @@ -209,16 +209,18 @@ def ingest_light_rail_gps() -> None: s3_files = [file for file in s3_files if "LightRailRawGPS" in file] - dataframe, archive_files, error_files = dataframe_from_gz(s3_files) + if len(s3_files) > 0: - write_parquet(dataframe) + dataframe, archive_files, error_files = dataframe_from_gz(s3_files) + + write_parquet(dataframe) + + if len(archive_files) > 0: + move_s3_objects(archive_files, os.environ["ARCHIVE_BUCKET"]) + if len(error_files) > 0: + move_s3_objects(error_files, os.environ["ERROR_BUCKET"]) logger.log_complete() except Exception as exception: logger.log_failure(exception) - - if len(archive_files) > 0: - move_s3_objects(archive_files, os.environ["ARCHIVE_BUCKET"]) - if len(error_files) > 0: - move_s3_objects(error_files, os.environ["ERROR_BUCKET"]) diff --git a/src/lamp_py/migrations/versions/metadata_dev/001_07903947aabe_initial_changes.py b/src/lamp_py/migrations/versions/metadata_dev/001_07903947aabe_initial_changes.py index 05c345b5..4ba12cb6 100644 --- a/src/lamp_py/migrations/versions/metadata_dev/001_07903947aabe_initial_changes.py +++ b/src/lamp_py/migrations/versions/metadata_dev/001_07903947aabe_initial_changes.py @@ -47,51 +47,6 @@ def upgrade() -> None: postgresql_where=sa.text("rail_pm_processed = false"), ) - # pull metadata from the rail performance manager database into the - # metadata database. the table may or may not exist, so wrap this in a try - # except - try: - rpm_db_manager = DatabaseManager( - db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER - ) - - insert_data = [] - # pull metadata from the rail performance manager database via direct - # sql query. the metadata_log table may or may not exist. - with rpm_db_manager.session.begin() as session: - result = session.execute( - text("SELECT path, processed, process_fail FROM metadata_log") - ) - for row in result: - (path, processed, process_fail) = row - insert_data.append( - { - "path": path, - "rail_pm_processed": processed, - "rail_pm_process_fail": process_fail, - } - ) - - except ProgrammingError as error: - # Error 42P01 is an 'Undefined Table' error. This occurs when there is - # no metadata_log table in the rail performance manager database - # - # Raise all other sql errors - insert_data = [] - original_error = error.orig - if ( - original_error is not None - and hasattr(original_error, "pgcode") - and original_error.pgcode == "42P01" - ): - logging.info("No Metadata Table in Rail Performance Manager") - else: - raise - - # insert data into the metadata database - if insert_data: - op.bulk_insert(MetadataLog.__table__, insert_data) - # ### end Alembic commands ### diff --git a/src/lamp_py/migrations/versions/performance_manager_dev/005_96187da84955_remove_metadata.py b/src/lamp_py/migrations/versions/performance_manager_dev/005_96187da84955_remove_metadata.py index 7840e2bc..84587e1d 100644 --- a/src/lamp_py/migrations/versions/performance_manager_dev/005_96187da84955_remove_metadata.py +++ b/src/lamp_py/migrations/versions/performance_manager_dev/005_96187da84955_remove_metadata.py @@ -29,59 +29,6 @@ def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - while True: - try: - rpm_db_manager = DatabaseManager( - db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER - ) - md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) - - with rpm_db_manager.session.begin() as session: - legacy_result = session.execute( - text("SELECT path FROM metadata_log") - ) - legacy_paths = set( - [record[0] for record in legacy_result.fetchall()] - ) - - modern_result = md_db_manager.select_as_list( - sa.select(MetadataLog.path) - ) - modern_paths = set([record["path"] for record in modern_result]) - - missing_paths = legacy_paths - modern_paths - if len(missing_paths) == 0: - break - else: - logging.error( - "Detected %s paths in Legacy Metadata Table not found in Metadata Database", - len(missing_paths), - ) - except ProgrammingError as error: - # Error 42P01 is an 'Undefined Table' error. This occurs when there is - # no metadata_log table in the rail performance manager database - # - # Raise all other sql errors - original_error = error.orig - if ( - original_error is not None - and hasattr(original_error, "pgcode") - and original_error.pgcode == "42P01" - ): - logging.info("No Metadata Table in Rail Performance Manager") - legacy_paths = set() - else: - logging.exception( - "Programming Error when checking Metadata Log" - ) - time.sleep(15) - continue - - except Exception as error: - logging.exception("Programming Error when checking Metadata Log") - time.sleep(15) - continue - op.drop_index("ix_metadata_log_not_processed", table_name="metadata_log") op.drop_table("metadata_log") # ### end Alembic commands ### diff --git a/src/lamp_py/migrations/versions/performance_manager_dev/008_32ba735d080c_add_revenue_columns.py b/src/lamp_py/migrations/versions/performance_manager_dev/008_32ba735d080c_add_revenue_columns.py new file mode 100644 index 00000000..d6d7ebb1 --- /dev/null +++ b/src/lamp_py/migrations/versions/performance_manager_dev/008_32ba735d080c_add_revenue_columns.py @@ -0,0 +1,74 @@ +"""add revenue columns + +Revision ID: 32ba735d080c +Revises: 896dedd8a4db +Create Date: 2024-09-20 08:47:52.784591 + +This change adds a boolean revenue column to the vehcile_trips table. +Initially this will be filled with True and back-filled by a seperate operation + +Details +* upgrade -> drop triggers and indexes from table and add revenue column + +* downgrade -> drop revenue column + +""" + +from alembic import op +import sqlalchemy as sa + +from lamp_py.postgres.rail_performance_manager_schema import ( + TempEventCompare, + VehicleTrips, +) + +# revision identifiers, used by Alembic. +revision = "32ba735d080c" +down_revision = "896dedd8a4db" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER update_vehicle_trips_modified;" + ) + op.drop_index("ix_vehicle_trips_composite_1", table_name="vehicle_trips") + op.drop_constraint("vehicle_trips_unique_trip", table_name="vehicle_trips") + + op.add_column( + "temp_event_compare", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.add_column( + "vehicle_trips", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.execute(sa.update(TempEventCompare).values(revenue=True)) + op.execute(sa.update(VehicleTrips).values(revenue=True)) + op.alter_column("temp_event_compare", "revenue", nullable=False) + op.alter_column("vehicle_trips", "revenue", nullable=False) + + op.create_unique_constraint( + "vehicle_trips_unique_trip", + "vehicle_trips", + ["service_date", "route_id", "trip_id"], + ) + op.create_index( + "ix_vehicle_trips_composite_1", + "vehicle_trips", + ["route_id", "direction_id", "vehicle_id"], + unique=False, + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER update_vehicle_trips_modified;" + ) + + +def downgrade() -> None: + op.drop_column("vehicle_trips", "revenue") + op.drop_column("temp_event_compare", "revenue") diff --git a/src/lamp_py/migrations/versions/performance_manager_prod/005_32ba735d080c_add_revenue_columns.py b/src/lamp_py/migrations/versions/performance_manager_prod/005_32ba735d080c_add_revenue_columns.py new file mode 100644 index 00000000..d6d7ebb1 --- /dev/null +++ b/src/lamp_py/migrations/versions/performance_manager_prod/005_32ba735d080c_add_revenue_columns.py @@ -0,0 +1,74 @@ +"""add revenue columns + +Revision ID: 32ba735d080c +Revises: 896dedd8a4db +Create Date: 2024-09-20 08:47:52.784591 + +This change adds a boolean revenue column to the vehcile_trips table. +Initially this will be filled with True and back-filled by a seperate operation + +Details +* upgrade -> drop triggers and indexes from table and add revenue column + +* downgrade -> drop revenue column + +""" + +from alembic import op +import sqlalchemy as sa + +from lamp_py.postgres.rail_performance_manager_schema import ( + TempEventCompare, + VehicleTrips, +) + +# revision identifiers, used by Alembic. +revision = "32ba735d080c" +down_revision = "896dedd8a4db" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER update_vehicle_trips_modified;" + ) + op.drop_index("ix_vehicle_trips_composite_1", table_name="vehicle_trips") + op.drop_constraint("vehicle_trips_unique_trip", table_name="vehicle_trips") + + op.add_column( + "temp_event_compare", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.add_column( + "vehicle_trips", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.execute(sa.update(TempEventCompare).values(revenue=True)) + op.execute(sa.update(VehicleTrips).values(revenue=True)) + op.alter_column("temp_event_compare", "revenue", nullable=False) + op.alter_column("vehicle_trips", "revenue", nullable=False) + + op.create_unique_constraint( + "vehicle_trips_unique_trip", + "vehicle_trips", + ["service_date", "route_id", "trip_id"], + ) + op.create_index( + "ix_vehicle_trips_composite_1", + "vehicle_trips", + ["route_id", "direction_id", "vehicle_id"], + unique=False, + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER update_vehicle_trips_modified;" + ) + + +def downgrade() -> None: + op.drop_column("vehicle_trips", "revenue") + op.drop_column("temp_event_compare", "revenue") diff --git a/src/lamp_py/migrations/versions/performance_manager_staging/009_32ba735d080c_add_revenue_columns.py b/src/lamp_py/migrations/versions/performance_manager_staging/009_32ba735d080c_add_revenue_columns.py new file mode 100644 index 00000000..d6d7ebb1 --- /dev/null +++ b/src/lamp_py/migrations/versions/performance_manager_staging/009_32ba735d080c_add_revenue_columns.py @@ -0,0 +1,74 @@ +"""add revenue columns + +Revision ID: 32ba735d080c +Revises: 896dedd8a4db +Create Date: 2024-09-20 08:47:52.784591 + +This change adds a boolean revenue column to the vehcile_trips table. +Initially this will be filled with True and back-filled by a seperate operation + +Details +* upgrade -> drop triggers and indexes from table and add revenue column + +* downgrade -> drop revenue column + +""" + +from alembic import op +import sqlalchemy as sa + +from lamp_py.postgres.rail_performance_manager_schema import ( + TempEventCompare, + VehicleTrips, +) + +# revision identifiers, used by Alembic. +revision = "32ba735d080c" +down_revision = "896dedd8a4db" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER update_vehicle_trips_modified;" + ) + op.drop_index("ix_vehicle_trips_composite_1", table_name="vehicle_trips") + op.drop_constraint("vehicle_trips_unique_trip", table_name="vehicle_trips") + + op.add_column( + "temp_event_compare", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.add_column( + "vehicle_trips", sa.Column("revenue", sa.Boolean(), nullable=True) + ) + op.execute(sa.update(TempEventCompare).values(revenue=True)) + op.execute(sa.update(VehicleTrips).values(revenue=True)) + op.alter_column("temp_event_compare", "revenue", nullable=False) + op.alter_column("vehicle_trips", "revenue", nullable=False) + + op.create_unique_constraint( + "vehicle_trips_unique_trip", + "vehicle_trips", + ["service_date", "route_id", "trip_id"], + ) + op.create_index( + "ix_vehicle_trips_composite_1", + "vehicle_trips", + ["route_id", "direction_id", "vehicle_id"], + unique=False, + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER rt_trips_update_branch_trunk;" + ) + op.execute( + f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER update_vehicle_trips_modified;" + ) + + +def downgrade() -> None: + op.drop_column("vehicle_trips", "revenue") + op.drop_column("temp_event_compare", "revenue") diff --git a/src/lamp_py/performance_manager/flat_file.py b/src/lamp_py/performance_manager/flat_file.py index 21ebe820..80bf122b 100644 --- a/src/lamp_py/performance_manager/flat_file.py +++ b/src/lamp_py/performance_manager/flat_file.py @@ -348,6 +348,7 @@ def write_daily_table( VehicleEvents.service_date == service_date_int, VehicleTrips.static_version_key == static_version_key, StaticRoutes.route_type < 2, + VehicleTrips.revenue == sa.true(), sa.or_( VehicleEvents.vp_move_timestamp.is_not(None), VehicleEvents.vp_stop_timestamp.is_not(None), diff --git a/src/lamp_py/performance_manager/l0_gtfs_rt_events.py b/src/lamp_py/performance_manager/l0_gtfs_rt_events.py index 728b972e..3689fb06 100644 --- a/src/lamp_py/performance_manager/l0_gtfs_rt_events.py +++ b/src/lamp_py/performance_manager/l0_gtfs_rt_events.py @@ -152,6 +152,7 @@ def combine_events( "direction_id", "parent_station", "vehicle_id", + "revenue", "stop_id", "stop_sequence", "trip_id", diff --git a/src/lamp_py/performance_manager/l0_gtfs_static_load.py b/src/lamp_py/performance_manager/l0_gtfs_static_load.py index 0b0676ed..438a3479 100644 --- a/src/lamp_py/performance_manager/l0_gtfs_static_load.py +++ b/src/lamp_py/performance_manager/l0_gtfs_static_load.py @@ -312,7 +312,7 @@ def load_parquet_files( paths_to_load[:1], columns=table.column_info.columns_to_pull ) assert table.data_table.shape[0] > 0 - except pyarrow.ArrowInvalid as exception: + except (pyarrow.ArrowInvalid, AssertionError) as exception: if table.allow_empty_dataframe is False: raise exception diff --git a/src/lamp_py/performance_manager/l0_rt_trip_updates.py b/src/lamp_py/performance_manager/l0_rt_trip_updates.py index dddd4cb9..5df7d1e7 100644 --- a/src/lamp_py/performance_manager/l0_rt_trip_updates.py +++ b/src/lamp_py/performance_manager/l0_rt_trip_updates.py @@ -179,6 +179,7 @@ def reduce_trip_updates(trip_updates: pandas.DataFrame) -> pandas.DataFrame: trip_updates["stop_sequence"] = None trip_updates["vehicle_label"] = None trip_updates["vehicle_consist"] = None + trip_updates["revenue"] = True process_logger.add_metadata(after_row_count=trip_updates.shape[0]) process_logger.log_complete() diff --git a/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py b/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py index eb10c7ed..40ab0606 100644 --- a/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py +++ b/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py @@ -36,6 +36,7 @@ def get_vp_dataframe( "vehicle.trip.route_id", "vehicle.trip.start_date", "vehicle.trip.start_time", + "vehicle.trip.revenue", "vehicle.vehicle.id", "vehicle.trip.trip_id", "vehicle.vehicle.label", @@ -64,6 +65,7 @@ def get_vp_dataframe( "vehicle.trip.route_id": "route_id", "vehicle.trip.start_date": "start_date", "vehicle.trip.start_time": "start_time", + "vehicle.trip.revenue": "revenue", "vehicle.vehicle.id": "vehicle_id", "vehicle.trip.trip_id": "trip_id", "vehicle.vehicle.label": "vehicle_label", @@ -125,6 +127,11 @@ def transform_vp_datatypes( vehicle_positions["direction_id"] ).astype(numpy.bool_) + # fix revenue field, NULL is True + vehicle_positions["revenue"] = numpy.where( + vehicle_positions["revenue"].eq(False), False, True + ).astype(numpy.bool_) + # store start_time as seconds from start of day as int64 vehicle_positions["start_time"] = ( vehicle_positions["start_time"] diff --git a/src/lamp_py/performance_manager/l1_cte_statements.py b/src/lamp_py/performance_manager/l1_cte_statements.py index 58537ba2..491e9ef1 100644 --- a/src/lamp_py/performance_manager/l1_cte_statements.py +++ b/src/lamp_py/performance_manager/l1_cte_statements.py @@ -137,6 +137,7 @@ def rt_trips_subquery(service_date: int) -> sa.sql.selectable.Subquery: VehicleTrips.vehicle_id, VehicleTrips.stop_count, VehicleTrips.static_trip_id_guess, + VehicleTrips.revenue, VehicleEvents.pm_trip_id, VehicleEvents.stop_sequence, VehicleEvents.parent_station, @@ -301,8 +302,8 @@ def trips_for_headways_subquery( .select_from(rt_trips_sub) .where( # drop trips with one stop count, probably not valid - rt_trips_sub.c.stop_count - > 1, + rt_trips_sub.c.stop_count > 1, + rt_trips_sub.c.revenue == sa.true(), ) .order_by( rt_trips_sub.c.pm_trip_id, diff --git a/src/lamp_py/performance_manager/l1_rt_trips.py b/src/lamp_py/performance_manager/l1_rt_trips.py index 20d5db36..14e1fb96 100644 --- a/src/lamp_py/performance_manager/l1_rt_trips.py +++ b/src/lamp_py/performance_manager/l1_rt_trips.py @@ -103,6 +103,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None: TempEventCompare.vehicle_label, TempEventCompare.vehicle_consist, TempEventCompare.static_version_key, + TempEventCompare.revenue, ) .distinct( TempEventCompare.service_date, @@ -127,6 +128,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None: "vehicle_label", "vehicle_consist", "static_version_key", + "revenue", ] trip_insert_query = ( @@ -152,6 +154,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None: TempEventCompare.trip_id, TempEventCompare.vehicle_label, TempEventCompare.vehicle_consist, + TempEventCompare.revenue, ) .distinct( TempEventCompare.service_date, @@ -179,6 +182,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None: trip_id=distinct_update_query.c.trip_id, vehicle_label=distinct_update_query.c.vehicle_label, vehicle_consist=distinct_update_query.c.vehicle_consist, + revenue=distinct_update_query.c.revenue, ) .where( VehicleTrips.service_date == distinct_update_query.c.service_date, diff --git a/src/lamp_py/postgres/rail_performance_manager_schema.py b/src/lamp_py/postgres/rail_performance_manager_schema.py index df5a7bbb..71e27caa 100644 --- a/src/lamp_py/postgres/rail_performance_manager_schema.py +++ b/src/lamp_py/postgres/rail_performance_manager_schema.py @@ -102,6 +102,7 @@ class VehicleTrips(RpmSqlBase): # pylint: disable=too-few-public-methods vehicle_consist = sa.Column(sa.String(), nullable=True) direction = sa.Column(sa.String(30), nullable=True) direction_destination = sa.Column(sa.String(60), nullable=True) + revenue = sa.Column(sa.Boolean, nullable=False) # static trip matching static_trip_id_guess = sa.Column(sa.String(512), nullable=True) @@ -171,6 +172,7 @@ class TempEventCompare(RpmSqlBase): # pylint: disable=too-few-public-methods vehicle_id = sa.Column(sa.String(60), nullable=False) vehicle_label = sa.Column(sa.String(128), nullable=True) vehicle_consist = sa.Column(sa.String(), nullable=True) + revenue = sa.Column(sa.Boolean, nullable=False) # forign key to static schedule expected values static_version_key = sa.Column( diff --git a/src/lamp_py/postgres/seed_metadata.py b/src/lamp_py/postgres/seed_metadata.py index b90cf3ca..7f17a5dd 100644 --- a/src/lamp_py/postgres/seed_metadata.py +++ b/src/lamp_py/postgres/seed_metadata.py @@ -73,9 +73,7 @@ def reset_rpm(parsed_args: argparse.Namespace) -> None: ) if parsed_args.clear_static: - rpm_db_name = os.getenv( - "ALEMBIC_RPM_DB_NAME", "performance_manager_prod" - ) + rpm_db_name = os.getenv("ALEMBIC_RPM_DB_NAME", "") alembic_downgrade_to_base(rpm_db_name) alembic_upgrade_to_head(rpm_db_name) elif parsed_args.clear_rt: @@ -99,7 +97,7 @@ def run() -> None: ) if parsed_args.clear_static: - md_db_name = os.getenv("ALEMBIC_MD_DB_NAME", "metadata_prod") + md_db_name = os.getenv("ALEMBIC_MD_DB_NAME", "") alembic_downgrade_to_base(md_db_name) alembic_upgrade_to_head(md_db_name) elif parsed_args.clear_rt: diff --git a/src/lamp_py/tableau/jobs/rt_rail.py b/src/lamp_py/tableau/jobs/rt_rail.py index bef10daf..3d933070 100644 --- a/src/lamp_py/tableau/jobs/rt_rail.py +++ b/src/lamp_py/tableau/jobs/rt_rail.py @@ -46,6 +46,7 @@ def __init__(self) -> None: " , extract(epoch FROM (TIMEZONE('America/New_York', TO_TIMESTAMP(ve.vp_move_timestamp)) - vt.service_date::text::timestamp))::int as previous_stop_departure_sec" " , extract(epoch FROM (TIMEZONE('America/New_York', TO_TIMESTAMP(COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp))) - vt.service_date::text::timestamp))::int as stop_arrival_sec" " , extract(epoch FROM (TIMEZONE('America/New_York', TO_TIMESTAMP(next_ve.vp_move_timestamp)) - vt.service_date::text::timestamp))::int as stop_departure_sec" + " , vt.revenue as is_revenue" " , vt.direction_id::int" " , vt.route_id" " , vt.branch_route_id" @@ -146,6 +147,7 @@ def parquet_schema(self) -> pyarrow.schema: ("previous_stop_departure_sec", pyarrow.int64()), ("stop_arrival_sec", pyarrow.int64()), ("stop_departure_sec", pyarrow.int64()), + ("is_revenue", pyarrow.bool_()), ("direction_id", pyarrow.int8()), ("route_id", pyarrow.string()), ("branch_route_id", pyarrow.string()), diff --git a/tests/performance_manager/test_performance_manager.py b/tests/performance_manager/test_performance_manager.py index ad7bc230..ceb11e99 100644 --- a/tests/performance_manager/test_performance_manager.py +++ b/tests/performance_manager/test_performance_manager.py @@ -515,7 +515,7 @@ def test_bad_empty_static_table() -> None: static_tables = get_table_objects() test_table = {"stop_times": static_tables["stop_times"]} - with pytest.raises(pyarrow.ArrowInvalid): + with pytest.raises((pyarrow.ArrowInvalid, AssertionError)): load_parquet_files(test_table, "/tmp/FEED_INFO/timestamp=0000000000") @@ -557,24 +557,24 @@ def test_gtfs_rt_processing( route_ids = rail_routes_from_filepath(files["vp_paths"], rpm_db_manager) positions = get_vp_dataframe(files["vp_paths"], route_ids) position_size = positions.shape[0] - assert positions.shape[1] == 13 + assert positions.shape[1] == 14 # check that the types can be set correctly positions = transform_vp_datatypes(positions) - assert positions.shape[1] == 13 + assert positions.shape[1] == 14 assert position_size == positions.shape[0] # check that it can be combined with the static schedule positions = add_static_version_key_column(positions, rpm_db_manager) - assert positions.shape[1] == 14 + assert positions.shape[1] == 15 assert position_size == positions.shape[0] positions = add_parent_station_column(positions, rpm_db_manager) - assert positions.shape[1] == 15 + assert positions.shape[1] == 16 assert position_size == positions.shape[0] positions = transform_vp_timestamps(positions) - assert positions.shape[1] == 14 + assert positions.shape[1] == 15 assert position_size > positions.shape[0] trip_updates = get_and_unwrap_tu_dataframe(files["tu_paths"], route_ids) @@ -619,6 +619,7 @@ def test_gtfs_rt_processing( expected_columns.add("start_time") expected_columns.add("vehicle_id") expected_columns.add("static_version_key") + expected_columns.add("revenue") assert len(expected_columns) == len(events.columns) missing_columns = set(events.columns) - expected_columns