Skip to content

Commit

Permalink
keep_filename_column param
Browse files Browse the repository at this point in the history
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
  • Loading branch information
sarahyurick committed Oct 22, 2024
1 parent 4ad1a4d commit f080389
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
7 changes: 7 additions & 0 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def __len__(self):
def persist(self):
return DocumentDataset(self.df.persist())

def head(self, n=5):
return self.df.head(n)

@classmethod
def read_json(
cls,
Expand Down Expand Up @@ -95,6 +98,7 @@ def to_json(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
Expand All @@ -104,13 +108,15 @@ def to_json(
df=self.df,
output_file_dir=output_file_dir,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
output_type="jsonl",
)

def to_parquet(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
Expand All @@ -120,6 +126,7 @@ def to_parquet(
df=self.df,
output_file_dir=output_file_dir,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
output_type="parquet",
)

Expand Down
27 changes: 25 additions & 2 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,19 @@ def process_all_batches(
)


def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl"):
def single_partition_write_with_filename(
df,
output_file_dir,
keep_filename_column=False,
output_type="jsonl",
):
"""
This function processes a DataFrame and writes it to disk
Args:
df: A DataFrame.
output_file_dir: The output file path.
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
output_type="jsonl": The type of output file to write.
Returns:
If the DataFrame is non-empty, return a Series containing a single element, True.
Expand All @@ -500,12 +506,18 @@ def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl
filenames = df.filename.unique()
filenames = list(filenames.values_host) if is_cudf_type(df) else list(filenames)
num_files = len(filenames)

for filename in filenames:
out_df = df[df.filename == filename] if num_files > 1 else df
if not keep_filename_column:
out_df = out_df.drop("filename", axis=1)

filename = Path(filename).stem
output_file_path = os.path.join(output_file_dir, filename)

if output_type == "jsonl":
output_file_path = output_file_path + ".jsonl"

if isinstance(df, pd.DataFrame):
out_df.to_json(
output_file_path,
Expand All @@ -524,16 +536,24 @@ def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl
lines=True,
force_ascii=False,
)

elif output_type == "parquet":
output_file_path = output_file_path + ".parquet"
out_df.to_parquet(output_file_path)

else:
raise ValueError(f"Unknown output type: {output_type}")

return success_ser


def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jsonl"):
def write_to_disk(
df,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
output_type="jsonl",
):
"""
This function writes a Dask DataFrame to the specified file path.
If write_to_filename is True, then it expects the
Expand All @@ -543,6 +563,7 @@ def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jso
df: A Dask DataFrame.
output_file_dir: The output file path.
write_to_filename: Whether to write the filename using the "filename" column.
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
output_type="jsonl": The type of output file to write.
"""
Expand All @@ -563,11 +584,13 @@ def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jso
output = df.map_partitions(
single_partition_write_with_filename,
output_file_dir,
keep_filename_column=keep_filename_column,
output_type=output_type,
meta=output_meta,
enforce_metadata=False,
)
output = output.compute()

else:
if output_type == "jsonl":
if is_cudf_type(df):
Expand Down

0 comments on commit f080389

Please sign in to comment.