From 1c99fbfc58782426042653418b6b542ae2e3eae0 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Fri, 6 Sep 2024 15:51:06 +0300 Subject: [PATCH] initial kfp implementation --- .../fdedup_multi_step/kfp_ray/Makefile_ori | 51 ++++ .../fdedup_multi_step/kfp_ray/README.md | 28 +++ .../kfp_ray/fdedup_bucket_processor_wf.py | 209 ++++++++++++++++ .../kfp_ray/fdedup_filter_wf.py | 194 +++++++++++++++ .../kfp_ray/fdedup_preprocessor_wf.py | 218 +++++++++++++++++ ...cket_processor_compute_execution_params.py | 131 ++++++++++ .../fdedup_filter_compute_execution_params.py | 102 ++++++++ ...p_preprocessor_compute_execution_params.py | 224 ++++++++++++++++++ .../fdedup_preprocessor_local_python.py | 8 +- .../src/fdedup/transforms/base/__init__.py | 4 +- .../base/fdedup_filter_transform_base.py | 4 +- .../fdedup_preprocessor_transform_base.py | 8 +- .../python/src/fdedup/utils/fdedupsupport.py | 4 +- .../test_fdedup_preprocessor_python.py | 8 +- .../ray/src/fdedup_ray/clusters_estimator.py | 8 +- .../examples/fdedup_preprocessor_local_ray.py | 8 +- .../ray/test/test_fdedup_preprocessor_ray.py | 8 +- 17 files changed, 1187 insertions(+), 30 deletions(-) create mode 100644 transforms/universal/fdedup_multi_step/kfp_ray/Makefile_ori create mode 100644 transforms/universal/fdedup_multi_step/kfp_ray/README.md create mode 100644 transforms/universal/fdedup_multi_step/kfp_ray/fdedup_bucket_processor_wf.py create mode 100644 transforms/universal/fdedup_multi_step/kfp_ray/fdedup_filter_wf.py create mode 100644 transforms/universal/fdedup_multi_step/kfp_ray/fdedup_preprocessor_wf.py create mode 100644 transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_bucket_processor_compute_execution_params.py create mode 100644 transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_filter_compute_execution_params.py create mode 100644 transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_preprocessor_compute_execution_params.py diff --git a/transforms/universal/fdedup_multi_step/kfp_ray/Makefile_ori b/transforms/universal/fdedup_multi_step/kfp_ray/Makefile_ori new file mode 100644 index 000000000..f6b215984 --- /dev/null +++ b/transforms/universal/fdedup_multi_step/kfp_ray/Makefile_ori @@ -0,0 +1,51 @@ +REPOROOT=${CURDIR}/../../../../ +WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate +include $(REPOROOT)/transforms/.make.workflows + +SRC_DIR=${CURDIR}/../ray/ + +PYTHON_WF := $(shell find ./ -name '*_wf.py') +YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF}) + +workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE} + +.PHONY: clean +clean: + @# Help: Clean up the virtual environment. + rm -rf ${REPOROOT}/transforms/venv + +venv:: + +build:: + +setup:: + +test:: + +test-src:: + +publish:: + +image:: + +test-image:: + +kind-load-image:: + +docker-load-image:: + +docker-save-image:: + +.PHONY: workflow-build +workflow-build: workflow-venv + $(MAKE) $(YAML_WF) + +.PHONY: workflow-test +workflow-test: workflow-build + $(MAKE) .workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=fdedup_wf.yaml + +.PHONY: workflow-upload +workflow-upload: workflow-build + @for file in $(YAML_WF); do \ + $(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \ + done \ No newline at end of file diff --git a/transforms/universal/fdedup_multi_step/kfp_ray/README.md b/transforms/universal/fdedup_multi_step/kfp_ray/README.md new file mode 100644 index 000000000..aff816b78 --- /dev/null +++ b/transforms/universal/fdedup_multi_step/kfp_ray/README.md @@ -0,0 +1,28 @@ +# Fuzzy Deduplication Ray-base KubeFlow Pipeline Transformation + + +## Summary +This project allows execution of the [multi step fuzzy dedup](../ray) as a +[KubeFlow Pipeline](https://www.kubeflow.org/docs/components/pipelines/overview/) + +The detail pipeline is presented in the [Simplest Transform pipeline tutorial](../../../../kfp/doc/simple_transform_pipeline.md) + +## Compilation + +In order to compile pipeline definitions run +```shell +make workflow-build +``` +from the directory. It creates a virtual environment (make workflow-venv) and after that compiles the pipeline +definitions in the folder. The virtual environment is created once for all transformers. + +Note: the pipelines definitions can be compiled and executed on KFPv1 and KFPv2. Meantime, KFPv1 is our default. If you +prefer KFPv2, please do the following: +```shell +make clean +export KFPv2=1 +make workflow-build +``` + +The next steps are described in [Deploying a pipeline](../../../../kfp/doc/simple_transform_pipeline.md#deploying-a-pipeline-) +and [Executing pipeline and watching execution results](../../../../kfp/doc/simple_transform_pipeline.md#executing-pipeline-and-watching-execution-results-) \ No newline at end of file diff --git a/transforms/universal/fdedup_multi_step/kfp_ray/fdedup_bucket_processor_wf.py b/transforms/universal/fdedup_multi_step/kfp_ray/fdedup_bucket_processor_wf.py new file mode 100644 index 000000000..fabf32dff --- /dev/null +++ b/transforms/universal/fdedup_multi_step/kfp_ray/fdedup_bucket_processor_wf.py @@ -0,0 +1,209 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +import kfp.compiler as compiler +import kfp.components as comp +import kfp.dsl as dsl +from src.fdedup_bucket_processor_compute_execution_params import fdedup_bucket_processor_compute_execution_params +from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils + + +task_image = "quay.io/dataprep1/data-prep-kit/fdedup-ray:latest" + +# the name of the job script +EXEC_SCRIPT_NAME: str = "fdedup_bucket_processor_transform_ray.py" + +# components +base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" + +# path to kfp component specifications files +component_spec_path = "../../../../kfp/kfp_ray_components/" + +# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the +# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. +# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use +# this if/else statement and explicitly call the decorator. +if os.getenv("KFPv2", "0") == "1": + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at + # compilation time. + import uuid + + compute_exec_params_op = dsl.component_decorator.component( + func=fdedup_bucket_processor_compute_execution_params, base_image=base_kfp_image + ) + print( + "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + + "same version of the same pipeline !!!" + ) + run_id = uuid.uuid4().hex +else: + compute_exec_params_op = comp.create_component_from_func( + func=fdedup_bucket_processor_compute_execution_params, base_image=base_kfp_image + ) + run_id = dsl.RUN_ID_PLACEHOLDER + +# create Ray cluster +create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") +# execute job +execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml") +# clean up Ray +cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml") + +# Task name is part of the pipeline name, the ray cluster name and the job name in DMF. +TASK_NAME: str = "fdedup_bucket_processor" + + +@dsl.pipeline( + name=TASK_NAME + "-ray-pipeline", + description="Pipeline for fdedup bucket_processor", +) +def fdedup_bucket_processor( + # Ray cluster + ray_name: str = "fdedup-bucket_processor_kfp-ray", # name of Ray cluster + # Add image_pull_secret and image_pull_policy to ray workers if needed + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, + server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", + # data access. checkpointing is not supported by dedup + data_s3_config: str = "{'input_folder': 'test/fdedup/input/', 'output_folder': 'test/fdedup/output/'}", + data_s3_access_secret: str = "s3-secret", + data_max_files: int = -1, + data_num_samples: int = -1, + # orchestrator + runtime_actor_options: dict = {"num_cpus": 0.8}, + runtime_pipeline_id: str = "pipeline_id", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, + # infrastructure + fdedup_bucket_processor_bucket_cpu: float = 0.5, + fdedup_bucket_processor_doc_cpu: float = 0.5, + fdedup_bucket_processor_mhash_cpu: float = 0.5, + fdedup_bucket_processor_processor_cpu: float = 0.8, + fdedup_bucket_processor_num_buckets: int = 1, + fdedup_bucket_processor_num_doc_id: int = 1, + fdedup_bucket_processor_num_minhashes: int = 1, + # fuzzy parameters + fdedup_bucket_processor_num_permutations: int = 64, + fdedup_bucket_processor_threshold: float = 0.8, + # snapshotting + fdedup_bucket_processor_minhash_snapshot_directory: str = None, + fdedup_bucket_processor_buckets_snapshot_directory: str = None, + fdedup_bucket_processor_doc_id_snapshot_directory: str = None, + # additional parameters + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', +): + """ + Pipeline to execute FDEDUP transform + :param ray_name: name of the Ray cluster + :param ray_head_options: head node options, containing the following: + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + :param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following: + replicas - number of replicas to create + max_replicas - max number of replicas + min_replicas - min number of replicas + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + :param server_url - server url + :param additional_params: additional (support) parameters, containing the following: + wait_interval - wait interval for API server, sec + wait_cluster_ready_tmout - time to wait for cluster ready, sec + wait_cluster_up_tmout - time to wait for cluster up, sec + wait_job_ready_tmout - time to wait for job ready, sec + wait_print_tmout - time between prints, sec + http_retries - http retries for API server calls + :param data_s3_access_secret - s3 access secret + :param data_s3_config - s3 configuration + :param data_max_files - max files to process + :param data_num_samples - num samples to process + :param runtime_actor_options - actor options + :param runtime_pipeline_id - pipeline id + :param runtime_code_location - code location + :param fdedup_bucket_processor_bucket_cpu - number of CPUs per bucket hash + :param fdedup_bucket_processor_doc_cpu - number of CPUs per doc hash + :param fdedup_bucket_processor_mhash_cpu - number of CPUs per minhash hash + :param fdedup_bucket_processor_processor_cpu - number of cpus for bucket processor + :param fdedup_bucket_processor_num_buckets - number of bucket cache actors (same as preprocessor) + :param fdedup_bucket_processor_num_doc_id - number of doc cache actors (same as preprocessor) + :param fdedup_bucket_processor_num_minhashes - number of minhash cache actors (same as preprocessor) + :param fdedup_bucket_processor_num_permutations - number of permutations + :param fdedup_bucket_processor_threshold - threshold + :param fdedup_bucket_processor_minhash_snapshot_directory - snapshot directory for minhashes + :param fdedup_bucket_processor_buckets_snapshot_directory - snapshot directory for buckets + :param fdedup_bucket_processor_doc_id_snapshot_directory - snapshot directory for docs + :return: None + """ + # create clean_up task + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) + ComponentUtils.add_settings_to_component(clean_up_task, 60) + # pipeline definition + with dsl.ExitHandler(clean_up_task): + # compute execution params + compute_exec_params = compute_exec_params_op( + ray_worker_options=ray_worker_options, + runtime_actor_options=runtime_actor_options, + data_s3_config=data_s3_config, + data_max_files=data_max_files, + data_num_samples=data_num_samples, + runtime_pipeline_id=runtime_pipeline_id, + runtime_job_id=run_id, + runtime_code_location=runtime_code_location, + fdedup_bucket_processor_num_permutations=fdedup_bucket_processor_num_permutations, + fdedup_bucket_processor_threshold=fdedup_bucket_processor_threshold, + fdedup_bucket_processor_bucket_cpu=fdedup_bucket_processor_bucket_cpu, + fdedup_bucket_processor_doc_cpu=fdedup_bucket_processor_doc_cpu, + fdedup_bucket_processor_mhash_cpu=fdedup_bucket_processor_mhash_cpu, + fdedup_bucket_processor_processor_cpu=fdedup_bucket_processor_processor_cpu, + fdedup_bucket_processor_num_buckets=fdedup_bucket_processor_num_buckets, + fdedup_bucket_processor_num_doc_id=fdedup_bucket_processor_num_doc_id, + fdedup_bucket_processor_num_minhashes=fdedup_bucket_processor_num_minhashes, + fdedup_bucket_processor_minhash_snapshot_directory=fdedup_bucket_processor_minhash_snapshot_directory, + fdedup_bucket_processor_buckets_snapshot_directory=fdedup_bucket_processor_buckets_snapshot_directory, + fdedup_bucket_processor_doc_id_snapshot_directory=fdedup_bucket_processor_doc_id_snapshot_directory, + ) + ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) + ComponentUtils.set_s3_env_vars_to_component(compute_exec_params, data_s3_access_secret) + + # start Ray cluster + ray_cluster = create_ray_op( + ray_name=ray_name, + run_id=run_id, + ray_head_options=ray_head_options, + ray_worker_options=ray_worker_options, + server_url=server_url, + additional_params=additional_params, + ) + ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) + ray_cluster.after(compute_exec_params) + # Execute job + execute_job = execute_ray_jobs_op( + ray_name=ray_name, + run_id=run_id, + additional_params=additional_params, + exec_params=compute_exec_params.output, + exec_script_name=EXEC_SCRIPT_NAME, + server_url=server_url, + ) + ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC) + ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) + execute_job.after(ray_cluster) + + +if __name__ == "__main__": + # Compiling the pipeline + compiler.Compiler().compile(fdedup_bucket_processor, __file__.replace(".py", ".yaml")) diff --git a/transforms/universal/fdedup_multi_step/kfp_ray/fdedup_filter_wf.py b/transforms/universal/fdedup_multi_step/kfp_ray/fdedup_filter_wf.py new file mode 100644 index 000000000..487a1ac97 --- /dev/null +++ b/transforms/universal/fdedup_multi_step/kfp_ray/fdedup_filter_wf.py @@ -0,0 +1,194 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +import kfp.compiler as compiler +import kfp.components as comp +import kfp.dsl as dsl +from src.fdedup_filter_compute_execution_params import fdedup_filter_compute_execution_params +from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils + + +task_image = "quay.io/dataprep1/data-prep-kit/fdedup-ray:latest" + +# the name of the job script +EXEC_SCRIPT_NAME: str = "fdedup_filter_transform_ray.py" + +# components +base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" + +# path to kfp component specifications files +component_spec_path = "../../../../kfp/kfp_ray_components/" + +# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the +# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. +# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use +# this if/else statement and explicitly call the decorator. +if os.getenv("KFPv2", "0") == "1": + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at + # compilation time. + import uuid + + compute_exec_params_op = dsl.component_decorator.component( + func=fdedup_filter_compute_execution_params, base_image=base_kfp_image + ) + print( + "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + + "same version of the same pipeline !!!" + ) + run_id = uuid.uuid4().hex +else: + compute_exec_params_op = comp.create_component_from_func( + func=fdedup_filter_compute_execution_params, base_image=base_kfp_image + ) + run_id = dsl.RUN_ID_PLACEHOLDER + +# create Ray cluster +create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") +# execute job +execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml") +# clean up Ray +cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml") + +# Task name is part of the pipeline name, the ray cluster name and the job name in DMF. +TASK_NAME: str = "fdedup_filter" + + +@dsl.pipeline( + name=TASK_NAME + "-ray-pipeline", + description="Pipeline for fdedup bucket_processor", +) +def fdedup_filter( + # Ray cluster + ray_name: str = "fdedup-bucket_processor_kfp-ray", # name of Ray cluster + # Add image_pull_secret and image_pull_policy to ray workers if needed + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, + server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", + # data access. checkpointing is not supported by dedup + data_s3_config: str = "{'input_folder': 'test/fdedup/input/', 'output_folder': 'test/fdedup/output/'}", + data_s3_access_secret: str = "s3-secret", + data_max_files: int = -1, + data_num_samples: int = -1, + # orchestrator + runtime_actor_options: dict = {"num_cpus": 0.8}, + runtime_pipeline_id: str = "pipeline_id", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, + # columns used + fdedup_filter_doc_column: str = "contents", + fdedup_filter_doc_id_column: str = "int_id_column", + fdedup_filter_cluster_column: str = "cluster", + fdedup_filter_cluster_removed_docs_column: str = "removed_docs", + # infrastructure + fdedup_filter_doc_cpu: float = 0.5, + fdedup_filter_num_doc_id: int = 1, + # snapshotting + fdedup_filter_doc_id_snapshot_directory: str = None, + # additional parameters + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', +): + """ + Pipeline to execute FDEDUP transform + :param ray_name: name of the Ray cluster + :param ray_head_options: head node options, containing the following: + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + :param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following: + replicas - number of replicas to create + max_replicas - max number of replicas + min_replicas - min number of replicas + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + :param server_url - server url + :param additional_params: additional (support) parameters, containing the following: + wait_interval - wait interval for API server, sec + wait_cluster_ready_tmout - time to wait for cluster ready, sec + wait_cluster_up_tmout - time to wait for cluster up, sec + wait_job_ready_tmout - time to wait for job ready, sec + wait_print_tmout - time between prints, sec + http_retries - http retries for API server calls + :param data_s3_access_secret - s3 access secret + :param data_s3_config - s3 configuration + :param data_max_files - max files to process + :param data_num_samples - num samples to process + :param runtime_actor_options - actor options + :param runtime_pipeline_id - pipeline id + :param runtime_code_location - code location + :param fdedup_filter_doc_column - doc column name + :param fdedup_filter_doc_id_column - doc id column name + :param fdedup_filter_cluster_column - cluster column name + :param fdedup_filter_cluster_removed_docs_column - removed docs column name + :param fdedup_filter_doc_cpu - number of CPUs per doc hash + :param fdedup_filter_num_doc_id - number of doc cache actors (same as preprocessor) + :param fdedup_filter_doc_id_snapshot_directory - snapshot directory for docs + :return: None + """ + # create clean_up task + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) + ComponentUtils.add_settings_to_component(clean_up_task, 60) + # pipeline definition + with dsl.ExitHandler(clean_up_task): + # compute execution params + compute_exec_params = compute_exec_params_op( + ray_worker_options=ray_worker_options, + runtime_actor_options=runtime_actor_options, + data_s3_config=data_s3_config, + data_max_files=data_max_files, + data_num_samples=data_num_samples, + runtime_pipeline_id=runtime_pipeline_id, + runtime_job_id=run_id, + runtime_code_location=runtime_code_location, + fdedup_filter_doc_column=fdedup_filter_doc_column, + fdedup_filter_doc_id_column=fdedup_filter_doc_id_column, + fdedup_filter_cluster_column=fdedup_filter_cluster_column, + fdedup_filter_cluster_removed_docs_column=fdedup_filter_cluster_removed_docs_column, + fdedup_filter_doc_cpu=fdedup_filter_doc_cpu, + fdedup_filter_num_doc_id=fdedup_filter_num_doc_id, + fdedup_filter_doc_id_snapshot_directory=fdedup_filter_doc_id_snapshot_directory, + ) + ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) + ComponentUtils.set_s3_env_vars_to_component(compute_exec_params, data_s3_access_secret) + + # start Ray cluster + ray_cluster = create_ray_op( + ray_name=ray_name, + run_id=run_id, + ray_head_options=ray_head_options, + ray_worker_options=ray_worker_options, + server_url=server_url, + additional_params=additional_params, + ) + ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) + ray_cluster.after(compute_exec_params) + # Execute job + execute_job = execute_ray_jobs_op( + ray_name=ray_name, + run_id=run_id, + additional_params=additional_params, + exec_params=compute_exec_params.output, + exec_script_name=EXEC_SCRIPT_NAME, + server_url=server_url, + ) + ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC) + ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) + execute_job.after(ray_cluster) + + +if __name__ == "__main__": + # Compiling the pipeline + compiler.Compiler().compile(fdedup_filter, __file__.replace(".py", ".yaml")) diff --git a/transforms/universal/fdedup_multi_step/kfp_ray/fdedup_preprocessor_wf.py b/transforms/universal/fdedup_multi_step/kfp_ray/fdedup_preprocessor_wf.py new file mode 100644 index 000000000..9486a0145 --- /dev/null +++ b/transforms/universal/fdedup_multi_step/kfp_ray/fdedup_preprocessor_wf.py @@ -0,0 +1,218 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +import kfp.compiler as compiler +import kfp.components as comp +import kfp.dsl as dsl +from src.fdedup_preprocessor_compute_execution_params import fdedup_preprocessor_compute_execution_params +from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils + + +task_image = "quay.io/dataprep1/data-prep-kit/fdedup-ray:latest" + +# the name of the job script +EXEC_SCRIPT_NAME: str = "fdedup_preprocessor_transform_ray.py" + +# components +base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" + +# path to kfp component specifications files +component_spec_path = "../../../../kfp/kfp_ray_components/" + +# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the +# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. +# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use +# this if/else statement and explicitly call the decorator. +if os.getenv("KFPv2", "0") == "1": + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at + # compilation time. + import uuid + + compute_exec_params_op = dsl.component_decorator.component( + func=fdedup_preprocessor_compute_execution_params, base_image=base_kfp_image + ) + print( + "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + + "same version of the same pipeline !!!" + ) + run_id = uuid.uuid4().hex +else: + compute_exec_params_op = comp.create_component_from_func( + func=fdedup_preprocessor_compute_execution_params, base_image=base_kfp_image + ) + run_id = dsl.RUN_ID_PLACEHOLDER + +# create Ray cluster +create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") +# execute job +execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml") +# clean up Ray +cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml") + +# Task name is part of the pipeline name, the ray cluster name and the job name in DMF. +TASK_NAME: str = "fdedup_preprocessor" + + +@dsl.pipeline( + name=TASK_NAME + "-ray-pipeline", + description="Pipeline for fdedup preprocessor", +) +def fdedup_preprocessor( + # Ray cluster + ray_name: str = "fdedup-preprocessor_kfp-ray", # name of Ray cluster + # Add image_pull_secret and image_pull_policy to ray workers if needed + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, + server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", + # data access. checkpointing is not supported by dedup + data_s3_config: str = "{'input_folder': 'test/fdedup/input/', 'output_folder': 'test/fdedup/output/'}", + data_s3_access_secret: str = "s3-secret", + data_max_files: int = -1, + data_num_samples: int = -1, + # orchestrator + runtime_actor_options: dict = {"num_cpus": 0.8}, + runtime_pipeline_id: str = "pipeline_id", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, + # columns used + fdedup_preprocessor_doc_column: str = "contents", + fdedup_preprocessor_doc_id_column: str = "int_id_column", + # infrastructure + fdedup_preprocessor_bucket_cpu: float = 0.5, + fdedup_preprocessor_doc_cpu: float = 0.5, + fdedup_preprocessor_mhash_cpu: float = 0.5, + # fuzzy parameters + fdedup_preprocessor_num_permutations: int = 64, + fdedup_preprocessor_threshold: float = 0.8, + fdedup_preprocessor_shingles_size: int = 5, + fdedup_preprocessor_delimiters: str = " ", + # snapshotting + fdedup_preprocessor_minhash_snapshot_directory: str = None, + fdedup_preprocessor_buckets_snapshot_directory: str = None, + fdedup_preprocessor_doc_id_snapshot_directory: str = None, + # data sampling + fdedup_preprocessor_n_samples: int = 10, + # number of docs to process + fdedup_preprocessor_n_docs: int = -1, + # additional parameters + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', +): + """ + Pipeline to execute FDEDUP transform + :param ray_name: name of the Ray cluster + :param ray_head_options: head node options, containing the following: + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + :param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following: + replicas - number of replicas to create + max_replicas - max number of replicas + min_replicas - min number of replicas + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + :param server_url - server url + :param additional_params: additional (support) parameters, containing the following: + wait_interval - wait interval for API server, sec + wait_cluster_ready_tmout - time to wait for cluster ready, sec + wait_cluster_up_tmout - time to wait for cluster up, sec + wait_job_ready_tmout - time to wait for job ready, sec + wait_print_tmout - time between prints, sec + http_retries - http retries for API server calls + :param data_s3_access_secret - s3 access secret + :param data_s3_config - s3 configuration + :param data_max_files - max files to process + :param data_num_samples - num samples to process + :param runtime_actor_options - actor options + :param runtime_pipeline_id - pipeline id + :param runtime_code_location - code location + :param fdedup_preprocessor_doc_column - document column name + :param fdedup_preprocessor_doc_id_column - integer document id column name + :param fdedup_preprocessor_bucket_cpu - number of CPUs per bucket hash + :param fdedup_preprocessor_doc_cpu - number of CPUs per doc hash + :param fdedup_preprocessor_mhash_cpu - number of CPUs per minhash hash + :param fdedup_preprocessor_num_permutations - number of permutations + :param fdedup_preprocessor_threshold - threshold + :param fdedup_preprocessor_shingles_size - number of words in shingle + :param fdedup_preprocessor_delimiters - delimiter for splitting document + :param fdedup_preprocessor_minhash_snapshot_directory - snapshot directory for minhashes + :param fdedup_preprocessor_buckets_snapshot_directory - snapshot directory for buckets + :param fdedup_preprocessor_doc_id_snapshot_directory - snapshot directory for docs + :param fdedup_preprocessor_n_samples - number of samples for parameters computation + :param fdedup_preprocessor_n_docs - number of documents in the source data + :return: None + """ + # create clean_up task + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) + ComponentUtils.add_settings_to_component(clean_up_task, 60) + # pipeline definition + with dsl.ExitHandler(clean_up_task): + # compute execution params + compute_exec_params = compute_exec_params_op( + ray_worker_options=ray_worker_options, + runtime_actor_options=runtime_actor_options, + data_s3_config=data_s3_config, + data_max_files=data_max_files, + data_num_samples=data_num_samples, + runtime_pipeline_id=runtime_pipeline_id, + runtime_job_id=run_id, + runtime_code_location=runtime_code_location, + fdedup_preprocessor_doc_column=fdedup_preprocessor_doc_column, + fdedup_preprocessor_doc_id_column=fdedup_preprocessor_doc_id_column, + fdedup_preprocessor_num_permutations=fdedup_preprocessor_num_permutations, + fdedup_preprocessor_threshold=fdedup_preprocessor_threshold, + fdedup_preprocessor_shingles_size=fdedup_preprocessor_shingles_size, + fdedup_preprocessor_delimiters=fdedup_preprocessor_delimiters, + fdedup_preprocessor_bucket_cpu=fdedup_preprocessor_bucket_cpu, + fdedup_preprocessor_doc_cpu=fdedup_preprocessor_doc_cpu, + fdedup_preprocessor_mhash_cpu=fdedup_preprocessor_mhash_cpu, + fdedup_preprocessor_minhash_snapshot_directory=fdedup_preprocessor_minhash_snapshot_directory, + fdedup_preprocessor_buckets_snapshot_directory=fdedup_preprocessor_buckets_snapshot_directory, + fdedup_preprocessor_doc_id_snapshot_directory=fdedup_preprocessor_doc_id_snapshot_directory, + fdedup_preprocessor_n_samples=fdedup_preprocessor_n_samples, + fdedup_preprocessor_n_docs=fdedup_preprocessor_n_docs, + ) + ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) + ComponentUtils.set_s3_env_vars_to_component(compute_exec_params, data_s3_access_secret) + + # start Ray cluster + ray_cluster = create_ray_op( + ray_name=ray_name, + run_id=run_id, + ray_head_options=ray_head_options, + ray_worker_options=ray_worker_options, + server_url=server_url, + additional_params=additional_params, + ) + ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) + ray_cluster.after(compute_exec_params) + # Execute job + execute_job = execute_ray_jobs_op( + ray_name=ray_name, + run_id=run_id, + additional_params=additional_params, + exec_params=compute_exec_params.output, + exec_script_name=EXEC_SCRIPT_NAME, + server_url=server_url, + ) + ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC) + ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) + execute_job.after(ray_cluster) + + +if __name__ == "__main__": + # Compiling the pipeline + compiler.Compiler().compile(fdedup_preprocessor, __file__.replace(".py", ".yaml")) diff --git a/transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_bucket_processor_compute_execution_params.py b/transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_bucket_processor_compute_execution_params.py new file mode 100644 index 000000000..445a63ad8 --- /dev/null +++ b/transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_bucket_processor_compute_execution_params.py @@ -0,0 +1,131 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from typing import Any + + +def fdedup_bucket_processor_compute_execution_params( + worker_options: dict, # ray worker configuration + runtime_actor_options: dict, # actor's resource requirements + data_s3_config: str, # s3 configuration + data_max_files: int, # max files to process + data_num_samples: int, # num samples to process + runtime_pipeline_id: str, # pipeline id + runtime_job_id: str, # job id + runtime_code_location: dict, # code location + fdedup_bucket_processor_num_permutations: int, # number of permutations + fdedup_bucket_processor_threshold: float, # threshold, + fdedup_bucket_processor_bucket_cpu: float, # number of CPUs per bucket hash + fdedup_bucket_processor_doc_cpu: float, # number of CPUs per doc hash + fdedup_bucket_processor_mhash_cpu: float, # number of CPUs per minhash hash + fdedup_bucket_processor_processor_cpu: float, # number of CPUs per bucket processor + fdedup_bucket_processor_num_buckets: int, # number of actors for bucket hash + fdedup_bucket_processor_num_doc_id: int, # number of actors fo doc hash + fdedup_bucket_processor_num_minhashes: int, # number of actors for minhash hash + fdedup_bucket_processor_minhash_snapshot_directory: str, # minhash_snapshot_directory + fdedup_bucket_processor_buckets_snapshot_directory: str, # buckets_snapshot_directory + fdedup_bucket_processor_doc_id_snapshot_directory: str, # doc id snapshot directory +) -> dict[str, Any]: + """ + Compute fuzzy dedup bucket processor execution parameters + :param worker_options: cluster parameters + :param runtime_actor_options: actor request requirements + :param data_s3_config: s3 configuration + :param data_max_files: max files to process + :param data_num_samples: num samples to process + :param runtime_pipeline_id: pipeline id + :param runtime_job_id: job id + :param runtime_code_location: code location + :param fdedup_bucket_processor_bucket_cpu: number of CPUs per bucket hash + :param fdedup_bucket_processor_doc_cpu: number of CPUs per doc hash + :param fdedup_bucket_processor_mhash_cpu: number of CPUs per minhash hash + :param fdedup_bucket_processor_processor_cpu: number of CPUs per bucket processor + :param fdedup_bucket_processor_num_buckets: number of actors for bucket hash + :param fdedup_bucket_processor_num_doc_id: number of actors fo doc hash + :param fdedup_bucket_processor_num_minhashes: number of actors for minhash hash + :param fdedup_bucket_processor_num_permutations: number of permutations + :param fdedup_bucket_processor_threshold: threshold, + :param fdedup_bucket_processor_minhash_snapshot_directory: minhash snapshot directory + :param fdedup_bucket_processor_doc_id_snapshot_directory: doc id snapshot directory + :param fdedup_bucket_processor_buckets_snapshot_directory: buckets snapshot directory + :return: a dictionary with a Ray Job execution parameters + """ + import math + import sys + + cluster_cpu = worker_options["replicas"] * worker_options["cpu"] + cluster_memory = worker_options["replicas"] * worker_options["memory"] + print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}") + cluster_cpu *= 0.85 + cluster_memory *= 0.85 + # get actor requirements + worker_cpu = runtime_actor_options["num_cpus"] + print(f"worker required cpu {worker_cpu}") + # Define number of workers. + n_workers = math.ceil(fdedup_bucket_processor_num_buckets / 50) + b_processors = int( + (0.85 * cluster_cpu - fdedup_bucket_processor_num_buckets * fdedup_bucket_processor_bucket_cpu + - fdedup_bucket_processor_num_minhashes * fdedup_bucket_processor_mhash_cpu + - fdedup_bucket_processor_num_doc_id * fdedup_bucket_processor_doc_cpu + - n_workers * worker_cpu) / fdedup_bucket_processor_processor_cpu + ) + if b_processors < 0: + print(f"Not enough CPUs to run fuzzy dedup bucket processor, computed number of bucket_processors " + f"is {b_processors}") + print(f"Required bucket actors {fdedup_bucket_processor_num_buckets}, minhash actors " + f"{fdedup_bucket_processor_num_minhashes}, document actors {fdedup_bucket_processor_num_doc_id}, " + f"workers {n_workers}") + print("Try to increase the size of the cluster") + sys.exit(1) + # cap the number of workers to ensure that we do not overwhelm COS + print( + f"Number of workers: {n_workers}, bucket actors {fdedup_bucket_processor_num_buckets}, " + f"minhash actors {fdedup_bucket_processor_num_minhashes}, " + f"document actors {fdedup_bucket_processor_num_doc_id}, " + f"bucket processors {b_processors}" + ) + + # Make sure that we have enough memory. We assume that each actor uses 3 GB memory + r_mem = 3 * (n_workers + b_processors + fdedup_bucket_processor_num_buckets + + fdedup_bucket_processor_num_minhashes + fdedup_bucket_processor_num_doc_id) + if r_mem > cluster_memory: + print(f"Not enough memory to run de duping, required {r_mem}, available {cluster_memory}") + print(f"Try to increase the size of the cluster or increase size of the cpu per worker (current {worker_cpu})") + sys.exit(1) + required_cpu = (fdedup_bucket_processor_num_buckets * fdedup_bucket_processor_bucket_cpu + + fdedup_bucket_processor_num_minhashes * fdedup_bucket_processor_mhash_cpu + + fdedup_bucket_processor_num_doc_id * fdedup_bucket_processor_doc_cpu + + n_workers * worker_cpu + b_processors * fdedup_bucket_processor_processor_cpu) + print(f"Required execution cpu : {required_cpu}, Required execution memory {r_mem} GB") + # return results + return { + "data_s3_config": data_s3_config, + "data_max_files": data_max_files, + "data_num_samples": data_num_samples, + "runtime_num_workers": n_workers, + "runtime_worker_options": str(runtime_actor_options), + "runtime_pipeline_id": runtime_pipeline_id, + "runtime_job_id": runtime_job_id, + "runtime_code_location": str(runtime_code_location), + "fdedup_bucket_processor_bucket_cpu": fdedup_bucket_processor_bucket_cpu, + "fdedup_bucket_processor_doc_cpu": fdedup_bucket_processor_doc_cpu, + "fdedup_bucket_processor_mhash_cpu": fdedup_bucket_processor_mhash_cpu, + "fdedup_bucket_processor_processor_cpu": fdedup_bucket_processor_processor_cpu, + "fdedup_bucket_processor_num_doc_id": fdedup_bucket_processor_num_doc_id, + "fdedup_bucket_processor_num_buckets": fdedup_bucket_processor_num_buckets, + "fdedup_bucket_processor_num_minhashes": fdedup_bucket_processor_num_minhashes, + "fdedup_bucket_processor_num_processors": b_processors, + "fdedup_num_permutations": fdedup_bucket_processor_num_permutations, + "fdedup_threshold": fdedup_bucket_processor_threshold, + "fdedup_bucket_processor_minhash_snapshot_directory": fdedup_bucket_processor_minhash_snapshot_directory, + "fdedup_bucket_processor_buckets_snapshot_directory": fdedup_bucket_processor_buckets_snapshot_directory, + "fdedup_bucket_processor_doc_id_snapshot_directory": fdedup_bucket_processor_doc_id_snapshot_directory, + } diff --git a/transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_filter_compute_execution_params.py b/transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_filter_compute_execution_params.py new file mode 100644 index 000000000..09606d7a5 --- /dev/null +++ b/transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_filter_compute_execution_params.py @@ -0,0 +1,102 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from typing import Any + + +def fdedup_filter_compute_execution_params( + worker_options: dict, # ray worker configuration + actor_options: dict, # actor's resource requirements + data_s3_config: str, # s3 configuration + data_max_files: int, # max files to process + data_num_samples: int, # num samples to process + runtime_pipeline_id: str, # pipeline id + runtime_job_id: str, # job id + runtime_code_location: dict, # code location + fdedup_filter_doc_column: str, # doc column name + fdedup_filter_doc_id_column: str, # doc id column name + fdedup_filter_cluster_column: str, # cluster column name + fdedup_filter_cluster_removed_docs_column: str, # removed docs column name + fdedup_filter_doc_cpu: float, # number of CPUs per doc hash + fdedup_filter_num_doc_id: int, # number of actors fo doc hash + fdedup_filter_doc_id_snapshot_directory: str, # doc id snapshot directory +) -> dict[str, Any]: + + """ + Compute fuzzy dedup filter execution parameters + :param worker_options: cluster parameters + :param actor_options: actor request requirements + :param data_s3_config: s3 configuration + :param data_max_files: max files to process + :param data_num_samples: num samples to process + :param runtime_pipeline_id: pipeline id + :param runtime_job_id: job id + :param runtime_code_location: code location + :param fdedup_filter_doc_column: doc column name + :param fdedup_filter_doc_id_column: doc id column name + :param fdedup_filter_cluster_column: cluster column name + :param fdedup_filter_cluster_removed_docs_column: removed docs column name + :param fdedup_filter_doc_cpu: number of CPUs per doc hash + :param fdedup_filter_num_doc_id: number of actors fo doc hash + :param fdedup_filter_doc_id_snapshot_directory: doc id snapshot directory + :return: a dictionary with a Ray Job execution parameters + """ + import sys + + cluster_cpu = worker_options["replicas"] * worker_options["cpu"] + cluster_memory = worker_options["replicas"] * worker_options["memory"] + print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}") + cluster_cpu *= 0.85 + cluster_memory *= 0.85 + # get actor requirements + worker_cpu = actor_options["num_cpus"] + print(f"worker required cpu {worker_cpu}") + # Define number of workers. + n_workers = int( + (0.85 * cluster_cpu - fdedup_filter_num_doc_id * fdedup_filter_doc_cpu) / worker_cpu + ) + if n_workers < 0: + print(f"Not enough CPUs to run fuzzy dedup filer, computed number of workers " + f"is {n_workers}") + print(f"Required document actors {fdedup_filter_num_doc_id}") + print("Try to increase the size of the cluster") + sys.exit(1) + # cap the number of workers to ensure that we do not overwhelm COS + if n_workers > 1000: + n_workers = 1000 + print(f"Number of workers: {n_workers}, document actors {fdedup_filter_num_doc_id}") + + # Make sure that we have enough memory. We assume that each actor uses 3 GB memory + r_mem = 3 * (n_workers + fdedup_filter_num_doc_id) + if r_mem > cluster_memory: + print(f"Not enough memory to run de duping, required {r_mem}, available {cluster_memory}") + print(f"Try to increase the size of the cluster or increase size of the cpu per worker (current {worker_cpu})") + sys.exit(1) + required_cpu = (fdedup_filter_num_doc_id * fdedup_filter_doc_cpu + n_workers * worker_cpu) + print(f"Required execution cpu : {required_cpu}, Required execution memory {r_mem} GB") + # return results + return { + "data_s3_config": data_s3_config, + "data_max_files": data_max_files, + "data_num_samples": data_num_samples, + "runtime_num_workers": n_workers, + "runtime_worker_options": str(actor_options), + "runtime_pipeline_id": runtime_pipeline_id, + "runtime_job_id": runtime_job_id, + "runtime_code_location": str(runtime_code_location), + "fdedup_filter_doc_column": fdedup_filter_doc_column, + "fdedup_filter_doc_id_column": fdedup_filter_doc_id_column, + "fdedup_filter_cluster_column": fdedup_filter_cluster_column, + "fdedup_filter_cluster_removed_docs_column": fdedup_filter_cluster_removed_docs_column, + "fdedup_filter_doc_cpu": fdedup_filter_doc_cpu, + "fdedup_filter_num_doc_id": fdedup_filter_num_doc_id, + "fdedup_filter_doc_id_snapshot_directory": fdedup_filter_doc_id_snapshot_directory, + } diff --git a/transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_preprocessor_compute_execution_params.py b/transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_preprocessor_compute_execution_params.py new file mode 100644 index 000000000..f8a44b907 --- /dev/null +++ b/transforms/universal/fdedup_multi_step/kfp_ray/src/fdedup_preprocessor_compute_execution_params.py @@ -0,0 +1,224 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from typing import Any + + +def fdedup_preprocessor_compute_execution_params( + ray_worker_options: dict, # ray worker configuration + runtime_actor_options: dict, # actor's resource requirements + data_s3_config: str, # s3 configuration + data_max_files: int, # max files to process + data_num_samples: int, # num samples to process + runtime_pipeline_id: str, # pipeline id + runtime_job_id: str, # job id + runtime_code_location: dict, # code location + fdedup_preprocessor_doc_column: str, # document column name + fdedup_preprocessor_doc_id_column: str, # integer document id column name + fdedup_preprocessor_num_permutations: int, # number of permutations + fdedup_preprocessor_threshold: float, # threshold, + fdedup_preprocessor_shingles_size: int, # number of words in shingle + fdedup_preprocessor_delimiters: str, # delimiter for splitting document + fdedup_preprocessor_bucket_cpu: float, # number of CPUs per bucket hash + fdedup_preprocessor_doc_cpu: float, # number of CPUs per doc hash + fdedup_preprocessor_mhash_cpu: float, # number of CPUs per minhash hash + fdedup_preprocessor_minhash_snapshot_directory: str, # minhash_snapshot_directory + fdedup_preprocessor_buckets_snapshot_directory: str, # buckets_snapshot_directory + fdedup_preprocessor_doc_id_snapshot_directory: str, # doc id snapshot directory + fdedup_preprocessor_n_samples: int, # number of samples to use for data evaluation + fdedup_preprocessor_n_docs: int, # number of source documents +) -> dict[str, Any]: + + """ + Compute fuzzy dedup preprocessor execution parameters + :param ray_worker_options: cluster parameters + :param runtime_actor_options: actor request requirements + :param data_s3_config: s3 configuration + :param data_max_files: max files to process + :param data_num_samples: num samples to process + :param runtime_pipeline_id: pipeline id + :param runtime_job_id: job id + :param runtime_code_location: code location + :param fdedup_preprocessor_doc_column: document column name + :param fdedup_preprocessor_doc_id_column: integer document id column name + :param fdedup_preprocessor_bucket_cpu: number of CPUs per bucket hash + :param fdedup_preprocessor_doc_cpu: number of CPUs per doc hash + :param fdedup_preprocessor_mhash_cpu: number of CPUs per minhash hash + :param fdedup_preprocessor_num_permutations: number of permutations + :param fdedup_preprocessor_threshold: threshold, + :param fdedup_preprocessor_shingles_size: number of words in shingle + :param fdedup_preprocessor_delimiters: delimiter for splitting document + :param fdedup_preprocessor_minhash_snapshot_directory: minhash snapshot directory + :param fdedup_preprocessor_doc_id_snapshot_directory: doc id snapshot directory + :param fdedup_preprocessor_buckets_snapshot_directory: buckets snapshot directory + :param fdedup_preprocessor_n_samples: number of samples to use for data evaluation + :param fdedup_preprocessor_n_docs: number of source documents (if negative use sampling) + :return: a dictionary with a Ray Job execution parameters + """ + import math + import sys + + from data_processing.data_access import DataAccessFactory + from data_processing.utils import GB, KB + from runtime_utils import KFPUtils + from scipy.integrate import quad as integrate + + def fuzzy_optimal_param( + threshold: float, + num_perm: int, + false_positive_weight: float, + false_negative_weight: float, + ) -> tuple[int, int]: + """ + Computes parameters for fuzzy dedup + :param threshold: filtering threshold + :param num_perm: number of permutations + :param false_positive_weight: false positive weight + :param false_negative_weight: false negative weight + :return: number of buckets and bucket length + """ + + def _false_positive_probability(ths: float, b: int, r: int) -> float: + """ + Compute false positive probability + :param ths: filtering threshold + :param b: permutation + :param r: rel permutation + :return: probability + """ + _probability = lambda s: 1 - (1 - s ** float(r)) ** float(b) + a, err = integrate(_probability, 0.0, ths) + return a + + def _false_negative_probability(ths: float, b: int, r: int) -> float: + """ + Compute false negative probability + :param ths: filtering threshold + :param b: permutation + :param r: rel permutation + :return: probability + """ + _probability = lambda s: 1 - (1 - (1 - s ** float(r)) ** float(b)) + a, err = integrate(_probability, ths, 1.0) + return a + + min_error = float("inf") + opt = (0, 0) + for perm in range(1, num_perm + 1): + max_r = int(num_perm / perm) + for rel in range(1, max_r + 1): + fp = _false_positive_probability(threshold, perm, rel) + fn = _false_negative_probability(threshold, perm, rel) + error = fp * false_positive_weight + fn * false_negative_weight + if error < min_error: + min_error = error + opt = (perm, rel) + return opt + + # fuzzy parameters + num_buckets, length_bucket = fuzzy_optimal_param( + threshold=fdedup_preprocessor_threshold, + num_perm=fdedup_preprocessor_num_permutations, + false_positive_weight=0.5, + false_negative_weight=0.5, + ) + print(f"Fuzzy parameters: num buckets {num_buckets}, bucket length {length_bucket}") + # Get cluster parameters + cluster_cpu = ray_worker_options["replicas"] * ray_worker_options["cpu"] + cluster_memory = ray_worker_options["replicas"] * ray_worker_options["memory"] + print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}") + cluster_cpu *= 0.85 + cluster_memory *= 0.85 + # get actor requirements + worker_cpu = runtime_actor_options["num_cpus"] + print(f"worker required cpu {worker_cpu}") + if fdedup_preprocessor_n_docs > 0: + # number of docs specified - use it + number_of_docs = fdedup_preprocessor_n_docs + print(f"Specified number of docs {number_of_docs}") + else: + # get credentials + s3_key, s3_secret, s3_endpoint = KFPUtils.credentials() + s3_creds = {"access_key": s3_key, "secret_key": s3_secret, "url": s3_endpoint} + # create data access + s3_config = KFPUtils.load_from_json(data_s3_config.replace("'", '"')) + if type(s3_config) is list: + # S3 config is list. take the first element + s3_config = s3_config[0] + data_access_factory = DataAccessFactory() + data_access_factory.apply_input_params({"data_s3_config": s3_config, "data_s3_cred": s3_creds}) + data_access = data_access_factory.create_data_access() + # sample input data + sampling, _ = data_access.sample_input_data(n_samples=fdedup_preprocessor_n_samples) + avg_doc_size = sampling.get("average doc size KB") + number_of_docs = sampling.get("estimated number of docs") + if number_of_docs == 0: + print(f"Estimated number of documents and documents size is zero. Please verify the input path.") + sys.exit(1) + print(f"Based on sampling, avg doc size: {avg_doc_size}, number of docs {number_of_docs}") + + # we are creating more buckets actors, so that we get better parallelization for bucket processing + b_actors = math.ceil(num_buckets * number_of_docs * 64 * 1.1 / GB) + d_actors = math.ceil(number_of_docs * 48 * 1.1 / GB) + m_actors = math.ceil(number_of_docs * 128 * 1.1 / GB) + # Define number of workers. + n_workers = int( + (0.85 * cluster_cpu - b_actors * fdedup_preprocessor_bucket_cpu - m_actors * fdedup_preprocessor_mhash_cpu + - d_actors * fdedup_preprocessor_doc_cpu) / worker_cpu + ) + if n_workers < 0: + print(f"Not enough CPUs to run fuzzy dedup preprocessor, computed number of workers is {n_workers}") + print(f"Required bucket actors {b_actors}, minhash actors {m_actors}, document actors {d_actors}") + print("Try to increase the size of the cluster") + sys.exit(1) + # cap the number of workers to ensure that we do not overwhelm COS + if n_workers > 1000: + n_workers = 1000 + print( + f"Number of workers: {n_workers}, bucket actors {b_actors}, " + f"minhash actors {m_actors}, document actors {d_actors}" + ) + + # Make sure that we have enough memory. We assume that each actor uses 3 GB memory + r_mem = 3 * (n_workers + b_actors + m_actors + d_actors) + if r_mem > cluster_memory: + print(f"Not enough memory to run de duping, required {r_mem}, available {cluster_memory}") + print(f"Try to increase the size of the cluster or increase size of the cpu per worker (current {worker_cpu})") + sys.exit(1) + required_cpu = (b_actors * fdedup_preprocessor_bucket_cpu + m_actors * fdedup_preprocessor_mhash_cpu + + d_actors * fdedup_preprocessor_doc_cpu + n_workers * worker_cpu) + print(f"Required execution cpu : {required_cpu}, Required execution memory {r_mem} GB") + # return results + return { + "data_s3_config": data_s3_config, + "data_max_files": data_max_files, + "data_num_samples": data_num_samples, + "runtime_num_workers": n_workers, + "runtime_worker_options": str(runtime_actor_options), + "runtime_pipeline_id": runtime_pipeline_id, + "runtime_job_id": runtime_job_id, + "runtime_code_location": str(runtime_code_location), + "fdedup_preprocessor_doc_column": fdedup_preprocessor_doc_column, + "fdedup_preprocessor_doc_id_column": fdedup_preprocessor_doc_id_column, + "fdedup_preprocessor_bucket_cpu": fdedup_preprocessor_bucket_cpu, + "fdedup_preprocessor_doc_cpu": fdedup_preprocessor_doc_cpu, + "fdedup_preprocessor_mhash_cpu": fdedup_preprocessor_mhash_cpu, + "fdedup_preprocessor__num_doc_id": d_actors, + "fdedup_preprocessor_num_buckets": b_actors, + "fdedup_preprocessor_num_minhashs": m_actors, + "fdedup_num_permutations": fdedup_preprocessor_num_permutations, + "fdedup_threshold": fdedup_preprocessor_threshold, + "fdedup_shingles_size": fdedup_preprocessor_shingles_size, + "fdedup_delimiters": fdedup_preprocessor_delimiters, + "fdedup_preprocessor_minhash_snapshot_directory": fdedup_preprocessor_minhash_snapshot_directory, + "fdedup_preprocessor_buckets_snapshot_directory": fdedup_preprocessor_buckets_snapshot_directory, + "fdedup_preprocessor_doc_id_snapshot_directory": fdedup_preprocessor_doc_id_snapshot_directory, + } diff --git a/transforms/universal/fdedup_multi_step/python/src/fdedup/examples/fdedup_preprocessor_local_python.py b/transforms/universal/fdedup_multi_step/python/src/fdedup/examples/fdedup_preprocessor_local_python.py index 3de1529b6..501f68c38 100644 --- a/transforms/universal/fdedup_multi_step/python/src/fdedup/examples/fdedup_preprocessor_local_python.py +++ b/transforms/universal/fdedup_multi_step/python/src/fdedup/examples/fdedup_preprocessor_local_python.py @@ -17,10 +17,10 @@ from data_processing.utils import ParamsUtils from fdedup.transforms.base import (preprocessor_doc_column_name_cli_param, preprocessor_int_column_name_cli_param, - delimiters_cli_param, + preprocessor_delimiters_cli_param, preprocessor_num_permutations_cli_param, preprocessor_threshold_cli_param, - shingles_size_cli_param, + preprocessor_shingles_size_cli_param, preprocessor_minhash_snapshot_directory_cli_param) from fdedup.transforms.python import FdedupPreprocessorPythonTransformRuntimeConfiguration @@ -44,10 +44,10 @@ # fdedup parameters preprocessor_doc_column_name_cli_param: "contents", preprocessor_int_column_name_cli_param: "Unnamed: 0", - delimiters_cli_param: " ", + preprocessor_delimiters_cli_param: " ", preprocessor_num_permutations_cli_param: 64, preprocessor_threshold_cli_param: .8, - shingles_size_cli_param: 5, + preprocessor_shingles_size_cli_param: 5, } sys.argv = ParamsUtils.dict_to_req(d=params) diff --git a/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/__init__.py b/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/__init__.py index c9b2b66bc..0138b002d 100644 --- a/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/__init__.py +++ b/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/__init__.py @@ -18,10 +18,10 @@ preprocessor_cli_prefix, preprocessor_doc_column_name_cli_param, preprocessor_int_column_name_cli_param, - delimiters_cli_param, + preprocessor_delimiters_cli_param, preprocessor_num_permutations_cli_param, preprocessor_threshold_cli_param, - shingles_size_cli_param, + preprocessor_shingles_size_cli_param, preprocessor_minhash_snapshot_directory_cli_param, preprocessor_docid_snapshot_directory_cli_param, preprocessor_buckets_snapshot_directory_cli_param, diff --git a/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/fdedup_filter_transform_base.py b/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/fdedup_filter_transform_base.py index 0b500c48f..47124db4a 100644 --- a/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/fdedup_filter_transform_base.py +++ b/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/fdedup_filter_transform_base.py @@ -25,8 +25,8 @@ # configuration parameters short_name = "fdedup_filter" filter_cli_prefix = f"{short_name}_" -cluster_column_name_key = "cluster_column" -removed_docs_column_name_key = "removed_docs_column" +cluster_column_name_key = "cluster" +removed_docs_column_name_key = "removed_docs" doc_id_cache_key = "doc_id_cache" filter_doc_column_name_cli_param = f"{filter_cli_prefix}{doc_column_name_key}" diff --git a/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/fdedup_preprocessor_transform_base.py b/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/fdedup_preprocessor_transform_base.py index 1f5755830..ad128c498 100644 --- a/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/fdedup_preprocessor_transform_base.py +++ b/transforms/universal/fdedup_multi_step/python/src/fdedup/transforms/base/fdedup_preprocessor_transform_base.py @@ -46,10 +46,10 @@ preprocessor_doc_column_name_cli_param = f"{preprocessor_cli_prefix}{doc_column_name_key}" preprocessor_int_column_name_cli_param = f"{preprocessor_cli_prefix}{int_column_name_key}" -delimiters_cli_param = f"{preprocessor_cli_prefix}{delimiters_key}" +preprocessor_delimiters_cli_param = f"{preprocessor_cli_prefix}{delimiters_key}" preprocessor_num_permutations_cli_param = f"{preprocessor_cli_prefix}{num_permutations_key}" preprocessor_threshold_cli_param = f"{preprocessor_cli_prefix}{threshold_key}" -shingles_size_cli_param = f"{preprocessor_cli_prefix}{shingles_size_key}" +preprocessor_shingles_size_cli_param = f"{preprocessor_cli_prefix}{shingles_size_key}" preprocessor_minhash_snapshot_directory_cli_param = f"{preprocessor_cli_prefix}{minhash_snapshot_directory_key}" preprocessor_buckets_snapshot_directory_cli_param = f"{preprocessor_cli_prefix}{buckets_snapshot_directory_key}" preprocessor_docid_snapshot_directory_cli_param = f"{preprocessor_cli_prefix}{doc_id_snapshot_directory_key}" @@ -257,12 +257,12 @@ def add_input_params(self, parser: ArgumentParser) -> None: help="threshold" ) parser.add_argument( - f"--{shingles_size_cli_param}", + f"--{preprocessor_shingles_size_cli_param}", type=int, default=5, help="number of words in shingle") parser.add_argument( - f"--{delimiters_cli_param}", + f"--{preprocessor_delimiters_cli_param}", type=str, default=" ", help="delimiter for splitting document" diff --git a/transforms/universal/fdedup_multi_step/python/src/fdedup/utils/fdedupsupport.py b/transforms/universal/fdedup_multi_step/python/src/fdedup/utils/fdedupsupport.py index af60c51a1..c2c156b1e 100644 --- a/transforms/universal/fdedup_multi_step/python/src/fdedup/utils/fdedupsupport.py +++ b/transforms/universal/fdedup_multi_step/python/src/fdedup/utils/fdedupsupport.py @@ -136,8 +136,8 @@ def process_buckets_locally(b_hash: int, bucket: list[int], minhashes: dict[int, min_len = c_len cluster_id = doc_id current_set.discard(remaining) - docs, removed = FdedupSupport.merge_doc_ids(current_ids=docs, current_removed=removed, - new_ids={remaining: (cluster_id, b_hash)}, new_removed=current_set) + docs[remaining] = (cluster_id, b_hash) + removed |= current_set return docs, removed @staticmethod diff --git a/transforms/universal/fdedup_multi_step/python/test/preprocessor/test_fdedup_preprocessor_python.py b/transforms/universal/fdedup_multi_step/python/test/preprocessor/test_fdedup_preprocessor_python.py index cd8e0a0db..223356635 100644 --- a/transforms/universal/fdedup_multi_step/python/test/preprocessor/test_fdedup_preprocessor_python.py +++ b/transforms/universal/fdedup_multi_step/python/test/preprocessor/test_fdedup_preprocessor_python.py @@ -18,10 +18,10 @@ ) from fdedup.transforms.base import (preprocessor_doc_column_name_cli_param, preprocessor_int_column_name_cli_param, - delimiters_cli_param, + preprocessor_delimiters_cli_param, preprocessor_num_permutations_cli_param, preprocessor_threshold_cli_param, - shingles_size_cli_param, + preprocessor_shingles_size_cli_param, ) from fdedup.transforms.python import FdedupPreprocessorPythonTransformRuntimeConfiguration @@ -38,10 +38,10 @@ def get_test_transform_fixtures(self) -> list[tuple]: launcher = PythonTransformLauncher(FdedupPreprocessorPythonTransformRuntimeConfiguration()) config = {preprocessor_doc_column_name_cli_param: "contents", preprocessor_int_column_name_cli_param: "Unnamed: 0", - delimiters_cli_param: " ", + preprocessor_delimiters_cli_param: " ", preprocessor_num_permutations_cli_param: 64, preprocessor_threshold_cli_param: .8, - shingles_size_cli_param: 5, + preprocessor_shingles_size_cli_param: 5, } return [(launcher, config, basedir + "/input", basedir + "/preprocessor")] diff --git a/transforms/universal/fdedup_multi_step/ray/src/fdedup_ray/clusters_estimator.py b/transforms/universal/fdedup_multi_step/ray/src/fdedup_ray/clusters_estimator.py index 965de996a..3d8df458d 100644 --- a/transforms/universal/fdedup_multi_step/ray/src/fdedup_ray/clusters_estimator.py +++ b/transforms/universal/fdedup_multi_step/ray/src/fdedup_ray/clusters_estimator.py @@ -24,7 +24,7 @@ - preprocessor - bucket processor - filter -For preprocessor and filter we cap number of workers to 800 to ensure that we are not starving S3. +For preprocessor and filter we cap number of workers to 1000 to ensure that we are not starving S3. Number of bucket processors that are used for the actual deduping do not impact S3 and can be made as large as desired to improve the throughput """ @@ -38,9 +38,9 @@ false_positive = 0.5 false_negative = 0.5 # Number of workers for initial run -n_workers = 800 -if n_workers > 800: - n_workers = 800 +n_workers = 1000 +if n_workers > 1000: + n_workers = 1000 n_bucket_processors = 1000 # pod cpus ray_node_cpu = 50 diff --git a/transforms/universal/fdedup_multi_step/ray/src/fdedup_ray/examples/fdedup_preprocessor_local_ray.py b/transforms/universal/fdedup_multi_step/ray/src/fdedup_ray/examples/fdedup_preprocessor_local_ray.py index f64d76e43..3371a5341 100644 --- a/transforms/universal/fdedup_multi_step/ray/src/fdedup_ray/examples/fdedup_preprocessor_local_ray.py +++ b/transforms/universal/fdedup_multi_step/ray/src/fdedup_ray/examples/fdedup_preprocessor_local_ray.py @@ -17,10 +17,10 @@ from data_processing_ray.runtime.ray import RayTransformLauncher from fdedup.transforms.base import (preprocessor_doc_column_name_cli_param, preprocessor_int_column_name_cli_param, - delimiters_cli_param, + preprocessor_delimiters_cli_param, preprocessor_num_permutations_cli_param, preprocessor_threshold_cli_param, - shingles_size_cli_param, + preprocessor_shingles_size_cli_param, ) from fdedup_ray.transforms import (FdedupPreprocessorRayTransformRuntimeConfiguration, preprocessor_bucket_cpu_cli_param, @@ -53,9 +53,9 @@ # fdedup parameters preprocessor_doc_column_name_cli_param: "contents", preprocessor_int_column_name_cli_param: "Unnamed: 0", - delimiters_cli_param: " ", + preprocessor_delimiters_cli_param: " ", preprocessor_num_permutations_cli_param: 64, - shingles_size_cli_param: 5, + preprocessor_shingles_size_cli_param: 5, preprocessor_threshold_cli_param: .8, preprocessor_num_buckets_cli_param: 1, preprocessor_bucket_cpu_cli_param: .5, diff --git a/transforms/universal/fdedup_multi_step/ray/test/test_fdedup_preprocessor_ray.py b/transforms/universal/fdedup_multi_step/ray/test/test_fdedup_preprocessor_ray.py index 29af1e8df..9a7c3b20b 100644 --- a/transforms/universal/fdedup_multi_step/ray/test/test_fdedup_preprocessor_ray.py +++ b/transforms/universal/fdedup_multi_step/ray/test/test_fdedup_preprocessor_ray.py @@ -18,10 +18,10 @@ from data_processing_ray.runtime.ray import RayTransformLauncher from fdedup.transforms.base import (preprocessor_doc_column_name_cli_param, preprocessor_int_column_name_cli_param, - delimiters_cli_param, + preprocessor_delimiters_cli_param, preprocessor_num_permutations_cli_param, preprocessor_threshold_cli_param, - shingles_size_cli_param, + preprocessor_shingles_size_cli_param, ) from fdedup_ray.transforms import (FdedupPreprocessorRayTransformRuntimeConfiguration, preprocessor_bucket_cpu_cli_param, @@ -44,10 +44,10 @@ def get_test_transform_fixtures(self) -> list[tuple]: config = {"run_locally": True, preprocessor_doc_column_name_cli_param: "contents", preprocessor_int_column_name_cli_param: "Unnamed: 0", - delimiters_cli_param: " ", + preprocessor_delimiters_cli_param: " ", preprocessor_num_permutations_cli_param: 64, preprocessor_threshold_cli_param: .8, - shingles_size_cli_param: 5, + preprocessor_shingles_size_cli_param: 5, preprocessor_num_buckets_cli_param: 1, preprocessor_bucket_cpu_cli_param: .5, preprocessor_num_doc_id_cli_param: 1,