Skip to content

Commit

Permalink
Fix linter errors
Browse files Browse the repository at this point in the history
Signed-off-by: Yury Tokpanov <yury@zyphra.com>
  • Loading branch information
yury-tokpanov committed Oct 15, 2024
1 parent bd85b71 commit 31c8cc0
Show file tree
Hide file tree
Showing 23 changed files with 492 additions and 297 deletions.
12 changes: 7 additions & 5 deletions tutorials/zyda2-tutorial/0_processing/helper.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import os

from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.datasets import DocumentDataset
from nemo_curator import AddId
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.file_utils import get_all_files_paths_under


def ensure_directory_exists(filename: str):
os.makedirs(os.path.dirname(filename), exist_ok = True)
os.makedirs(os.path.dirname(filename), exist_ok=True)


def process_data(input_folder, output_folder, prefix, partition_size="512MB"):
raw_files = get_all_files_paths_under(input_folder)
raw_data = DocumentDataset.read_parquet(raw_files)
raw_data_rep = DocumentDataset(raw_data.df.repartition(partition_size=partition_size))
add_id = AddId(id_field='nemo_id', id_prefix=prefix)
raw_data_rep = DocumentDataset(
raw_data.df.repartition(partition_size=partition_size)
)
add_id = AddId(id_field="nemo_id", id_prefix=prefix)
data_with_id = add_id(raw_data_rep)
ensure_directory_exists(output_folder)
data_with_id.to_parquet(output_folder)
17 changes: 9 additions & 8 deletions tutorials/zyda2-tutorial/0_processing/process_dclm.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging

from dask.distributed import Client, LocalCluster
from helper import process_data

import logging
logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO)
logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)

DATA_BASE = os.environ.get("DATA_BASE")
INPUT_BASE = os.path.join(DATA_BASE, "raw/dclm-baseline-1.0-parquet/filtered")
OUTPUT_BASE = os.path.join(DATA_BASE, "processed/dclm-baseline-1.0-parquet")
CPU_WORKERS = os.environ.get("CPU_WORKERS")


if __name__ == '__main__':
if __name__ == "__main__":
logging.info("Starting Dask cluster")
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit='48GB')
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB")
client = Client(cluster)
logging.info(client)

components = [
"global-shard_01_of_10",
"global-shard_02_of_10",
"global-shard_02_of_10",
"global-shard_03_of_10",
"global-shard_04_of_10",
"global-shard_05_of_10",
Expand All @@ -44,7 +46,6 @@
prefix=f"dclm-gs{i}",
)
logging.info("Done!")

client.cluster.close()
client.shutdown()

client.shutdown()
16 changes: 7 additions & 9 deletions tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging

from dask.distributed import Client, LocalCluster
from helper import process_data

import logging
logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO)
logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)

DATA_BASE = os.environ.get("DATA_BASE")
INPUT_BASE = os.path.join(DATA_BASE, "raw/dolma-v1_7-cc-parquet")
OUTPUT_BASE = os.path.join(DATA_BASE, "processed/dolma-v1_7-cc-parquet")
CPU_WORKERS = os.environ.get("CPU_WORKERS")


if __name__ == '__main__':
if __name__ == "__main__":
logging.info("Starting Dask cluster")
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit='48GB')
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB")
client = Client(cluster)
logging.info(client)

logging.info(f"Processing Dolma-CC")
process_data(
input_folder=INPUT_BASE,
output_folder=OUTPUT_BASE,
prefix="dolma-cc"
)
process_data(input_folder=INPUT_BASE, output_folder=OUTPUT_BASE, prefix="dolma-cc")
logging.info("Done!")

client.cluster.close()
Expand Down
42 changes: 27 additions & 15 deletions tutorials/zyda2-tutorial/0_processing/process_fwe2.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import os
os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

from dask.distributed import Client, LocalCluster
from helper import process_data
os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import gc
import ctypes
import gc
import logging
from pathlib import Path

import logging
logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO)
from dask.distributed import Client, LocalCluster
from helper import process_data

logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)

DATA_BASE = os.environ.get("DATA_BASE")
INPUT_BASE = os.path.join(DATA_BASE, "raw/fineweb-edu-score-2/data")
Expand All @@ -23,17 +24,26 @@ def trim_memory() -> int:


def get_folder_size(folder_path):
return sum(file.stat().st_size for file in Path(folder_path).rglob('*') if file.is_file())
return sum(
file.stat().st_size for file in Path(folder_path).rglob("*") if file.is_file()
)


def sort_folders_by_size(parent_directory):
folders = [f for f in os.listdir(parent_directory) if os.path.isdir(os.path.join(parent_directory, f))]
folder_sizes = [(folder, get_folder_size(os.path.join(parent_directory, folder))) for folder in folders]
folders = [
f
for f in os.listdir(parent_directory)
if os.path.isdir(os.path.join(parent_directory, f))
]
folder_sizes = [
(folder, get_folder_size(os.path.join(parent_directory, folder)))
for folder in folders
]
return sorted(folder_sizes, key=lambda x: x[1])


def bytes_to_human_readable(size_in_bytes):
suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
suffixes = ["B", "KB", "MB", "GB", "TB", "PB"]
suffix_index = 0
size = float(size_in_bytes)
while size >= 1024 and suffix_index < len(suffixes) - 1:
Expand All @@ -42,9 +52,9 @@ def bytes_to_human_readable(size_in_bytes):
return f"{size:.2f} {suffixes[suffix_index]}"


if __name__ == '__main__':
if __name__ == "__main__":
logging.info("Starting Dask cluster")
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit='240GB')
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="240GB")
client = Client(cluster)
logging.info(client)

