Skip to content

Commit

Permalink
add ededup Ray sample
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Sep 23, 2024
1 parent d187dba commit 26978f1
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 0 deletions.
55 changes: 55 additions & 0 deletions transforms/universal/ededup/ray/src/ededup_pipeline_local_ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os
import sys

from data_processing.utils import ParamsUtils
from data_processing_ray.runtime.ray import RayTransformLauncher
from ededup_pipeline_transform_ray import EdedupPypelineRayTransformConfiguration
from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param
from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params


# create launcher
launcher = RayTransformLauncher(EdedupPypelineRayTransformConfiguration())
# create parameters
input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data/input"))
output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "../output"))
local_conf = {
"input_folder": input_folder,
"output_folder": output_folder,
}
worker_options = {"num_cpus": 0.5}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
params = {
# where to run
"run_locally": True,
# Data access. Only required parameters are specified
"data_local_config": ParamsUtils.convert_to_ast(local_conf),
# orchestrator
"runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
"runtime_num_workers": 2,
"runtime_pipeline_id": "pipeline_id",
"runtime_job_id": "job_id",
"runtime_creation_delay": 0,
"runtime_code_location": ParamsUtils.convert_to_ast(code_location),
# ededup parameters
hash_cpu_cli_params: 0.5,
num_hashes_cli_params: 2,
doc_column_name_cli_param: "contents",
int_column_name_cli_param: "document_id",
}
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch
launcher.launch()
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from data_processing_ray.runtime.ray import RayTransformLauncher, RayTransformRuntimeConfiguration
from data_processing.transform import PipelineTransformConfiguration
from data_processing_ray.transform.ray import RayPipelineTransform
from data_processing.utils import get_logger
from ededup_transform_ray import EdedupRayTransformRuntimeConfiguration

logger = get_logger(__name__)


class EdedupPypelineRayTransformConfiguration(RayTransformRuntimeConfiguration):
"""
Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher.
NOOP does not use a RayRuntime class so the superclass only needs the base
python-only configuration.
"""

def __init__(self):
"""
Initialization
"""
super().__init__(transform_config=PipelineTransformConfiguration(
config={"transforms": [EdedupRayTransformRuntimeConfiguration()]},
transform_class=RayPipelineTransform))


if __name__ == "__main__":
# launcher = NOOPRayLauncher()
launcher = RayTransformLauncher(EdedupPypelineRayTransformConfiguration())
logger.info("Launching resize/noop transform")
launcher.launch()
43 changes: 43 additions & 0 deletions transforms/universal/ededup/ray/test/test_ededup_pipeline_ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os

from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)
from data_processing_ray.runtime.ray import RayTransformLauncher
from ededup_pipeline_transform_ray import EdedupPypelineRayTransformConfiguration
from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param
from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params


class TestRayEdedupTransform(AbstractTransformLauncherTest):
"""
Extends the super-class to define the test data for the tests defined there.
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
"""

def get_test_transform_fixtures(self) -> list[tuple]:
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data"))
config = {
"run_locally": True,
# When running in ray, our Runtime's get_transform_config() method will load the domains using
# the orchestrator's DataAccess/Factory. So we don't need to provide the bl_local_config configuration.
hash_cpu_cli_params: 0.5,
num_hashes_cli_params: 2,
doc_column_name_cli_param: "contents",
int_column_name_cli_param: "document_id",
}
launcher = RayTransformLauncher(EdedupPypelineRayTransformConfiguration())
fixtures = [(launcher, config, basedir + "/input", basedir + "/expected")]
return fixtures

0 comments on commit 26978f1

Please sign in to comment.