From f4f41a6362e2817c0beaaf77a1574dbaa992005b Mon Sep 17 00:00:00 2001 From: mschulist Date: Thu, 25 Jul 2024 21:20:12 -0700 Subject: [PATCH 1/9] added write to parquet for classify --- chirp/inference/classify/classify.py | 60 ++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/chirp/inference/classify/classify.py b/chirp/inference/classify/classify.py index 0e1a1103..981352d5 100644 --- a/chirp/inference/classify/classify.py +++ b/chirp/inference/classify/classify.py @@ -25,6 +25,8 @@ import numpy as np import tensorflow as tf import tqdm +import pandas as pd +import os @dataclasses.dataclass @@ -222,3 +224,61 @@ def write_inference_csv( nondetection_count += 1 print('\n\n\n Detection count: ', detection_count) print('NonDetection count: ', nondetection_count) + +def write_inference_parquet( + embeddings_ds: tf.data.Dataset, + model: interface.LogitsOutputHead, + labels: Sequence[str], + output_filepath: str, + embedding_hop_size_s: float, + threshold: dict[str, float] | None = None, + exclude_classes: Sequence[str] = ("unknown",), + include_classes: Sequence[str] = (), + row_size: int = 1_000_000, +): + """Write a Parquet file of inference results. + + Uses Polars to write to a partitioned Parquet file. + Each partition has a maximum of `row_size` rows. + Setting `row_size` too large will result in too few partitions + and may cause memory issues. Setting `row_size` too small will + result in many partitions and may slow down the writing process. + """ + inference_ds = get_inference_dataset(embeddings_ds, model) + + if output_filepath.endswith(".csv"): + output_filepath = output_filepath[:-4] + if not output_filepath.endswith(".parquet"): + output_filepath += ".parquet" + + tmp_df = pd.DataFrame() + detection_count = 0 + nondetection_count = 0 + parquet_count = 0 + os.mkdir(output_filepath) + headers = ["filename", "timestamp_s", "label", "logit"] + for ex in tqdm.tqdm(inference_ds.as_numpy_iterator()): + for t in range(ex["logits"].shape[0]): + for i, label in enumerate(labels): + if label in exclude_classes: + continue + if include_classes and label not in include_classes: + continue + if threshold is None or ex["logits"][t, i] > threshold[label]: + offset = ex["timestamp_s"] + t * embedding_hop_size_s + logit = ex["logits"][t, i] + row = { + headers[0]: ex["filename"].decode("utf-8"), + headers[1]: offset, + headers[2]: label, + headers[3]: logit, + } + tmp_df = pd.concat([tmp_df, pd.DataFrame([row])]) + detection_count += 1 + if len(tmp_df) >= row_size: + tmp_df.to_parquet( + f"{output_filepath}/part.{parquet_count}.parquet") + parquet_count += 1 + tmp_df = pd.DataFrame() + else: + nondetection_count += 1 \ No newline at end of file From 33c3fafc6fff86dd6de283209962f7ffff83901d Mon Sep 17 00:00:00 2001 From: mschulist Date: Thu, 25 Jul 2024 21:58:28 -0700 Subject: [PATCH 2/9] whoops, pandas not polars --- chirp/inference/classify/classify.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chirp/inference/classify/classify.py b/chirp/inference/classify/classify.py index 981352d5..1ef66924 100644 --- a/chirp/inference/classify/classify.py +++ b/chirp/inference/classify/classify.py @@ -238,7 +238,7 @@ def write_inference_parquet( ): """Write a Parquet file of inference results. - Uses Polars to write to a partitioned Parquet file. + Uses Pandas to write to a partitioned Parquet file. Each partition has a maximum of `row_size` rows. Setting `row_size` too large will result in too few partitions and may cause memory issues. Setting `row_size` too small will From b57de58b9b0168f496736c6c3f355616449e6547 Mon Sep 17 00:00:00 2001 From: mschulist Date: Fri, 26 Jul 2024 16:21:40 -0700 Subject: [PATCH 3/9] add print --- chirp/inference/classify/classify.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/chirp/inference/classify/classify.py b/chirp/inference/classify/classify.py index 1ef66924..bfb6e69a 100644 --- a/chirp/inference/classify/classify.py +++ b/chirp/inference/classify/classify.py @@ -281,4 +281,6 @@ def write_inference_parquet( parquet_count += 1 tmp_df = pd.DataFrame() else: - nondetection_count += 1 \ No newline at end of file + nondetection_count += 1 + print('\n\n\n Detection count: ', detection_count) + print('NonDetection count: ', nondetection_count) \ No newline at end of file From e1e6b68b81b03eff91f9f5a90c3640c2963268b9 Mon Sep 17 00:00:00 2001 From: mschulist Date: Sat, 3 Aug 2024 20:23:13 -0700 Subject: [PATCH 4/9] combine into single method --- chirp/inference/classify/classify.py | 130 +++++++++++---------------- 1 file changed, 54 insertions(+), 76 deletions(-) diff --git a/chirp/inference/classify/classify.py b/chirp/inference/classify/classify.py index bfb6e69a..d57f5054 100644 --- a/chirp/inference/classify/classify.py +++ b/chirp/inference/classify/classify.py @@ -183,7 +183,7 @@ def classify_batch(batch): return inference_ds -def write_inference_csv( +def write_inference_file( embeddings_ds: tf.data.Dataset, model: interface.LogitsOutputHead, labels: Sequence[str], @@ -192,95 +192,73 @@ def write_inference_csv( threshold: dict[str, float] | None = None, exclude_classes: Sequence[str] = ('unknown',), include_classes: Sequence[str] = (), -): - """Write a CSV file of inference results.""" - inference_ds = get_inference_dataset(embeddings_ds, model) - - detection_count = 0 - nondetection_count = 0 - with open(output_filepath, 'w') as f: - # Write column headers. - headers = ['filename', 'timestamp_s', 'label', 'logit'] - f.write(', '.join(headers) + '\n') - for ex in tqdm.tqdm(inference_ds.as_numpy_iterator()): - for t in range(ex['logits'].shape[0]): - for i, label in enumerate(labels): - if label in exclude_classes: - continue - if include_classes and label not in include_classes: - continue - if threshold is None or ex['logits'][t, i] > threshold[label]: - offset = ex['timestamp_s'] + t * embedding_hop_size_s - logit = '{:.2f}'.format(ex['logits'][t, i]) - row = [ - ex['filename'].decode('utf-8'), - '{:.2f}'.format(offset), - label, - logit, - ] - f.write(','.join(row) + '\n') - detection_count += 1 - else: - nondetection_count += 1 - print('\n\n\n Detection count: ', detection_count) - print('NonDetection count: ', nondetection_count) - -def write_inference_parquet( - embeddings_ds: tf.data.Dataset, - model: interface.LogitsOutputHead, - labels: Sequence[str], - output_filepath: str, - embedding_hop_size_s: float, - threshold: dict[str, float] | None = None, - exclude_classes: Sequence[str] = ("unknown",), - include_classes: Sequence[str] = (), row_size: int = 1_000_000, + format: str = 'parquet', ): - """Write a Parquet file of inference results. - - Uses Pandas to write to a partitioned Parquet file. - Each partition has a maximum of `row_size` rows. - Setting `row_size` too large will result in too few partitions - and may cause memory issues. Setting `row_size` too small will - result in many partitions and may slow down the writing process. - """ + """Write inference results.""" + + if format != 'parquet' and format != 'csv': + raise ValueError('Format must be either "parquet" or "csv"') + + if format == 'parquet': + if output_filepath.endswith('.csv'): + output_filepath = output_filepath[:-4] + if not output_filepath.endswith('.parquet'): + output_filepath += '.parquet' + + tmp_df = pd.DataFrame() + parquet_count = 0 + os.mkdir(output_filepath) + rows = [] + inference_ds = get_inference_dataset(embeddings_ds, model) - if output_filepath.endswith(".csv"): - output_filepath = output_filepath[:-4] - if not output_filepath.endswith(".parquet"): - output_filepath += ".parquet" - - tmp_df = pd.DataFrame() detection_count = 0 nondetection_count = 0 - parquet_count = 0 - os.mkdir(output_filepath) - headers = ["filename", "timestamp_s", "label", "logit"] + if format == 'csv': + f = open(output_filepath, 'w') + headers = ['filename', 'timestamp_s', 'label', 'logit'] + # Write column headers if CSV format + if format == 'csv': + f.write(','.join(headers) + '\n') for ex in tqdm.tqdm(inference_ds.as_numpy_iterator()): - for t in range(ex["logits"].shape[0]): + for t in range(ex['logits'].shape[0]): for i, label in enumerate(labels): if label in exclude_classes: continue if include_classes and label not in include_classes: continue - if threshold is None or ex["logits"][t, i] > threshold[label]: - offset = ex["timestamp_s"] + t * embedding_hop_size_s - logit = ex["logits"][t, i] - row = { - headers[0]: ex["filename"].decode("utf-8"), - headers[1]: offset, - headers[2]: label, - headers[3]: logit, - } - tmp_df = pd.concat([tmp_df, pd.DataFrame([row])]) + if threshold is None or ex['logits'][t, i] > threshold[label]: + offset = ex['timestamp_s'] + t * embedding_hop_size_s + logit = '{:.2f}'.format(ex['logits'][t, i]) + if format == 'parquet': + row = { + headers[0]: ex["filename"].decode("utf-8"), + headers[1]: offset, + headers[2]: label, + headers[3]: logit, + } + rows.append(row) + if len(rows) >= row_size: + tmp_df = pd.DataFrame(rows) + tmp_df.to_parquet(f'{output_filepath}/part.{parquet_count}.parquet') + parquet_count += 1 + tmp_df = pd.DataFrame() + elif format == 'csv': + row = [ + ex['filename'].decode('utf-8'), + '{:.2f}'.format(offset), + label, + logit, + ] + f.write(','.join(row) + '\n') detection_count += 1 - if len(tmp_df) >= row_size: - tmp_df.to_parquet( - f"{output_filepath}/part.{parquet_count}.parquet") - parquet_count += 1 - tmp_df = pd.DataFrame() else: nondetection_count += 1 + if format == 'parquet' and rows: + tmp_df = pd.DataFrame(rows) + tmp_df.to_parquet(f'{output_filepath}/part.{parquet_count}.parquet') + if format == 'csv': + f.close() print('\n\n\n Detection count: ', detection_count) print('NonDetection count: ', nondetection_count) \ No newline at end of file From 32b7a1247a4bdf426f5579f325a489d9160b241a Mon Sep 17 00:00:00 2001 From: mschulist Date: Sat, 3 Aug 2024 20:54:00 -0700 Subject: [PATCH 5/9] fix rows list --- chirp/inference/classify/classify.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chirp/inference/classify/classify.py b/chirp/inference/classify/classify.py index d57f5054..fbb19171 100644 --- a/chirp/inference/classify/classify.py +++ b/chirp/inference/classify/classify.py @@ -206,7 +206,6 @@ def write_inference_file( if not output_filepath.endswith('.parquet'): output_filepath += '.parquet' - tmp_df = pd.DataFrame() parquet_count = 0 os.mkdir(output_filepath) rows = [] @@ -243,7 +242,7 @@ def write_inference_file( tmp_df = pd.DataFrame(rows) tmp_df.to_parquet(f'{output_filepath}/part.{parquet_count}.parquet') parquet_count += 1 - tmp_df = pd.DataFrame() + rows = [] elif format == 'csv': row = [ ex['filename'].decode('utf-8'), @@ -255,6 +254,7 @@ def write_inference_file( detection_count += 1 else: nondetection_count += 1 + # write remaining rows if parquet format if format == 'parquet' and rows: tmp_df = pd.DataFrame(rows) tmp_df.to_parquet(f'{output_filepath}/part.{parquet_count}.parquet') From da13b221816db96398bab222ac86ebb8eb7fdb64 Mon Sep 17 00:00:00 2001 From: mschulist Date: Sun, 4 Aug 2024 11:05:44 -0700 Subject: [PATCH 6/9] start test, definitely not finished... --- chirp/inference/tests/classify_test.py | 58 +++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/chirp/inference/tests/classify_test.py b/chirp/inference/tests/classify_test.py index 3475442c..5fa06a2b 100644 --- a/chirp/inference/tests/classify_test.py +++ b/chirp/inference/tests/classify_test.py @@ -15,16 +15,20 @@ """Test small-model classification.""" +import os import tempfile +from etils import epath -from chirp.inference import interface +from chirp.inference import embed_lib, interface from chirp.inference.classify import classify from chirp.inference.classify import data_lib +from chirp.inference.search import bootstrap from chirp.taxonomy import namespace import numpy as np from absl.testing import absltest from absl.testing import parameterized +from bootstrap_test import BootstrapTest class ClassifyTest(parameterized.TestCase): @@ -99,6 +103,58 @@ def test_train_linear_model(self): error = np.abs(restored_logits - logits).sum() self.assertEqual(error, 0) + def test_write_inference_file(self): + # copy from test_train_linear_model to get the model + embedding_dim = 128 + num_classes = 4 + model = classify.get_linear_model(embedding_dim, num_classes) + + classes = ['a', 'b', 'c', 'd'] + logits_model = interface.LogitsOutputHead( + model_path='./test_model', + logits_key='some_model', + logits_model=model, + class_list=namespace.ClassList('classes', classes), + ) + + # make a fake embeddings dataset + filenames = ['file1', 'file2', 'file3'] + bt = BootstrapTest() + bt.setUp() + audio_glob = bt.make_wav_files(classes, filenames) + source_infos = embed_lib.create_source_infos([audio_glob], shard_len_s=5.0) + + embed_dir = os.path.join(bt.tempdir, 'embeddings') + labeled_dir = os.path.join(bt.tempdir, 'labeled') + epath.Path(embed_dir).mkdir(parents=True, exist_ok=True) + epath.Path(labeled_dir).mkdir(parents=True, exist_ok=True) + + print(source_infos) + print(bt.tempdir) + + bt.write_placeholder_embeddings(audio_glob, source_infos, embed_dir) + + bootstrap_config = bootstrap.BootstrapConfig.load_from_embedding_path( + embeddings_path=embed_dir, + annotated_path=labeled_dir, + ) + print('config hash : ', bootstrap_config.embedding_config_hash()) + + project_state = bootstrap.BootstrapState( + config=bootstrap_config, + ) + + embeddings_ds = project_state.create_embeddings_dataset() + + classify.write_inference_file( + embeddings_ds=embeddings_ds, + model=logits_model, + labels=classes, + output_filepath='./test_output', + embedding_hop_size_s=5.0, + row_size=1, + format='csv' + ) if __name__ == '__main__': absltest.main() From bbc8e612babaeb0c7b4541bf8b500aa3527f999c Mon Sep 17 00:00:00 2001 From: mschulist Date: Sun, 4 Aug 2024 12:30:49 -0700 Subject: [PATCH 7/9] write test and fix errors in write --- chirp/inference/classify/classify.py | 15 ++-- chirp/inference/tests/classify_test.py | 97 +++++++++++++++++--------- 2 files changed, 74 insertions(+), 38 deletions(-) diff --git a/chirp/inference/classify/classify.py b/chirp/inference/classify/classify.py index fbb19171..86fc5aad 100644 --- a/chirp/inference/classify/classify.py +++ b/chirp/inference/classify/classify.py @@ -205,10 +205,15 @@ def write_inference_file( output_filepath = output_filepath[:-4] if not output_filepath.endswith('.parquet'): output_filepath += '.parquet' - - parquet_count = 0 os.mkdir(output_filepath) - rows = [] + if format == 'csv': + if output_filepath.endswith('.parquet'): + output_filepath = output_filepath[:-8] + if not output_filepath.endswith('.csv'): + output_filepath += '.csv' + + parquet_count = 0 + rows = [] inference_ds = get_inference_dataset(embeddings_ds, model) @@ -229,7 +234,7 @@ def write_inference_file( continue if threshold is None or ex['logits'][t, i] > threshold[label]: offset = ex['timestamp_s'] + t * embedding_hop_size_s - logit = '{:.2f}'.format(ex['logits'][t, i]) + logit = ex['logits'][t, i] if format == 'parquet': row = { headers[0]: ex["filename"].decode("utf-8"), @@ -248,7 +253,7 @@ def write_inference_file( ex['filename'].decode('utf-8'), '{:.2f}'.format(offset), label, - logit, + '{:.2f}'.format(logit), ] f.write(','.join(row) + '\n') detection_count += 1 diff --git a/chirp/inference/tests/classify_test.py b/chirp/inference/tests/classify_test.py index 5fa06a2b..d2d1105d 100644 --- a/chirp/inference/tests/classify_test.py +++ b/chirp/inference/tests/classify_test.py @@ -17,18 +17,18 @@ import os import tempfile -from etils import epath -from chirp.inference import embed_lib, interface +import pandas as pd + +from chirp.inference import interface, tf_examples from chirp.inference.classify import classify from chirp.inference.classify import data_lib -from chirp.inference.search import bootstrap from chirp.taxonomy import namespace import numpy as np from absl.testing import absltest from absl.testing import parameterized -from bootstrap_test import BootstrapTest +import shutil class ClassifyTest(parameterized.TestCase): @@ -102,8 +102,33 @@ def test_train_linear_model(self): restored_logits = restored_model(query) error = np.abs(restored_logits - logits).sum() self.assertEqual(error, 0) + + def write_random_embeddings(self, embedding_dim, filenames, tempdir): + """Write random embeddings to a temporary directory.""" + rng = np.random.default_rng(42) + with tf_examples.EmbeddingsTFRecordMultiWriter( + output_dir=tempdir, num_files=1 + ) as file_writer: + for filename in filenames: + embedding = rng.normal(size=(1, 1, embedding_dim)).astype(np.float32) + model_outputs = interface.InferenceOutputs(embedding) + example = tf_examples.model_outputs_to_tf_example( + model_outputs=model_outputs, + file_id=filename, + audio=np.array([]), + timestamp_offset_s=0, + write_raw_audio=False, + write_separated_audio=False, + write_embeddings=True, + write_logits=False, + ) + file_writer.write(example.SerializeToString()) + file_writer.flush() def test_write_inference_file(self): + """Test writing inference files.""" + tempdir = tempfile.mkdtemp() + # copy from test_train_linear_model to get the model embedding_dim = 128 num_classes = 4 @@ -111,50 +136,56 @@ def test_write_inference_file(self): classes = ['a', 'b', 'c', 'd'] logits_model = interface.LogitsOutputHead( - model_path='./test_model', + model_path=os.path.join(tempdir, 'model'), logits_key='some_model', logits_model=model, class_list=namespace.ClassList('classes', classes), ) # make a fake embeddings dataset - filenames = ['file1', 'file2', 'file3'] - bt = BootstrapTest() - bt.setUp() - audio_glob = bt.make_wav_files(classes, filenames) - source_infos = embed_lib.create_source_infos([audio_glob], shard_len_s=5.0) - - embed_dir = os.path.join(bt.tempdir, 'embeddings') - labeled_dir = os.path.join(bt.tempdir, 'labeled') - epath.Path(embed_dir).mkdir(parents=True, exist_ok=True) - epath.Path(labeled_dir).mkdir(parents=True, exist_ok=True) + filenames = [f'file_{i}' for i in range(100)] - print(source_infos) - print(bt.tempdir) - - bt.write_placeholder_embeddings(audio_glob, source_infos, embed_dir) - - bootstrap_config = bootstrap.BootstrapConfig.load_from_embedding_path( - embeddings_path=embed_dir, - annotated_path=labeled_dir, - ) - print('config hash : ', bootstrap_config.embedding_config_hash()) - - project_state = bootstrap.BootstrapState( - config=bootstrap_config, - ) + self.write_random_embeddings(embedding_dim, filenames, tempdir) - embeddings_ds = project_state.create_embeddings_dataset() + embeddings_ds = tf_examples.create_embeddings_dataset(embeddings_dir=tempdir) + + parquet_path = os.path.join(tempdir, 'output.parquet') + csv_path = os.path.join(tempdir, 'output.csv') classify.write_inference_file( embeddings_ds=embeddings_ds, model=logits_model, labels=classes, - output_filepath='./test_output', + output_filepath=parquet_path, embedding_hop_size_s=5.0, - row_size=1, - format='csv' + row_size=10, + format='parquet', ) + + classify.write_inference_file( + embeddings_ds=embeddings_ds, + model=logits_model, + labels=classes, + output_filepath=csv_path, + embedding_hop_size_s=5.0, + format='csv', + ) + + parquet = pd.read_parquet(parquet_path) + parquet['filename_i'] = parquet['filename'].str.split('_').str[1].astype(int) + parquet = parquet.sort_values(by=['filename_i', 'timestamp_s']).reset_index(drop=True) + + csv = pd.read_csv(csv_path) + csv['filename_i'] = csv['filename'].str.split('_').str[1].astype(int) + csv = csv.sort_values(by=['filename_i', 'timestamp_s']).reset_index(drop=True) + + n_expected_rows = len(filenames) * len(classes) + self.assertTrue(np.allclose(parquet['logit'], csv['logit'], atol=1e-2)) + self.assertEqual(len(parquet), n_expected_rows) + self.assertEqual(len(csv), n_expected_rows) + + shutil.rmtree(tempdir) + if __name__ == '__main__': absltest.main() From a273206a2222716adad03bc359dcab7cd78e269c Mon Sep 17 00:00:00 2001 From: mschulist Date: Mon, 5 Aug 2024 10:10:17 -0700 Subject: [PATCH 8/9] clean up writing code, add ePath ability --- chirp/inference/classify/classify.py | 99 +++++++++++++------------- chirp/inference/tests/classify_test.py | 7 +- 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/chirp/inference/classify/classify.py b/chirp/inference/classify/classify.py index 86fc5aad..c7b917df 100644 --- a/chirp/inference/classify/classify.py +++ b/chirp/inference/classify/classify.py @@ -27,6 +27,7 @@ import tqdm import pandas as pd import os +from etils import epath @dataclasses.dataclass @@ -182,49 +183,63 @@ def classify_batch(batch): ) return inference_ds +def flush_rows( + output_path: epath.Path, + shard_num: int, + rows: list[dict[str, str]], + format: str, + headers: list[str], +): + """Helper method to write rows to disk.""" + if format == 'csv': + if shard_num == 0: + with output_path.open('w') as f: + f.write(','.join(headers) + '\n') + with output_path.open('a') as f: + for row in rows: + csv_row = [ + '{:.2f}'.format(row.get(h, '')) if isinstance(row.get(h, ''), np.float32) else row.get(h, '') + for h in row + ] + f.write(','.join(csv_row) + '\n') + elif format == 'parquet': + output_path.mkdir(parents=True, exist_ok=True) + parquet_path = output_path / f'part.{shard_num}.parquet' + pd.DataFrame(rows).to_parquet(parquet_path) + else: + raise ValueError('Output format must be either csv or parquet') + def write_inference_file( embeddings_ds: tf.data.Dataset, model: interface.LogitsOutputHead, labels: Sequence[str], - output_filepath: str, + output_filepath: epath.PathLike, embedding_hop_size_s: float, threshold: dict[str, float] | None = None, exclude_classes: Sequence[str] = ('unknown',), include_classes: Sequence[str] = (), - row_size: int = 1_000_000, - format: str = 'parquet', + shard_size: int = 1_000_000, ): """Write inference results.""" + output_filepath = epath.Path(output_filepath) - if format != 'parquet' and format != 'csv': - raise ValueError('Format must be either "parquet" or "csv"') - - if format == 'parquet': - if output_filepath.endswith('.csv'): - output_filepath = output_filepath[:-4] - if not output_filepath.endswith('.parquet'): - output_filepath += '.parquet' - os.mkdir(output_filepath) - if format == 'csv': - if output_filepath.endswith('.parquet'): - output_filepath = output_filepath[:-8] - if not output_filepath.endswith('.csv'): - output_filepath += '.csv' + if str(output_filepath).endswith('.csv'): + format = 'csv' + elif str(output_filepath).endswith('.parquet'): + format = 'parquet' + else: + raise ValueError('Output file must end with either .csv or .parquet') - parquet_count = 0 + shard_num = 0 rows = [] inference_ds = get_inference_dataset(embeddings_ds, model) detection_count = 0 nondetection_count = 0 - if format == 'csv': - f = open(output_filepath, 'w') headers = ['filename', 'timestamp_s', 'label', 'logit'] # Write column headers if CSV format - if format == 'csv': - f.write(','.join(headers) + '\n') for ex in tqdm.tqdm(inference_ds.as_numpy_iterator()): for t in range(ex['logits'].shape[0]): for i, label in enumerate(labels): @@ -235,35 +250,21 @@ def write_inference_file( if threshold is None or ex['logits'][t, i] > threshold[label]: offset = ex['timestamp_s'] + t * embedding_hop_size_s logit = ex['logits'][t, i] - if format == 'parquet': - row = { - headers[0]: ex["filename"].decode("utf-8"), - headers[1]: offset, - headers[2]: label, - headers[3]: logit, - } - rows.append(row) - if len(rows) >= row_size: - tmp_df = pd.DataFrame(rows) - tmp_df.to_parquet(f'{output_filepath}/part.{parquet_count}.parquet') - parquet_count += 1 - rows = [] - elif format == 'csv': - row = [ - ex['filename'].decode('utf-8'), - '{:.2f}'.format(offset), - label, - '{:.2f}'.format(logit), - ] - f.write(','.join(row) + '\n') + row = { + headers[0]: ex["filename"].decode("utf-8"), + headers[1]: np.float32(offset), + headers[2]: label, + headers[3]: np.float32(logit), + } + rows.append(row) + if len(rows) >= shard_size: + flush_rows(output_filepath, shard_num, rows, format, headers) + rows = [] + shard_num += 1 detection_count += 1 else: nondetection_count += 1 - # write remaining rows if parquet format - if format == 'parquet' and rows: - tmp_df = pd.DataFrame(rows) - tmp_df.to_parquet(f'{output_filepath}/part.{parquet_count}.parquet') - if format == 'csv': - f.close() + # write remaining rows + flush_rows(output_filepath, shard_num, rows, format, headers) print('\n\n\n Detection count: ', detection_count) print('NonDetection count: ', nondetection_count) \ No newline at end of file diff --git a/chirp/inference/tests/classify_test.py b/chirp/inference/tests/classify_test.py index d2d1105d..db5f5d1a 100644 --- a/chirp/inference/tests/classify_test.py +++ b/chirp/inference/tests/classify_test.py @@ -143,7 +143,7 @@ def test_write_inference_file(self): ) # make a fake embeddings dataset - filenames = [f'file_{i}' for i in range(100)] + filenames = [f'file_{i}' for i in range(101)] self.write_random_embeddings(embedding_dim, filenames, tempdir) @@ -158,8 +158,7 @@ def test_write_inference_file(self): labels=classes, output_filepath=parquet_path, embedding_hop_size_s=5.0, - row_size=10, - format='parquet', + shard_size=10, ) classify.write_inference_file( @@ -168,7 +167,7 @@ def test_write_inference_file(self): labels=classes, output_filepath=csv_path, embedding_hop_size_s=5.0, - format='csv', + shard_size=10, ) parquet = pd.read_parquet(parquet_path) From 4a66bb5c96ac232320f9753038d9c7f8e5083832 Mon Sep 17 00:00:00 2001 From: mschulist Date: Tue, 6 Aug 2024 12:07:58 -0700 Subject: [PATCH 9/9] rename flush rows and remove old comments --- chirp/inference/classify/classify.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/chirp/inference/classify/classify.py b/chirp/inference/classify/classify.py index c7b917df..88edcd79 100644 --- a/chirp/inference/classify/classify.py +++ b/chirp/inference/classify/classify.py @@ -183,7 +183,7 @@ def classify_batch(batch): ) return inference_ds -def flush_rows( +def flush_inference_rows( output_path: epath.Path, shard_num: int, rows: list[dict[str, str]], @@ -239,7 +239,6 @@ def write_inference_file( detection_count = 0 nondetection_count = 0 headers = ['filename', 'timestamp_s', 'label', 'logit'] - # Write column headers if CSV format for ex in tqdm.tqdm(inference_ds.as_numpy_iterator()): for t in range(ex['logits'].shape[0]): for i, label in enumerate(labels): @@ -258,13 +257,13 @@ def write_inference_file( } rows.append(row) if len(rows) >= shard_size: - flush_rows(output_filepath, shard_num, rows, format, headers) + flush_inference_rows(output_filepath, shard_num, rows, format, headers) rows = [] shard_num += 1 detection_count += 1 else: nondetection_count += 1 # write remaining rows - flush_rows(output_filepath, shard_num, rows, format, headers) + flush_inference_rows(output_filepath, shard_num, rows, format, headers) print('\n\n\n Detection count: ', detection_count) print('NonDetection count: ', nondetection_count) \ No newline at end of file