-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FIX: Stream DB Results to Parquet Files #183
Conversation
if not running_in_docker() and not running_in_aws(): | ||
db_host = "127.0.0.1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pulled this over from dmap_import
@@ -16,6 +17,9 @@ def validate_environment( | |||
process_logger = ProcessLogger("validate_env") | |||
process_logger.log_start() | |||
|
|||
if private_variables is None: | |||
private_variables = [] | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to re-structure this function to allow for a private_variable
parameter and avoid a pylint too-many-branches
flag
self.db_manager = DatabaseManager() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dropped self.db_manager
from being created for all hyper jobs. This would have led to issues with the Hyper file writing ecs that won't be able to connect to our rds.
db_manager
is now passed directly into create_parquet
and update_parquet
methods, as they are the only portions of the class that require db access.
with self.session.begin() as cursor: | ||
result = cursor.execute(select_query).yield_per(batch_size) | ||
with pq.ParquetWriter(write_path, schema=schema) as pq_writer: | ||
for part in result.partitions(): | ||
pq_writer.write_batch( | ||
pyarrow.RecordBatch.from_pylist( | ||
[row._asdict() for row in part], schema=schema | ||
) | ||
) | ||
|
||
return write_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think we need to return the write path since it was provided as part of the input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorporated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice. excited to see this in staging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm 🍰
The current Parquet -> Tableau pipeline process is flawed in the amount of memory required to create parquet files from DB SELECT queries. This change is meant to result in a fixed amount of memory usage, no matter what the number of results are returned from a DB query, when creating a parquet file.
This fixed memory usage is achieved by utilizing the
yield_per
method of the SQLAlchemyResult
object, as well as theRecordBatch
object of the pyarrow library.In testing, memory usage for the creation of a parquet file from the
static_stop_times
table maxes out at approximately 5-6GB.If memory usage needs to be further limited, the
write_to_parquet
function ofDatabaseManager
offers abatch_size
parameter to limit the number for records flowing into a parquet file per partition.Asana Task: https://app.asana.com/0/1205827492903547/1205940053804614