Skip to content

Commit

Permalink
Merge pull request #691 from IBM/folder_transform
Browse files Browse the repository at this point in the history
added folder_transform
  • Loading branch information
blublinsky authored Oct 15, 2024
2 parents 8e5ef30 + 7b7736c commit 97810af
Show file tree
Hide file tree
Showing 32 changed files with 575 additions and 140 deletions.
34 changes: 16 additions & 18 deletions data-processing-lib/doc/transforms.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
# Transforms

[All transforms](../python/src/data_processing/transform/abstract_transform.py)
are generalized to operate on generically typed `DATA.`
[Ray](ray-runtime.md) and [Python](python-runtime.md) runtimes
currently support `DATA` as both byte arrays
and [pyarrow Tables](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html).
The [Spark runtime](spark-runtime.md) currently supports the native Spark `DataFrame`.
Transform is a basic integration unit of DPK that can be executed in any of the supported by the DPK
runtimes ([Python](python-runtime.md), [Ray](ray-runtime.md) and [Spark](spark-runtime.md)). All transforms
are derived from the
[AbstractTransform class](../python/src/data_processing/transform/abstract_transform.py). Theis class
provides no functionality and is used as just a marker that a given class implements transform.
There are currently two types of transforms defined in DPK:

All transforms convert their input `DATA` to a list of transformed `DATA` objects
and optional metadata about the transformation of the `DATA` instance.
The Transform itself need only be concerned with the conversion
of one `DATA` instance at a time.
Transforms, where possible, should be implemented without regard to the
runtime it will run in or where its configuration originates.
* [AbstractBinaryTransform](../python/src/data_processing/transform/binary_transform.py) which is a base
class for all data transforms. Data transforms convert a file of data producing zero or more data files
and metadata. A specific class of the binary transform is
[AbstractTableTransform](../python/src/data_processing/transform/table_transform.py) that consumes and produces
data files containing [pyarrow tables](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html)
* [AbstractFolderTransform](../python/src/data_processing/transform/folder_transform.py) which is a base
class consuming a folder (that can contain an arbitrary set of files, that need to be processed together)
and proces zero or more data files and metadata.

In the discussion that follows, we'll focus on the transformation of pyarrow Tables
using the `AbstractTableTransform` class (see below), supported by both
the Ray and Python runtimes.
Mapping from this tutorial to a Spark runtime can be done by using
`data-prep-kit-spark`'s [AbstractSparkTransform](../spark/src/data_processing_spark/runtime/spark/spark_transform.py)
which operates on a Spark DataFrame instead of a pyarrow Table.

In the discussion that follows, we'll focus on the transformation of pyarrow Tables
using the `AbstractTableTransform` class (see below), supported by Ray Spark and Python runtimes.

#### AbstractTableTransform class
[AbstractTableTransform](../python/src/data_processing/transform/table_transform.py)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,10 @@ def get_output_location(self, path: str) -> str:
:param path: input file location
:return: output file location
"""
raise NotImplementedError("Subclasses should implement this!")
if self.get_output_folder() is None:
self.logger.error("Get out put location. S3 configuration is not provided, returning None")
return None
return path.replace(self.get_input_folder(), self.get_output_folder())

def save_table(self, path: str, table: pa.Table) -> tuple[int, dict[str, Any], int]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,6 @@ def get_table(self, path: str) -> tuple[pa.table, int]:
logger.error(f"Error reading table from {path}: {e}")
return None, 0

def get_output_location(self, path: str) -> str:
"""
Get output location based on input
:param path: input file location
:return: output file location
"""
if self.output_folder is None:
logger.error("Get output location. local configuration is not defined, returning None")
return None
return path.replace(self.input_folder, self.output_folder)

def save_table(self, path: str, table: pa.Table) -> tuple[int, dict[str, Any], int]:
"""
Saves a pyarrow table to a file and returns information about the operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,6 @@ def get_table(self, path: str) -> tuple[pyarrow.table, int]:
self.logger.error(f"Exception reading table {path} from S3 - {e}")
return None, 0

def get_output_location(self, path: str) -> str:
"""
Get output location based on input
:param path: input file location
:return: output file location
"""
if self.output_folder is None:
self.logger.error("Get out put location. S3 configuration is not provided, returning None")
return None
return path.replace(self.input_folder, self.output_folder)

def save_table(self, path: str, table: pyarrow.Table) -> tuple[int, dict[str, Any], int]:
"""
Save table to a given location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from data_processing.data_access import DataAccessFactoryBase
from data_processing.runtime import AbstractTransformFileProcessor
from data_processing.transform import AbstractBinaryTransform, TransformStatistics
from data_processing.transform import AbstractTransform, TransformStatistics
from data_processing.utils import UnrecoverableException


Expand All @@ -28,19 +28,22 @@ def __init__(
data_access_factory: DataAccessFactoryBase,
statistics: TransformStatistics,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
transform_class: type[AbstractTransform],
is_folder: bool,
):
"""
Init method
:param data_access_factory - data access factory
:param statistics - reference to statistics class
:param transform_params - transform parameters
:param transform_class: transform class
:param is_folder: folder transform flag
"""
# invoke superclass
super().__init__(
data_access_factory=data_access_factory,
transform_parameters=dict(transform_params),
is_folder=is_folder,
)
self.transform_params["statistics"] = statistics
# Create local processor
Expand All @@ -65,17 +68,20 @@ def __init__(
self,
data_access_factory: DataAccessFactoryBase,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
transform_class: type[AbstractTransform],
is_folder: bool
):
"""
Init method
:param data_access_factory - data access factory
:param transform_params - transform parameters
:param transform_class: transform class
:param is_folder: folder tranform flag
"""
super().__init__(
data_access_factory=data_access_factory,
transform_parameters=dict(transform_params),
is_folder=is_folder,
)
# Add data access and statistics to the processor parameters
self.transform_params["data_access"] = self.data_access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
PythonTransformFileProcessor,
PythonTransformRuntimeConfiguration,
)
from data_processing.transform import AbstractBinaryTransform, TransformStatistics
from data_processing.transform import AbstractTransform, TransformStatistics, AbstractFolderTransform
from data_processing.utils import GB, get_logger


logger = get_logger(__name__)


@staticmethod
def _execution_resources() -> dict[str, Any]:
"""
Get Execution resource
Expand All @@ -49,7 +48,6 @@ def _execution_resources() -> dict[str, Any]:
}



