Skip to content

Commit

Permalink
FEAT: Export compressed GTFS schedule to SQLITE db (#388)
Browse files Browse the repository at this point in the history
Adds the ability to export compressed GTFS schedule data to an SQLITE db file, along with schedule S3 sync/upload logic.

For each year partition folder, in the compressed gtfs archives, one gzipped SQLITE db file will be produced that contains a table for each GTFS schedule file that has been compressed.

Asana Task: https://app.asana.com/0/1205827492903547/1207450430015372
  • Loading branch information
rymarczy authored Jul 3, 2024
1 parent edb3665 commit a51c2a4
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 39 deletions.
60 changes: 34 additions & 26 deletions src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@

from lamp_py.runtime_utils.process_logger import ProcessLogger

from .gtfs_schema_map import gtfs_schema_list
from .gtfs_schema_map import gtfs_schema
from .schedule_details import ScheduleDetails
from .schedule_details import schedules_to_compress
from lamp_py.ingestion.compress_gtfs.gtfs_schema_map import (
gtfs_schema_list,
gtfs_schema,
)
from lamp_py.ingestion.compress_gtfs.schedule_details import (
ScheduleDetails,
schedules_to_compress,
GTFS_PATH,
)
from lamp_py.ingestion.compress_gtfs.pq_to_sqlite import pq_folder_to_sqlite
from lamp_py.aws.s3 import upload_file


def frame_parquet_diffs(
Expand Down Expand Up @@ -57,6 +64,7 @@ def frame_parquet_diffs(
how="anti",
on=join_columns,
join_nulls=True,
coalesce=True,
).drop("from_zip")

# left join to create frame of old and same records
Expand All @@ -65,6 +73,7 @@ def frame_parquet_diffs(
how="left",
on=join_columns,
join_nulls=True,
coalesce=True,
)
same_records = pq_frame.filter(pl.col("from_zip").eq(True)).drop("from_zip")
old_records = pq_frame.filter(pl.col("from_zip").is_null()).drop("from_zip")
Expand All @@ -73,51 +82,46 @@ def frame_parquet_diffs(


def merge_frame_with_parquet(
merge_frame: pl.DataFrame, export_path: str, filter_date: int
merge_df: pl.DataFrame, export_path: str, filter_date: int
) -> None:
"""
merge merge_frame with existing parqut file (export_path) and over-write with results
merge merge_df with existing parqut file (export_path) and over-write with results
all parquet read/write operations are done in batches to constrain memory usage
:param merge_frame: records to merge into export_path parquet file
:param export_path: existing parquet file to merge with merge_frame
:param merge_df: records to merge into export_path parquet file
:param export_path: existing parquet file to merge with merge_df
:param filter_date: value for exclusive filter on parquet files as YYYYMMDD (ie. service_date)
"""
batch_size = 1024 * 256
if merge_frame.shape[0] == 0:
if merge_df.shape[0] == 0:
# No records to merge with parquet file
return

# sort stop_times and trips frames to reduce file size
if "/stop_times.parquet" in export_path:
merge_frame = merge_frame.sort(by=["stop_id", "trip_id"])
merge_df = merge_df.sort(by=["stop_id", "trip_id"])
if "/trips.parquet" in export_path:
merge_frame = merge_frame.sort(by=["route_id", "service_id"])
merge_df = merge_df.sort(by=["route_id", "service_id"])

pq_schema = merge_frame.to_arrow().schema
merge_df = merge_df.to_arrow()

with tempfile.TemporaryDirectory() as temp_dir:
new_pq_path = os.path.join(temp_dir, "new.parquet")
filter_pq_path = os.path.join(temp_dir, "filter.parquet")

merge_frame.write_parquet(
new_pq_path, use_pyarrow=True, statistics=True
)
tmp_path = os.path.join(temp_dir, "filter.parquet")

# create filtered parquet file, excluding records from merge_frame
pq_filter = (pc.field("gtfs_active_date") > filter_date) | (
pc.field("gtfs_end_date") < filter_date
)
filter_ds = pd.dataset(export_path).filter(pq_filter)
with pq.ParquetWriter(filter_pq_path, schema=pq_schema) as writer:
with pq.ParquetWriter(tmp_path, schema=merge_df.schema) as writer:
for batch in filter_ds.to_batches(batch_size=batch_size):
writer.write_batch(batch)

# over-write export_path file with merged dataset
combined_ds = pd.dataset((filter_pq_path, new_pq_path))
with pq.ParquetWriter(export_path, schema=pq_schema) as writer:
for batch in combined_ds.to_batches(batch_size=batch_size):
export_ds = pd.dataset((pd.dataset(tmp_path), pd.dataset(merge_df)))
with pq.ParquetWriter(export_path, schema=merge_df.schema) as writer:
for batch in export_ds.to_batches(batch_size=batch_size):
writer.write_batch(batch)


Expand Down Expand Up @@ -290,7 +294,9 @@ def gtfs_to_parquet() -> None:
maximum process memory usage for this operation peaked at 5440MB
while processing Feb-2018 to April-2024
"""
gtfs_tmp_folder = "/tmp/compress-gtfs"
gtfs_tmp_folder = GTFS_PATH.replace(
os.getenv("PUBLIC_ARCHIVE_BUCKET"), "/tmp"
)
logger = ProcessLogger(
"compress_gtfs_schedules", gtfs_tmp_folder=gtfs_tmp_folder
)
Expand All @@ -313,8 +319,10 @@ def gtfs_to_parquet() -> None:
# send updates to S3 bucket...
for year in set(feed["published_dt"].dt.strftime("%Y").unique()):
year_path = os.path.join(gtfs_tmp_folder, year)
for _ in os.listdir(year_path):
# upload file to S3
continue
pq_folder_to_sqlite(year_path)
for file in os.listdir(year_path):
local_path = os.path.join(year_path, file)
upload_path = os.path.join(GTFS_PATH, year, file)
upload_file(local_path, upload_path)

logger.log_complete()
4 changes: 4 additions & 0 deletions src/lamp_py/ingestion/compress_gtfs/pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from lamp_py.ingestion.compress_gtfs.gtfs_to_parquet import gtfs_to_parquet

if __name__ == "__main__":
gtfs_to_parquet()
81 changes: 81 additions & 0 deletions src/lamp_py/ingestion/compress_gtfs/pq_to_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
import sqlite3

import pyarrow
import pyarrow.dataset as pd

from lamp_py.runtime_utils.process_logger import ProcessLogger
from lamp_py.ingestion.utils import gzip_file


def sqlite_type(pq_type: str) -> str:
"""
return SQLITE type from pyarrow Field type
"""
if "int" in pq_type:
return "INTEGER"
if "bool" in pq_type:
return "INTEGER"
if "float" in pq_type:
return "REAL"
if "double" in pq_type:
return "REAL"
return "TEXT"


def sqlite_table_query(table_name: str, schema: pyarrow.Schema) -> str:
"""
return CREATE TABLE query for sqlite table from pyarrow schema
"""
logger = ProcessLogger("sqlite_create_table")
logger.log_start()
field_list = [
f"{field.name} {sqlite_type(str(field.type))}" for field in schema
]
query = f"""
CREATE TABLE
IF NOT EXISTS
{table_name}
(
{','.join(field_list)}
);
"""
logger.log_complete()
return query


def pq_folder_to_sqlite(year_path: str) -> None:
"""
load all files from year_path folder into SQLITE3 db file
"""
logger = ProcessLogger("pq_to_sqlite", year_path=year_path)
logger.log_start()

db_path = os.path.join(year_path, "GTFS_ARCHIVE.db")
if os.path.exists(db_path):
os.remove(db_path)
try:
for file in os.listdir(year_path):
if ".parquet" not in file:
continue
logger.add_metadata(current_file=file)

ds = pd.dataset(os.path.join(year_path, file))

table = file.replace(".parquet", "")
columns = [f":{col}" for col in ds.schema.names]
insert_query = f"INSERT INTO {table} VALUES({','.join(columns)});"

conn = sqlite3.connect(db_path)
with conn:
conn.execute(sqlite_table_query(table, ds.schema))
with conn:
for batch in ds.to_batches(batch_size=250_000):
conn.executemany(insert_query, batch.to_pylist())
conn.close()

gzip_file(db_path)

logger.log_complete()
except Exception as exception:
logger.log_failure(exception)
48 changes: 40 additions & 8 deletions src/lamp_py/ingestion/compress_gtfs/schedule_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
ordered_schedule_frame,
file_as_bytes_buf,
)
from lamp_py.ingestion.compress_gtfs.gtfs_schema_map import gtfs_schema
from lamp_py.aws.s3 import (
file_list_from_s3,
download_file,
)


from .gtfs_schema_map import gtfs_schema
GTFS_PATH = os.path.join(
str(os.getenv("PUBLIC_ARCHIVE_BUCKET")), "lamp/gtfs_archive"
)


# pylint: disable=R0902
Expand Down Expand Up @@ -218,15 +224,41 @@ def schedules_to_compress(tmp_folder: str) -> pl.DataFrame:

pq_fi_path = os.path.join(tmp_folder, year, "feed_info.parquet")
if not os.path.exists(pq_fi_path):
# check for file in s3_path...
continue
bucket, prefix = GTFS_PATH.split("/", 1)
prefix = os.path.join(prefix, year)
s3_files = file_list_from_s3(bucket, prefix)
if len(s3_files) > 1:
for obj_path in s3_files:
if not obj_path.endswith(".parquet"):
continue
local_path = obj_path.replace(f"s3://{bucket}", "/tmp")
download_file(obj_path, local_path)
else:
continue

pq_fi_frame = pl.read_parquet(pq_fi_path)

# anti join against records for 'year' to find records not already in feed_info.parquet
feed = feed.filter(pl.col("published_date") > int(f"{year}0000")).join(
pq_fi_frame.select("feed_version"), on="feed_version", how="anti"
)
if int(year) <= 2018:
# different filter operation used for less than year 2018 because some
# schedules in this date range do not have matching "feed_version"
# values between `feed_info` file in schedule and "archived_feeds.txt" file
feed = feed.filter(
(pl.col("published_date") > int(f"{year}0000"))
& (
pl.col("feed_start_date")
> pq_fi_frame.get_column("feed_start_date").max()
)
)
else:
# anti join against records for 'year' to find records not already in feed_info.parquet
feed = feed.filter(
pl.col("published_date") > int(f"{year}0000")
).join(
pq_fi_frame.select("feed_version"),
on="feed_version",
how="anti",
coalesce=True,
)

break

Expand Down
2 changes: 2 additions & 0 deletions src/lamp_py/ingestion/ingest_gtfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
DEFAULT_S3_PREFIX,
group_sort_file_list,
)
from lamp_py.ingestion.compress_gtfs.gtfs_to_parquet import gtfs_to_parquet


class NoImplConverter(Converter):
Expand Down Expand Up @@ -138,5 +139,6 @@ def ingest_gtfs(metadata_queue: Queue[Optional[str]]) -> None:
static schedule files should be ingested first
"""
gtfs_to_parquet()
ingest_gtfs_archive(metadata_queue)
ingest_s3_files(metadata_queue)
1 change: 1 addition & 0 deletions src/lamp_py/ingestion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def start() -> None:
"ARCHIVE_BUCKET",
"ERROR_BUCKET",
"INCOMING_BUCKET",
"PUBLIC_ARCHIVE_BUCKET",
"SPRINGBOARD_BUCKET",
"ALEMBIC_MD_DB_NAME",
],
Expand Down
28 changes: 25 additions & 3 deletions src/lamp_py/ingestion/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import re
import gzip
import shutil
import pathlib
import datetime
import zoneinfo
Expand All @@ -16,6 +18,8 @@
import pyarrow.compute as pc
import polars as pl

from lamp_py.runtime_utils.process_logger import ProcessLogger

DEFAULT_S3_PREFIX = "lamp"
GTFS_RT_HASH_COL = "lamp_record_hash"

Expand Down Expand Up @@ -168,9 +172,6 @@ def ordered_schedule_frame() -> pl.DataFrame:
)
)

# fix_me: filter for malformed archive schedules
feed = feed.filter(feed["feed_start_date"] > 20180200)

return feed


Expand Down Expand Up @@ -287,3 +288,24 @@ def hash_gtfs_rt_parquet(path: str) -> None:
writer.write_table(batch)

os.replace(tmp_pq, path)


def gzip_file(path: str, keep_original: bool = False) -> None:
"""
gzip local file
:param path: local file path
:param keep_original: keep original non-gzip file = False
"""
logger = ProcessLogger(
"gzip_file", path=path, remove_original=keep_original
)
logger.log_start()
with open(path, "rb") as f_in:
with gzip.open(f"{path}.gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)

if not keep_original:
os.remove(path)

logger.log_complete()
Loading

0 comments on commit a51c2a4

Please sign in to comment.