diff --git a/transforms/universal/ededup/ray/src/ededup_pipeline_local_ray.py b/transforms/universal/ededup/ray/src/ededup_pipeline_local_ray.py new file mode 100644 index 000000000..d8c3b09c8 --- /dev/null +++ b/transforms/universal/ededup/ray/src/ededup_pipeline_local_ray.py @@ -0,0 +1,55 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import sys + +from data_processing.utils import ParamsUtils +from data_processing_ray.runtime.ray import RayTransformLauncher +from ededup_pipeline_transform_ray import EdedupPypelineRayTransformConfiguration +from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param +from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params + + +# create launcher +launcher = RayTransformLauncher(EdedupPypelineRayTransformConfiguration()) +# create parameters +input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data/input")) +output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../output")) +local_conf = { + "input_folder": input_folder, + "output_folder": output_folder, +} +worker_options = {"num_cpus": 0.5} +code_location = {"github": "github", "commit_hash": "12345", "path": "path"} +params = { + # where to run + "run_locally": True, + # Data access. Only required parameters are specified + "data_local_config": ParamsUtils.convert_to_ast(local_conf), + # orchestrator + "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options), + "runtime_num_workers": 2, + "runtime_pipeline_id": "pipeline_id", + "runtime_job_id": "job_id", + "runtime_creation_delay": 0, + "runtime_code_location": ParamsUtils.convert_to_ast(code_location), + # ededup parameters + hash_cpu_cli_params: 0.5, + num_hashes_cli_params: 2, + doc_column_name_cli_param: "contents", + int_column_name_cli_param: "document_id", +} +sys.argv = ParamsUtils.dict_to_req(d=params) + +# launch +launcher.launch() diff --git a/transforms/universal/ededup/ray/src/ededup_pipeline_transform_ray.py b/transforms/universal/ededup/ray/src/ededup_pipeline_transform_ray.py new file mode 100644 index 000000000..5b2e1216d --- /dev/null +++ b/transforms/universal/ededup/ray/src/ededup_pipeline_transform_ray.py @@ -0,0 +1,42 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing_ray.runtime.ray import RayTransformLauncher, RayTransformRuntimeConfiguration +from data_processing.transform import PipelineTransformConfiguration +from data_processing_ray.transform.ray import RayPipelineTransform +from data_processing.utils import get_logger +from ededup_transform_ray import EdedupRayTransformRuntimeConfiguration + +logger = get_logger(__name__) + + +class EdedupPypelineRayTransformConfiguration(RayTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=PipelineTransformConfiguration( + config={"transforms": [EdedupRayTransformRuntimeConfiguration()]}, + transform_class=RayPipelineTransform)) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = RayTransformLauncher(EdedupPypelineRayTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/transforms/universal/ededup/ray/test/test_ededup_pipeline_ray.py b/transforms/universal/ededup/ray/test/test_ededup_pipeline_ray.py new file mode 100644 index 000000000..d9460ef93 --- /dev/null +++ b/transforms/universal/ededup/ray/test/test_ededup_pipeline_ray.py @@ -0,0 +1,43 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from data_processing_ray.runtime.ray import RayTransformLauncher +from ededup_pipeline_transform_ray import EdedupPypelineRayTransformConfiguration +from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param +from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params + + +class TestRayEdedupTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data")) + config = { + "run_locally": True, + # When running in ray, our Runtime's get_transform_config() method will load the domains using + # the orchestrator's DataAccess/Factory. So we don't need to provide the bl_local_config configuration. + hash_cpu_cli_params: 0.5, + num_hashes_cli_params: 2, + doc_column_name_cli_param: "contents", + int_column_name_cli_param: "document_id", + } + launcher = RayTransformLauncher(EdedupPypelineRayTransformConfiguration()) + fixtures = [(launcher, config, basedir + "/input", basedir + "/expected")] + return fixtures