Skip to content

Commit

Permalink
Ray orchestration for fuzzy dedup
Browse files Browse the repository at this point in the history
Signed-off-by: Constantin M Adam <cmadam@us.ibm.com>
  • Loading branch information
cmadam committed Oct 14, 2024
1 parent c14bdaa commit 1215ac5
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,65 @@
# limitations under the License.
################################################################################

from cluster_analysis_transform import ClusterAnalysisTransformConfiguration
import os
from typing import Any

from cluster_analysis_transform import (
ClusterAnalysisTransformConfiguration,
num_bands_key,
num_segments_key,
)
from data_processing.data_access import DataAccess
from data_processing.utils import CLIArgumentProvider, get_logger
from data_processing_ray.runtime.ray.runtime_configuration import (
from data_processing_ray.runtime.ray import (
DefaultRayTransformRuntime,
RayTransformLauncher,
RayTransformRuntimeConfiguration,
)


logger = get_logger(__name__)


class ClusterAnalysisRayRuntime(DefaultRayTransformRuntime):
"""
Cluster analysis runtime support for Ray
"""

def __init__(self, params: dict[str, Any]):
super().__init__(params=params)
self.logger = get_logger(__name__)

def get_folders(self, data_access: DataAccess) -> list[str]:
"""
Return the set of folders that will be processed by this transform
:param data_access - data access object
:return: list of folder paths
"""
bands = self.params[num_bands_key]
segments = self.params[num_segments_key]
folders = [os.path.join(f"band={b}", f"segment={s}") for b in range(bands) for s in range(segments)]
return folders


class ClusterAnalysisRayTransformConfiguration(RayTransformRuntimeConfiguration):
"""
Implements the RayTransformConfiguration for NOOP as required by the RayTransformLauncher.
NOOP does not use a RayRuntime class so the superclass only needs the base
python-only configuration.
Implements the RayTransformConfiguration for Fuzzy Dedup Cluster Analysis
as required by the RayTransformLauncher.
"""

def __init__(self):
"""
Initialization
:param base_configuration - base configuration class
"""
super().__init__(transform_config=ClusterAnalysisTransformConfiguration())
super().__init__(
transform_config=ClusterAnalysisTransformConfiguration(),
runtime_class=ClusterAnalysisRayRuntime,
)


if __name__ == "__main__":
# launcher = NOOPRayLauncher()
launcher = RayTransformLauncher(ClusterAnalysisRayTransformConfiguration())
logger.info("Launching transform")
logger.info("Launching fuzzy dedup cluster analysis ray transform")
launcher.launch()
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.
################################################################################

import os
from typing import Any

import ray
Expand Down Expand Up @@ -88,8 +89,11 @@ def get_transform_config(
:param files - list of files to remove
:return: dictionary of filter init params
"""
duplicate_list_location = self.params.get(duplicate_list_location_key, duplicate_list_location_default)
data_access = data_access_factory.create_data_access()
duplicate_list_location = self.params.get(duplicate_list_location_key, duplicate_list_location_default)
duplicate_list_location = os.path.abspath(
os.path.join(data_access.output_folder, "..", duplicate_list_location)
)
if duplicate_list_location.startswith("s3://"):
_, duplicate_list_location = duplicate_list_location.split("://")
duplicate_list, retries = data_access.get_file(duplicate_list_location)
Expand Down Expand Up @@ -117,6 +121,6 @@ def __init__(self):

if __name__ == "__main__":
# launcher = NOOPRayLauncher()
launcher = RayTransformLauncher(DataCleaningRayTransformConfiguration())
logger.info("Launching transform")
launcher = RayTransformLauncher(runtime_config=DataCleaningRayTransformConfiguration())
logger.info("Launching transform")
launcher.launch()
60 changes: 60 additions & 0 deletions transforms/universal/fdedup/ray/src/fuzzy_dedup_ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import argparse
import os
import sys

from cluster_analysis_transform_ray import ClusterAnalysisRayTransformConfiguration
from data_cleaning_transform_ray import DataCleaningRayTransformConfiguration
from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.utils import ParamsUtils
from data_processing_ray.runtime.ray import RayTransformLauncher
from fuzzy_dedup_python import ServiceOrchestrator, parse_args
from get_duplicate_list_transform_python import (
GetDuplicateListPythonTransformConfiguration,
)
from signature_calc_transform_ray import SignatureCalculationRayTransformConfiguration


s3_creds = {
"access_key": os.getenv("AWS_ACCESS_KEY_ID"),
"secret_key": os.getenv("AWS_SECRET_ACCESS_KEY"),
"url": os.getenv("AWS_ENDPOINT_URL"),
}


ray_worker_options = {"num_cpus": 0.8}
ray_params = {
# where to run
"run_locally": True,
# orchestrator
"runtime_worker_options": ParamsUtils.convert_to_ast(ray_worker_options),
"runtime_num_workers": 3,
}

ray_params_argv = ParamsUtils.dict_to_req(ray_params)


class RayServiceOrchestrator(ServiceOrchestrator):
def __init__(self, global_params: argparse.Namespace = None):
super().__init__(global_params=global_params)

def execute_service(self, service_short_name: str, params: list) -> int:
sys.argv = params if service_short_name == "fdlist" else ray_params_argv + params[1:]
if service_short_name == "minhash":
launcher = RayTransformLauncher(runtime_config=SignatureCalculationRayTransformConfiguration())
elif service_short_name == "cluster":
launcher = RayTransformLauncher(runtime_config=ClusterAnalysisRayTransformConfiguration())
elif service_short_name == "fdlist":
launcher = PythonTransformLauncher(runtime_config=GetDuplicateListPythonTransformConfiguration())
elif service_short_name == "fdclean":
launcher = RayTransformLauncher(runtime_config=DataCleaningRayTransformConfiguration())
status = launcher.launch()
return status


if __name__ == "__main__":
# Parse command line arguments
args = parse_args()
# Initialize the orchestrator
orchestrator = RayServiceOrchestrator(global_params=args)
# Launch ray fuzzy dedup execution
orchestrator.orchestrate()

0 comments on commit 1215ac5

Please sign in to comment.