Skip to content

Commit

Permalink
FEAT: Get Piece of Work Information from TM Files
Browse files Browse the repository at this point in the history
Process Daily Work Piece logs into a dataframe of operator / vehicle
records tied to block ids, run ids, and trip ids that can be joined
against bus vehicle events.
  • Loading branch information
mzappitello committed Aug 20, 2024
1 parent 14682a5 commit 40c2007
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 4 deletions.
204 changes: 202 additions & 2 deletions src/lamp_py/bus_performance_manager/tm_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import List
from datetime import date

import pytz
import polars as pl
Expand Down Expand Up @@ -27,7 +26,25 @@ def create_dt_from_sam(

def generate_tm_events(tm_files: List[str]) -> pl.DataFrame:
"""
build out events from transit master stop crossing data
Build out events from transit master stop crossing data after joining it
with static Transit Master data describing stops, routes, trips, and
vehicles.
:param tm_files: transit master parquet files from the StopCrossings table.
:return dataframe:
service_date -> Date
begin_wp_sam -> Int64
end_wp_skm -> Int64
block_id -> String
run_id -> String
trip_id -> String
operator_badge_number -> String
vehicle_label -> String
logon_sam -> Int64
logoff_sam -> Int64
logon_time -> Datetime(time_unit='us', time_zone=None)
logoff_time -> Datetime(time_unit='us', time_zone=None))
"""
# the geo node id is the transit master key and the geo node abbr is the
# gtfs stop id
Expand Down Expand Up @@ -100,3 +117,186 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame:
)

return tm_stop_crossings


def get_daily_work_pieces(daily_work_piece_files: List[str]) -> pl.DataFrame:
"""
Create dataframe describing who drove what piece of work, run, and block.
This dataframe can be joined against bus vehicle events by both trip id and
vehicle label.
:param daily_work_piece_files: transit master parquet files from the
DailyWorkPiece table.
:return dataframe:
service_date -> Date
block_id -> String
run_id -> String
trip_id -> String
operator_badge_number -> String
vehicle_label -> String
logon_sam -> Int64
logoff_sam -> Int64
logon_time -> Datetime(time_unit='us', time_zone=None)
logoff_time -> Datetime(time_unit='us', time_zone=None
"""
# collect all the tables with static data on pieces of work, blocks, runs,
# and trips. these will all be joined into a static work pieces dataframe
# that will be joined against realtime data.

# Work Piece Id is the TM Work Piece Table Key
# Block Id is the TM Block Table Key
# Run Id is the TM Run Table Key
# Begin and End Time are in Seconds after Midnight. It will be used to
# filter a join with the Trips objects.
# Time Table Version Id is similar to our Static Schedule Version keys in
# the Rail Performance Manager DB
#
# NOTE: RUN_IDs and BLOCK_IDs will be repeated, as multiple pieces of work
# can have the same run or block. I think its because a Piece of Work can
# be scheduled for a single day of the week but we reuse Runs and Blocks
# across different scheduled days.
tm_work_pieces = pl.scan_parquet(
RemoteFileLocations.tm_work_piece_file.get_s3_path()
).select(
"WORK_PIECE_ID",
"BLOCK_ID",
"RUN_ID",
"BEGIN_TIME",
"END_TIME",
"TIME_TABLE_VERSION_ID",
)

# Block Id is the TM Block Table Key
# Block Abbr is the ID the rest of the MBTA uses for this Block
# Time Table Version Id is similar to our Static Schedule Version keys in
# the Rail Performance Manager DB
tm_blocks = pl.scan_parquet(
RemoteFileLocations.tm_block_file.get_s3_path()
).select("BLOCK_ID", "BLOCK_ABBR", "TIME_TABLE_VERSION_ID")

# Run Id is the TM Run Table Key
# Run Designator is the ID the rest of the MBTA uses for this Run
# Time Table Version Id is similar to our Static Schedule Version keys in
# the Rail Performance Manager DB
tm_runs = pl.scan_parquet(
RemoteFileLocations.tm_run_file.get_s3_path()
).select("RUN_ID", "RUN_DESIGNATOR", "TIME_TABLE_VERSION_ID")

# Trip Id is the TM Trip Table Key
# Block Id is the TM Block Table Key
# Trip Serial Number is the ID the rest of the MBTA uses for this Trip
# Trip End Time is in Seconds after Midnight. It will be used to filter a
# join with the Work Pieces objects.
# Time Table Version Id is similar to our Static Schedule Version keys in
# the Rail Performance Manager DB
tm_trips = pl.scan_parquet(
RemoteFileLocations.tm_trip_file.get_s3_path()
).select(
"TRIP_ID",
"BLOCK_ID",
"TRIP_SERIAL_NUMBER",
"TRIP_END_TIME",
"TIME_TABLE_VERSION_ID",
)

# Join all of the Static Data together to map a Trip to a Block, Run, and
# Piece of Work.
#
# As multiple Pieces of Work will have the same Block and Run Ids, an
# individual Trip, which is joined on Block Ids will map to multiple Pieces
# of Work, we can filter out a lot of these based on the trip end time and
# piece of work begin and end time. There may still be multiple pieces of
# work per trip id though. I haven't found a good way to filter out
static_work_pieces = (
tm_work_pieces.join(tm_blocks, on=["BLOCK_ID", "TIME_TABLE_VERSION_ID"])
.join(tm_runs, on=["RUN_ID", "TIME_TABLE_VERSION_ID"])
.join(tm_trips, on=["BLOCK_ID", "TIME_TABLE_VERSION_ID"])
.filter(
(pl.col("BEGIN_TIME") < pl.col("TRIP_END_TIME"))
& (pl.col("END_TIME") >= pl.col("TRIP_END_TIME"))
)
)

