Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write to parquet #679

Merged
merged 11 commits into from
Aug 7, 2024
Merged

Conversation

mschulist
Copy link
Contributor

Right now, the file sizes are giant when writing the output from the classifier to a csv. However, Parquet reduces this file size significantly (in our case, we saw files go from ~80GB to ~3GB due to the number of duplicate filenames).

By writing to a partitioned Parquet file, we can get small files and reduce the amount of memory needed.

@sdenton4
Copy link
Collaborator

sdenton4 commented Aug 4, 2024

Hey, Mark! Thanks for this; I actually didn't know that Pandas has parquet support.

There's a lot of repeated logic in the parquet and csv methods; could you merge them into a single method with an argument to choose the output format?

@mschulist
Copy link
Contributor Author

Yeah, that sounds good! I'm not sure if concatenating dataframes is the most memory efficient way of buffering dataframes (it might have to copy a lot of data?), so I might look into other ways of buffering the output.

@mschulist
Copy link
Contributor Author

I'd like to test it before merging, but I couldn't find an easy way to test it in the existing test file. Is there a way to make a unit test for it, or do we have to just run it in a notebook on an existing dataset?

@sdenton4
Copy link
Collaborator

sdenton4 commented Aug 4, 2024

Certainly wouldn't say no to a test, but you're right that there's not an existing test for this function. The place to put a new test is here:
https://github.com/google-research/perch/blob/main/chirp/inference/tests/classify_test.py

There's an example of creating a test dataset here which might be helpful:
https://source.corp.google.com/piper///depot/google3/third_party/py/chirp/inference/tests/bootstrap_test.py;l=47

@mschulist
Copy link
Contributor Author

I was able to make a test (just creating random embeddings) and check that the csv and parquet files are "equal" (or close enough...floating point numbers). The files are significantly smaller with parquet, even in the test with only 4 classes, which is good to see.

"""Write a CSV file of inference results."""
"""Write inference results."""

if format != 'parquet' and format != 'csv':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit cleaner:

if format == 'parquet':
  ...
elif format == 'csv':
  ...
else:
  raise ValueError(...)

if format == 'parquet':
if output_filepath.endswith('.csv'):
output_filepath = output_filepath[:-4]
if not output_filepath.endswith('.parquet'):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This second-guessing of the user-intention from the extension and format args is a bit cumbersome.

Maybe we should get the extension from the output file and use that instead of an arg? (and complain if it's not one of our accepted types.)

Then we would have:

if output_filepath.endswith('.parquet'):
  format = 'parquet'
elif output_filepath.endswith('.csv'):
  format = 'csv'
else:
  raise ValueError(...)

which saves an argument and ~12 lines of code.

offset = ex['timestamp_s'] + t * embedding_hop_size_s
logit = '{:.2f}'.format(ex['logits'][t, i])
if format == 'csv':
f = open(output_filepath, 'w')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to use the with open(...) as f because it ensures that the file will be properly flushed and closed if an exception arises, or if we return early for some reason.

if threshold is None or ex['logits'][t, i] > threshold[label]:
offset = ex['timestamp_s'] + t * embedding_hop_size_s
logit = ex['logits'][t, i]
if format == 'parquet':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe simpler:

Write a helper function flush_rows(output_path, shard_num, rows, format, headers) which writes everything in rows to a file. Then all of the writing logic is centralized; you can call the function here and below when you deal with the remainder rows.

This also helps with the csv file handling; you just open the file and write to it when you're flushing the data to disk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That make it SO much cleaner

Copy link
Collaborator

@sdenton4 sdenton4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we're just about there, thanks for sticking with it!

else:
nondetection_count += 1
headers = ['filename', 'timestamp_s', 'label', 'logit']
# Write column headers if CSV format
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this comment can be deleted now

@@ -180,45 +183,88 @@ def classify_batch(batch):
)
return inference_ds

def flush_rows(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a slightly more descriptive name, like flush_inference_rows.

@sdenton4 sdenton4 merged commit 0402b78 into google-research:main Aug 7, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants