From fb0f391c4ce71b515833bd5848828453e6c48fb3 Mon Sep 17 00:00:00 2001 From: Mike Zappitello Date: Mon, 14 Aug 2023 12:56:08 -0400 Subject: [PATCH] Write Performance Manager Flat Files After processing new gtfs_rt files, update the flat files for all service dates contained by those files. * Write new function `write_flat_files` that takes a db manager. The db manager gets all of the effected service days from our temp events compare, and generates a flat table for each one, which it then writes to s3. * the `main` performance manager pipeline method calls `write_flat_files` on each event loop. * update `write_parquet_file` to take in additional parameters needed for writing the flat files. this triggers updates to ingestion s3 writes and associated tests. * add flat file generation tests to performance manager test suite. --- python_src/src/lamp_py/aws/s3.py | 37 ++++- .../src/lamp_py/ingestion/convert_gtfs.py | 4 +- .../src/lamp_py/ingestion/convert_gtfs_rt.py | 4 +- .../lamp_py/performance_manager/flat_file.py | 46 ++++++ .../lamp_py/performance_manager/pipeline.py | 4 +- .../tests/ingestion/test_gtfs_converter.py | 10 +- .../test_performance_manager.py | 134 ++++++++++-------- 7 files changed, 163 insertions(+), 76 deletions(-) diff --git a/python_src/src/lamp_py/aws/s3.py b/python_src/src/lamp_py/aws/s3.py index baa1e363..95559a3e 100644 --- a/python_src/src/lamp_py/aws/s3.py +++ b/python_src/src/lamp_py/aws/s3.py @@ -232,15 +232,18 @@ def move_s3_objects(files: List[str], to_bucket: str) -> List[str]: # pylint: enable=R0914 +# pylint: disable=R0913 +# pylint too many arguments (more than 5) def write_parquet_file( table: Table, - config_type: str, - s3_path: str, + file_type: str, + s3_dir: str, partition_cols: List[str], - visitor_func: Optional[Callable[..., None]], + visitor_func: Optional[Callable[..., None]] = None, + basename_template: Optional[str] = None, ) -> None: """ - Helper function to write out a parquet table to an s3 path, patitioning + Helper function to write out a parquet table to an s3 path, partitioning based on columns. As files are written, add them to the metadata table of the performance manager database. @@ -253,9 +256,23 @@ def write_parquet_file( It appears that this bug isn't going to be fixed and using the dataset.write_dataset is the preferred method for writing parquet files going forward. https://issues.apache.org/jira/browse/ARROW-17068 + + @table - the table thats going to be written to parquet + @file_type - string used in logging to indicate what type of file was + written + @s3_dir - the s3 bucket plus prefix "subdirectory" path where the + parquet files should be written + @partition_cols - column names in the table to partition out into the + filepath. + @visitor_func - if set, this function will be called with a WrittenFile + instance for each file created during the call. a WrittenFile has + path and metadata attributes. + @basename_template - a template string used to generate base names of + written parquet files. The token `{i}` will be replaced with an + incremented int. """ process_logger = ProcessLogger( - "write_parquet", config_type=config_type, number_of_rows=table.num_rows + "write_parquet", file_type=file_type, number_of_rows=table.num_rows ) process_logger.log_start() @@ -265,20 +282,26 @@ def write_parquet_file( table.select(partition_cols).schema, flavor="hive" ) + if basename_template is None: + basename_template = guid() + "-{i}.parquet" + ds.write_dataset( data=table, - base_dir=s3_path, + base_dir=s3_dir, filesystem=fs.S3FileSystem(), format=ds.ParquetFileFormat(), partitioning=partitioning, file_visitor=visitor_func, - basename_template=guid() + "-{i}.parquet", + basename_template=basename_template, existing_data_behavior="overwrite_or_ignore", ) process_logger.log_complete() +# pylint: enable=R0913 + + def get_datetime_from_partition_path(path: str) -> datetime.datetime: """ process and return datetime from partitioned s3 path diff --git a/python_src/src/lamp_py/ingestion/convert_gtfs.py b/python_src/src/lamp_py/ingestion/convert_gtfs.py index cf356288..d0a5dd9d 100644 --- a/python_src/src/lamp_py/ingestion/convert_gtfs.py +++ b/python_src/src/lamp_py/ingestion/convert_gtfs.py @@ -116,8 +116,8 @@ def create_table( write_parquet_file( table=table, - config_type=s3_prefix, - s3_path=os.path.join( + file_type=s3_prefix, + s3_dir=os.path.join( os.environ["SPRINGBOARD_BUCKET"], DEFAULT_S3_PREFIX, s3_prefix, diff --git a/python_src/src/lamp_py/ingestion/convert_gtfs_rt.py b/python_src/src/lamp_py/ingestion/convert_gtfs_rt.py index 2afe5cfd..032714a9 100644 --- a/python_src/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/python_src/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -356,8 +356,8 @@ def write_table(self, table: pyarrow.table) -> None: s3_prefix = str(self.config_type) write_parquet_file( table=table, - config_type=s3_prefix, - s3_path=os.path.join( + file_type=s3_prefix, + s3_dir=os.path.join( os.environ["SPRINGBOARD_BUCKET"], DEFAULT_S3_PREFIX, s3_prefix, diff --git a/python_src/src/lamp_py/performance_manager/flat_file.py b/python_src/src/lamp_py/performance_manager/flat_file.py index f11d0795..fd40f296 100644 --- a/python_src/src/lamp_py/performance_manager/flat_file.py +++ b/python_src/src/lamp_py/performance_manager/flat_file.py @@ -1,6 +1,9 @@ +import os + import sqlalchemy as sa import pyarrow +from lamp_py.aws.s3 import write_parquet_file from lamp_py.performance_manager.gtfs_utils import ( static_version_key_from_service_date, ) @@ -9,8 +12,51 @@ VehicleTrips, StaticStopTimes, StaticStops, + TempEventCompare, ) from lamp_py.postgres.postgres_utils import DatabaseManager +from lamp_py.runtime_utils.process_logger import ProcessLogger + + +def write_flat_files(db_manager: DatabaseManager) -> None: + """write flat files to s3 for datetimes""" + date_df = db_manager.select_as_dataframe( + sa.select(TempEventCompare.service_date).distinct() + ) + + process_logger = ProcessLogger( + "bulk_flat_file_write", date_count=date_df.shape[0] + ) + process_logger.log_start() + + for date in date_df["service_date"]: + sub_process_logger = ProcessLogger("flat_file_write", service_date=date) + sub_process_logger.log_start() + + try: + s3_directory = os.path.join( + os.environ["ARCHIVE_BUCKET"], "lamp", "flat_file" + ) + + as_str = str(date) + filename = f"{as_str[0:4]}-{as_str[4:6]}-{as_str[6:8]}-rail-performance-{{i}}.parquet" + + flat_table = generate_daily_table(db_manager, date) + sub_process_logger.add_metadata(row_count=flat_table.shape[0]) + + write_parquet_file( + table=flat_table, + file_type="flat_rail_performance", + s3_dir=s3_directory, + partition_cols=["year", "month", "day"], + basename_template=filename, + ) + except Exception as e: + sub_process_logger.log_failure(e) + else: + sub_process_logger.log_complete() + + process_logger.log_complete() def generate_daily_table( diff --git a/python_src/src/lamp_py/performance_manager/pipeline.py b/python_src/src/lamp_py/performance_manager/pipeline.py index 976f8b52..cbb134c1 100755 --- a/python_src/src/lamp_py/performance_manager/pipeline.py +++ b/python_src/src/lamp_py/performance_manager/pipeline.py @@ -14,8 +14,9 @@ from lamp_py.runtime_utils.process_logger import ProcessLogger from lamp_py.runtime_utils.alembic_migration import alembic_upgrade_to_head -from .l0_gtfs_static_load import process_static_tables +from .flat_file import write_flat_files from .l0_gtfs_rt_events import process_gtfs_rt_files +from .l0_gtfs_static_load import process_static_tables logging.getLogger().setLevel("INFO") @@ -94,6 +95,7 @@ def iteration() -> None: try: process_static_tables(db_manager) process_gtfs_rt_files(db_manager) + write_flat_files(db_manager) process_logger.log_complete() except Exception as exception: diff --git a/python_src/tests/ingestion/test_gtfs_converter.py b/python_src/tests/ingestion/test_gtfs_converter.py index fbd58bc1..0d5f9ff5 100644 --- a/python_src/tests/ingestion/test_gtfs_converter.py +++ b/python_src/tests/ingestion/test_gtfs_converter.py @@ -326,17 +326,17 @@ def fixture_s3_patch(monkeypatch: MonkeyPatch) -> Iterator[None]: def mock_write_parquet_file( table: Table, - config_type: str, - s3_path: str, + file_type: str, + s3_dir: str, partition_cols: List[str], - visitor_func: Optional[Callable[..., None]], + visitor_func: Optional[Callable[..., None]] = None, ) -> None: """ instead of writing the parquet file to s3, inspect the contents of the table. call the visitor function on a dummy s3 path. """ # pull the name out of the s3 path and check that we are expecting this table - table_name = config_type.lower() + table_name = file_type.lower() tables_written.append(table_name) table_attributes = all_table_attributes()[table_name] @@ -352,7 +352,7 @@ def mock_write_parquet_file( # call the visitor function, which should add the string to the queue to check later if visitor_func is not None: - visitor_func(os.path.join(s3_path, "written.parquet")) + visitor_func(os.path.join(s3_dir, "written.parquet")) monkeypatch.setattr( "lamp_py.ingestion.convert_gtfs.write_parquet_file", diff --git a/python_src/tests/performance_manager/test_performance_manager.py b/python_src/tests/performance_manager/test_performance_manager.py index 7daeb979..0ea614bd 100644 --- a/python_src/tests/performance_manager/test_performance_manager.py +++ b/python_src/tests/performance_manager/test_performance_manager.py @@ -2,16 +2,25 @@ import os import pathlib from functools import lru_cache -from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union +from typing import ( + Dict, + Iterator, + List, + Optional, + Sequence, + Tuple, + Union, + Callable, +) import pandas import pytest import sqlalchemy as sa from _pytest.monkeypatch import MonkeyPatch import pyarrow -from pyarrow import fs, parquet, csv +from pyarrow import fs, parquet, csv, Table -from lamp_py.performance_manager.flat_file import generate_daily_table +from lamp_py.performance_manager.flat_file import write_flat_files from lamp_py.performance_manager.l0_gtfs_static_load import ( process_static_tables, ) @@ -119,7 +128,7 @@ def fixture_s3_patch(monkeypatch: MonkeyPatch) -> Iterator[None]: files instead of s3, so we read these files differently """ - def mock_get_static_parquet_paths( + def mock__get_static_parquet_paths( table_type: str, feed_info_path: str ) -> List[str]: """ @@ -138,13 +147,13 @@ def mock_get_static_parquet_paths( monkeypatch.setattr( "lamp_py.performance_manager.l0_gtfs_static_load.get_static_parquet_paths", - mock_get_static_parquet_paths, + mock__get_static_parquet_paths, ) def mock__get_pyarrow_table( filename: Union[str, List[str]], filters: Optional[Union[Sequence[Tuple], Sequence[List[Tuple]]]] = None, - ) -> pyarrow.Table: + ) -> Table: logging.debug( "Mock Get Pyarrow Table filename=%s, filters=%s", filename, filters ) @@ -156,7 +165,7 @@ def mock__get_pyarrow_table( to_load = [filename] if len(to_load) == 0: - return pyarrow.Table.from_pydict({}) + return Table.from_pydict({}) return parquet.ParquetDataset( to_load, filesystem=active_fs, filters=filters @@ -165,59 +174,68 @@ def mock__get_pyarrow_table( monkeypatch.setattr( "lamp_py.aws.s3._get_pyarrow_table", mock__get_pyarrow_table ) - yield - - -def flat_table_check(db_manager: DatabaseManager, service_date: int) -> None: - """checks to run on a flat table to ensure it has what would be expected""" - flat_table = generate_daily_table(db_manager, service_date) - - # check that the shape is good - rows, columns = flat_table.shape - assert columns == 30 - expected_row_count = db_manager.select_as_list( - sa.select(sa.func.count()).where( - sa.and_( - VehicleEvents.service_date == service_date, - sa.or_( - VehicleEvents.vp_move_timestamp.is_not(None), - VehicleEvents.vp_stop_timestamp.is_not(None), - ), - ) - ) - )[0]["count_1"] - assert rows == expected_row_count - - # check that partitioned columns behave appropriately - assert "year" in flat_table.column_names - assert len(flat_table["year"].unique()) == 1 + # pylint: disable=R0913 + # pylint too many arguments (more than 5) + def mock__write_parquet_file( + table: Table, + file_type: str, + s3_dir: str, + partition_cols: List[str], + visitor_func: Optional[Callable[..., None]] = None, + basename_template: Optional[str] = None, + ) -> None: + """ + this will be called when writing the flat file parquet to s3 + """ + # check that only that flat file is being written + assert file_type == "flat_rail_performance" + assert visitor_func is None + assert "lamp" in s3_dir + assert "flat_file" in s3_dir + + # check that the service date is right in the filename + assert basename_template == "2023-05-08-rail-performance-{i}.parquet" + + # check that the shape is good, rows count is precalculated + rows, columns = table.shape + assert columns == 30 + assert rows == 4310 + + # check that partitioned columns behave appropriately + for partition in partition_cols: + assert partition in table.column_names + assert len(table[partition].unique()) == 1 + + # check that these keys have values throughout the file + must_have_keys = [ + "stop_id", + "parent_station", + "route_id", + "direction_id", + "start_time", + "vehicle_id", + "trip_id", + "vehicle_label", + "year", + "month", + "day", + "service_date", + ] - assert "month" in flat_table.column_names - assert len(flat_table["month"].unique()) == 1 + for key in must_have_keys: + assert True not in table[key].is_null( + nan_is_null=True + ), f"{key} has null values" - assert "day" in flat_table.column_names - assert len(flat_table["day"].unique()) == 1 + # pylint: enable=R0913 - # check that these keys have values throughout the file - must_have_keys = [ - "stop_id", - "parent_station", - "route_id", - "direction_id", - "start_time", - "vehicle_id", - "trip_id", - "vehicle_label", - "year", - "month", - "day", - ] + monkeypatch.setattr( + "lamp_py.performance_manager.flat_file.write_parquet_file", + mock__write_parquet_file, + ) - for key in must_have_keys: - assert True not in flat_table[key].is_null( - nan_is_null=True - ), f"{key} has null values" + yield def check_logs(caplog: pytest.LogCaptureFixture) -> None: @@ -368,9 +386,7 @@ def test_gtfs_rt_processing( ] db_manager.add_metadata_paths(paths) - grouped_files = get_gtfs_rt_paths(db_manager) - - for files in grouped_files: + for files in get_gtfs_rt_paths(db_manager): for path in files["vp_paths"]: assert "RT_VEHICLE_POSITIONS" in path @@ -448,7 +464,7 @@ def test_gtfs_rt_processing( build_temp_events(events, db_manager) update_events_from_temp(db_manager) - flat_table_check(db_manager=db_manager, service_date=20230508) + write_flat_files(db_manager) check_logs(caplog)