# Collect the Realtime Details of who operated what vehicle for which piece
# of work on a given day. Join the realtime data with static operator and
# vehicle datasets.

# Work Piece Id is the TM Work Piece Table Key
# Calendar Id is the service date formatted "1YYYYMMDD"
# Current Operator Id is a TM Operator Table Key
# Run Id is the TM Run Table Key
# Current Vehicle Id is a TM Vehicle Table Key
# Actual Logon and Logoff Times are in Seconds after Midnight and describe
# when an operator logged on or off for this piece of work.
#
# NOTE: A Piece of Work can have multiple operator / vehicle pairs. The log
# on and log off times can be used to figure out who is driving during a
# vehicle event.
daily_work_piece = (
pl.scan_parquet(daily_work_piece_files)
.filter(pl.col("WORK_PIECE_ID").is_not_null())
.select(
"WORK_PIECE_ID",
"CALENDAR_ID",
"CURRENT_OPERATOR_ID",
"RUN_ID",
"CURRENT_VEHICLE_ID",
"ACTUAL_LOGON_TIME",
"ACTUAL_LOGOFF_TIME",
)
)

# Operator Id is the TM Operator Table Key
# Operator Logon Id is the Badge Number
tm_operators = pl.scan_parquet(
RemoteFileLocations.tm_operator_file.get_s3_path()
).select("OPERATOR_ID", "ONBOARD_LOGON_ID")

# Vehicle Id is the TM Vehicle Table Key
# Property Tag is Vehicle Label used by the MBTA
tm_vehicles = pl.scan_parquet(
RemoteFileLocations.tm_vehicle_file.get_s3_path()
).select("VEHICLE_ID", "PROPERTY_TAG")

# Join Operator and Vehicle information to the Daily Work Pieces
realtime_work_pieces = daily_work_piece.join(
tm_operators, left_on="CURRENT_OPERATOR_ID", right_on="OPERATOR_ID"
).join(tm_vehicles, left_on="CURRENT_VEHICLE_ID", right_on="VEHICLE_ID")

# Join the static and realtime workpiece dataframes on the Work Piece ID
# and Run Id. This will give us a dataframe of potential operator / vehicle
# pairs for a given trip, along with the block and run ids for a service
# date. Since multiple operators can be associated with a single piece of
# work, the logon and logoff times will need to be used to figure out who
# was driving at a given time.
return (
realtime_work_pieces.join(
static_work_pieces, on=["WORK_PIECE_ID", "RUN_ID"], how="left"
)
.select(
pl.col("CALENDAR_ID")
.cast(pl.Utf8)
.str.slice(1)
.str.strptime(pl.Date, format="%Y%m%d")
.alias("service_date"),
pl.col("BLOCK_ABBR").cast(pl.String).alias("block_id"),
pl.col("RUN_DESIGNATOR").cast(pl.String).alias("run_id"),
pl.col("TRIP_SERIAL_NUMBER").cast(pl.String).alias("trip_id"),
pl.col("ONBOARD_LOGON_ID")
.cast(pl.String)
.alias("operator_badge_number"),
pl.col("PROPERTY_TAG").cast(pl.String).alias("vehicle_label"),
pl.col("ACTUAL_LOGON_TIME").alias("logon_sam"),
pl.col("ACTUAL_LOGOFF_TIME").alias("logoff_sam"),
)
.with_columns(
create_dt_from_sam(
pl.col("service_date"), pl.col("logon_sam")
).alias("logon_time"),
create_dt_from_sam(
pl.col("service_date"), pl.col("logoff_sam")
).alias("logoff_time"),
)
.collect()
)
11 changes: 9 additions & 2 deletions tests/bus_performance_manager/test_tm_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from _pytest.monkeypatch import MonkeyPatch
from datetime import datetime

from _pytest.monkeypatch import MonkeyPatch
import polars as pl

from lamp_py.bus_performance_manager.tm_ingestion import generate_tm_events
Expand All @@ -10,6 +10,9 @@


def test_tm_to_bus_events(monkeypatch: MonkeyPatch) -> None:
"""
run tests on each file in the test files tm stop crossings directory
"""
monkeypatch.setattr(
"lamp_py.bus_performance_manager.tm_ingestion.RemoteFileLocations",
LocalFileLocaions,
Expand All @@ -23,9 +26,13 @@ def test_tm_to_bus_events(monkeypatch: MonkeyPatch) -> None:


def check_stop_crossings(stop_crossings_filepath: str) -> None:
"""
run checks on the dataframes produced by running generate_tm_events on
transit master stop crossing files.
"""
# Remove the .parquet extension and get the date
filename = os.path.basename(stop_crossings_filepath)
date_str = filename.replace( ".parquet", "")[1:]
date_str = filename.replace(".parquet", "")[1:]
service_date = datetime.strptime(date_str, "%Y%m%d").date()

# this is the df of all useful records from the stop crossings files
Expand Down

0 comments on commit 40c2007

Please sign in to comment.