-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: HyperFile API Publishing and Parquet Flat File Creation #176
Conversation
623f43c
to
0d2573a
Compare
TABLEAU_USER=DOUPDATE | ||
TABLEAU_PASSWORD=DOUPDATE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be updated with personal username/password for dev development
row_groups = pq.read_metadata(self.local_parquet_path).to_dict()[ | ||
"row_groups" | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulling max column values from parquet metadata should be a much more efficient operation compared to loading a whole parquet file into memory.
row_groups
can be pulled out of read_metadata
, this is a list of list of dictionary objects.
this function produces a dictionary object where the key is the column name and the value is the max
value from the statistics
object
hyper_table_schema = TableDefinition( | ||
table_name=self.hyper_table_name, | ||
columns=[ | ||
TableDefinition.Column( | ||
col.name, self.convert_parquet_dtype(col.type) | ||
) | ||
for col in self.parquet_schema | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Construct a HyperFile TableDefinition
, two parameters are required, the table_name
and columns
. columns
is a list of TableDefinition.Column
objects.
copy_command = ( | ||
f"COPY {hyper_table_schema.table_name} " | ||
f"FROM {escape_string_literal(self.local_parquet_path)} " | ||
"WITH (FORMAT PARQUET)" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy parquet file into HyperFile with SQL command.
for retry_count in range(max_retries): | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry logic max_retries
of 5 might be a little high
pq_file_info = self.remote_fs.get_file_info( | ||
self.remote_parquet_path | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gather parquet file information from local file
try: | ||
process_log.add_metadata(retry_count=retry_count) | ||
# get datasource from Tableau to check "updated_at" datetime | ||
datasource = datasource_from_name(self.hyper_table_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gather datasource
information from Tableau server, if datasource exists.
if ( | ||
datasource is not None | ||
and pq_file_info.mtime < datasource.updated_at | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If datasource exists and the datasource was modified after the parquet file, then HyperFile updates is not required and is skipped.
if retry_count == max_retries - 1: | ||
process_log.log_failure(exception=e) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If maximum number of retries is reached, log_failure
and return
hyper_file_name=f"LAMP_GTFS_Rail_{gtfs_table_name}.hyper", | ||
remote_parquet_path=f"s3://{os.getenv('PUBLIC_BUCKET')}/lamp/tableau/rail/LAMP_GTFS_Rail_{gtfs_table_name}.parquet", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open to modifying these default file name and path values, this was just a first attempt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think its worth sorting out all our s3 prefix conventions in the next week ro so.
def __init__( | ||
self, | ||
gtfs_table_name: str, | ||
table_query: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table_query
should constructed as SELECT statement with a %s
placeholder for WHERE statement related to static_version_key
pq.write_table( | ||
pyarrow.concat_tables( | ||
[ | ||
pq.read_table(self.local_parquet_path), | ||
pyarrow.Table.from_pylist( | ||
mapping=self.db_manager.select_as_list( | ||
sa.text(update_query) | ||
), | ||
schema=self.parquet_schema, | ||
), | ||
] | ||
), | ||
self.local_parquet_path, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If static_version_key
values exist in the RDS that are larger than the values in the parquet file, pull all records with larger static_version_key
from RDS and append to existing parquet file.
Will have to keep an eye on resource usage of performance manager instance during these operations
self.table_query = ( | ||
"SELECT" | ||
" date(vt.service_date::text) as service_date" | ||
" , ve.pm_trip_id" | ||
" , ve.stop_sequence" | ||
" , ve.canonical_stop_sequence" | ||
" , prev_ve.canonical_stop_sequence as previous_canonical_stop_sequence" | ||
" , ve.sync_stop_sequence" | ||
" , prev_ve.sync_stop_sequence as previous_sync_stop_sequence" | ||
" , ve.stop_id" | ||
" , prev_ve.stop_id as previous_stop_id" | ||
" , ve.parent_station" | ||
" , prev_ve.parent_station as previous_parent_station" | ||
" , ve.vp_move_timestamp as previous_stop_departure_timestamp" | ||
" , COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp) as stop_arrival_timestamp" | ||
" , COALESCE(ve.vp_stop_timestamp, ve.tu_stop_timestamp) + ve.dwell_time_seconds as stop_departure_timestamp" | ||
" , vt.direction_id::int" | ||
" , vt.route_id" | ||
" , vt.branch_route_id" | ||
" , vt.trunk_route_id" | ||
" , vt.start_time" | ||
" , vt.vehicle_id" | ||
" , vt.stop_count" | ||
" , vt.trip_id" | ||
" , vt.vehicle_label" | ||
" , vt.vehicle_consist" | ||
" , vt.direction" | ||
" , vt.direction_destination" | ||
" , vt.static_trip_id_guess" | ||
" , vt.static_start_time" | ||
" , vt.static_stop_count" | ||
" , vt.first_last_station_match" | ||
" , vt.static_version_key" | ||
" , ve.travel_time_seconds" | ||
" , ve.dwell_time_seconds" | ||
" , ve.headway_trunk_seconds" | ||
" , ve.headway_branch_seconds" | ||
" , ve.updated_on " | ||
"FROM " | ||
" vehicle_events ve " | ||
"LEFT JOIN " | ||
" vehicle_trips vt " | ||
"ON " | ||
" ve.pm_trip_id = vt.pm_trip_id " | ||
"LEFT JOIN " | ||
" vehicle_events prev_ve " | ||
"ON " | ||
" ve.pm_event_id = prev_ve.next_trip_stop_pm_event_id " | ||
"WHERE " | ||
" ve.previous_trip_stop_pm_event_id is not NULL " | ||
" AND ( " | ||
" ve.vp_stop_timestamp IS NOT null " | ||
" OR ve.vp_move_timestamp IS NOT null " | ||
" ) " | ||
" %s" | ||
"ORDER BY " | ||
" ve.service_date, vt.route_id, vt.direction_id, vt.vehicle_id" | ||
";" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need updates to this Query to match expected output for Ops Analytics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few different types of comments. Most are suggestions or questions.
copy_command = ( | ||
f"COPY {hyper_table_schema.table_name} " | ||
f"FROM {escape_string_literal(self.local_parquet_path)} " | ||
"WITH (FORMAT PARQUET)" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what a command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have some logging on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function call is contained by the hyper_job_run_hyper
log.
We are already capturing the row count of the file that's produced as well as the size of the HyperFile on disk. Is there anything else you specifically think we should capture?
# get remote parquet schema to compare to expected local schema | ||
if file_info.type == fs.FileType.File: | ||
remote_schema = pq.read_schema( | ||
self.remote_parquet_path, filesystem=self.remote_fs | ||
) | ||
process_log.add_metadata( | ||
remote_schema_match=self.parquet_schema.equals( | ||
remote_schema | ||
) | ||
) | ||
|
||
# create parquet if no remote parquet found or remote schema | ||
# does not match expected Job schema | ||
if ( | ||
file_info.type == fs.FileType.NotFound | ||
or not self.parquet_schema.equals(remote_schema) | ||
): | ||
process_log.add_metadata(created=True, updated=False) | ||
self.create_parquet() | ||
# else attempt parquet update | ||
else: | ||
was_updated = self.update_parquet() | ||
process_log.add_metadata(created=False, updated=was_updated) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this if block is a bit muddled to me. i think the part where update_parquet
is also using some logic to decide if it needs to update isn't helping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to clean this up, consolidate the logging and rename some variables to make them more intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks much better. thanks.
"""Base Class for GTFS Hyper Jobs""" | ||
|
||
def __init__( | ||
self, | ||
gtfs_table_name: str, | ||
table_query: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we're going to be creating more children of the HyperGTFS
class, we probably want to add a better doc on the table_query
parameter. now its a string that has a space for a filtering where clause to be inserted, but its not clear that you would create a new class with that in.
also worth considering doing a dictionary type format here, where the filter clause string in the table_query
is named.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the Base Class docstring with more detail
def create_parquet(self) -> None: | ||
create_query = self.table_query % "" | ||
|
||
if os.path.exists(self.local_parquet_path): | ||
os.remove(self.local_parquet_path) | ||
|
||
pq.write_table( | ||
pyarrow.Table.from_pylist( | ||
mapping=self.db_manager.select_as_list(sa.text(create_query)), | ||
schema=self.parquet_schema, | ||
), | ||
self.local_parquet_path, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is the same as the GTFS create_parquet
file?
hyper_file_name=f"LAMP_GTFS_Rail_{gtfs_table_name}.hyper", | ||
remote_parquet_path=f"s3://{os.getenv('PUBLIC_BUCKET')}/lamp/tableau/rail/LAMP_GTFS_Rail_{gtfs_table_name}.parquet", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think its worth sorting out all our s3 prefix conventions in the next week ro so.
"TABLEAU_SERVER", | ||
"PUBLIC_ARCHIVE_BUCKET", | ||
], | ||
validate_db=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this app doesn't need to connect to the DB does it? its reading from s3, converting, and publishing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, run_hyper
functions don't require DB access, only run_parquet
, will fix that.
LGTM 🍰 i think this can be squashed and merged. |
# log_config = "" disables creation of local logfile | ||
with HyperProcess( | ||
telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU, | ||
parameters={"log_config": ""}, | ||
) as hyper: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was spitting out .log files onto the local machine without this parameter getting passed.
hyper_file_name=f"LAMP_GTFS_Rail_{gtfs_table_name}.hyper", | ||
remote_parquet_path=f"s3://{os.getenv('PUBLIC_ARCHIVE_BUCKET')}/lamp/tableau/rail/LAMP_GTFS_Rail_{gtfs_table_name}.parquet", | ||
hyper_file_name=f"LAMP_{gtfs_table_name}.hyper", | ||
remote_parquet_path=f"s3://{os.getenv('PUBLIC_ARCHIVE_BUCKET')}/lamp/tableau/rail/LAMP_{gtfs_table_name}.parquet", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matching existing Tableau datasets
" ,date(service_date::text) as service_date" | ||
" ,service_date" | ||
" ,date(service_date::text) as service_date_calc" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matching existing Tableau datasets
("service_date", pyarrow.date32()), | ||
("service_date", pyarrow.int64()), | ||
("service_date_calc", pyarrow.date32()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matching existing Tableau datasets
" ,date(date::text) as date" | ||
" ,date" | ||
" ,date(date::text) as calendar_date" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matching existing Tableau datasets
("date", pyarrow.date32()), | ||
("date", pyarrow.int64()), | ||
("calendar_date", pyarrow.date32()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matching existing Tableau datasets
hyper_file_name="LAMP_RT_Rail.hyper", | ||
remote_parquet_path=f"s3://{os.getenv('PUBLIC_ARCHIVE_BUCKET')}/lamp/tableau/rail/LAMP_RT_Rail.parquet", | ||
hyper_file_name="LAMP_ALL_RT_fields.hyper", | ||
remote_parquet_path=f"s3://{os.getenv('PUBLIC_ARCHIVE_BUCKET')}/lamp/tableau/rail/LAMP_ALL_RT_fields.parquet", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matching existing Tableau datasets
This change updates the LAMP project to allow for the direct publishing of Tableau HyperFiles to Tableau Server.
Tabluea HyperFiles are created from flat parquet files saved on a public S3 bucket. Each
HyperJob
containsrun_hyper
andrun_parquet
functionality.run_hyper
is responsible for creating and updating Tableau HyperFiles, using the Tableau Server API, from parquet files saved on a public S3 bucket.run_parquet
is responsible for creating and updating parquet files saved on a public S3 bucket.Use of the
tableauhyperapi
andtableauserverclient
libraries required the updating of the Python version to 3.10. This is to avoid conflicts withbotocore
library the relies on an older version of the Pythonrequests
library.Asana Task: https://app.asana.com/0/1205634094440864/1205550802213386