From 31c8cc0bee38dda8b0b64fa836c66760b3b7e212 Mon Sep 17 00:00:00 2001 From: Yury Tokpanov Date: Tue, 15 Oct 2024 17:23:26 +0000 Subject: [PATCH] Fix linter errors Signed-off-by: Yury Tokpanov --- .../zyda2-tutorial/0_processing/helper.py | 12 +- .../0_processing/process_dclm.py | 17 +-- .../0_processing/process_dolma_cc.py | 16 ++- .../0_processing/process_fwe2.py | 42 ++++--- .../0_processing/process_zyda.py | 18 +-- .../zyda2-tutorial/1_fuzzy_dedup/0_minhash.py | 25 ++-- .../zyda2-tutorial/1_fuzzy_dedup/1_lsh.py | 30 ++--- .../1_fuzzy_dedup/2_buckets_to_edges.py | 21 ++-- .../1_fuzzy_dedup/3_connected_components.py | 31 +++-- .../2_dupes_removal/0_id_mapping.py | 32 +++-- .../2_dupes_removal/1_id_conversion.py | 32 +++-- .../2_dupes_removal/2_compute_counts.py | 55 ++++---- .../2_dupes_removal/3_prep_dupes.py | 119 ++++++++++++------ .../2_dupes_removal/4_get_dupes_dclm.py | 69 ++++++---- .../2_dupes_removal/4_get_dupes_dolma-cc.py | 63 ++++++---- .../2_dupes_removal/5_get_dupes_zyda.py | 71 +++++++---- .../2_dupes_removal/remove_dupes.py | 41 ++++-- .../2_dupes_removal/run_remove_dupes_dclm.sh | 2 +- .../3_quality_model/run_quality_classifier.py | 22 ++-- .../3_quality_model/run_quality_classifier.sh | 2 +- .../zyda2-tutorial/4_filtering/filter_fwe.py | 14 ++- .../4_filtering/filter_quality.py | 49 ++++++-- tutorials/zyda2-tutorial/README.md | 6 +- 23 files changed, 492 insertions(+), 297 deletions(-) diff --git a/tutorials/zyda2-tutorial/0_processing/helper.py b/tutorials/zyda2-tutorial/0_processing/helper.py index cf448095..21b14805 100644 --- a/tutorials/zyda2-tutorial/0_processing/helper.py +++ b/tutorials/zyda2-tutorial/0_processing/helper.py @@ -1,19 +1,21 @@ import os -from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.datasets import DocumentDataset from nemo_curator import AddId +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.file_utils import get_all_files_paths_under def ensure_directory_exists(filename: str): - os.makedirs(os.path.dirname(filename), exist_ok = True) + os.makedirs(os.path.dirname(filename), exist_ok=True) def process_data(input_folder, output_folder, prefix, partition_size="512MB"): raw_files = get_all_files_paths_under(input_folder) raw_data = DocumentDataset.read_parquet(raw_files) - raw_data_rep = DocumentDataset(raw_data.df.repartition(partition_size=partition_size)) - add_id = AddId(id_field='nemo_id', id_prefix=prefix) + raw_data_rep = DocumentDataset( + raw_data.df.repartition(partition_size=partition_size) + ) + add_id = AddId(id_field="nemo_id", id_prefix=prefix) data_with_id = add_id(raw_data_rep) ensure_directory_exists(output_folder) data_with_id.to_parquet(output_folder) diff --git a/tutorials/zyda2-tutorial/0_processing/process_dclm.py b/tutorials/zyda2-tutorial/0_processing/process_dclm.py index c7566b11..bcfe6e06 100644 --- a/tutorials/zyda2-tutorial/0_processing/process_dclm.py +++ b/tutorials/zyda2-tutorial/0_processing/process_dclm.py @@ -1,11 +1,13 @@ import os + os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" +import logging + from dask.distributed import Client, LocalCluster from helper import process_data -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") INPUT_BASE = os.path.join(DATA_BASE, "raw/dclm-baseline-1.0-parquet/filtered") @@ -13,15 +15,15 @@ CPU_WORKERS = os.environ.get("CPU_WORKERS") -if __name__ == '__main__': +if __name__ == "__main__": logging.info("Starting Dask cluster") - cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit='48GB') + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB") client = Client(cluster) logging.info(client) components = [ "global-shard_01_of_10", - "global-shard_02_of_10", + "global-shard_02_of_10", "global-shard_03_of_10", "global-shard_04_of_10", "global-shard_05_of_10", @@ -44,7 +46,6 @@ prefix=f"dclm-gs{i}", ) logging.info("Done!") - + client.cluster.close() - client.shutdown() - \ No newline at end of file + client.shutdown() diff --git a/tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py b/tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py index 5416c25f..c4f9a287 100644 --- a/tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py +++ b/tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py @@ -1,11 +1,13 @@ import os + os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" +import logging + from dask.distributed import Client, LocalCluster from helper import process_data -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") INPUT_BASE = os.path.join(DATA_BASE, "raw/dolma-v1_7-cc-parquet") @@ -13,18 +15,14 @@ CPU_WORKERS = os.environ.get("CPU_WORKERS") -if __name__ == '__main__': +if __name__ == "__main__": logging.info("Starting Dask cluster") - cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit='48GB') + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB") client = Client(cluster) logging.info(client) logging.info(f"Processing Dolma-CC") - process_data( - input_folder=INPUT_BASE, - output_folder=OUTPUT_BASE, - prefix="dolma-cc" - ) + process_data(input_folder=INPUT_BASE, output_folder=OUTPUT_BASE, prefix="dolma-cc") logging.info("Done!") client.cluster.close() diff --git a/tutorials/zyda2-tutorial/0_processing/process_fwe2.py b/tutorials/zyda2-tutorial/0_processing/process_fwe2.py index cb14c6b4..a425a18e 100644 --- a/tutorials/zyda2-tutorial/0_processing/process_fwe2.py +++ b/tutorials/zyda2-tutorial/0_processing/process_fwe2.py @@ -1,15 +1,16 @@ import os -os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" -from dask.distributed import Client, LocalCluster -from helper import process_data +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" -import gc import ctypes +import gc +import logging from pathlib import Path -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from dask.distributed import Client, LocalCluster +from helper import process_data + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") INPUT_BASE = os.path.join(DATA_BASE, "raw/fineweb-edu-score-2/data") @@ -23,17 +24,26 @@ def trim_memory() -> int: def get_folder_size(folder_path): - return sum(file.stat().st_size for file in Path(folder_path).rglob('*') if file.is_file()) + return sum( + file.stat().st_size for file in Path(folder_path).rglob("*") if file.is_file() + ) def sort_folders_by_size(parent_directory): - folders = [f for f in os.listdir(parent_directory) if os.path.isdir(os.path.join(parent_directory, f))] - folder_sizes = [(folder, get_folder_size(os.path.join(parent_directory, folder))) for folder in folders] + folders = [ + f + for f in os.listdir(parent_directory) + if os.path.isdir(os.path.join(parent_directory, f)) + ] + folder_sizes = [ + (folder, get_folder_size(os.path.join(parent_directory, folder))) + for folder in folders + ] return sorted(folder_sizes, key=lambda x: x[1]) def bytes_to_human_readable(size_in_bytes): - suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] + suffixes = ["B", "KB", "MB", "GB", "TB", "PB"] suffix_index = 0 size = float(size_in_bytes) while size >= 1024 and suffix_index < len(suffixes) - 1: @@ -42,9 +52,9 @@ def bytes_to_human_readable(size_in_bytes): return f"{size:.2f} {suffixes[suffix_index]}" -if __name__ == '__main__': +if __name__ == "__main__": logging.info("Starting Dask cluster") - cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit='240GB') + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="240GB") client = Client(cluster) logging.info(client) @@ -55,16 +65,18 @@ def bytes_to_human_readable(size_in_bytes): if not os.path.exists(input_path) or not os.path.isdir(input_path): continue output_path = os.path.join(OUTPUT_BASE, component) - logging.info(f"Processing {component}, size = {bytes_to_human_readable(component_size)}") + logging.info( + f"Processing {component}, size = {bytes_to_human_readable(component_size)}" + ) process_data( input_folder=input_path, output_folder=output_path, - prefix=f"fwe2-{component}" + prefix=f"fwe2-{component}", ) logging.info("Trimming memory") gc.collect() client.run(trim_memory) logging.info("Done!") - + client.cluster.close() client.shutdown() diff --git a/tutorials/zyda2-tutorial/0_processing/process_zyda.py b/tutorials/zyda2-tutorial/0_processing/process_zyda.py index 8584a0d1..6cc951cd 100644 --- a/tutorials/zyda2-tutorial/0_processing/process_zyda.py +++ b/tutorials/zyda2-tutorial/0_processing/process_zyda.py @@ -1,11 +1,13 @@ import os + os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" +import logging + from dask.distributed import Client, LocalCluster from helper import process_data -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") INPUT_BASE = os.path.join(DATA_BASE, "raw/data/zyda_no_starcoder") @@ -13,14 +15,14 @@ CPU_WORKERS = os.environ.get("CPU_WORKERS") -if __name__ == '__main__': +if __name__ == "__main__": logging.info("Starting Dask cluster") - cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit='48GB') + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB") client = Client(cluster) logging.info(client) components = [ - "zyda_arxiv", + "zyda_arxiv", "zyda_peS2o", "zyda_pile-uncopyrighted", "zyda_slimpajama", @@ -35,11 +37,9 @@ output_path = os.path.join(OUTPUT_BASE, component) logging.info(f"Processing {component}") process_data( - input_folder=input_path, - output_folder=output_path, - prefix=component + input_folder=input_path, output_folder=output_path, prefix=component ) logging.info("Done!") - + client.cluster.close() client.shutdown() diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py index 81d5c89d..d3c73252 100644 --- a/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py @@ -1,17 +1,18 @@ import os + os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" +import logging +import time + import dask_cudf +from nemo_curator import MinHash +from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client, get_num_workers from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.datasets import DocumentDataset -from nemo_curator import MinHash -import time - -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) def read_folder(input_folder, columns=["nemo_id", "text"]): @@ -30,16 +31,16 @@ def read_folder(input_folder, columns=["nemo_id", "text"]): SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") -if __name__ == '__main__': +if __name__ == "__main__": client = get_client(scheduler_file=SCHEDULER_FILE) logging.info(f"Number of dask workers: {get_num_workers(client)}") minhash_base_output_path = os.path.join(DATA_BASE, "fuzzy/minhash") - minhash_output_dir = os.path.join(minhash_base_output_path, 'data') + minhash_output_dir = os.path.join(minhash_base_output_path, "data") # Relevant parameters - minhash_id_field = 'nemo_id' - minhash_text_field = 'text' + minhash_id_field = "nemo_id" + minhash_text_field = "text" seed = 10 minhash_length = 128 char_ngram = 25 @@ -48,7 +49,7 @@ def read_folder(input_folder, columns=["nemo_id", "text"]): # Reading all the data text_ddf = read_folder( input_folder=os.path.join(DATA_BASE, "processed"), - columns=[minhash_id_field, minhash_text_field] + columns=[minhash_id_field, minhash_text_field], ) # Computing minhashes @@ -60,7 +61,7 @@ def read_folder(input_folder, columns=["nemo_id", "text"]): use_64bit_hash=use_64bit_hash, id_field=minhash_id_field, text_field=minhash_text_field, - cache_dir=minhash_output_dir + cache_dir=minhash_output_dir, ) res = minhasher(DocumentDataset(text_ddf)).df logging.info(f"Time taken for MinHash: {time.time()-t0:.2f}sec.") diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py index 603365c1..b574c1e8 100644 --- a/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py @@ -1,43 +1,43 @@ import os -os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" - -from nemo_curator.utils.distributed_utils import get_client,get_num_workers -from nemo_curator.datasets import DocumentDataset -from nemo_curator import LSH -from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import \ - convert_str_id_to_int +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" +import logging import time + import cudf import dask_cudf import numpy as np -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator import LSH +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.distributed_utils import get_client, get_num_workers +from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") -if __name__ == '__main__': +if __name__ == "__main__": client = get_client(scheduler_file=SCHEDULER_FILE) logging.info(f"Number of dask workers: {get_num_workers(client)}") minhash_base_output_path = os.path.join(DATA_BASE, "fuzzy/minhash") - minhash_output_dir = os.path.join(minhash_base_output_path, 'data') + minhash_output_dir = os.path.join(minhash_base_output_path, "data") # Input lsh_input_data_path = minhash_output_dir # Output lsh_base_output_path = os.path.join(DATA_BASE, "fuzzy/lsh") - lsh_output_dir = os.path.join(lsh_base_output_path, 'data') + lsh_output_dir = os.path.join(lsh_base_output_path, "data") # Relevant parameters - lsh_id_field = 'nemo_id' - minhash_field = '_minhash_signature' + lsh_id_field = "nemo_id" + minhash_field = "_minhash_signature" minhash_length = 128 num_bands = 8 buckets_per_shuffle = 8 @@ -46,7 +46,7 @@ # Load MinHash output logging.info("Converting ids") - df = dask_cudf.read_parquet(lsh_input_data_path, backend = "cudf") + df = dask_cudf.read_parquet(lsh_input_data_path, backend="cudf") df = df.map_partitions( convert_str_id_to_int, id_column=lsh_id_field, diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py index 68e904b8..2ae7f408 100644 --- a/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py @@ -1,29 +1,32 @@ import os -os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" - -from nemo_curator.utils.distributed_utils import get_client, get_num_workers -from nemo_curator.datasets import DocumentDataset -from nemo_curator.modules.fuzzy_dedup import BucketsToEdges +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" +import logging import time + import dask_cudf -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator.datasets import DocumentDataset +from nemo_curator.modules.fuzzy_dedup import BucketsToEdges +from nemo_curator.utils.distributed_utils import get_client, get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") -if __name__ == '__main__': +if __name__ == "__main__": client = get_client(scheduler_file=SCHEDULER_FILE) logging.info(f"Number of dask workers: {get_num_workers(client)}") # Input lsh_base_output_path = os.path.join(DATA_BASE, "fuzzy/lsh") - lsh_buckets_output_path = os.path.join(lsh_base_output_path, "data/_buckets.parquet") + lsh_buckets_output_path = os.path.join( + lsh_base_output_path, "data/_buckets.parquet" + ) # Output buckets_to_edges_out = os.path.join(DATA_BASE, "fuzzy/buckets_to_edges/data") diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py index caa6c1e6..67796ec4 100644 --- a/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py @@ -1,35 +1,42 @@ import os -os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" -from nemo_curator.utils.distributed_utils import get_client, get_num_workers -from nemo_curator.modules.fuzzy_dedup import ConnectedComponents +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" +import logging import time -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator.modules.fuzzy_dedup import ConnectedComponents +from nemo_curator.utils.distributed_utils import get_client, get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") -if __name__ == '__main__': +if __name__ == "__main__": client = get_client(scheduler_file=SCHEDULER_FILE) logging.info(f"Number of dask workers: {get_num_workers(client)}") # Input - buckets_to_edges_out = os.path.join(DATA_BASE, "fuzzy/buckets_to_edges/data/_edges.parquet") + buckets_to_edges_out = os.path.join( + DATA_BASE, "fuzzy/buckets_to_edges/data/_edges.parquet" + ) # Output connected_component_base_output_path = os.path.join(DATA_BASE, "fuzzy/cc") - connected_component_output_path = os.path.join(connected_component_base_output_path, "connected_components.parquet") - connected_component_cache_dir = os.path.join(connected_component_base_output_path, "cache") + connected_component_output_path = os.path.join( + connected_component_base_output_path, "connected_components.parquet" + ) + connected_component_cache_dir = os.path.join( + connected_component_base_output_path, "cache" + ) # Relevant parameters - input_id_field = 'id' - + input_id_field = "id" + t0 = time.time() - + components_stage = ConnectedComponents( cache_dir=connected_component_cache_dir, jaccard_pairs_path=buckets_to_edges_out, diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/0_id_mapping.py b/tutorials/zyda2-tutorial/2_dupes_removal/0_id_mapping.py index 4086d884..b2f95e25 100644 --- a/tutorials/zyda2-tutorial/2_dupes_removal/0_id_mapping.py +++ b/tutorials/zyda2-tutorial/2_dupes_removal/0_id_mapping.py @@ -1,37 +1,41 @@ import os -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" -from nemo_curator.utils.distributed_utils import get_client, get_num_workers +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import json +import logging import cudf import dask_cudf import tqdm -import json -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator.utils.distributed_utils import get_client, get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) def list_deepest_folders(path): deepest_folders = [] - + # Walk through the directory structure - for root, dirs, files in os.walk(path, topdown=False): # `topdown=False` starts from the deepest folder + for root, dirs, files in os.walk( + path, topdown=False + ): # `topdown=False` starts from the deepest folder if not dirs: # Keep only folders that don't have subdirectories deepest_folders.append(root) - + return deepest_folders def read_data_subset(dir_paths: list): """ - Reads 1 row from each dataset assuming each file has a unique dataset prefix + Reads 1 row from each dataset assuming each file has a unique dataset prefix """ dfs = [] for dir_path in tqdm.tqdm(dir_paths): file_name = sorted([x for x in os.listdir(dir_path) if ".parquet" in x])[0] file_path = os.path.join(dir_path, file_name) - x = dask_cudf.read_parquet(file_path).head(1) # read 1 rows from each file + x = dask_cudf.read_parquet(file_path).head(1) # read 1 rows from each file dfs.append(x) x = cudf.concat(dfs) return x @@ -49,7 +53,9 @@ def generate_mapping(x: cudf.DataFrame): def convert_cc_ids(cc_df: dask_cudf.DataFrame, doc_id_mapping: dict, pad_width=10): - cc_df["doc_id"] = cc_df["doc_id"].astype(str).str.pad(width=pad_width, side="left", fillchar="0") + cc_df["doc_id"] = ( + cc_df["doc_id"].astype(str).str.pad(width=pad_width, side="left", fillchar="0") + ) cc_df["dataset_id"] = cc_df.dataset_id.astype(str).replace(doc_id_mapping) cc_df["original_id"] = cc_df.dataset_id + "-" + cc_df.doc_id return cc_df[["original_id", "group"]] @@ -66,7 +72,7 @@ def convert_cc_ids(cc_df: dask_cudf.DataFrame, doc_id_mapping: dict, pad_width=1 all_folders = sorted(list_deepest_folders(os.path.join(DATA_BASE, "processed"))) df_subset = read_data_subset(all_folders) - dataset_id_mapping= generate_mapping(df_subset) + dataset_id_mapping = generate_mapping(df_subset) with open(ID_MAPPING, "w") as f: - f.write(json.dumps(dataset_id_mapping)) + f.write(json.dumps(dataset_id_mapping)) diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/1_id_conversion.py b/tutorials/zyda2-tutorial/2_dupes_removal/1_id_conversion.py index 82682dea..82b37cb3 100644 --- a/tutorials/zyda2-tutorial/2_dupes_removal/1_id_conversion.py +++ b/tutorials/zyda2-tutorial/2_dupes_removal/1_id_conversion.py @@ -1,14 +1,16 @@ -import os import json -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" +import os -from nemo_curator.utils.distributed_utils import get_num_workers +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging import dask.dataframe as dd from dask.distributed import Client, LocalCluster -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator.utils.distributed_utils import get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") @@ -41,13 +43,23 @@ else: print(f"Unknown value {val} for key {key}") - def convert_cc_ids(cc_df, dataset_id_mapping, global_dataset_id_mapping, doc_id_len=10): - cc_df["global_dataset_id"] = cc_df.dataset_id.astype(str).replace(global_dataset_id_mapping) + def convert_cc_ids( + cc_df, dataset_id_mapping, global_dataset_id_mapping, doc_id_len=10 + ): + cc_df["global_dataset_id"] = cc_df.dataset_id.astype(str).replace( + global_dataset_id_mapping + ) cc_df["dataset_id"] = cc_df.dataset_id.astype(str).replace(dataset_id_mapping) - cc_df["doc_id"] = cc_df["doc_id"].astype(str).str.pad(width=doc_id_len, side="left", fillchar="0") + cc_df["doc_id"] = ( + cc_df["doc_id"] + .astype(str) + .str.pad(width=doc_id_len, side="left", fillchar="0") + ) cc_df["original_id"] = cc_df.dataset_id + "-" + cc_df.doc_id return cc_df[["global_dataset_id", "dataset_id", "original_id", "group"]] - - cc_df_converted = convert_cc_ids(cc_df, dataset_id_mapping, global_dataset_id_mapping) + + cc_df_converted = convert_cc_ids( + cc_df, dataset_id_mapping, global_dataset_id_mapping + ) cc_df_converted.to_parquet(CC_CONVERTED_FOLDER, overwrite=True, write_index=False) logging.info("Done converting!") diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/2_compute_counts.py b/tutorials/zyda2-tutorial/2_dupes_removal/2_compute_counts.py index e0885d8a..c59d0f89 100644 --- a/tutorials/zyda2-tutorial/2_dupes_removal/2_compute_counts.py +++ b/tutorials/zyda2-tutorial/2_dupes_removal/2_compute_counts.py @@ -1,23 +1,27 @@ +import json import os import time -import json -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" -from nemo_curator.utils.distributed_utils import get_num_workers +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging import dask.dataframe as dd import pandas as pd from dask.distributed import Client, LocalCluster -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator.utils.distributed_utils import get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") CC_FOLDER = os.path.join(CC_BASE, "connected_components.parquet") CC_CONVERTED_FOLDER = os.path.join(CC_BASE, "connected_components_converted.parquet") CC_GROUPED_FOLDER = os.path.join(CC_BASE, "connected_components_grouped.parquet") -CC_GROUPED_COUNTS_FOLDER = os.path.join(CC_BASE, "connected_components_grouped_counts.parquet") +CC_GROUPED_COUNTS_FOLDER = os.path.join( + CC_BASE, "connected_components_grouped_counts.parquet" +) CPU_WORKERS = os.environ.get("CPU_WORKERS") @@ -32,20 +36,28 @@ t0 = time.time() def group_partition(partition): - sizes = partition.groupby('group').size().reset_index() - - grouped = partition.groupby('group').agg({ - 'global_dataset_id': lambda x: json.dumps(list(x)), - 'dataset_id': lambda x: json.dumps(list(x)), - 'original_id': lambda x: json.dumps(list(x)), - }).reset_index() - - result = pd.merge(sizes, grouped, on='group') - - return result[['group', 'global_dataset_id', 'dataset_id', 'original_id', 'size']] + sizes = partition.groupby("group").size().reset_index() + + grouped = ( + partition.groupby("group") + .agg( + { + "global_dataset_id": lambda x: json.dumps(list(x)), + "dataset_id": lambda x: json.dumps(list(x)), + "original_id": lambda x: json.dumps(list(x)), + } + ) + .reset_index() + ) + + result = pd.merge(sizes, grouped, on="group") + + return result[ + ["group", "global_dataset_id", "dataset_id", "original_id", "size"] + ] meta = { - "group": int, + "group": int, "global_dataset_id": str, "dataset_id": str, "original_id": str, @@ -61,11 +73,13 @@ def group_partition(partition): def count_occurrences_in_partition(partition): for id in global_dataset_ids: - partition[id] = partition["global_dataset_id"].apply(lambda x: json.loads(x).count(id)) + partition[id] = partition["global_dataset_id"].apply( + lambda x: json.loads(x).count(id) + ) return partition meta = { - "group": "int", + "group": "int", "global_dataset_id": "str", "dataset_id": "str", "original_id": "str", @@ -79,5 +93,4 @@ def count_occurrences_in_partition(partition): ) cc_grouped_counts_df.to_parquet(CC_GROUPED_COUNTS_FOLDER, overwrite=True) - logging.info(f"Done computing counts in {time.time() - t0:.2f} sec") diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/3_prep_dupes.py b/tutorials/zyda2-tutorial/2_dupes_removal/3_prep_dupes.py index 63ffebc1..d9d30f36 100644 --- a/tutorials/zyda2-tutorial/2_dupes_removal/3_prep_dupes.py +++ b/tutorials/zyda2-tutorial/2_dupes_removal/3_prep_dupes.py @@ -1,25 +1,31 @@ -import os import json -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" +import os -from nemo_curator.utils.distributed_utils import get_num_workers -from nemo_curator.utils.module_utils import count_digits +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging import dask.dataframe as dd import pandas as pd from dask.distributed import Client, LocalCluster -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator.utils.distributed_utils import get_num_workers +from nemo_curator.utils.module_utils import count_digits + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") RAW_DATA_BASE = os.path.join(DATA_BASE, "processed") CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") CC_FOLDER = os.path.join(CC_BASE, "connected_components.parquet") -CC_GROUPED_COUNTS_FOLDER = os.path.join(CC_BASE, "connected_components_grouped_counts.parquet") +CC_GROUPED_COUNTS_FOLDER = os.path.join( + CC_BASE, "connected_components_grouped_counts.parquet" +) DUPES_BASE = os.path.join(CC_BASE, "dupes") -DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join(DUPES_BASE, "dupes_ids_grouped_in_columns.parquet") +DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join( + DUPES_BASE, "dupes_ids_grouped_in_columns.parquet" +) CPU_WORKERS = os.environ.get("CPU_WORKERS") @@ -28,14 +34,13 @@ cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) client = Client(cluster) logging.info(f"Number of dask workers: {get_num_workers(client)}") - + paths = { "dclm": os.path.join(RAW_DATA_BASE, "dclm-baseline-1.0-parquet/filtered"), "dolma-cc": os.path.join(RAW_DATA_BASE, "dolma-v1_7-cc-parquet"), "fwe2": os.path.join(RAW_DATA_BASE, "fineweb-edu-score-2/data"), "zyda": os.path.join(RAW_DATA_BASE, "data/zyda_no_starcoder"), - } - + } dclm_id2dir = { "gs1": "global-shard_01_of_10", @@ -52,52 +57,75 @@ dclm_dir2id = {} for key, val in dclm_id2dir.items(): - dclm_dir2id[val] = key - + dclm_dir2id[val] = key # Counting digits dclm_digits = {} for dir in sorted(os.listdir(paths["dclm"])): - files = [x for x in os.listdir(os.path.join(paths["dclm"], dir)) if ".parquet" in x] + files = [ + x for x in os.listdir(os.path.join(paths["dclm"], dir)) if ".parquet" in x + ] dclm_digits[dclm_dir2id[dir]] = count_digits(len(files)) - dolma_digits = count_digits(len([x for x in os.listdir(paths["dolma-cc"]) if ".parquet" in x])) + dolma_digits = count_digits( + len([x for x in os.listdir(paths["dolma-cc"]) if ".parquet" in x]) + ) zyda_digits = {} for dir in sorted(os.listdir(paths["zyda"])): - files = [x for x in os.listdir(os.path.join(paths["zyda"], dir)) if ".parquet" in x] + files = [ + x for x in os.listdir(os.path.join(paths["zyda"], dir)) if ".parquet" in x + ] zyda_digits[dir] = count_digits(len(files)) fwe2_digits = {} for dir in sorted(os.listdir(paths["fwe2"])): - files = [x for x in os.listdir(os.path.join(paths["fwe2"], dir)) if ".parquet" in x] + files = [ + x for x in os.listdir(os.path.join(paths["fwe2"], dir)) if ".parquet" in x + ] fwe2_digits[dir] = count_digits(len(files)) - - cc_grouped_counts_df = dd.read_parquet(CC_GROUPED_COUNTS_FOLDER, split_row_groups=False) - cc_grouped_counts_filtered_df = cc_grouped_counts_df[cc_grouped_counts_df["size"] > 1] - - cc_groups_counts_inter_df = cc_grouped_counts_filtered_df[cc_grouped_counts_filtered_df["size"] != cc_grouped_counts_filtered_df["dclm"]] - cc_groups_counts_inter_df = cc_groups_counts_inter_df[cc_groups_counts_inter_df["size"] != cc_groups_counts_inter_df["fwe2"]] - cc_groups_counts_inter_df = cc_groups_counts_inter_df[cc_groups_counts_inter_df["size"] != cc_groups_counts_inter_df["dolma-cc"]] - cc_groups_counts_inter_df = cc_groups_counts_inter_df[cc_groups_counts_inter_df["size"] != cc_groups_counts_inter_df["zyda"]] - + cc_grouped_counts_df = dd.read_parquet( + CC_GROUPED_COUNTS_FOLDER, split_row_groups=False + ) + cc_grouped_counts_filtered_df = cc_grouped_counts_df[ + cc_grouped_counts_df["size"] > 1 + ] + + cc_groups_counts_inter_df = cc_grouped_counts_filtered_df[ + cc_grouped_counts_filtered_df["size"] != cc_grouped_counts_filtered_df["dclm"] + ] + cc_groups_counts_inter_df = cc_groups_counts_inter_df[ + cc_groups_counts_inter_df["size"] != cc_groups_counts_inter_df["fwe2"] + ] + cc_groups_counts_inter_df = cc_groups_counts_inter_df[ + cc_groups_counts_inter_df["size"] != cc_groups_counts_inter_df["dolma-cc"] + ] + cc_groups_counts_inter_df = cc_groups_counts_inter_df[ + cc_groups_counts_inter_df["size"] != cc_groups_counts_inter_df["zyda"] + ] def select_dupes(partition): # Removes all overlaps with fwe2 partition_fwe2 = partition[partition["fwe2"] > 0] - partition_fwe2["dupes_to_remove"] = partition_fwe2["original_id"].apply(lambda x: json.dumps([id for id in json.loads(x) if "fwe2" not in id])) + partition_fwe2["dupes_to_remove"] = partition_fwe2["original_id"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "fwe2" not in id]) + ) # Removes all overlaps with fwe2 (after fwe2 overlaps are removed) partition_dclm = partition[partition["fwe2"] == 0] partition_dclm = partition_dclm[partition_dclm["dclm"] > 0] - partition_dclm["dupes_to_remove"] = partition_dclm["original_id"].apply(lambda x: json.dumps([id for id in json.loads(x) if "dclm" not in id])) + partition_dclm["dupes_to_remove"] = partition_dclm["original_id"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "dclm" not in id]) + ) # Removes all overlaps with zyda (after dclm, fwe2 overlaps are removed) partition_zyda = partition[partition["fwe2"] == 0] partition_zyda = partition_zyda[partition_zyda["dclm"] == 0] partition_zyda = partition_zyda[partition_zyda["zyda"] > 0] - partition_zyda["dupes_to_remove"] = partition_zyda["original_id"].apply(lambda x: json.dumps([id for id in json.loads(x) if "zyda" not in id])) + partition_zyda["dupes_to_remove"] = partition_zyda["original_id"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "zyda" not in id]) + ) return pd.concat([partition_dclm, partition_fwe2, partition_zyda]) @@ -115,13 +143,30 @@ def select_dupes(partition): } dupes_df = cc_groups_counts_inter_df.map_partitions(select_dupes, meta=meta) - def group_dupes(partition): - partition["dclm_dupes"] = partition["dupes_to_remove"].apply(lambda x: json.dumps([id for id in json.loads(x) if "dclm" in id])) - partition["zyda_dupes"] = partition["dupes_to_remove"].apply(lambda x: json.dumps([id for id in json.loads(x) if "zyda" in id])) - partition["dolma_dupes"] = partition["dupes_to_remove"].apply(lambda x: json.dumps([id for id in json.loads(x) if "dolma" in id])) - - return partition[["group", "size", "dclm", "fwe2", "dolma-cc", "zyda", "dclm_dupes", "zyda_dupes", "dolma_dupes"]] + partition["dclm_dupes"] = partition["dupes_to_remove"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "dclm" in id]) + ) + partition["zyda_dupes"] = partition["dupes_to_remove"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "zyda" in id]) + ) + partition["dolma_dupes"] = partition["dupes_to_remove"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "dolma" in id]) + ) + + return partition[ + [ + "group", + "size", + "dclm", + "fwe2", + "dolma-cc", + "zyda", + "dclm_dupes", + "zyda_dupes", + "dolma_dupes", + ] + ] meta = { "group": int, @@ -136,4 +181,6 @@ def group_dupes(partition): } grouped_dupes_df = dupes_df.map_partitions(group_dupes, meta=meta) - grouped_dupes_df.to_parquet(DUPES_IDS_GROUPED_IN_COLUMNS, write_index=False, overwrite=True) + grouped_dupes_df.to_parquet( + DUPES_IDS_GROUPED_IN_COLUMNS, write_index=False, overwrite=True + ) diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dclm.py b/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dclm.py index ed3dae79..6de77cd3 100644 --- a/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dclm.py +++ b/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dclm.py @@ -1,22 +1,26 @@ -import os import json -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" +import os -from nemo_curator.utils.distributed_utils import get_num_workers -from nemo_curator.utils.module_utils import count_digits +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging import dask.dataframe as dd from dask.distributed import Client, LocalCluster -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator.utils.distributed_utils import get_num_workers +from nemo_curator.utils.module_utils import count_digits + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") RAW_DATA_BASE = os.path.join(DATA_BASE, "processed") CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") DUPES_BASE = os.path.join(CC_BASE, "dupes") -DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join(DUPES_BASE, "dupes_ids_grouped_in_columns.parquet") +DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join( + DUPES_BASE, "dupes_ids_grouped_in_columns.parquet" +) DCLM_EXPLODED = os.path.join(DUPES_BASE, "dupes_dclm_exploded.parquet") DUPES_DCLM_TO_REMOVE = os.path.join(DUPES_BASE, "dupes_dclm_to_remove.jsonl") @@ -28,14 +32,13 @@ cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) client = Client(cluster) logging.info(f"Number of dask workers: {get_num_workers(client)}") - + paths = { "dclm": os.path.join(RAW_DATA_BASE, "dclm-baseline-1.0-parquet/filtered"), "dolma-cc": os.path.join(RAW_DATA_BASE, "dolma-v1_7-cc-parquet"), "fwe2": os.path.join(RAW_DATA_BASE, "fineweb-edu-score-2/data"), "zyda": os.path.join(RAW_DATA_BASE, "data/zyda_no_starcoder"), - } - + } dclm_id2dir = { "gs1": "global-shard_01_of_10", @@ -52,31 +55,36 @@ dclm_dir2id = {} for key, val in dclm_id2dir.items(): - dclm_dir2id[val] = key - + dclm_dir2id[val] = key # Counting digits dclm_digits = {} for dir in sorted(os.listdir(paths["dclm"])): - files = [x for x in os.listdir(os.path.join(paths["dclm"], dir)) if ".parquet" in x] + files = [ + x for x in os.listdir(os.path.join(paths["dclm"], dir)) if ".parquet" in x + ] dclm_digits[dclm_dir2id[dir]] = count_digits(len(files)) # Processing DCLM dupes - grouped_dupes_df = dd.read_parquet(DUPES_IDS_GROUPED_IN_COLUMNS, split_row_groups=False) + grouped_dupes_df = dd.read_parquet( + DUPES_IDS_GROUPED_IN_COLUMNS, split_row_groups=False + ) dclm_df = grouped_dupes_df[grouped_dupes_df["dclm_dupes"] != "[]"] def decode_and_explode(partition, column): # Decode JSON strings to lists - partition['id_list'] = partition[column].apply(json.loads) + partition["id_list"] = partition[column].apply(json.loads) # Explode the lists - return partition.explode('id_list')[["group", "id_list"]] + return partition.explode("id_list")[["group", "id_list"]] meta = { "group": int, "id_list": str, } - dclm_exploded_df = dclm_df.map_partitions(decode_and_explode, "dclm_dupes", meta=meta).reset_index(drop=True) - dclm_exploded_df = dclm_exploded_df.rename(columns={'id_list': 'id'}) + dclm_exploded_df = dclm_df.map_partitions( + decode_and_explode, "dclm_dupes", meta=meta + ).reset_index(drop=True) + dclm_exploded_df = dclm_exploded_df.rename(columns={"id_list": "id"}) def split_id(df, id_column="id"): dx = df[id_column].str.rsplit("-", n=1, expand=True) @@ -96,8 +104,12 @@ def split_id(df, id_column="id"): dclm_exploded_df = dclm_exploded_df.map_partitions(split_id, meta=meta) def split_doc_id(df): - df["row"] = df.apply(lambda x: int(x["doc_id"][:-dclm_digits[x["folder"]]]), axis=1) - df["partition"] = df.apply(lambda x: int(x["doc_id"][-dclm_digits[x["folder"]]:]), axis=1) + df["row"] = df.apply( + lambda x: int(x["doc_id"][: -dclm_digits[x["folder"]]]), axis=1 + ) + df["partition"] = df.apply( + lambda x: int(x["doc_id"][-dclm_digits[x["folder"]] :]), axis=1 + ) return df meta = { @@ -113,9 +125,10 @@ def split_doc_id(df): dclm_exploded_df.to_parquet(DCLM_EXPLODED, write_index=False, overwrite=True) dclm_exploded_df = dd.read_parquet(DCLM_EXPLODED, split_row_groups=False) + def write_group_to_jsonl(group): folder_id, partition = group.name - + zipped = zip(list(group["row"]), list(group["group"]), list(group["id"])) zipped = sorted(zipped, key=lambda x: x[0]) @@ -129,9 +142,13 @@ def write_group_to_jsonl(group): } # Writing rows - file_path = os.path.join(DUPES_DCLM_TO_REMOVE, dclm_id2dir[folder_id], f"{partition}.jsonl") + file_path = os.path.join( + DUPES_DCLM_TO_REMOVE, dclm_id2dir[folder_id], f"{partition}.jsonl" + ) os.makedirs(os.path.dirname(file_path), exist_ok=True) - with open(file_path, 'w') as f: - f.write(json.dumps(partition_dict)) - - dclm_exploded_df.groupby(['folder', 'partition']).apply(write_group_to_jsonl, meta=()).compute() + with open(file_path, "w") as f: + f.write(json.dumps(partition_dict)) + + dclm_exploded_df.groupby(["folder", "partition"]).apply( + write_group_to_jsonl, meta=() + ).compute() diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dolma-cc.py b/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dolma-cc.py index ba265cad..f6749c57 100644 --- a/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dolma-cc.py +++ b/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dolma-cc.py @@ -1,22 +1,26 @@ -import os import json -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" +import os -from nemo_curator.utils.distributed_utils import get_num_workers -from nemo_curator.utils.module_utils import count_digits +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging import dask.dataframe as dd from dask.distributed import Client, LocalCluster -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator.utils.distributed_utils import get_num_workers +from nemo_curator.utils.module_utils import count_digits + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") RAW_DATA_BASE = os.path.join(DATA_BASE, "processed") CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") DUPES_BASE = os.path.join(CC_BASE, "dupes") -DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join(DUPES_BASE, "dupes_ids_grouped_in_columns.parquet") +DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join( + DUPES_BASE, "dupes_ids_grouped_in_columns.parquet" +) DOLMA_EXPLODED = os.path.join(DUPES_BASE, "dupes_dolma_exploded.parquet") DUPES_DOLMA_TO_REMOVE = os.path.join(DUPES_BASE, "dupes_dolma_to_remove.jsonl") @@ -28,34 +32,40 @@ cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) client = Client(cluster) logging.info(f"Number of dask workers: {get_num_workers(client)}") - + paths = { "dclm": os.path.join(RAW_DATA_BASE, "dclm-baseline-1.0-parquet/filtered"), "dolma-cc": os.path.join(RAW_DATA_BASE, "dolma-v1_7-cc-parquet"), "fwe2": os.path.join(RAW_DATA_BASE, "fineweb-edu-score-2/data"), "zyda": os.path.join(RAW_DATA_BASE, "data/zyda_no_starcoder"), - } - + } # Counting digits - dolma_digits = count_digits(len([x for x in os.listdir(paths["dolma-cc"]) if ".parquet" in x])) - + dolma_digits = count_digits( + len([x for x in os.listdir(paths["dolma-cc"]) if ".parquet" in x]) + ) # Processing DCLM dupes - grouped_dupes_df = dd.read_parquet(DUPES_IDS_GROUPED_IN_COLUMNS, split_row_groups=False) - dolma_df = grouped_dupes_df[grouped_dupes_df["dolma_dupes"] != "[]"][["group", "size", "dclm", "fwe2", "zyda", "dolma-cc", "dolma_dupes"]] + grouped_dupes_df = dd.read_parquet( + DUPES_IDS_GROUPED_IN_COLUMNS, split_row_groups=False + ) + dolma_df = grouped_dupes_df[grouped_dupes_df["dolma_dupes"] != "[]"][ + ["group", "size", "dclm", "fwe2", "zyda", "dolma-cc", "dolma_dupes"] + ] def decode_and_explode(partition, column): - partition['id_list'] = partition[column].apply(json.loads) - return partition.explode('id_list')[["group", "id_list"]] + partition["id_list"] = partition[column].apply(json.loads) + return partition.explode("id_list")[["group", "id_list"]] # Step 2: Apply the function and reset the index meta = { "group": int, "id_list": str, } - dolma_exploded_df = dolma_df.map_partitions(decode_and_explode, "dolma_dupes", meta=meta).reset_index(drop=True) - dolma_exploded_df = dolma_exploded_df.rename(columns={'id_list': 'id'}) + dolma_exploded_df = dolma_df.map_partitions( + decode_and_explode, "dolma_dupes", meta=meta + ).reset_index(drop=True) + dolma_exploded_df = dolma_exploded_df.rename(columns={"id_list": "id"}) def split_id(df, id_column="id"): dx = df[id_column].str.rsplit("-", n=1, expand=True) @@ -73,7 +83,7 @@ def split_id(df, id_column="id"): def split_doc_id(df): df["row"] = df.apply(lambda x: int(x["doc_id"][:-dolma_digits]), axis=1) - df["partition"] = df.apply(lambda x: int(x["doc_id"][-dolma_digits:]), axis=1) + df["partition"] = df.apply(lambda x: int(x["doc_id"][-dolma_digits:]), axis=1) return df meta = { @@ -88,10 +98,10 @@ def split_doc_id(df): dolma_exploded_df.to_parquet(DOLMA_EXPLODED, write_index=False, overwrite=True) dolma_exploded_df = dd.read_parquet(DOLMA_EXPLODED, split_row_groups=False) - + def write_group_to_jsonl(group): partition = group.name - + zipped = zip(list(group["row"]), list(group["group"]), list(group["id"])) zipped = sorted(zipped, key=lambda x: x[0]) @@ -107,8 +117,9 @@ def write_group_to_jsonl(group): # Writing rows file_path = os.path.join(DUPES_DOLMA_TO_REMOVE, f"{partition}.jsonl") os.makedirs(os.path.dirname(file_path), exist_ok=True) - with open(file_path, 'w') as f: - f.write(json.dumps(partition_dict)) - - dolma_exploded_df.groupby(['partition']).apply(write_group_to_jsonl, meta=()).compute() - \ No newline at end of file + with open(file_path, "w") as f: + f.write(json.dumps(partition_dict)) + + dolma_exploded_df.groupby(["partition"]).apply( + write_group_to_jsonl, meta=() + ).compute() diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/5_get_dupes_zyda.py b/tutorials/zyda2-tutorial/2_dupes_removal/5_get_dupes_zyda.py index 05e6056d..cabb471e 100644 --- a/tutorials/zyda2-tutorial/2_dupes_removal/5_get_dupes_zyda.py +++ b/tutorials/zyda2-tutorial/2_dupes_removal/5_get_dupes_zyda.py @@ -1,22 +1,26 @@ -import os import json -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" +import os -from nemo_curator.utils.distributed_utils import get_num_workers -from nemo_curator.utils.module_utils import count_digits +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging import dask.dataframe as dd from dask.distributed import Client, LocalCluster -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +from nemo_curator.utils.distributed_utils import get_num_workers +from nemo_curator.utils.module_utils import count_digits + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") RAW_DATA_BASE = os.path.join(DATA_BASE, "processed") CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") DUPES_BASE = os.path.join(CC_BASE, "dupes") -DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join(DUPES_BASE, "dupes_ids_grouped_in_columns.parquet") +DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join( + DUPES_BASE, "dupes_ids_grouped_in_columns.parquet" +) ZYDA_EXPLODED = os.path.join(DUPES_BASE, "dupes_zyda_exploded.parquet") DUPES_ZYDA_TO_REMOVE = os.path.join(DUPES_BASE, "dupes_zyda_to_remove.jsonl") @@ -28,39 +32,48 @@ cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) client = Client(cluster) logging.info(f"Number of dask workers: {get_num_workers(client)}") - + paths = { "dclm": os.path.join(RAW_DATA_BASE, "dclm-baseline-1.0-parquet/filtered"), "dolma-cc": os.path.join(RAW_DATA_BASE, "dolma-v1_7-cc-parquet"), "fwe2": os.path.join(RAW_DATA_BASE, "fineweb-edu-score-2/data"), "zyda": os.path.join(RAW_DATA_BASE, "data/zyda_no_starcoder"), - } + } # Counting digits zyda_digits = {} for dir in sorted(os.listdir(paths["zyda"])): - files = [x for x in os.listdir(os.path.join(paths["zyda"], dir)) if ".parquet" in x] + files = [ + x for x in os.listdir(os.path.join(paths["zyda"], dir)) if ".parquet" in x + ] zyda_digits[dir] = count_digits(len(files)) - grouped_dupes_df = dd.read_parquet(DUPES_IDS_GROUPED_IN_COLUMNS, split_row_groups=False) - zyda_df = grouped_dupes_df[grouped_dupes_df["zyda_dupes"] != "[]"][["group", "size", "dclm", "fwe2", "zyda", "dolma-cc", "zyda_dupes"]] + grouped_dupes_df = dd.read_parquet( + DUPES_IDS_GROUPED_IN_COLUMNS, split_row_groups=False + ) + zyda_df = grouped_dupes_df[grouped_dupes_df["zyda_dupes"] != "[]"][ + ["group", "size", "dclm", "fwe2", "zyda", "dolma-cc", "zyda_dupes"] + ] def decode_and_explode(partition, column): - partition['id_list'] = partition[column].apply(json.loads) - return partition.explode('id_list')[["group", "id_list"]] + partition["id_list"] = partition[column].apply(json.loads) + return partition.explode("id_list")[["group", "id_list"]] meta = { "group": int, "id_list": str, } - zyda_exploded_df = zyda_df.map_partitions(decode_and_explode, "zyda_dupes", meta=meta).reset_index(drop=True) - zyda_exploded_df = zyda_exploded_df.rename(columns={'id_list': 'id'}) + zyda_exploded_df = zyda_df.map_partitions( + decode_and_explode, "zyda_dupes", meta=meta + ).reset_index(drop=True) + zyda_exploded_df = zyda_exploded_df.rename(columns={"id_list": "id"}) def split_id(df, id_column="id"): dx = df[id_column].str.rsplit("-", n=1, expand=True) df["doc_id"] = dx[1].astype("str") df["folder"] = dx[0].astype("str") return df + meta = { "group": int, "id": str, @@ -70,8 +83,12 @@ def split_id(df, id_column="id"): zyda_exploded_df = zyda_exploded_df.map_partitions(split_id, meta=meta) def split_doc_id(df): - df["row"] = df.apply(lambda x: int(x["doc_id"][:-zyda_digits[x["folder"]]]), axis=1) - df["partition"] = df.apply(lambda x: int(x["doc_id"][-zyda_digits[x["folder"]]:]), axis=1) + df["row"] = df.apply( + lambda x: int(x["doc_id"][: -zyda_digits[x["folder"]]]), axis=1 + ) + df["partition"] = df.apply( + lambda x: int(x["doc_id"][-zyda_digits[x["folder"]] :]), axis=1 + ) return df meta = { @@ -86,9 +103,10 @@ def split_doc_id(df): zyda_exploded_df.to_parquet(ZYDA_EXPLODED, write_index=False, overwrite=True) zyda_exploded_df = dd.read_parquet(ZYDA_EXPLODED, split_row_groups=False) + def write_group_to_jsonl(group): folder_name, partition = group.name - + zipped = zip(list(group["row"]), list(group["group"]), list(group["id"])) zipped = sorted(zipped, key=lambda x: x[0]) @@ -102,10 +120,13 @@ def write_group_to_jsonl(group): } # Writing rows - file_path = os.path.join(DUPES_ZYDA_TO_REMOVE, folder_name, f"{partition}.jsonl") + file_path = os.path.join( + DUPES_ZYDA_TO_REMOVE, folder_name, f"{partition}.jsonl" + ) os.makedirs(os.path.dirname(file_path), exist_ok=True) - with open(file_path, 'w') as f: - f.write(json.dumps(partition_dict)) - - zyda_exploded_df.groupby(['folder', 'partition']).apply(write_group_to_jsonl, meta=()).compute() - \ No newline at end of file + with open(file_path, "w") as f: + f.write(json.dumps(partition_dict)) + + zyda_exploded_df.groupby(["folder", "partition"]).apply( + write_group_to_jsonl, meta=() + ).compute() diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/remove_dupes.py b/tutorials/zyda2-tutorial/2_dupes_removal/remove_dupes.py index 26f41025..16238c79 100644 --- a/tutorials/zyda2-tutorial/2_dupes_removal/remove_dupes.py +++ b/tutorials/zyda2-tutorial/2_dupes_removal/remove_dupes.py @@ -1,25 +1,40 @@ -import os -import time import argparse import json -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" +import os +import time -from dask.distributed import Client, LocalCluster -import dask.dataframe as dd +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) -if __name__ == '__main__': +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run NeMo quality classifier.") - parser.add_argument("--dupes-path", type=str, required=True, help="Path to the folder with dupes indices") - parser.add_argument("--input", type=str, required=True, help="Path to the folder with input dataset") - parser.add_argument("--output", type=str, required=True, help="Path to where write the result") - parser.add_argument("--n-workers", type=int, default=64, help="Number of CPU Dask workers") + parser.add_argument( + "--dupes-path", + type=str, + required=True, + help="Path to the folder with dupes indices", + ) + parser.add_argument( + "--input", type=str, required=True, help="Path to the folder with input dataset" + ) + parser.add_argument( + "--output", type=str, required=True, help="Path to where write the result" + ) + parser.add_argument( + "--n-workers", type=int, default=64, help="Number of CPU Dask workers" + ) args = parser.parse_args() t0 = time.time() - cluster = LocalCluster(n_workers=args.n_workers, threads_per_worker=2, processes=True) + cluster = LocalCluster( + n_workers=args.n_workers, threads_per_worker=2, processes=True + ) client = Client(cluster) logging.info(f"Dask client: {client}") logging.info(f"Dashboard link: {client.dashboard_link}") @@ -39,7 +54,7 @@ def drop_dupes(partition, partition_info=None): dupes_file_path = ind_2_fullpath.get(ind, None) if not dupes_file_path: return partition - + with open(dupes_file_path, "r") as f: dupe_inds = json.loads(f.read())["rows"] partition_deduped = partition.drop(index=dupe_inds) diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dclm.sh b/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dclm.sh index 66a4e7e2..ba819fd8 100644 --- a/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dclm.sh +++ b/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dclm.sh @@ -16,7 +16,7 @@ if test -d $INPUT; then --input $INPUT \ --output $OUTPUT \ --n-workers $N_WORKERS -fi +fi NAME=global-shard_02_of_10 INPUT=$IN_BASE/$NAME diff --git a/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.py b/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.py index fd0d4451..c19720a7 100644 --- a/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.py +++ b/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.py @@ -1,25 +1,29 @@ +import argparse import os import time -import argparse -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging from nemo_curator.classifiers import QualityClassifier from nemo_curator.datasets import DocumentDataset -from nemo_curator.utils.file_utils import get_remaining_files from nemo_curator.utils.distributed_utils import get_client, get_num_workers +from nemo_curator.utils.file_utils import get_remaining_files -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run NeMo quality classifier.") parser.add_argument("--input", help="Path to the input folder") parser.add_argument("--output", help="Path to the output folder") - parser.add_argument("--batch-size", type=int, default=64, help="Batch size for quality model") + parser.add_argument( + "--batch-size", type=int, default=64, help="Batch size for quality model" + ) args = parser.parse_args() t0 = time.time() @@ -28,7 +32,7 @@ logging.info(client) classifier = QualityClassifier(batch_size=args.batch_size) - + raw_base_path = args.input qm_base_path = args.output files = get_remaining_files(raw_base_path, qm_base_path, "parquet") @@ -37,7 +41,7 @@ input_dataset = DocumentDataset.read_parquet( files, backend="cudf", add_filename=True ) - result_dataset = classifier(dataset=input_dataset) + result_dataset = classifier(dataset=input_dataset) result_dataset.to_parquet(qm_base_path, write_to_filename=True) else: logging.info("Nothing to be done. All files are already processed.") diff --git a/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.sh b/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.sh index 609f2d2d..249f7e13 100644 --- a/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.sh +++ b/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.sh @@ -27,4 +27,4 @@ python run_quality_classifier.py --input $DATA_BASE/deduped/zyda-parquet/$NAME - NAME=zyda_arxiv echo "Processing $NAME" -python run_quality_classifier.py --input $DATA_BASE/deduped/zyda-parquet/$NAME --output $DATA_BASE/deduped-with-quality/zyda-parquet/$NAME --batch-size 64 \ No newline at end of file +python run_quality_classifier.py --input $DATA_BASE/deduped/zyda-parquet/$NAME --output $DATA_BASE/deduped-with-quality/zyda-parquet/$NAME --batch-size 64 diff --git a/tutorials/zyda2-tutorial/4_filtering/filter_fwe.py b/tutorials/zyda2-tutorial/4_filtering/filter_fwe.py index 6b5dede1..90a1ea97 100644 --- a/tutorials/zyda2-tutorial/4_filtering/filter_fwe.py +++ b/tutorials/zyda2-tutorial/4_filtering/filter_fwe.py @@ -1,19 +1,21 @@ import os import time -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" -from dask.distributed import Client, LocalCluster +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + import dask.dataframe as dd import pyarrow as pa +from dask.distributed import Client, LocalCluster -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) DATA_BASE = os.environ.get("DATA_BASE") CPU_WORKERS = os.environ.get("CPU_WORKERS") -if __name__ == '__main__': +if __name__ == "__main__": t0 = time.time() cluster = LocalCluster(n_workers=CPU_WORKERS, threads_per_worker=2, processes=True) client = Client(cluster) @@ -28,5 +30,5 @@ ddf_filtered = ddf[ddf["int_score"] >= 3].repartition(partition_size="512M") out_folder = os.path.join(OUTPUT_BASE, folder) print(f"Saving to {out_folder}") - ddf_filtered.to_parquet(out_folder, write_index=False, overwrite=True) + ddf_filtered.to_parquet(out_folder, write_index=False, overwrite=True) logging.info(f"Done in {time.time() - t0:.2f} sec") diff --git a/tutorials/zyda2-tutorial/4_filtering/filter_quality.py b/tutorials/zyda2-tutorial/4_filtering/filter_quality.py index c8899e0e..a027b9f4 100644 --- a/tutorials/zyda2-tutorial/4_filtering/filter_quality.py +++ b/tutorials/zyda2-tutorial/4_filtering/filter_quality.py @@ -1,31 +1,54 @@ +import argparse import os import time -import argparse -os.environ['DASK_DATAFRAME__QUERY_PLANNING'] = "False" -from dask.distributed import Client, LocalCluster +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + import dask.dataframe as dd import pyarrow as pa +from dask.distributed import Client, LocalCluster -import logging -logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO) +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser(description="Filter by quality") - parser.add_argument("--input", type=str, required=True, help="Path to the folder with input dataset") - parser.add_argument("--output", type=str, required=True, help="Path to where write the result") - parser.add_argument("--n-workers", type=int, default=64, help="Number of CPU Dask workers") - parser.add_argument("--quality_pred", type=str, required=True, choices={"High", "Medium", "Low"}, help="Quality for filtering") + parser.add_argument( + "--input", type=str, required=True, help="Path to the folder with input dataset" + ) + parser.add_argument( + "--output", type=str, required=True, help="Path to where write the result" + ) + parser.add_argument( + "--n-workers", type=int, default=64, help="Number of CPU Dask workers" + ) + parser.add_argument( + "--quality_pred", + type=str, + required=True, + choices={"High", "Medium", "Low"}, + help="Quality for filtering", + ) args = parser.parse_args() t0 = time.time() - cluster = LocalCluster(n_workers=args.n_workers, threads_per_worker=2, processes=True) + cluster = LocalCluster( + n_workers=args.n_workers, threads_per_worker=2, processes=True + ) client = Client(cluster) ddf = dd.read_parquet(args.input, split_row_groups=False) - ddf_filtered = ddf[ddf["quality_pred"] == args.quality_pred].repartition(partition_size="512MB") - ddf_filtered.to_parquet(args.output, write_index=False, overwrite=True, schema={"quality_prob": pa.list_(pa.float32())}) + ddf_filtered = ddf[ddf["quality_pred"] == args.quality_pred].repartition( + partition_size="512MB" + ) + ddf_filtered.to_parquet( + args.output, + write_index=False, + overwrite=True, + schema={"quality_prob": pa.list_(pa.float32())}, + ) ddf_filtered = dd.read_parquet(args.output) l_after = len(ddf_filtered) logging.info(f"Done in {time.time() - t0:.2f} sec") diff --git a/tutorials/zyda2-tutorial/README.md b/tutorials/zyda2-tutorial/README.md index aff2e1e3..ba382719 100644 --- a/tutorials/zyda2-tutorial/README.md +++ b/tutorials/zyda2-tutorial/README.md @@ -25,7 +25,7 @@ export PROTOCOL=ucx 2. Set the location of the scheduler file at `SCHEDULER_FILE` 3. Set the network interface you want to use at `INTERFACE` (if unsure, ask your network administrator for what works with your Infiniband setup) 3. Start Dask scheduler on your head node with `DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True DASK_DISTRIBUTED__RMM__POOL_SIZE=$RMM_SCHEDULER_POOL_SIZE dask scheduler --scheduler-file $SCHEDULER_FILE --protocol $PROTOCOL --interface $INTERFACE` -4. Start Dask workers on every compute node with `dask-cuda-worker --enable-tcp-over-ucx --enable-nvlink --enable-infiniband --enable-rdmacm --scheduler-file /shared/yury/nemo_scheduler.json --interface $INTERFACE`. [Optional] To help with potential out-of-memory memory issues due to fragmentation, one can set flags `--rmm-async --rmm-release-threshold `, which will force RMM to release cache when memory usage is higher than specified threshold (this comes with a performance hit). In addition, Dask supports spilling into CPU RAM, it should allow running workloads when there is not enough VRAM, but it comes with a big performance hit; to enable spilling specify `--enable-cudf-spill` flag. +4. Start Dask workers on every compute node with `dask-cuda-worker --enable-tcp-over-ucx --enable-nvlink --enable-infiniband --enable-rdmacm --scheduler-file /shared/yury/nemo_scheduler.json --interface $INTERFACE`. [Optional] To help with potential out-of-memory memory issues due to fragmentation, one can set flags `--rmm-async --rmm-release-threshold `, which will force RMM to release cache when memory usage is higher than specified threshold (this comes with a performance hit). In addition, Dask supports spilling into CPU RAM, it should allow running workloads when there is not enough VRAM, but it comes with a big performance hit; to enable spilling specify `--enable-cudf-spill` flag. To comfortably reproduce Zyda2 in 2 days we recommend using a cluster with 8 nodes of H100s (or A100s with 80GB of VRAM, but it will take longer). It could be run with less, but it will run into memory issues and will require spilling into CPU RAM, slowing down processing. Scripts in this tutorial assume that all the data is being stored at a shared storage accessible to all the nodes. However, Dask supports cloud storage (like GCS or AWS S3), so with minor modifications to the scripts one can read and write to the cloud. @@ -89,7 +89,7 @@ The script for computing LSH buckets is located at `1_fuzzy_dedup/1_lsh.py`. For building LSH buckets, we split minhash signatures into 8 bands (each having range 16). This gives us a theoretical 85% Jaccard similarity threshold (meaning that documents that have at least 85% similarity are deemed duplicates). This step performs the following operation: -1. Splits ID's into dataset_id and doc_id and converts them to integers. This step is no longer necessary, since recent releases of NeMo Curator support long strings on GPUs, but when we started our project this wasn't the default. +1. Splits ID's into dataset_id and doc_id and converts them to integers. This step is no longer necessary, since recent releases of NeMo Curator support long strings on GPUs, but when we started our project this wasn't the default. 2. Splits minhashes of all documents into bands 3. Groups documents into buckets, that correspond to identical values of bands 4. Shuffles the resultant dataset by buckets, so that documents within the same bucket are in the same Dask partition @@ -140,7 +140,7 @@ The deduplicated datasets are saved explicitly in the `$DATA_BASE/deduped` folde We ran a quality model classifier on Zyda1 and Dolma-CC v1.7 portions of our dataset. To run the prediction, use bash script `3_quality_model/run_quality_classifier.sh`. It calls the Python script `3_quality_model/run_quality_classifier.py` for all the components. All the results are saved in `$DATA_BASE/deduped-with-quality`. This step must be run on GPUs. ### 6. Filtering -As the final step we perform filtering on some components of our dataset. +As the final step we perform filtering on some components of our dataset. We convert Fineweb-edu-score-2 into Fineweb-edu by keeping only the documents with edu score >=3. In principle, this dataset should be the same as the official version of Fineweb-edu. However, to be consistent we performed our own filtering in the script `4_filtering/filter_fwe.py`.