From a51c2a49cf4c117b1c17906113c8dfd72034aae8 Mon Sep 17 00:00:00 2001 From: Ryan Rymarczyk Date: Wed, 3 Jul 2024 06:00:26 -0400 Subject: [PATCH] FEAT: Export compressed GTFS schedule to SQLITE db (#388) Adds the ability to export compressed GTFS schedule data to an SQLITE db file, along with schedule S3 sync/upload logic. For each year partition folder, in the compressed gtfs archives, one gzipped SQLITE db file will be produced that contains a table for each GTFS schedule file that has been compressed. Asana Task: https://app.asana.com/0/1205827492903547/1207450430015372 --- .../compress_gtfs/gtfs_to_parquet.py | 60 ++++++++------ src/lamp_py/ingestion/compress_gtfs/pipe.py | 4 + .../ingestion/compress_gtfs/pq_to_sqlite.py | 81 +++++++++++++++++++ .../compress_gtfs/schedule_details.py | 48 +++++++++-- src/lamp_py/ingestion/ingest_gtfs.py | 2 + src/lamp_py/ingestion/pipeline.py | 1 + src/lamp_py/ingestion/utils.py | 28 ++++++- tests/ingestion/test_gtfs_compress.py | 21 ++++- 8 files changed, 206 insertions(+), 39 deletions(-) create mode 100644 src/lamp_py/ingestion/compress_gtfs/pipe.py create mode 100644 src/lamp_py/ingestion/compress_gtfs/pq_to_sqlite.py diff --git a/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py b/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py index f477953b..135fc1e9 100644 --- a/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py +++ b/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py @@ -10,10 +10,17 @@ from lamp_py.runtime_utils.process_logger import ProcessLogger -from .gtfs_schema_map import gtfs_schema_list -from .gtfs_schema_map import gtfs_schema -from .schedule_details import ScheduleDetails -from .schedule_details import schedules_to_compress +from lamp_py.ingestion.compress_gtfs.gtfs_schema_map import ( + gtfs_schema_list, + gtfs_schema, +) +from lamp_py.ingestion.compress_gtfs.schedule_details import ( + ScheduleDetails, + schedules_to_compress, + GTFS_PATH, +) +from lamp_py.ingestion.compress_gtfs.pq_to_sqlite import pq_folder_to_sqlite +from lamp_py.aws.s3 import upload_file def frame_parquet_diffs( @@ -57,6 +64,7 @@ def frame_parquet_diffs( how="anti", on=join_columns, join_nulls=True, + coalesce=True, ).drop("from_zip") # left join to create frame of old and same records @@ -65,6 +73,7 @@ def frame_parquet_diffs( how="left", on=join_columns, join_nulls=True, + coalesce=True, ) same_records = pq_frame.filter(pl.col("from_zip").eq(True)).drop("from_zip") old_records = pq_frame.filter(pl.col("from_zip").is_null()).drop("from_zip") @@ -73,51 +82,46 @@ def frame_parquet_diffs( def merge_frame_with_parquet( - merge_frame: pl.DataFrame, export_path: str, filter_date: int + merge_df: pl.DataFrame, export_path: str, filter_date: int ) -> None: """ - merge merge_frame with existing parqut file (export_path) and over-write with results + merge merge_df with existing parqut file (export_path) and over-write with results all parquet read/write operations are done in batches to constrain memory usage - :param merge_frame: records to merge into export_path parquet file - :param export_path: existing parquet file to merge with merge_frame + :param merge_df: records to merge into export_path parquet file + :param export_path: existing parquet file to merge with merge_df :param filter_date: value for exclusive filter on parquet files as YYYYMMDD (ie. service_date) """ batch_size = 1024 * 256 - if merge_frame.shape[0] == 0: + if merge_df.shape[0] == 0: # No records to merge with parquet file return # sort stop_times and trips frames to reduce file size if "/stop_times.parquet" in export_path: - merge_frame = merge_frame.sort(by=["stop_id", "trip_id"]) + merge_df = merge_df.sort(by=["stop_id", "trip_id"]) if "/trips.parquet" in export_path: - merge_frame = merge_frame.sort(by=["route_id", "service_id"]) + merge_df = merge_df.sort(by=["route_id", "service_id"]) - pq_schema = merge_frame.to_arrow().schema + merge_df = merge_df.to_arrow() with tempfile.TemporaryDirectory() as temp_dir: - new_pq_path = os.path.join(temp_dir, "new.parquet") - filter_pq_path = os.path.join(temp_dir, "filter.parquet") - - merge_frame.write_parquet( - new_pq_path, use_pyarrow=True, statistics=True - ) + tmp_path = os.path.join(temp_dir, "filter.parquet") # create filtered parquet file, excluding records from merge_frame pq_filter = (pc.field("gtfs_active_date") > filter_date) | ( pc.field("gtfs_end_date") < filter_date ) filter_ds = pd.dataset(export_path).filter(pq_filter) - with pq.ParquetWriter(filter_pq_path, schema=pq_schema) as writer: + with pq.ParquetWriter(tmp_path, schema=merge_df.schema) as writer: for batch in filter_ds.to_batches(batch_size=batch_size): writer.write_batch(batch) # over-write export_path file with merged dataset - combined_ds = pd.dataset((filter_pq_path, new_pq_path)) - with pq.ParquetWriter(export_path, schema=pq_schema) as writer: - for batch in combined_ds.to_batches(batch_size=batch_size): + export_ds = pd.dataset((pd.dataset(tmp_path), pd.dataset(merge_df))) + with pq.ParquetWriter(export_path, schema=merge_df.schema) as writer: + for batch in export_ds.to_batches(batch_size=batch_size): writer.write_batch(batch) @@ -290,7 +294,9 @@ def gtfs_to_parquet() -> None: maximum process memory usage for this operation peaked at 5440MB while processing Feb-2018 to April-2024 """ - gtfs_tmp_folder = "/tmp/compress-gtfs" + gtfs_tmp_folder = GTFS_PATH.replace( + os.getenv("PUBLIC_ARCHIVE_BUCKET"), "/tmp" + ) logger = ProcessLogger( "compress_gtfs_schedules", gtfs_tmp_folder=gtfs_tmp_folder ) @@ -313,8 +319,10 @@ def gtfs_to_parquet() -> None: # send updates to S3 bucket... for year in set(feed["published_dt"].dt.strftime("%Y").unique()): year_path = os.path.join(gtfs_tmp_folder, year) - for _ in os.listdir(year_path): - # upload file to S3 - continue + pq_folder_to_sqlite(year_path) + for file in os.listdir(year_path): + local_path = os.path.join(year_path, file) + upload_path = os.path.join(GTFS_PATH, year, file) + upload_file(local_path, upload_path) logger.log_complete() diff --git a/src/lamp_py/ingestion/compress_gtfs/pipe.py b/src/lamp_py/ingestion/compress_gtfs/pipe.py new file mode 100644 index 00000000..97062ddf --- /dev/null +++ b/src/lamp_py/ingestion/compress_gtfs/pipe.py @@ -0,0 +1,4 @@ +from lamp_py.ingestion.compress_gtfs.gtfs_to_parquet import gtfs_to_parquet + +if __name__ == "__main__": + gtfs_to_parquet() diff --git a/src/lamp_py/ingestion/compress_gtfs/pq_to_sqlite.py b/src/lamp_py/ingestion/compress_gtfs/pq_to_sqlite.py new file mode 100644 index 00000000..552f7321 --- /dev/null +++ b/src/lamp_py/ingestion/compress_gtfs/pq_to_sqlite.py @@ -0,0 +1,81 @@ +import os +import sqlite3 + +import pyarrow +import pyarrow.dataset as pd + +from lamp_py.runtime_utils.process_logger import ProcessLogger +from lamp_py.ingestion.utils import gzip_file + + +def sqlite_type(pq_type: str) -> str: + """ + return SQLITE type from pyarrow Field type + """ + if "int" in pq_type: + return "INTEGER" + if "bool" in pq_type: + return "INTEGER" + if "float" in pq_type: + return "REAL" + if "double" in pq_type: + return "REAL" + return "TEXT" + + +def sqlite_table_query(table_name: str, schema: pyarrow.Schema) -> str: + """ + return CREATE TABLE query for sqlite table from pyarrow schema + """ + logger = ProcessLogger("sqlite_create_table") + logger.log_start() + field_list = [ + f"{field.name} {sqlite_type(str(field.type))}" for field in schema + ] + query = f""" + CREATE TABLE + IF NOT EXISTS + {table_name} + ( + {','.join(field_list)} + ); + """ + logger.log_complete() + return query + + +def pq_folder_to_sqlite(year_path: str) -> None: + """ + load all files from year_path folder into SQLITE3 db file + """ + logger = ProcessLogger("pq_to_sqlite", year_path=year_path) + logger.log_start() + + db_path = os.path.join(year_path, "GTFS_ARCHIVE.db") + if os.path.exists(db_path): + os.remove(db_path) + try: + for file in os.listdir(year_path): + if ".parquet" not in file: + continue + logger.add_metadata(current_file=file) + + ds = pd.dataset(os.path.join(year_path, file)) + + table = file.replace(".parquet", "") + columns = [f":{col}" for col in ds.schema.names] + insert_query = f"INSERT INTO {table} VALUES({','.join(columns)});" + + conn = sqlite3.connect(db_path) + with conn: + conn.execute(sqlite_table_query(table, ds.schema)) + with conn: + for batch in ds.to_batches(batch_size=250_000): + conn.executemany(insert_query, batch.to_pylist()) + conn.close() + + gzip_file(db_path) + + logger.log_complete() + except Exception as exception: + logger.log_failure(exception) diff --git a/src/lamp_py/ingestion/compress_gtfs/schedule_details.py b/src/lamp_py/ingestion/compress_gtfs/schedule_details.py index 6f6fc1a5..c9533799 100644 --- a/src/lamp_py/ingestion/compress_gtfs/schedule_details.py +++ b/src/lamp_py/ingestion/compress_gtfs/schedule_details.py @@ -16,9 +16,15 @@ ordered_schedule_frame, file_as_bytes_buf, ) +from lamp_py.ingestion.compress_gtfs.gtfs_schema_map import gtfs_schema +from lamp_py.aws.s3 import ( + file_list_from_s3, + download_file, +) - -from .gtfs_schema_map import gtfs_schema +GTFS_PATH = os.path.join( + str(os.getenv("PUBLIC_ARCHIVE_BUCKET")), "lamp/gtfs_archive" +) # pylint: disable=R0902 @@ -218,15 +224,41 @@ def schedules_to_compress(tmp_folder: str) -> pl.DataFrame: pq_fi_path = os.path.join(tmp_folder, year, "feed_info.parquet") if not os.path.exists(pq_fi_path): - # check for file in s3_path... - continue + bucket, prefix = GTFS_PATH.split("/", 1) + prefix = os.path.join(prefix, year) + s3_files = file_list_from_s3(bucket, prefix) + if len(s3_files) > 1: + for obj_path in s3_files: + if not obj_path.endswith(".parquet"): + continue + local_path = obj_path.replace(f"s3://{bucket}", "/tmp") + download_file(obj_path, local_path) + else: + continue pq_fi_frame = pl.read_parquet(pq_fi_path) - # anti join against records for 'year' to find records not already in feed_info.parquet - feed = feed.filter(pl.col("published_date") > int(f"{year}0000")).join( - pq_fi_frame.select("feed_version"), on="feed_version", how="anti" - ) + if int(year) <= 2018: + # different filter operation used for less than year 2018 because some + # schedules in this date range do not have matching "feed_version" + # values between `feed_info` file in schedule and "archived_feeds.txt" file + feed = feed.filter( + (pl.col("published_date") > int(f"{year}0000")) + & ( + pl.col("feed_start_date") + > pq_fi_frame.get_column("feed_start_date").max() + ) + ) + else: + # anti join against records for 'year' to find records not already in feed_info.parquet + feed = feed.filter( + pl.col("published_date") > int(f"{year}0000") + ).join( + pq_fi_frame.select("feed_version"), + on="feed_version", + how="anti", + coalesce=True, + ) break diff --git a/src/lamp_py/ingestion/ingest_gtfs.py b/src/lamp_py/ingestion/ingest_gtfs.py index 2c0043ca..2aa7e71a 100644 --- a/src/lamp_py/ingestion/ingest_gtfs.py +++ b/src/lamp_py/ingestion/ingest_gtfs.py @@ -27,6 +27,7 @@ DEFAULT_S3_PREFIX, group_sort_file_list, ) +from lamp_py.ingestion.compress_gtfs.gtfs_to_parquet import gtfs_to_parquet class NoImplConverter(Converter): @@ -138,5 +139,6 @@ def ingest_gtfs(metadata_queue: Queue[Optional[str]]) -> None: static schedule files should be ingested first """ + gtfs_to_parquet() ingest_gtfs_archive(metadata_queue) ingest_s3_files(metadata_queue) diff --git a/src/lamp_py/ingestion/pipeline.py b/src/lamp_py/ingestion/pipeline.py index 394fbf5b..682690f8 100755 --- a/src/lamp_py/ingestion/pipeline.py +++ b/src/lamp_py/ingestion/pipeline.py @@ -65,6 +65,7 @@ def start() -> None: "ARCHIVE_BUCKET", "ERROR_BUCKET", "INCOMING_BUCKET", + "PUBLIC_ARCHIVE_BUCKET", "SPRINGBOARD_BUCKET", "ALEMBIC_MD_DB_NAME", ], diff --git a/src/lamp_py/ingestion/utils.py b/src/lamp_py/ingestion/utils.py index 44fc386e..a9778161 100644 --- a/src/lamp_py/ingestion/utils.py +++ b/src/lamp_py/ingestion/utils.py @@ -1,5 +1,7 @@ import os import re +import gzip +import shutil import pathlib import datetime import zoneinfo @@ -16,6 +18,8 @@ import pyarrow.compute as pc import polars as pl +from lamp_py.runtime_utils.process_logger import ProcessLogger + DEFAULT_S3_PREFIX = "lamp" GTFS_RT_HASH_COL = "lamp_record_hash" @@ -168,9 +172,6 @@ def ordered_schedule_frame() -> pl.DataFrame: ) ) - # fix_me: filter for malformed archive schedules - feed = feed.filter(feed["feed_start_date"] > 20180200) - return feed @@ -287,3 +288,24 @@ def hash_gtfs_rt_parquet(path: str) -> None: writer.write_table(batch) os.replace(tmp_pq, path) + + +def gzip_file(path: str, keep_original: bool = False) -> None: + """ + gzip local file + + :param path: local file path + :param keep_original: keep original non-gzip file = False + """ + logger = ProcessLogger( + "gzip_file", path=path, remove_original=keep_original + ) + logger.log_start() + with open(path, "rb") as f_in: + with gzip.open(f"{path}.gz", "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + + if not keep_original: + os.remove(path) + + logger.log_complete() diff --git a/tests/ingestion/test_gtfs_compress.py b/tests/ingestion/test_gtfs_compress.py index 80ed7d47..0cd32227 100644 --- a/tests/ingestion/test_gtfs_compress.py +++ b/tests/ingestion/test_gtfs_compress.py @@ -1,21 +1,24 @@ import os import tempfile import datetime +from unittest import mock import pyarrow.compute as pc import pyarrow.dataset as pd import polars as pl from lamp_py.ingestion.compress_gtfs.schedule_details import ( - schedules_to_compress, ScheduleDetails, + schedules_to_compress, ) from lamp_py.ingestion.compress_gtfs.gtfs_to_parquet import ( compress_gtfs_schedule, ) from lamp_py.ingestion.compress_gtfs.gtfs_schema_map import gtfs_schema_list +from lamp_py.ingestion.compress_gtfs.pq_to_sqlite import pq_folder_to_sqlite +# pylint: disable=R0914 def test_gtfs_to_parquet_compression() -> None: """ test gtfs -> parquet compression pipeline @@ -23,7 +26,12 @@ def test_gtfs_to_parquet_compression() -> None: will test compression of 3 randomly selected schedules from the past year """ with tempfile.TemporaryDirectory() as temp_dir: - feed = schedules_to_compress(temp_dir) + with mock.patch( + "lamp_py.ingestion.compress_gtfs.schedule_details.file_list_from_s3" + ) as patch_s3: + patch_s3.return_value = [] + feed = schedules_to_compress(temp_dir) + patch_s3.assert_called() year_ago = datetime.datetime.now() - datetime.timedelta(weeks=52) @@ -45,6 +53,12 @@ def test_gtfs_to_parquet_compression() -> None: ) compress_gtfs_schedule(schedule_details) + # verify sqlite db creation and gzip for 1 year + year = feed["published_dt"].dt.strftime("%Y").unique()[0] + year_path = os.path.join(temp_dir, year) + pq_folder_to_sqlite(year_path) + assert os.path.exists(os.path.join(year_path, "GTFS_ARCHIVE.db.gz")) + # check parquet file exports for schedule in feed.rows(named=True): schedule_url = schedule["archive_url"] @@ -99,3 +113,6 @@ def test_gtfs_to_parquet_compression() -> None: assert ( pq_end_count == zip_count ), f"{schedule_url=} {gtfs_file=} {active_start_date=} {active_end_date=}" + + +# pylint: enable=R0914