Skip to content

Commit

Permalink
Write Performance Manager Flat Files
Browse files Browse the repository at this point in the history
After processing new gtfs_rt files, update the flat files for all
service dates contained by those files.

* Write new function `write_flat_files` that takes a db manager. The db
  manager gets all of the effected service days from our temp events
  compare, and generates a flat table for each one, which it then
  writes to s3.
* the `main` performance manager pipeline method calls
  `write_flat_files` on each event loop.
* update `write_parquet_file` to take in additional parameters needed
  for writing the flat files. this triggers updates to ingestion s3
  writes and associated tests.
* add flat file generation tests to performance manager test suite.
  • Loading branch information
mzappitello committed Aug 15, 2023
1 parent 2757f74 commit fb0f391
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 76 deletions.
37 changes: 30 additions & 7 deletions python_src/src/lamp_py/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,18 @@ def move_s3_objects(files: List[str], to_bucket: str) -> List[str]:
# pylint: enable=R0914


# pylint: disable=R0913
# pylint too many arguments (more than 5)
def write_parquet_file(
table: Table,
config_type: str,
s3_path: str,
file_type: str,
s3_dir: str,
partition_cols: List[str],
visitor_func: Optional[Callable[..., None]],
visitor_func: Optional[Callable[..., None]] = None,
basename_template: Optional[str] = None,
) -> None:
"""
Helper function to write out a parquet table to an s3 path, patitioning
Helper function to write out a parquet table to an s3 path, partitioning
based on columns. As files are written, add them to the metadata table of
the performance manager database.
Expand All @@ -253,9 +256,23 @@ def write_parquet_file(
It appears that this bug isn't going to be fixed and using the
dataset.write_dataset is the preferred method for writing parquet files
going forward. https://issues.apache.org/jira/browse/ARROW-17068
@table - the table thats going to be written to parquet
@file_type - string used in logging to indicate what type of file was
written
@s3_dir - the s3 bucket plus prefix "subdirectory" path where the
parquet files should be written
@partition_cols - column names in the table to partition out into the
filepath.
@visitor_func - if set, this function will be called with a WrittenFile
instance for each file created during the call. a WrittenFile has
path and metadata attributes.
@basename_template - a template string used to generate base names of
written parquet files. The token `{i}` will be replaced with an
incremented int.
"""
process_logger = ProcessLogger(
"write_parquet", config_type=config_type, number_of_rows=table.num_rows
"write_parquet", file_type=file_type, number_of_rows=table.num_rows
)
process_logger.log_start()

Expand All @@ -265,20 +282,26 @@ def write_parquet_file(
table.select(partition_cols).schema, flavor="hive"
)

if basename_template is None:
basename_template = guid() + "-{i}.parquet"

ds.write_dataset(
data=table,
base_dir=s3_path,
base_dir=s3_dir,
filesystem=fs.S3FileSystem(),
format=ds.ParquetFileFormat(),
partitioning=partitioning,
file_visitor=visitor_func,
basename_template=guid() + "-{i}.parquet",
basename_template=basename_template,
existing_data_behavior="overwrite_or_ignore",
)

process_logger.log_complete()


# pylint: enable=R0913