def orchestrate(
data_access_factory: DataAccessFactoryBase,
runtime_config: PythonTransformRuntimeConfiguration,
Expand All @@ -74,15 +72,21 @@ def orchestrate(
return 1
# create additional execution parameters
runtime = runtime_config.create_transform_runtime()
is_folder = issubclass(runtime_config.get_transform_class(), AbstractFolderTransform)
try:
# Get files to process
files, profile, retries = data_access.get_files_to_process()
if len(files) == 0:
logger.error("No input files to process - exiting")
return 0
if retries > 0:
statistics.add_stats({"data access retries": retries})
logger.info(f"Number of files is {len(files)}, source profile {profile}")
if is_folder:
# folder transform
files = runtime.get_folders(data_access=data_access)
logger.info(f"Number of folders is {len(files)}")
else:
# Get files to process
files, profile, retries = data_access.get_files_to_process()
if len(files) == 0:
logger.error("No input files to process - exiting")
return 0
if retries > 0:
statistics.add_stats({"data access retries": retries})
logger.info(f"Number of files is {len(files)}, source profile {profile}")
# Print interval
print_interval = int(len(files) / 100)
if print_interval == 0:
Expand All @@ -99,6 +103,7 @@ def orchestrate(
data_access_factory=data_access_factory, statistics=statistics, files=files
),
transform_class=runtime_config.get_transform_class(),
is_folder=is_folder,
)
else:
# using sequential execution
Expand All @@ -111,6 +116,7 @@ def orchestrate(
data_access_factory=data_access_factory, statistics=statistics, files=files
),
transform_class=runtime_config.get_transform_class(),
is_folder=is_folder,
)
status = "success"
return_code = 0
Expand Down Expand Up @@ -139,7 +145,8 @@ def orchestrate(
"job_input_params": input_params
| data_access_factory.get_input_params()
| execution_config.get_input_params(),
"execution_stats": _execution_resources() | {"execution time, min": round((time.time() - start_time) / 60.0, 3)},
"execution_stats": _execution_resources() |
{"execution time, min": round((time.time() - start_time) / 60.0, 3)},
"job_output_stats": stats,
}
logger.debug(f"Saving job metadata: {metadata}.")
Expand All @@ -157,7 +164,8 @@ def _process_transforms(
data_access_factory: DataAccessFactoryBase,
statistics: TransformStatistics,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
transform_class: type[AbstractTransform],
is_folder: bool,
) -> None:
"""
Process transforms sequentially
Expand All @@ -167,16 +175,16 @@ def _process_transforms(
:param data_access_factory: data access factory
:param transform_params - transform parameters
:param transform_class: transform class
:param is_folder: folder transform flag
:return: metadata for the execution
:return: None
"""
# create executor
executor = PythonTransformFileProcessor(
data_access_factory=data_access_factory,
statistics=statistics,
transform_params=transform_params,
transform_class=transform_class,
is_folder=is_folder,
)
# process data
t_start = time.time()
Expand All @@ -202,7 +210,8 @@ def _process_transforms_multiprocessor(
print_interval: int,
data_access_factory: DataAccessFactoryBase,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
transform_class: type[AbstractTransform],
is_folder: bool
) -> TransformStatistics:
"""
Process transforms using multiprocessing pool
Expand All @@ -212,13 +221,17 @@ def _process_transforms_multiprocessor(
:param data_access_factory: data access factory
:param transform_params - transform parameters
:param transform_class: transform class
:param is_folder: folder transform class
:return: metadata for the execution
"""
# result statistics
statistics = TransformStatistics()
# create processor
processor = PythonPoolTransformFileProcessor(
data_access_factory=data_access_factory, transform_params=transform_params, transform_class=transform_class
data_access_factory=data_access_factory,
transform_params=transform_params,
transform_class=transform_class,
is_folder=is_folder,
)
completed = 0
t_start = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from typing import Any

from data_processing.data_access import DataAccessFactoryBase
from data_processing.data_access import DataAccessFactoryBase, DataAccess
from data_processing.transform import TransformStatistics


Expand All @@ -28,6 +28,14 @@ def __init__(self, params: dict[str, Any]):
"""
self.params = params

def get_folders(self, data_access: DataAccess) -> list[str]:
"""
Get folders to process
:param data_access: data access
:return: list of folders to process
"""
raise NotImplemented()

def get_transform_config(
self, data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics, files: list[str]
) -> dict[str, Any]:
Expand Down
Loading

0 comments on commit 97810af

Please sign in to comment.