From 63d87c32daed6ed2fb10790b7e6f7ac416f3ecf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20Mart=C3=ADnez?= <26169771+miguelusque@users.noreply.github.com> Date: Tue, 1 Oct 2024 20:43:55 +0200 Subject: [PATCH 1/3] Fix 255 - Improve separate_by_metadata performance for jsonl files (#256) * Improve performance in jsonl files Signed-off-by: miguelusque * Improve performance in jsonl files Signed-off-by: miguelusque * Shutdown Dask cluster at exit Signed-off-by: miguelusque * Remove unneeded persist() and wait() operations Signed-off-by: miguelusque * Display only Dask error messages or above Signed-off-by: miguelusque * Cancel any remaining futures Signed-off-by: miguelusque * Remove Dask warning message Signed-off-by: miguelusque * Rename new arguments Signed-off-by: miguelusque * Refactor separate_by_metadata Signed-off-by: miguelusque --------- Signed-off-by: miguelusque Co-authored-by: miguelusque --- nemo_curator/scripts/separate_by_metadata.py | 80 ++++++---- nemo_curator/utils/file_utils.py | 141 ++++++++++++++++-- ...tadata.py => test_separate_by_metadata.py} | 2 +- 3 files changed, 180 insertions(+), 43 deletions(-) rename tests/{test_seperate_by_metadata.py => test_separate_by_metadata.py} (98%) diff --git a/nemo_curator/scripts/separate_by_metadata.py b/nemo_curator/scripts/separate_by_metadata.py index e8ff4f92..c7be49cf 100644 --- a/nemo_curator/scripts/separate_by_metadata.py +++ b/nemo_curator/scripts/separate_by_metadata.py @@ -14,45 +14,53 @@ import argparse import json +import logging import shutil -from nemo_curator.utils.distributed_utils import get_client, read_data -from nemo_curator.utils.file_utils import ( - expand_outdir_and_mkdir, - get_all_files_paths_under, - separate_by_metadata, -) +from dask.distributed.utils import silence_logging_cmgr + +from nemo_curator.utils.distributed_utils import get_client +from nemo_curator.utils.file_utils import separate_by_metadata from nemo_curator.utils.script_utils import ArgumentHelper def main(args): - client = get_client(**ArgumentHelper.parse_client_args(args)) + print(f"Beginning metadata separation for {args.input_metadata_field}") - files = get_all_files_paths_under(args.input_data_dir) - input_data = read_data( - files, file_type=args.input_file_type, backend="pandas", add_filename=True - ) + with silence_logging_cmgr(logging.ERROR): + # Initializes a Dask cluster. + client = get_client(**ArgumentHelper.parse_client_args(args)) + + # Separete corpus by metadata + metadata_distribution = separate_by_metadata( + input_data=args.input_data_dir, + output_dir=args.output_data_dir, + metadata_field=args.input_metadata_field, + remove_metadata=args.remove_metadata_field, + output_type=args.output_file_type, + input_type=args.input_file_type, + include_values=args.include_values, + exclude_values=args.exclude_values, + ) - output_dir = expand_outdir_and_mkdir(args.output_data_dir) + # Save metadata distribution to disk + with open(args.output_metadata_distribution, "w") as fp: + json.dump(metadata_distribution.compute(), fp) - metadata_field = args.input_metadata_field - print(f"Beginning metadata separation for {metadata_field}") - metadata_distribution = separate_by_metadata( - input_data, - output_dir, - metadata_field, - remove_metadata=args.remove_metadata_field, - output_type=args.output_file_type, - ).compute() - print(f"Finished metadata separation for {metadata_field}") + # Optionally, remove input directory + if args.remove_input_dir: + print(f"Removing all files in {args.input_data_dir}") + shutil.rmtree(args.input_data_dir) + print(f"Finished removing all files in {args.input_data_dir}") - with open(args.output_metadata_distribution, "w") as fp: - json.dump(metadata_distribution, fp) + # Cancel any remaining futures (if any) + client.cancel(metadata_distribution) - if args.remove_input_dir: - print(f"Removing all files in {args.input_data_dir}") - shutil.rmtree(args.input_data_dir) - print(f"Finished removing all files in {args.input_data_dir}") + # Shut down the cluster + client.shutdown() + + # Close the client + client.close() def attach_args( @@ -103,6 +111,22 @@ def attach_args( "is desired to be separated from the others", ) + exclusive_filters_group = parser.add_mutually_exclusive_group(required=False) + exclusive_filters_group.add_argument( + "--include-values", + nargs="+", + type=str, + help="A list of strings representing specific values to be selected or included. " + "If provided, only the items matching these values should be kept.", + ) + exclusive_filters_group.add_argument( + "--exclude-values", + nargs="+", + type=str, + help="A list of strings representing specific values to be excluded or ignored. " + "If provided, any items matching these values should be skipped.", + ) + return parser diff --git a/nemo_curator/utils/file_utils.py b/nemo_curator/utils/file_utils.py index 5793b235..6bb45d2c 100644 --- a/nemo_curator/utils/file_utils.py +++ b/nemo_curator/utils/file_utils.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. + import json import os import pathlib from functools import partial, reduce +from typing import List, Union import dask.bag as db import dask.dataframe as dd @@ -23,7 +25,10 @@ import pandas as pd from dask import delayed -from nemo_curator.utils.distributed_utils import single_partition_write_with_filename +from nemo_curator.utils.distributed_utils import ( + read_data, + single_partition_write_with_filename, +) NEMO_CURATOR_HOME = os.environ.get( "NEMO_CURATOR_HOME", os.path.join(os.path.expanduser("~"), ".nemo_curator") @@ -96,7 +101,7 @@ def get_remaining_files( for entry in os.scandir(input_file_path) if os.path.basename(entry.path) not in completed_files ] - # Gaurd against non extension files if present in the input directory + # Guard against non extension files if present in the input directory input_files = [f for f in input_files if f.endswith(input_file_type)] input_files.sort() @@ -131,13 +136,26 @@ def get_batched_files( def write_dataframe_by_meta( - df: pd.DataFrame, output_dir, metadata_field, remove_metadata, output_type + df: pd.DataFrame, + output_dir: str, + metadata_field: str, + remove_metadata: bool = False, + output_type: str = "jsonl", + include_values: List[str] = None, + exclude_values: List[str] = None, ): counts = df[metadata_field].value_counts().to_dict() + # Apply include_values or value_exclesion_filter if provided + if include_values is not None and include_values: + counts = {k: v for k, v in counts.items() if k in include_values} + elif exclude_values is not None and exclude_values: + counts = {k: v for k, v in counts.items() if k not in exclude_values} + for meta_value in counts: meta_output_dir = expand_outdir_and_mkdir(os.path.join(output_dir, meta_value)) meta_slice = df[df[metadata_field] == meta_value] + if remove_metadata: meta_slice = meta_slice.drop(columns=[metadata_field]) single_partition_write_with_filename( @@ -154,35 +172,130 @@ def merge_counts(first: dict, second: dict): return first +def write_record( + input_dir: str, + file_name: str, + line: str, + field: str, + output_dir: str, + include_values: List[str] = None, + exclude_values: List[str] = None, +): + try: + # Parse the JSON-encoded string 'line' into a Python dictionary + line = json.loads(line) + + # Select category value + category = line[field] + + if (exclude_values and category in exclude_values) or ( + include_values and category not in include_values + ): + return None + + # Obtain the relative path + rel_path, file_name = os.path.split( + os.path.relpath(file_name, start=os.path.abspath(input_dir)) + ) + + output_dir = os.path.join(output_dir, category, rel_path) + os.makedirs(output_dir, exist_ok=True) + with open(f"{output_dir}/{file_name}", "a") as f: + f.write(json.dumps(line) + "\n") + + return category + except (KeyError, ValueError, json.JSONDecodeError): + return None + + def separate_by_metadata( - df: dd.DataFrame, - output_dir, - metadata_field, - remove_metadata=False, - output_type="jsonl", + input_data: Union[dd.DataFrame, str], + output_dir: str, + metadata_field: str, + remove_metadata: bool = False, + output_type: str = "jsonl", + input_type: str = "jsonl", + include_values: List[str] = None, + exclude_values: List[str] = None, ) -> dict: """ Saves the dataframe to subfolders named after a metadata Args: - df: The dataframe to write. Must have a filename column for the shard. + input_data: Either a DataFrame or a string representing the path to the input directory. + If a DataFrame is provided, it must have a 'filename' column for the shard. output_dir: The base directory for which all metadata based subdirs will be created under metadata_field: The metadata field to split on remove_metadata: Whether to remove the metadata from the dataframe when saving it + output_type: File type the dataset will be written to. Supported file formats include 'jsonl' (default), + 'pickle', or 'parquet'. (default: jsonl) + include_values: A list of strings representing specific values to be selected or included. + If provided, only the items matching these values should be kept. + exclude_values: A list of strings representing specific values to be excluded or ignored. + If provided, any items matching these values should be skipped. + Returns: A delayed dictionary mapping each metadata to the count of entries with that metadata value. """ - delayed_data = df.to_delayed() + + if include_values is not None and exclude_values is not None: + print("Error: 'include_values' and 'exclude_values' are mutually exclusive.") + + return + + # Create output_dir if needed + if output_dir: + output_dir = expand_outdir_and_mkdir(output_dir) + + if isinstance(input_data, str): + print(f"Reading {input_type} files from {input_data}", flush=True) + + if input_type in ["json", "jsonl"] and output_type in ["json", "jsonl"]: + # Read JSONL files with streaming (line-by-line), and include file path + bag = db.read_text( + os.path.join(input_data, "**", f"*.{input_type}"), + include_path=True, + ) + + # Parse JSON lines and retain the file path + bag = bag.map( + lambda x: write_record( + input_dir=input_data, + file_name=x[1], + line=x[0], + field=metadata_field, + output_dir=output_dir, + include_values=include_values, + exclude_values=exclude_values, + ) + ) + + frequencies = dict(bag.frequencies().compute()) + frequencies.pop(None, None) # Remove None when applying filters + + return delayed(reduce)(merge_counts, [frequencies]) + else: + input_data = read_data( + get_all_files_paths_under(input_data), + file_type=input_type, + backend="pandas", + add_filename=True, + ) delayed_counts = [ delayed(write_dataframe_by_meta)( - partition, output_dir, metadata_field, remove_metadata, output_type + partition, + output_dir, + metadata_field, + remove_metadata, + output_type, + include_values, + exclude_values, ) - for partition in delayed_data + for partition in input_data.to_delayed() ] - merged_counts = delayed(reduce)(merge_counts, delayed_counts) - return merged_counts + return delayed(reduce)(merge_counts, delayed_counts) def parse_str_of_num_bytes(s, return_str=False): diff --git a/tests/test_seperate_by_metadata.py b/tests/test_separate_by_metadata.py similarity index 98% rename from tests/test_seperate_by_metadata.py rename to tests/test_separate_by_metadata.py index b692f4b2..68b57497 100644 --- a/tests/test_seperate_by_metadata.py +++ b/tests/test_separate_by_metadata.py @@ -64,7 +64,7 @@ def test_metadatasep( add_filename=True, ).df separate_by_metadata( - df=df, + input_data=df, output_dir=str(output_dir), metadata_field="metadata", output_type=file_ext, From b61e2c2c743fa0f09f8e4c794fcfa580dd1562d7 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 2 Oct 2024 13:07:11 -0700 Subject: [PATCH 2/3] Pin to spacy<3.8 temporarily to unblock CI (#276) * Pin to spacy<3.8 temporarily to unblock CI Signed-off-by: Ayush Dattagupta * Update pin in rapids nightly dep as well Signed-off-by: Ayush Dattagupta --------- Signed-off-by: Ayush Dattagupta --- requirements/requirements.txt | 2 +- requirements/requirements_cuda12x.txt | 2 +- requirements/requirements_rapids_nightly.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 6d7ac2ac..f6fd7d19 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -21,7 +21,7 @@ presidio-analyzer==2.2.351 presidio-anonymizer==2.2.351 pycld2 resiliparse -spacy>=3.6.0, <4.0.0 +spacy>=3.6.0, <3.8.0 unidic-lite==1.0.8 usaddress==0.5.10 warcio==1.7.4 diff --git a/requirements/requirements_cuda12x.txt b/requirements/requirements_cuda12x.txt index 2b1e4cd2..99e2ede7 100644 --- a/requirements/requirements_cuda12x.txt +++ b/requirements/requirements_cuda12x.txt @@ -3,4 +3,4 @@ cugraph-cu12>=24.8 cuml-cu12>=24.8 dask-cuda>=24.8 dask-cudf-cu12>=24.8 -spacy[cuda12x]>=3.6.0, <4.0.0 +spacy[cuda12x]>=3.6.0, <3.8.0 diff --git a/requirements/requirements_rapids_nightly.txt b/requirements/requirements_rapids_nightly.txt index ef4e0f79..6d7e9785 100644 --- a/requirements/requirements_rapids_nightly.txt +++ b/requirements/requirements_rapids_nightly.txt @@ -3,4 +3,4 @@ cugraph-cu12>=24.10.0a0,<=24.10 cuml-cu12>=24.10.0a0,<=24.10 dask-cuda>=24.10.0a0,<=24.10 dask-cudf-cu12>=24.10.0a0,<=24.10 -spacy[cuda12x]>=3.6.0, <4.0.0 +spacy[cuda12x]>=3.6.0, <3.8.0 From d9c414b37a7a84fb5b7d346ee5d59012e4866af8 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Wed, 2 Oct 2024 15:33:06 -0600 Subject: [PATCH 3/3] Fix enabling spilling by enabling it on client process (#275) * Add comment about fix to dask-cuda Signed-off-by: Vibhu Jawa * Remove client.run Signed-off-by: Vibhu Jawa --------- Signed-off-by: Vibhu Jawa --- nemo_curator/utils/distributed_utils.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index a847d13e..1c54e84d 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -43,6 +43,18 @@ class NoWorkerError(Exception): pass +def _enable_spilling(): + """ + Setting this environment variable enables automatic spilling (and "unspilling") + of buffers from device to host to enable out-of-memory computation, + i.e., computing on objects that occupy more memory than is available on the GPU. + """ + # Workaround for below (which is missing in 24.08, but fixed in 24.10) + # Remove this when we update to 24.10 or later dask-cuda + # https://github.com/rapidsai/dask-cuda/pull/1369/files + cudf.set_option("spill", True) + + def start_dask_gpu_local_cluster( nvlink_only=False, protocol="tcp", @@ -79,6 +91,11 @@ def start_dask_gpu_local_cluster( _set_torch_to_use_rmm() client.run(_set_torch_to_use_rmm) print("Torch is using RMM memory pool", flush=True) + + if enable_spilling: + _enable_spilling() + print("cuDF Spilling is enabled", flush=True) + return client