def get_datetime_from_partition_path(path: str) -> datetime.datetime:
"""
process and return datetime from partitioned s3 path
Expand Down
4 changes: 2 additions & 2 deletions python_src/src/lamp_py/ingestion/convert_gtfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ def create_table(

write_parquet_file(
table=table,
config_type=s3_prefix,
s3_path=os.path.join(
file_type=s3_prefix,
s3_dir=os.path.join(
os.environ["SPRINGBOARD_BUCKET"],
DEFAULT_S3_PREFIX,
s3_prefix,
Expand Down
4 changes: 2 additions & 2 deletions python_src/src/lamp_py/ingestion/convert_gtfs_rt.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ def write_table(self, table: pyarrow.table) -> None:
s3_prefix = str(self.config_type)
write_parquet_file(
table=table,
config_type=s3_prefix,
s3_path=os.path.join(
file_type=s3_prefix,
s3_dir=os.path.join(
os.environ["SPRINGBOARD_BUCKET"],
DEFAULT_S3_PREFIX,
s3_prefix,
Expand Down
46 changes: 46 additions & 0 deletions python_src/src/lamp_py/performance_manager/flat_file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os

import sqlalchemy as sa
import pyarrow

from lamp_py.aws.s3 import write_parquet_file
from lamp_py.performance_manager.gtfs_utils import (
static_version_key_from_service_date,
)
Expand All @@ -9,8 +12,51 @@
VehicleTrips,
StaticStopTimes,
StaticStops,
TempEventCompare,
)
from lamp_py.postgres.postgres_utils import DatabaseManager
from lamp_py.runtime_utils.process_logger import ProcessLogger


def write_flat_files(db_manager: DatabaseManager) -> None:
"""write flat files to s3 for datetimes"""
date_df = db_manager.select_as_dataframe(
sa.select(TempEventCompare.service_date).distinct()
)

process_logger = ProcessLogger(
"bulk_flat_file_write", date_count=date_df.shape[0]
)
process_logger.log_start()

for date in date_df["service_date"]:
sub_process_logger = ProcessLogger("flat_file_write", service_date=date)
sub_process_logger.log_start()

try:
s3_directory = os.path.join(
os.environ["ARCHIVE_BUCKET"], "lamp", "flat_file"
)

as_str = str(date)
filename = f"{as_str[0:4]}-{as_str[4:6]}-{as_str[6:8]}-rail-performance-{{i}}.parquet"

flat_table = generate_daily_table(db_manager, date)
sub_process_logger.add_metadata(row_count=flat_table.shape[0])

write_parquet_file(
table=flat_table,
file_type="flat_rail_performance",
s3_dir=s3_directory,
partition_cols=["year", "month", "day"],
basename_template=filename,
)
except Exception as e:
sub_process_logger.log_failure(e)
else:
sub_process_logger.log_complete()

process_logger.log_complete()


def generate_daily_table(
Expand Down
4 changes: 3 additions & 1 deletion python_src/src/lamp_py/performance_manager/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
from lamp_py.runtime_utils.process_logger import ProcessLogger
from lamp_py.runtime_utils.alembic_migration import alembic_upgrade_to_head

from .l0_gtfs_static_load import process_static_tables
from .flat_file import write_flat_files
from .l0_gtfs_rt_events import process_gtfs_rt_files
from .l0_gtfs_static_load import process_static_tables

logging.getLogger().setLevel("INFO")

Expand Down Expand Up @@ -94,6 +95,7 @@ def iteration() -> None:
try:
process_static_tables(db_manager)
process_gtfs_rt_files(db_manager)
write_flat_files(db_manager)

process_logger.log_complete()
except Exception as exception:
Expand Down
10 changes: 5 additions & 5 deletions python_src/tests/ingestion/test_gtfs_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,17 +326,17 @@ def fixture_s3_patch(monkeypatch: MonkeyPatch) -> Iterator[None]:

def mock_write_parquet_file(
table: Table,
config_type: str,
s3_path: str,
file_type: str,
s3_dir: str,
partition_cols: List[str],
visitor_func: Optional[Callable[..., None]],
visitor_func: Optional[Callable[..., None]] = None,
) -> None:
"""
instead of writing the parquet file to s3, inspect the contents of the
table. call the visitor function on a dummy s3 path.
"""
# pull the name out of the s3 path and check that we are expecting this table
table_name = config_type.lower()
table_name = file_type.lower()
tables_written.append(table_name)

table_attributes = all_table_attributes()[table_name]
Expand All @@ -352,7 +352,7 @@ def mock_write_parquet_file(

# call the visitor function, which should add the string to the queue to check later
if visitor_func is not None:
visitor_func(os.path.join(s3_path, "written.parquet"))
visitor_func(os.path.join(s3_dir, "written.parquet"))

monkeypatch.setattr(
"lamp_py.ingestion.convert_gtfs.write_parquet_file",
Expand Down
Loading

0 comments on commit fb0f391

Please sign in to comment.