Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CHORE: Clean Up Parquet File Writing #189

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python_src/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ build-backend = "poetry.core.masonry.api"

[tool.black]
line-length = 80
target-version = ['py39']
target-version = ['py310']
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed this on the change from python 3.9 to 3.10


[tool.mypy]
disallow_untyped_defs = true
Expand Down
4 changes: 2 additions & 2 deletions python_src/src/lamp_py/tableau/hyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def run_hyper(self) -> None:
)
process_log.log_start()

for retry_count in range(max_retries):
for retry_count in range(max_retries + 1):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allows for actual number of max_retries

try:
process_log.add_metadata(retry_count=retry_count)
# get datasource from Tableau to check "updated_at" datetime
Expand Down Expand Up @@ -256,7 +256,7 @@ def run_hyper(self) -> None:
break

except Exception as exception:
if retry_count == max_retries - 1:
if retry_count == max_retries:
process_log.log_failure(exception=exception)

def run_parquet(self, db_manager: DatabaseManager) -> None:
Expand Down
3 changes: 3 additions & 0 deletions python_src/src/lamp_py/tableau/jobs/gtfs_rail.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ def create_parquet(self, db_manager: DatabaseManager) -> None:
if os.path.exists(self.local_parquet_path):
os.remove(self.local_parquet_path)

db_batch_size = 1024 * 1024 / 2

db_manager.write_to_parquet(
select_query=sa.text(self.create_query),
write_path=self.local_parquet_path,
schema=self.parquet_schema,
batch_size=db_batch_size,
Comment on lines +43 to +49
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on ECS Health dashboard, memory utilization appeared to peak somewhere between 70-90% on our ECS with 16GB of memory. A reduced batch_size should cut that in half.

)

def update_parquet(self, db_manager: DatabaseManager) -> bool:
Expand Down
25 changes: 11 additions & 14 deletions python_src/src/lamp_py/tableau/jobs/rt_rail.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def __init__(self) -> None:
" date(vt.service_date::text) as service_date"
" , TIMEZONE('UTC', TO_TIMESTAMP(extract(epoch FROM date(vt.service_date::text)) + vt.start_time)) as start_datetime"
" , TIMEZONE('UTC', TO_TIMESTAMP(extract(epoch FROM date(vt.service_date::text)) + vt.static_start_time)) as static_start_datetime"
" , ve.pm_trip_id"
" , ve.stop_sequence"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think pm_trip_id or updated_on fields have any use to OPMI, so drop from parquet file.

" , ve.canonical_stop_sequence"
" , prev_ve.canonical_stop_sequence as previous_canonical_stop_sequence"
Expand All @@ -36,11 +35,8 @@ def __init__(self) -> None:
" , prev_ve.stop_id as previous_stop_id"
" , ve.parent_station"
" , prev_ve.parent_station as previous_parent_station"
# " , ve.vp_move_timestamp as previous_stop_departure_timestamp"
" , TIMEZONE('America/New_York', TO_TIMESTAMP(ve.vp_move_timestamp)) as previous_stop_departure_datetime"
# " , COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp) as stop_arrival_timestamp"
" , TIMEZONE('America/New_York', TO_TIMESTAMP(COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp))) as stop_arrival_datetime"
# " , COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp) + ve.dwell_time_seconds as stop_departure_timestamp"
" , TIMEZONE('America/New_York', TO_TIMESTAMP(COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp) + ve.dwell_time_seconds)) as stop_departure_datetime"
" , (ve.vp_move_timestamp - extract(epoch FROM date(vt.service_date::text)))::int as previous_stop_departure_sec"
" , (ve.vp_move_timestamp - extract(epoch FROM date(vt.service_date::text)) + ve.travel_time_seconds)::int as stop_arrival_sec"
Expand All @@ -59,13 +55,12 @@ def __init__(self) -> None:
" , vt.static_trip_id_guess"
" , vt.static_start_time"
" , vt.static_stop_count"
" , vt.first_last_station_match"
" , vt.first_last_station_match as exact_static_trip_match"
Comment on lines -62 to +58
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exact_static_trip_match is a much more accurate name for this field.

