Skip to content

Commit

Permalink
FEAT: Add 'revenue' field to Rail data pipeline (#446)
Browse files Browse the repository at this point in the history
This change adds the GTFS-RT "revenue" field to the LAMP Rail Performance Data pipeline.

- Create db migrations to add revenue column to vehicle_trips table
- Read vehicle.trip.revenue column from VehiclePositions parquet files
- Load revenue data from parquet files into DB table
- Add is_revenue column to LAMP_ALL_RT_fields OPMI export
- Add where clause for only revenue trips to Subway Performance Data export

The use of the revenue field differs between the OPMI export and Subway Performance Data export. For the OMPI export an additional field is added so that revenue trips can be filtered out at their discretion. The Subway Performance Data export was never meant to include non-revenue event data, so it is filtered out from the export all together.

Asana Task: https://app.asana.com/0/1205827492903547/1208216161546522
  • Loading branch information
rymarczy authored Oct 2, 2024
1 parent 9d37551 commit f0a4652
Show file tree
Hide file tree
Showing 19 changed files with 306 additions and 141 deletions.
32 changes: 17 additions & 15 deletions Data_Dictionary.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ LAMP currently produces the following sets of public data exports:

# Subway Performance Data

Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of rail service.
Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of revenue rail service.

| field name | type | description | source |
| ----------- | --------- | ----------- | ------------ |
| service_date | int64 | equivalent to GTFS-RT `start_date` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) as `int` instead of `string` | GTFS-RT |
| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) converted to seconds after midnight | GTFS-RT |
| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT |
| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT |
| service_date | int64 | equivalent to GTFS-RT `start_date` value in [Trip Descriptor][gtfs-tripdescriptor] as `int` instead of `string` | GTFS-RT |
| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor][gtfs-tripdescriptor] converted to seconds after midnight | GTFS-RT |
| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT |
| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT |
| trunk_route_id | string | line if multiple routes exist on line, otherwise `route_id`, e.g. `Green` for `Green-B` route, `Blue` for `Blue` route | GTFS-RT |
| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT |
| direction_id | bool | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) as `bool` instead of `int` | GTFS-RT |
| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT |
| direction_id | bool | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor][gtfs-tripdescriptor] as `bool` instead of `int` | GTFS-RT |
| direction | string | equivalent to GTFS `direction` value from [directions.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#directionstxt) for `route_id`-`direction_id` pair | GTFS |
| direction_destination | string | equivalent to GTFS `direction_destination` value from [directions.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#directionstxt) for `route_id`-`direction_id` pair | GTFS |
| stop_count | int16 | number of stops recorded on trip | LAMP Calculated |
Expand Down Expand Up @@ -61,8 +61,8 @@ Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of

| field name | type | description | source |
| ----------- | --------- | ----------- | ------------ |
| service_date | date | equivalent to GTFS-RT `start_date` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) as `date` instead of `string` | GTFS-RT |
| start_datetime | datetime | equivalent to GTFS-RT `start_time` added to `start_date` from [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | LAMP Calculated |
| service_date | date | equivalent to GTFS-RT `start_date` value in [Trip Descriptor][gtfs-tripdescriptor] as `date` instead of `string` | GTFS-RT |
| start_datetime | datetime | equivalent to GTFS-RT `start_time` added to `start_date` from [Trip Descriptor][gtfs-tripdescriptor] | LAMP Calculated |
| static_start_datetime | datetime | equivalent to `start_datetime` if planned trip, otherwise GTFS-RT `start_date` added to `static_start_time` | LAMP Calculated |
| stop_sequence | int16 | equivalent to GTFS-RT `current_stop_sequence` value in [VehiclePosition](https://gtfs.org/realtime/reference/#message-vehicleposition) | GTFS-RT |
| canonical_stop_sequence | int16 | stop sequence based on "canonical" route trip as defined in [route_patterns.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#route_patternstxt) table | LAMP Calculated |
Expand All @@ -81,14 +81,15 @@ Each row represents a unique `trip_id`-`stop_id` pair for each `service_date` of
| previous_stop_departure_sec | int64 | `previous_stop_departure_datetime` as seconds after midnight | LAMP Calculated
| stop_arrival_sec | int64 | `stop_arrival_datetime` as seconds after midnight | LAMP Calculated
| stop_departure_sec | int64 | `stop_departure_datetime` as seconds after midnight | LAMP Calculated
| direction_id | int8 | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT |
| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT |
| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route_id. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT |
| is_revenue | boolen | equivalent to MBTA GTFS-RT `revenue` value in [Trip Descriptor](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#non-revenue-trips) as only bool | GTFS-RT |
| direction_id | int8 | equivalent to GTFS-RT `direction_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT |
| route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT |
| branch_route_id | string | equivalent to GTFS-RT `route_id` value in [Trip Descriptor][gtfs-tripdescriptor] for lines with multiple routes, `NULL` if line has single route, e.g. `Green-B` for `Green-B` route, `NULL` for `Blue` route_id. EXCEPTION: LAMP Inserts `Red-A` or `Red-B` to indicate `Red`-line Ashmont or Braintree branch if trip stops at station south of JFK/UMass. | GTFS-RT |
| trunk_route_id | string | line if multiple routes exist on line, otherwise `route_id`, e.g. `Green` for `Green-B` route, `Blue` for `Blue` route | GTFS-RT |
| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) converted to seconds after midnight | GTFS-RT |
| start_time | int64 | equivalent to GTFS-RT `start_time` value in [Trip Descriptor][gtfs-tripdescriptor] converted to seconds after midnight | GTFS-RT |
| vehicle_id | string | equivalent to GTFS-RT `id` value in [VehicleDescriptor](https://gtfs.org/realtime/reference/#message-vehicledescriptor) | GTFS-RT
| stop_count | int16 | number of stops recorded on trip | LAMP Calculated |
| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor](https://gtfs.org/realtime/reference/#message-tripdescriptor) | GTFS-RT |
| trip_id | string | equivalent to GTFS-RT `trip_id` value in [Trip Descriptor][gtfs-tripdescriptor] | GTFS-RT |
| vehicle_label | string | equivalent to GTFS-RT `label` value in [VehicleDescriptor](https://gtfs.org/realtime/reference/#message-vehicledescriptor). | GTFS-RT
| vehicle_consist | string | Pipe separated concatenation of `multi_carriage_details` labels in [CarriageDetails](https://gtfs.org/realtime/reference/#message-CarriageDetails) | GTFS-RT
| direction | string | equivalent to GTFS `direction` value from [directions.txt](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs.md#directionstxt) for `route_id`-`direction_id` pair | GTFS |
Expand Down Expand Up @@ -259,4 +260,5 @@ In generating this dataset, translation string fields contain only the English t
| informed_entity.activities | string | Equivalent to `informed_entity[n][activities]` as a `\|` delimitated string. All potential values are defined in the [Activity](https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enum-activity) enum.

[gtfs-rt-alert]: https://gtfs.org/realtime/reference/#message-alert
[mbta-enhanced]: https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enhanced-fields
[mbta-enhanced]: https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enhanced-fields
[gtfs-tripdescriptor]: https://gtfs.org/realtime/reference/#message-tripdescriptor
35 changes: 27 additions & 8 deletions src/lamp_py/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import boto3
import botocore
import botocore.exceptions
from botocore.exceptions import ClientError
import pandas
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pyarrow.dataset as pd
from botocore.exceptions import ClientError
from pyarrow import Table, fs
from pyarrow.util import guid

Expand Down Expand Up @@ -254,7 +255,10 @@ def get_zip_buffer(filename: str) -> IO[bytes]:


def file_list_from_s3(
bucket_name: str, file_prefix: str, max_list_size: int = 250_000
bucket_name: str,
file_prefix: str,
max_list_size: int = 250_000,
in_filter: Optional[str] = None,
) -> List[str]:
"""
get a list of s3 objects
Expand Down Expand Up @@ -283,7 +287,10 @@ def file_list_from_s3(
for obj in page["Contents"]:
if obj["Size"] == 0:
continue
filepaths.append(os.path.join("s3://", bucket_name, obj["Key"]))
if in_filter is None or in_filter in obj["Key"]:
filepaths.append(
os.path.join("s3://", bucket_name, obj["Key"])
)

if len(filepaths) > max_list_size:
break
Expand Down Expand Up @@ -673,15 +680,27 @@ def read_parquet(
) -> pandas.core.frame.DataFrame:
"""
read parquet file or files from s3 and return it as a pandas dataframe
if requested column from "columns" does not exist in parquet file then
the column will be added as all nulls, this was added to capture
vehicle.trip.revenue field from VehiclePosition files starting december 2023
"""
retry_attempts = 2
for retry_attempt in range(retry_attempts + 1):
try:
df = (
_get_pyarrow_dataset(filename, filters)
.to_table(columns=columns)
.to_pandas(self_destruct=True)
)
ds = _get_pyarrow_dataset(filename, filters)
if columns is None:
table = ds.to_table(columns=columns)

else:
read_columns = list(set(ds.schema.names) & set(columns))
table = ds.to_table(columns=read_columns)
for null_column in set(columns).difference(ds.schema.names):
table = table.append_column(
null_column, pa.nulls(table.num_rows)
)

df = table.to_pandas(self_destruct=True)
break
except Exception as exception:
if retry_attempt == retry_attempts:
Expand Down
16 changes: 9 additions & 7 deletions src/lamp_py/ingestion/light_rail_gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,18 @@ def ingest_light_rail_gps() -> None:

s3_files = [file for file in s3_files if "LightRailRawGPS" in file]

dataframe, archive_files, error_files = dataframe_from_gz(s3_files)
if len(s3_files) > 0:

write_parquet(dataframe)
dataframe, archive_files, error_files = dataframe_from_gz(s3_files)

write_parquet(dataframe)

if len(archive_files) > 0:
move_s3_objects(archive_files, os.environ["ARCHIVE_BUCKET"])
if len(error_files) > 0:
move_s3_objects(error_files, os.environ["ERROR_BUCKET"])

logger.log_complete()

except Exception as exception:
logger.log_failure(exception)

if len(archive_files) > 0:
move_s3_objects(archive_files, os.environ["ARCHIVE_BUCKET"])
if len(error_files) > 0:
move_s3_objects(error_files, os.environ["ERROR_BUCKET"])
Original file line number Diff line number Diff line change
Expand Up @@ -47,51 +47,6 @@ def upgrade() -> None:
postgresql_where=sa.text("rail_pm_processed = false"),
)

# pull metadata from the rail performance manager database into the
# metadata database. the table may or may not exist, so wrap this in a try
# except
try:
rpm_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER
)

insert_data = []
# pull metadata from the rail performance manager database via direct
# sql query. the metadata_log table may or may not exist.
with rpm_db_manager.session.begin() as session:
result = session.execute(
text("SELECT path, processed, process_fail FROM metadata_log")
)
for row in result:
(path, processed, process_fail) = row
insert_data.append(
{
"path": path,
"rail_pm_processed": processed,
"rail_pm_process_fail": process_fail,
}
)

except ProgrammingError as error:
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
# no metadata_log table in the rail performance manager database
#
# Raise all other sql errors
insert_data = []
original_error = error.orig
if (
original_error is not None
and hasattr(original_error, "pgcode")
and original_error.pgcode == "42P01"
):
logging.info("No Metadata Table in Rail Performance Manager")
else:
raise

# insert data into the metadata database
if insert_data:
op.bulk_insert(MetadataLog.__table__, insert_data)

# ### end Alembic commands ###


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,59 +29,6 @@

def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
while True:
try:
rpm_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER
)
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA)

with rpm_db_manager.session.begin() as session:
legacy_result = session.execute(
text("SELECT path FROM metadata_log")
)
legacy_paths = set(
[record[0] for record in legacy_result.fetchall()]
)

modern_result = md_db_manager.select_as_list(
sa.select(MetadataLog.path)
)
modern_paths = set([record["path"] for record in modern_result])

missing_paths = legacy_paths - modern_paths
if len(missing_paths) == 0:
break
else:
logging.error(
"Detected %s paths in Legacy Metadata Table not found in Metadata Database",
len(missing_paths),
)
except ProgrammingError as error:
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
# no metadata_log table in the rail performance manager database
#
# Raise all other sql errors
original_error = error.orig
if (
original_error is not None
and hasattr(original_error, "pgcode")
and original_error.pgcode == "42P01"
):
logging.info("No Metadata Table in Rail Performance Manager")
legacy_paths = set()
else:
logging.exception(
"Programming Error when checking Metadata Log"
)
time.sleep(15)
continue

except Exception as error:
logging.exception("Programming Error when checking Metadata Log")
time.sleep(15)
continue

op.drop_index("ix_metadata_log_not_processed", table_name="metadata_log")
op.drop_table("metadata_log")
# ### end Alembic commands ###
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""add revenue columns
Revision ID: 32ba735d080c
Revises: 896dedd8a4db
Create Date: 2024-09-20 08:47:52.784591
This change adds a boolean revenue column to the vehcile_trips table.
Initially this will be filled with True and back-filled by a seperate operation
Details
* upgrade -> drop triggers and indexes from table and add revenue column
* downgrade -> drop revenue column
"""

from alembic import op
import sqlalchemy as sa

from lamp_py.postgres.rail_performance_manager_schema import (
TempEventCompare,
VehicleTrips,
)

# revision identifiers, used by Alembic.
revision = "32ba735d080c"
down_revision = "896dedd8a4db"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.execute(
f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER rt_trips_update_branch_trunk;"
)
op.execute(
f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER update_vehicle_trips_modified;"
)
op.drop_index("ix_vehicle_trips_composite_1", table_name="vehicle_trips")
op.drop_constraint("vehicle_trips_unique_trip", table_name="vehicle_trips")

op.add_column(
"temp_event_compare", sa.Column("revenue", sa.Boolean(), nullable=True)
)
op.add_column(
"vehicle_trips", sa.Column("revenue", sa.Boolean(), nullable=True)
)
op.execute(sa.update(TempEventCompare).values(revenue=True))
op.execute(sa.update(VehicleTrips).values(revenue=True))
op.alter_column("temp_event_compare", "revenue", nullable=False)
op.alter_column("vehicle_trips", "revenue", nullable=False)

op.create_unique_constraint(
"vehicle_trips_unique_trip",
"vehicle_trips",
["service_date", "route_id", "trip_id"],
)
op.create_index(
"ix_vehicle_trips_composite_1",
"vehicle_trips",
["route_id", "direction_id", "vehicle_id"],
unique=False,
)
op.execute(
f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER rt_trips_update_branch_trunk;"
)
op.execute(
f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER update_vehicle_trips_modified;"
)


def downgrade() -> None:
op.drop_column("vehicle_trips", "revenue")
op.drop_column("temp_event_compare", "revenue")
Loading

0 comments on commit f0a4652

Please sign in to comment.