From 70360958a9164abc7f64d936934aa81ceb760f42 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 18 Oct 2024 12:30:03 -0700 Subject: [PATCH 1/4] add column param Signed-off-by: Sarah Yurick --- nemo_curator/datasets/doc_dataset.py | 16 +++++++++++ nemo_curator/utils/distributed_utils.py | 38 +++++++++++++++++++------ 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index b3c595cf..5a39bf50 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -43,6 +43,8 @@ def read_json( files_per_partition: int = 1, add_filename: bool = False, input_meta: Union[str, dict] = None, + columns: List[str] = None, + **kwargs, ): return cls( _read_json_or_parquet( @@ -52,6 +54,8 @@ def read_json( files_per_partition=files_per_partition, add_filename=add_filename, input_meta=input_meta, + columns=columns, + **kwargs, ) ) @@ -62,6 +66,8 @@ def read_parquet( backend="pandas", files_per_partition=1, add_filename=False, + columns: List[str] = None, + **kwargs, ): return cls( _read_json_or_parquet( @@ -70,6 +76,8 @@ def read_parquet( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + columns=columns, + **kwargs, ) ) @@ -175,6 +183,8 @@ def _read_json_or_parquet( files_per_partition: int, add_filename: bool, input_meta: Union[str, dict] = None, + columns: List[str] = None, + **kwargs, ): """ `input_files` may be a list or a string type. @@ -205,6 +215,8 @@ def _read_json_or_parquet( files_per_partition=files_per_partition, add_filename=add_filename, input_meta=input_meta, + columns=columns, + **kwargs, ) # List of directories @@ -222,6 +234,8 @@ def _read_json_or_parquet( files_per_partition=files_per_partition, add_filename=add_filename, input_meta=input_meta, + columns=columns, + **kwargs, ) dfs.append(df) @@ -245,6 +259,8 @@ def _read_json_or_parquet( files_per_partition=files_per_partition, add_filename=add_filename, input_meta=input_meta, + columns=columns, + **kwargs, ) else: diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index c41b5ea9..60987419 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -260,6 +260,8 @@ def read_single_partition( filetype="jsonl", add_filename=False, input_meta: Union[str, dict] = None, + columns: List[str] = None, + **kwargs, ) -> Union[cudf.DataFrame, pd.DataFrame]: """ This function reads a file with cuDF, sorts the columns of the DataFrame @@ -271,6 +273,7 @@ def read_single_partition( add_filename: Whether to add a "filename" column to the DataFrame. input_meta: A dictionary or a string formatted as a dictionary, which outlines the field names and their respective data types within the JSONL input file. + columns: If not None, only these columns will be read from the file. Returns: A cudf DataFrame or a pandas DataFrame. @@ -296,12 +299,14 @@ def read_single_partition( read_kwargs["dtype"] = ( ast.literal_eval(input_meta) if type(input_meta) == str else input_meta ) + elif filetype == "parquet": - read_kwargs = {} + read_kwargs = {"columns": columns} if backend == "cudf": read_f = cudf.read_parquet else: read_f = pd.read_parquet + else: raise RuntimeError("Could not read data, please check file type") @@ -312,7 +317,7 @@ def read_single_partition( # cuDF supports reading multiple files at once read_files_one_at_a_time = False else: - # pandas does not support reading multiple files at once + # Pandas does not support reading multiple files at once read_files_one_at_a_time = True if read_files_one_at_a_time: @@ -322,31 +327,41 @@ def read_single_partition( concat_f = pd.concat df_ls = [] for file in files: - df = read_f(file, **read_kwargs) + df = read_f(file, **read_kwargs, **kwargs) if add_filename: df["filename"] = os.path.basename(file) df_ls.append(df) df = concat_f(df_ls, ignore_index=True) else: - df = read_f(files, **read_kwargs) + df = read_f(files, **read_kwargs, **kwargs) + + if filetype in ["jsonl", "json"] and columns is not None: + if add_filename and "filename" not in columns: + columns.append("filename") + df = df[columns] + df = df[sorted(df.columns)] return df -def read_pandas_pickle(file, add_filename=False) -> pd.DataFrame: +def read_pandas_pickle(file, add_filename=False, columns=None, **kwargs) -> pd.DataFrame: """ - This function reads a pickle file with pandas and adds a "filename" column. + This function reads a pickle file with Pandas. Args: file: The path to the pickle file to read. add_filename: Whether to add a "filename" column to the DataFrame. Returns: - A pandas DataFrame. + A Pandas DataFrame. """ if add_filename: warnings.warn("add_filename is not supported for pickle files") - return pd.read_pickle(file) + + if columns is not None: + return pd.read_pickle(file, **kwargs)[columns] + else: + return pd.read_pickle(file, **kwargs) def read_data( @@ -356,6 +371,8 @@ def read_data( files_per_partition: int = 1, add_filename: bool = False, input_meta: Union[str, dict] = None, + columns: List[str] = None, + **kwargs, ) -> Union[dd.DataFrame, dask_cudf.DataFrame]: """ This function can read multiple data formats and returns a Dask-cuDF DataFrame. @@ -368,6 +385,7 @@ def read_data( add_filename: Whether to add a "filename" column to the DataFrame. input_meta: A dictionary or a string formatted as a dictionary, which outlines the field names and their respective data types within the JSONL input file. + columns: If not None, only these columns will be read from the file. Returns: A Dask-cuDF or a Dask-pandas DataFrame. @@ -378,7 +396,7 @@ def read_data( test_obj = cudf.Series if file_type == "pickle": - df = read_pandas_pickle(input_files[0], add_filename=add_filename) + df = read_pandas_pickle(input_files[0], add_filename=add_filename, columns=columns, **kwargs) df = dd.from_pandas(df, npartitions=16) if backend == "cudf": df = df.to_backend("cudf") @@ -401,6 +419,8 @@ def read_data( add_filename=add_filename, input_meta=input_meta, enforce_metadata=False, + columns=columns, + **kwargs, ) else: raise RuntimeError("Could not read data, please check file type") From 8b9fbc3cae323e1f453a3f0b1b61061ecb3a39bb Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 18 Oct 2024 12:47:23 -0700 Subject: [PATCH 2/4] read_pickle and black Signed-off-by: Sarah Yurick --- nemo_curator/datasets/doc_dataset.py | 4 ++++ nemo_curator/utils/distributed_utils.py | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 5a39bf50..ce2f6a6b 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -88,6 +88,8 @@ def read_pickle( backend="pandas", files_per_partition=1, add_filename=False, + columns: List[str] = None, + **kwargs, ): return cls( read_data( @@ -96,6 +98,8 @@ def read_pickle( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + columns=columns, + **kwargs, ) ) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 60987419..ef87a72b 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -344,7 +344,9 @@ def read_single_partition( return df -def read_pandas_pickle(file, add_filename=False, columns=None, **kwargs) -> pd.DataFrame: +def read_pandas_pickle( + file, add_filename=False, columns=None, **kwargs +) -> pd.DataFrame: """ This function reads a pickle file with Pandas. @@ -396,7 +398,9 @@ def read_data( test_obj = cudf.Series if file_type == "pickle": - df = read_pandas_pickle(input_files[0], add_filename=add_filename, columns=columns, **kwargs) + df = read_pandas_pickle( + input_files[0], add_filename=add_filename, columns=columns, **kwargs + ) df = dd.from_pandas(df, npartitions=16) if backend == "cudf": df = df.to_backend("cudf") From f1ee67560fee6eec71a5224955d482847ab934e7 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 18 Oct 2024 14:14:32 -0700 Subject: [PATCH 3/4] optional param Signed-off-by: Sarah Yurick --- nemo_curator/datasets/doc_dataset.py | 8 ++++---- nemo_curator/utils/distributed_utils.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index ce2f6a6b..89f1e761 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -43,7 +43,7 @@ def read_json( files_per_partition: int = 1, add_filename: bool = False, input_meta: Union[str, dict] = None, - columns: List[str] = None, + columns: Optional[List[str]] = None, **kwargs, ): return cls( @@ -66,7 +66,7 @@ def read_parquet( backend="pandas", files_per_partition=1, add_filename=False, - columns: List[str] = None, + columns: Optional[List[str]] = None, **kwargs, ): return cls( @@ -88,7 +88,7 @@ def read_pickle( backend="pandas", files_per_partition=1, add_filename=False, - columns: List[str] = None, + columns: Optional[List[str]] = None, **kwargs, ): return cls( @@ -187,7 +187,7 @@ def _read_json_or_parquet( files_per_partition: int, add_filename: bool, input_meta: Union[str, dict] = None, - columns: List[str] = None, + columns: Optional[List[str]] = None, **kwargs, ): """ diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index ef87a72b..8f36139b 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -260,7 +260,7 @@ def read_single_partition( filetype="jsonl", add_filename=False, input_meta: Union[str, dict] = None, - columns: List[str] = None, + columns: Optional[List[str]] = None, **kwargs, ) -> Union[cudf.DataFrame, pd.DataFrame]: """ @@ -373,7 +373,7 @@ def read_data( files_per_partition: int = 1, add_filename: bool = False, input_meta: Union[str, dict] = None, - columns: List[str] = None, + columns: Optional[List[str]] = None, **kwargs, ) -> Union[dd.DataFrame, dask_cudf.DataFrame]: """ From 71aedc0fbb92a2f09f1c50d3a61a88d277cb70bd Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 21 Oct 2024 14:52:39 -0700 Subject: [PATCH 4/4] add parquet comment Signed-off-by: Sarah Yurick --- nemo_curator/utils/distributed_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 8f36139b..b1aabc5e 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -274,6 +274,7 @@ def read_single_partition( input_meta: A dictionary or a string formatted as a dictionary, which outlines the field names and their respective data types within the JSONL input file. columns: If not None, only these columns will be read from the file. + There is a significant performance gain when specifying columns for Parquet files. Returns: A cudf DataFrame or a pandas DataFrame. @@ -388,6 +389,7 @@ def read_data( input_meta: A dictionary or a string formatted as a dictionary, which outlines the field names and their respective data types within the JSONL input file. columns: If not None, only these columns will be read from the file. + There is a significant performance gain when specifying columns for Parquet files. Returns: A Dask-cuDF or a Dask-pandas DataFrame.