" , vt.static_version_key"
" , ve.travel_time_seconds"
" , ve.dwell_time_seconds"
" , ve.headway_trunk_seconds"
" , ve.headway_branch_seconds"
" , ve.updated_on "
"FROM "
" vehicle_events ve "
"LEFT JOIN "
Expand Down Expand Up @@ -95,7 +90,6 @@ def parquet_schema(self) -> pyarrow.schema:
("service_date", pyarrow.date32()),
("start_datetime", pyarrow.timestamp("us")),
("static_start_datetime", pyarrow.timestamp("us")),
("pm_trip_id", pyarrow.int64()),
("stop_sequence", pyarrow.int16()),
("canonical_stop_sequence", pyarrow.int16()),
("previous_canonical_stop_sequence", pyarrow.int16()),
Expand All @@ -105,11 +99,8 @@ def parquet_schema(self) -> pyarrow.schema:
("previous_stop_id", pyarrow.string()),
("parent_station", pyarrow.string()),
("previous_parent_station", pyarrow.string()),
# ("previous_stop_departure_timestamp", pyarrow.int64()),
("previous_stop_departure_datetime", pyarrow.timestamp("us")),
# ("stop_arrival_timestamp", pyarrow.int64()),
("stop_arrival_datetime", pyarrow.timestamp("us")),
# ("stop_departure_timestamp", pyarrow.int64()),
("stop_departure_datetime", pyarrow.timestamp("us")),
("previous_stop_departure_sec", pyarrow.int64()),
("stop_arrival_sec", pyarrow.int64()),
Expand All @@ -128,29 +119,35 @@ def parquet_schema(self) -> pyarrow.schema:
("static_trip_id_guess", pyarrow.string()),
("static_start_time", pyarrow.int64()),
("static_stop_count", pyarrow.int64()),
("first_last_station_match", pyarrow.bool_()),
("exact_static_trip_match", pyarrow.bool_()),
("static_version_key", pyarrow.int64()),
("travel_time_seconds", pyarrow.int32()),
("dwell_time_seconds", pyarrow.int32()),
("headway_trunk_seconds", pyarrow.int32()),
("headway_branch_seconds", pyarrow.int32()),
("updated_on", pyarrow.timestamp("us")),
]
)

def create_parquet(self, db_manager: DatabaseManager) -> None:
create_query = self.table_query % ""

# this is a fairly wide dataset, so dial back the batch size
# to limit memory usage
db_batch_size = 1024 * 1024 / 2

Comment on lines +134 to +137
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should limit memory utilization during initial file creation events.

if os.path.exists(self.local_parquet_path):
os.remove(self.local_parquet_path)

db_manager.write_to_parquet(
select_query=sa.text(create_query),
write_path=self.local_parquet_path,
schema=self.parquet_schema,
batch_size=db_batch_size,
)

def update_parquet(self, db_manager: DatabaseManager) -> bool:
dataset_batch_size = 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the update double the create size?

Copy link
Collaborator Author

@rymarczy rymarczy Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, we shouldn't see very large update query sizes, unless the parquet process itself is turned off for awhile, but I'll update this one as well, just to cover our basis.

Sorry, actually this is the "dataset" batch size. My intuition is that these batch sizes are much less memory intensive than the large DB query batches. And I believe there is a file size reduction by selecting the largest possible batch_size for these "dataset" batches.


download_file(
object_path=self.remote_parquet_path,
file_name=self.local_parquet_path,
Expand All @@ -174,7 +171,7 @@ def update_parquet(self, db_manager: DatabaseManager) -> bool:
# update downloaded parquet file with filtered service_date
old_filter = pc.field("service_date") < max_start_date
old_batches = pd.dataset(self.local_parquet_path).to_batches(
filter=old_filter, batch_size=1024 * 1024
filter=old_filter, batch_size=dataset_batch_size
)
filter_path = "/tmp/filter_local.parquet"
with pq.ParquetWriter(
Expand All @@ -193,7 +190,7 @@ def update_parquet(self, db_manager: DatabaseManager) -> bool:
combine_batches = pd.dataset(
joined_dataset,
schema=self.parquet_schema,
).to_batches(batch_size=1024 * 1024)
).to_batches(batch_size=dataset_batch_size)

with pq.ParquetWriter(
combine_parquet_path, schema=self.parquet_schema
Expand Down
5 changes: 5 additions & 0 deletions python_src/src/lamp_py/tableau/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import gc
from typing import List

from lamp_py.runtime_utils.env_validation import validate_environment
Expand Down Expand Up @@ -56,3 +57,7 @@ def start_parquet_updates(db_manager: DatabaseManager) -> None:
"""Run all Parquet Update jobs"""
for job in create_hyper_jobs():
job.run_parquet(db_manager)

# ECS Memory Utilization appeared to stay elevated after initial calling
# of `run_parquet` to create all parquet files, this may resolve that...
gc.collect()
2 changes: 1 addition & 1 deletion python_src/src/lamp_py/tableau/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def tableau_server(
def tableau_authentication(
tableau_user: Optional[str] = os.getenv("TABLEAU_USER"),
tableau_password: Optional[str] = os.getenv("TABLEAU_PASSWORD"),
tableau_site: str = "", # empty string corresponds to Default site
tableau_site: str = "", # empty string corresponds to 'Default' site
) -> TSC.models.tableau_auth.TableauAuth:
"""
Get Tableau Authentication object
Expand Down
Loading