Skip to content

Commit

Permalink
FIX: Ingestion RT table save for yield (#214)
Browse files Browse the repository at this point in the history
Ingestion is running out of memory processing very large trip update tables.

This fix changes the way RT table data is stored during ingestion loops.

Previously, json.gz table data was stored as a list of individual pyarrow tables, that was concatenated before writing to parquet.

The new method concats each json.gz pyarrow table to a main pyarrow table when each table is extracted.
  • Loading branch information
rymarczy authored Jan 3, 2024
1 parent 7cc8846 commit 2d6d79d
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions python_src/src/lamp_py/ingestion/convert_gtfs_rt.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class TableData:
can be yielded
"""

tables: List[pyarrow.table] = field(default_factory=list)
table: Optional[pyarrow.table] = None
files: List[str] = field(default_factory=list)
next_hr_cnt: int = 0

Expand Down Expand Up @@ -177,7 +177,19 @@ def process_files(self) -> Iterable[pyarrow.table]:
# add result to matching timestamp_hr key
if iter_ts == timestamp_hr:
table_group.files.append(result_filename)
table_group.tables.append(result_table)
if table_group.table is None:
table_group.table = self.detail.transform_for_write(
result_table
)
else:
table_group.table = pyarrow.concat_tables(
[
table_group.table,
self.detail.transform_for_write(
result_table
),
]
)
table_group.next_hr_cnt = 0
# increment next_hr_cnt if key is before timestamp_hr
elif timestamp_hr > iter_ts:
Expand Down Expand Up @@ -235,7 +247,11 @@ def yield_check(
and iter_ts < self.start_of_hour
):
self.archive_files += self.table_groups[iter_ts].files
table = pyarrow.concat_tables(self.table_groups[iter_ts].tables)

table = self.table_groups[iter_ts].table

assert table is not None

process_logger.add_metadata(
file_count=len(self.table_groups[iter_ts].files),
number_of_rows=table.num_rows,
Expand Down Expand Up @@ -344,10 +360,13 @@ def write_table(self, table: pyarrow.table) -> None:
try:
s3_prefix = str(self.config_type)

table = self.detail.transform_for_write(table)

sort_log = ProcessLogger(
"pyarrow_sort_by", table_rows=table.num_rows
)
sort_log.log_start()
if self.detail.table_sort_order is not None:
table = table.sort_by(self.detail.table_sort_order)
sort_log.log_complete()

write_parquet_file(
table=table,
Expand Down

0 comments on commit 2d6d79d

Please sign in to comment.