Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update pipeline to use trip_id as unique key #160

Merged
merged 4 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,15 @@ def upgrade() -> None:
sa.Column("service_date", sa.Integer(), nullable=False),
sa.Column("direction_id", sa.Boolean(), nullable=False),
sa.Column("route_id", sa.String(length=60), nullable=False),
sa.Column("start_time", sa.Integer(), nullable=False),
sa.Column("start_time", sa.Integer(), nullable=True),
sa.Column("vehicle_id", sa.String(length=60), nullable=False),
sa.Column("stop_sequence", sa.SmallInteger(), nullable=True),
sa.Column("stop_id", sa.String(length=60), nullable=False),
sa.Column("parent_station", sa.String(length=60), nullable=False),
sa.Column("vp_move_timestamp", sa.Integer(), nullable=True),
sa.Column("vp_stop_timestamp", sa.Integer(), nullable=True),
sa.Column("tu_stop_timestamp", sa.Integer(), nullable=True),
sa.Column("trip_id", sa.String(length=128), nullable=True),
sa.Column("trip_id", sa.String(length=128), nullable=False),
sa.Column("vehicle_label", sa.String(length=128), nullable=True),
sa.Column("vehicle_consist", sa.String(), nullable=True),
sa.Column("static_version_key", sa.Integer(), nullable=False),
Expand Down Expand Up @@ -342,12 +342,12 @@ def upgrade() -> None:
sa.Column("service_date", sa.Integer(), nullable=False),
sa.Column("route_id", sa.String(length=60), nullable=False),
sa.Column("direction_id", sa.Boolean(), nullable=False),
sa.Column("start_time", sa.Integer(), nullable=False),
sa.Column("start_time", sa.Integer(), nullable=True),
sa.Column("vehicle_id", sa.String(length=60), nullable=False),
sa.Column("branch_route_id", sa.String(length=60), nullable=True),
sa.Column("trunk_route_id", sa.String(length=60), nullable=True),
sa.Column("stop_count", sa.SmallInteger(), nullable=True),
sa.Column("trip_id", sa.String(length=128), nullable=True),
sa.Column("trip_id", sa.String(length=128), nullable=False),
sa.Column("vehicle_label", sa.String(length=128), nullable=True),
sa.Column("vehicle_consist", sa.String(), nullable=True),
sa.Column("direction", sa.String(length=30), nullable=True),
Expand All @@ -371,9 +371,7 @@ def upgrade() -> None:
sa.UniqueConstraint(
"service_date",
"route_id",
"direction_id",
"start_time",
"vehicle_id",
"trip_id",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to keep route_id in here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have examples in our test set where the same trip_id continues across more than one Green Line route_id for the same start_date.

I'm not sure how common this is, but it does occur, and I believe the unique trip designations we are looking for should include route_id as a guarantee.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): not sure it matters for you all, but a route is an attribute of a trip, so that's already unique: you don't need both items.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have examples in our test set where the same trip_id continues across more than one Green Line route_id for the same start_date.

I'm not sure how common this is, but it does occur, and I believe the unique trip designations we are looking for should include route_id as a guarantee.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment has been added to the function call to describe this behavior

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have details about when you've seen that, it would be helpful: I don't think that should be happening (at least not within a single start_date).

Copy link
Collaborator Author

@rymarczy rymarczy Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our flat file test data is a random sample from May 8th of this year. Two trip ids exhibit this behavior (ADDED-1581518542, ADDED-1581518549).

CSV file with the GTFS-RT Vehicle Position data is attached.

non_unique_trip_id.csv

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, something like this makes sense for ADDED trips (although probably that reflects a bug in RTR). I'm still not sure why route_id needs to be included in the unique constraint though (given that it's possible for some of the other items in there to be non-unique as well), but I'll defer to the Lamp team.

name="vehicle_trips_unique_trip",
),
)
Expand Down
4 changes: 1 addition & 3 deletions python_src/src/lamp_py/performance_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,9 @@ Performance Manager compresses GTFS-RT Vehicle Positions event records to store

Initially, Vehicle Positions events are grouped by unique trip-stop columns:
* parent_station
* direction_id
* route_id
* service_date
* start_time
* vehicle_id
* trip_id