Expand All @@ -55,16 +65,18 @@ def bytes_to_human_readable(size_in_bytes):
if not os.path.exists(input_path) or not os.path.isdir(input_path):
continue
output_path = os.path.join(OUTPUT_BASE, component)
logging.info(f"Processing {component}, size = {bytes_to_human_readable(component_size)}")
logging.info(
f"Processing {component}, size = {bytes_to_human_readable(component_size)}"
)
process_data(
input_folder=input_path,
output_folder=output_path,
prefix=f"fwe2-{component}"
prefix=f"fwe2-{component}",
)
logging.info("Trimming memory")
gc.collect()
client.run(trim_memory)
logging.info("Done!")

client.cluster.close()
client.shutdown()
18 changes: 9 additions & 9 deletions tutorials/zyda2-tutorial/0_processing/process_zyda.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging

from dask.distributed import Client, LocalCluster
from helper import process_data

import logging
logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO)
logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)

DATA_BASE = os.environ.get("DATA_BASE")
INPUT_BASE = os.path.join(DATA_BASE, "raw/data/zyda_no_starcoder")
OUTPUT_BASE = os.path.join(DATA_BASE, "processed/zyda-parquet")
CPU_WORKERS = os.environ.get("CPU_WORKERS")


if __name__ == '__main__':
if __name__ == "__main__":
logging.info("Starting Dask cluster")
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit='48GB')
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB")
client = Client(cluster)
logging.info(client)

components = [
"zyda_arxiv",
"zyda_arxiv",
"zyda_peS2o",
"zyda_pile-uncopyrighted",
"zyda_slimpajama",
Expand All @@ -35,11 +37,9 @@
output_path = os.path.join(OUTPUT_BASE, component)
logging.info(f"Processing {component}")
process_data(
input_folder=input_path,
output_folder=output_path,
prefix=component
input_folder=input_path, output_folder=output_path, prefix=component
)
logging.info("Done!")

client.cluster.close()
client.shutdown()
25 changes: 13 additions & 12 deletions tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging
import time

import dask_cudf

from nemo_curator import MinHash
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client, get_num_workers
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.datasets import DocumentDataset
from nemo_curator import MinHash

import time

import logging
logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO)
logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)


def read_folder(input_folder, columns=["nemo_id", "text"]):
Expand All @@ -30,16 +31,16 @@ def read_folder(input_folder, columns=["nemo_id", "text"]):
SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE")


if __name__ == '__main__':
if __name__ == "__main__":
client = get_client(scheduler_file=SCHEDULER_FILE)
logging.info(f"Number of dask workers: {get_num_workers(client)}")

minhash_base_output_path = os.path.join(DATA_BASE, "fuzzy/minhash")
minhash_output_dir = os.path.join(minhash_base_output_path, 'data')
minhash_output_dir = os.path.join(minhash_base_output_path, "data")

# Relevant parameters
minhash_id_field = 'nemo_id'
minhash_text_field = 'text'
minhash_id_field = "nemo_id"
minhash_text_field = "text"
seed = 10
minhash_length = 128
char_ngram = 25
Expand All @@ -48,7 +49,7 @@ def read_folder(input_folder, columns=["nemo_id", "text"]):
# Reading all the data
text_ddf = read_folder(
input_folder=os.path.join(DATA_BASE, "processed"),
columns=[minhash_id_field, minhash_text_field]
columns=[minhash_id_field, minhash_text_field],
)

# Computing minhashes
Expand All @@ -60,7 +61,7 @@ def read_folder(input_folder, columns=["nemo_id", "text"]):
use_64bit_hash=use_64bit_hash,
id_field=minhash_id_field,
text_field=minhash_text_field,
cache_dir=minhash_output_dir
cache_dir=minhash_output_dir,
)
res = minhasher(DocumentDataset(text_ddf)).df
logging.info(f"Time taken for MinHash: {time.time()-t0:.2f}sec.")
30 changes: 15 additions & 15 deletions tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,43 @@
import os
os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

from nemo_curator.utils.distributed_utils import get_client,get_num_workers
from nemo_curator.datasets import DocumentDataset

from nemo_curator import LSH
from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import \
convert_str_id_to_int
os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging
import time

import cudf
import dask_cudf
import numpy as np

import logging
logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.INFO)
from nemo_curator import LSH
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client, get_num_workers
from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int

logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)


DATA_BASE = os.environ.get("DATA_BASE")
SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE")


if __name__ == '__main__':
if __name__ == "__main__":
client = get_client(scheduler_file=SCHEDULER_FILE)
logging.info(f"Number of dask workers: {get_num_workers(client)}")

minhash_base_output_path = os.path.join(DATA_BASE, "fuzzy/minhash")
minhash_output_dir = os.path.join(minhash_base_output_path, 'data')
minhash_output_dir = os.path.join(minhash_base_output_path, "data")

# Input
lsh_input_data_path = minhash_output_dir

# Output
lsh_base_output_path = os.path.join(DATA_BASE, "fuzzy/lsh")
lsh_output_dir = os.path.join(lsh_base_output_path, 'data')
lsh_output_dir = os.path.join(lsh_base_output_path, "data")

# Relevant parameters
lsh_id_field = 'nemo_id'
minhash_field = '_minhash_signature'
lsh_id_field = "nemo_id"
minhash_field = "_minhash_signature"
minhash_length = 128
num_bands = 8
buckets_per_shuffle = 8
Expand All @@ -46,7 +46,7 @@

# Load MinHash output
logging.info("Converting ids")
df = dask_cudf.read_parquet(lsh_input_data_path, backend = "cudf")
df = dask_cudf.read_parquet(lsh_input_data_path, backend="cudf")
df = df.map_partitions(
convert_str_id_to_int,
id_column=lsh_id_field,
Expand Down
Loading

0 comments on commit 31c8cc0

Please sign in to comment.