For each trip_stop event, the earliest `vehicle_timestamp` for a `current_status` indicating the vehicle is stopped is saved as the `vp_stop_timestamp` in the [vehicle_events](#vehicle_events) table. The earliest `vehicle_timestamp` for a `current_status` indicating the vehicle is moving is saved as the `vp_move_timestamp`.

Expand Down
4 changes: 1 addition & 3 deletions python_src/src/lamp_py/performance_manager/gtfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ def unique_trip_stop_columns() -> List[str]:
"""
return [
"service_date",
"start_time",
"route_id",
"direction_id",
"vehicle_id",
"trip_id",
"parent_station",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ def combine_events(
# we want to drop these null value records whenever possible
# to prioritize records from vehicle_positions
event_details["na_sort"] = (
event_details[
["trip_id", "stop_sequence", "vehicle_label", "vehicle_consist"]
]
event_details[["stop_sequence", "vehicle_label", "vehicle_consist"]]
.isna()
.sum(axis=1)
)
Expand Down Expand Up @@ -305,9 +303,7 @@ def build_temp_events(
.where(
TempEventCompare.service_date == VehicleTrips.service_date,
TempEventCompare.route_id == VehicleTrips.route_id,
TempEventCompare.direction_id == VehicleTrips.direction_id,
TempEventCompare.start_time == VehicleTrips.start_time,
TempEventCompare.vehicle_id == VehicleTrips.vehicle_id,
TempEventCompare.trip_id == VehicleTrips.trip_id,
)
)
db_manager.execute(update_temp_trip_id)
Expand Down
16 changes: 9 additions & 7 deletions python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ def get_tu_dataframe_chunks(
"start_date",
"start_time",
"vehicle_id",
"trip_id",
]
trip_update_filters = [
("direction_id", "in", (0, 1)),
("timestamp", ">", 0),
("route_id", "!=", "None"),
("start_date", "!=", "None"),
("start_time", "!=", "None"),
("trip_id", "!=", "None"),
("vehicle_id", "!=", "None"),
("route_id", "in", route_ids),
]
Expand All @@ -58,9 +58,10 @@ def explode_stop_time_update(
timestamp: int,
direction_id: bool,
route_id: Any,
service_date: int,
start_time: int,
service_date: Optional[int],
start_time: Optional[int],
vehicle_id: Any,
trip_id: Any,
) -> Optional[List[dict]]:
"""
explode nested list of dicts in stop_time_update column
Expand All @@ -74,6 +75,7 @@ def explode_stop_time_update(
"service_date": service_date,
"start_time": start_time,
"vehicle_id": vehicle_id,
"trip_id": trip_id,
}
return_list: List[Dict[str, Any]] = []

Expand Down Expand Up @@ -134,7 +136,7 @@ def get_and_unwrap_tu_dataframe(
)
batch_events["service_date"] = pandas.to_numeric(
batch_events["service_date"]
).astype("int64")
).astype("Int64")
rymarczy marked this conversation as resolved.
Show resolved Hide resolved

# store direction_id as bool
batch_events["direction_id"] = pandas.to_numeric(
Expand All @@ -145,7 +147,7 @@ def get_and_unwrap_tu_dataframe(
batch_events["start_time"] = (
batch_events["start_time"]
.apply(start_time_to_seconds)
.astype("int64")
.astype("Int64")
)

# expand and filter stop_time_update column using numpy vectorize
Expand All @@ -161,6 +163,7 @@ def get_and_unwrap_tu_dataframe(
batch_events.service_date,
batch_events.start_time,
batch_events.vehicle_id,
batch_events.trip_id,
)
).dropna()
events = pandas.concat([events, batch_events])
Expand Down Expand Up @@ -203,7 +206,6 @@ def reduce_trip_updates(trip_updates: pandas.DataFrame) -> pandas.DataFrame:
# add selected columns
# trip_updates and vehicle_positions dataframes must all have the same columns
# available to be correctly joined in combine_events function of l0_gtfs_rt_events.py
trip_updates["trip_id"] = None
trip_updates["stop_sequence"] = None
trip_updates["vehicle_label"] = None
trip_updates["vehicle_consist"] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ def get_vp_dataframe(
("vehicle_timestamp", ">", 0),
("direction_id", "in", (0, 1)),
("route_id", "!=", "None"),
("start_date", "!=", "None"),
("start_time", "!=", "None"),
("vehicle_id", "!=", "None"),
("route_id", "in", route_ids),
("trip_id", "!=", "None"),
]

result = read_parquet(
Expand Down Expand Up @@ -89,7 +88,7 @@ def transform_vp_datatypes(
)
vehicle_positions["service_date"] = pandas.to_numeric(
vehicle_positions["service_date"]
).astype("int64")
).astype("Int64")

# rename current_stop_sequence to stop_sequence
# and convert to int64
Expand All @@ -109,7 +108,7 @@ def transform_vp_datatypes(
vehicle_positions["start_time"] = (
vehicle_positions["start_time"]
.apply(start_time_to_seconds)
.astype("int64")
.astype("Int64")
)

process_logger.log_complete()
Expand All @@ -134,8 +133,6 @@ def transform_vp_timestamps(
process_logger.log_start()

trip_stop_columns = unique_trip_stop_columns()
# TODO: review trip_id processing # pylint: disable=fixme
# add more intelligent trip_id processing, this approach will randomly select trip_id record to keep
rymarczy marked this conversation as resolved.
Show resolved Hide resolved

# create a pivot table on unique trip-stop events, finding the earliest time
# that each vehicle/stop pair is and is not moving. rename the vehicle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,7 @@ def trips_for_metrics_subquery(
sa.func.lead(rt_trips_sub.c.vp_move_timestamp)
.over(
partition_by=rt_trips_sub.c.vehicle_id,
order_by=sa.func.coalesce(
rt_trips_sub.c.vp_move_timestamp,
rt_trips_sub.c.vp_stop_timestamp,
rt_trips_sub.c.tu_stop_timestamp,
),
order_by=rt_trips_sub.c.vp_move_timestamp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this change necessary as well if we're running the VACUUM?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change wasn't necessary, but looking at the field that's being calculated, and how it's used. I don't believe the coalesce is needed.

I think this business logic was a little bit of a hold over from before Ops Analytics decided they wanted all headways as departure to departure calculations.

)
.label("next_station_move"),
)
Expand Down
31 changes: 15 additions & 16 deletions python_src/src/lamp_py/performance_manager/l1_rt_trips.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None:

This guarantees that all events will have
matching trips data in the "vehicle_trips" table

This INSERT/UPDATE logic will load distinct trip information from the last
recorded trip-stop event of a trip. The information in the last trip-stop
event is assumed to be more accurate than information from the first
trip-stop event because some values can carry over from the last trip of the
vehicle. The `TempEventCompare.stop_sequence.desc()` `order_by` call is
responsible for this behavior.
"""
process_logger = ProcessLogger("l1_trips.load_new_trips")
process_logger.log_start()
Expand All @@ -93,15 +100,13 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None:
.distinct(
TempEventCompare.service_date,
TempEventCompare.route_id,
TempEventCompare.direction_id,
TempEventCompare.start_time,
TempEventCompare.vehicle_id,
TempEventCompare.trip_id,
)
.order_by(
TempEventCompare.service_date,
TempEventCompare.route_id,
TempEventCompare.direction_id,
TempEventCompare.start_time,
TempEventCompare.trip_id,
TempEventCompare.stop_sequence.desc(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this one into the ordering?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this query is to collect additional information fields ( vehicle_label, vehicle_consist) for unique trips that will be used for UPDATE/INSERT operations into our vehicle_trips table.

For this first stop-event in a trip, this information is frequently carried over from the last trip for the vehicle, so this query is collecting the latest stop-event values from these information fields to UPDATE/INSERT into our vehicle_trips table.

That is why the desc() ORDER is used, it would probably be helpful to add a comment to this effect above these calls.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment has been added to the function call to describe this behavior

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: is this backwards? stop sequences start with low numbers and go up.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this query is to collect additional information fields ( vehicle_label, vehicle_consist) for unique trips that will be used for UPDATE/INSERT operations into our vehicle_trips table.

For this first stop-event in a trip, this information is frequently carried over from the last trip for the vehicle, so this query is collecting the latest stop-event values from these information fields to UPDATE/INSERT into our vehicle_trips table.

That is why the desc() ORDER is used, it would probably be helpful to add a comment to this effect above these calls.

)
)

Expand All @@ -124,9 +129,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None:
index_elements=[
VehicleTrips.service_date,
VehicleTrips.route_id,
VehicleTrips.direction_id,
VehicleTrips.start_time,
VehicleTrips.vehicle_id,
VehicleTrips.trip_id,
],
)
)
Expand All @@ -146,15 +149,13 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None:
.distinct(
TempEventCompare.service_date,
TempEventCompare.route_id,
TempEventCompare.direction_id,
TempEventCompare.start_time,
TempEventCompare.vehicle_id,
TempEventCompare.trip_id,
)
.order_by(
TempEventCompare.service_date,
TempEventCompare.route_id,
TempEventCompare.direction_id,
TempEventCompare.start_time,
TempEventCompare.trip_id,
TempEventCompare.stop_sequence.desc(),
)
.where(
sa.or_(
Expand All @@ -175,9 +176,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None:
.where(
VehicleTrips.service_date == distinct_update_query.c.service_date,
VehicleTrips.route_id == distinct_update_query.c.route_id,
VehicleTrips.direction_id == distinct_update_query.c.direction_id,
VehicleTrips.start_time == distinct_update_query.c.start_time,
VehicleTrips.vehicle_id == distinct_update_query.c.vehicle_id,
VehicleTrips.trip_id == distinct_update_query.c.trip_id,
)
)
db_manager.execute(trip_update_query)
Expand Down
23 changes: 10 additions & 13 deletions python_src/src/lamp_py/postgres/postgres_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ class VehicleTrips(SqlBase): # pylint: disable=too-few-public-methods

# trip identifiers
service_date = sa.Column(sa.Integer, nullable=False)
trip_id = sa.Column(sa.String(128), nullable=False)

# additional trip information
route_id = sa.Column(sa.String(60), nullable=False)
direction_id = sa.Column(sa.Boolean, nullable=False)
start_time = sa.Column(sa.Integer, nullable=False)
start_time = sa.Column(sa.Integer, nullable=True)
vehicle_id = sa.Column(sa.String(60), nullable=False)

# additional trip information
branch_route_id = sa.Column(sa.String(60), nullable=True)
trunk_route_id = sa.Column(sa.String(60), nullable=True)
stop_count = sa.Column(sa.SmallInteger, nullable=True)
trip_id = sa.Column(sa.String(128), nullable=True)
vehicle_label = sa.Column(sa.String(128), nullable=True)
vehicle_consist = sa.Column(sa.String(), nullable=True)
direction = sa.Column(sa.String(30), nullable=True)
Expand All @@ -120,10 +120,7 @@ class VehicleTrips(SqlBase): # pylint: disable=too-few-public-methods
__table_args__ = (
sa.UniqueConstraint(
service_date,
route_id,
direction_id,
start_time,
vehicle_id,
trip_id,
name="vehicle_trips_unique_trip",
),
)
Expand Down Expand Up @@ -151,10 +148,7 @@ class TempEventCompare(SqlBase): # pylint: disable=too-few-public-methods

# trip identifiers
service_date = sa.Column(sa.Integer, nullable=False)
direction_id = sa.Column(sa.Boolean, nullable=False)
route_id = sa.Column(sa.String(60), nullable=False)
start_time = sa.Column(sa.Integer, nullable=False)
vehicle_id = sa.Column(sa.String(60), nullable=False)
trip_id = sa.Column(sa.String(128), nullable=False)

# stop identifiers
stop_sequence = sa.Column(sa.SmallInteger, nullable=True)
Expand All @@ -167,7 +161,10 @@ class TempEventCompare(SqlBase): # pylint: disable=too-few-public-methods
tu_stop_timestamp = sa.Column(sa.Integer, nullable=True)

# extra trip information
trip_id = sa.Column(sa.String(128), nullable=True)
direction_id = sa.Column(sa.Boolean, nullable=False)
route_id = sa.Column(sa.String(60), nullable=False)
start_time = sa.Column(sa.Integer, nullable=False)
vehicle_id = sa.Column(sa.String(60), nullable=False)
vehicle_label = sa.Column(sa.String(128), nullable=True)
vehicle_consist = sa.Column(sa.String(), nullable=True)

Expand Down
6 changes: 5 additions & 1 deletion python_src/src/lamp_py/postgres/postgres_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,11 @@ def truncate_table(
truncate_query = f"{truncate_query} CASCADE"

self.execute(sa.text(f"{truncate_query};"))
self.execute(sa.text(f"ANALYZE {truncat_as};"))

# Execute VACUUM to avoid non-deterministic behavior during testing
with self.session.begin() as cursor:
cursor.execute(sa.text("END TRANSACTION;"))
cursor.execute(sa.text(f"VACUUM (ANALYZE) {truncat_as};"))

def add_metadata_paths(self, paths: List[str]) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,15 @@ def test_gtfs_rt_processing(
files["tu_paths"], db_manager
)
trip_update_size = trip_updates.shape[0]
assert trip_updates.shape[1] == 8
assert trip_updates.shape[1] == 9

# check that it can be combined with the static schedule
trip_updates = add_static_version_key_column(trip_updates, db_manager)
assert trip_updates.shape[1] == 9
assert trip_updates.shape[1] == 10
assert trip_update_size == trip_updates.shape[0]

trip_updates = add_parent_station_column(trip_updates, db_manager)
assert trip_updates.shape[1] == 10
assert trip_updates.shape[1] == 11
assert trip_update_size == trip_updates.shape[0]

trip_updates = reduce_trip_updates(trip_updates)
Expand Down Expand Up @@ -642,6 +642,7 @@ def test_whole_table(
"headway_branch_seconds": "Int64",
"headway_trunk_seconds": "Int64",
"static_trip_id_guess": "Int64",
"start_time": "Int64",
}
sort_by = [
"route_id",
Expand Down
Loading
Loading