Skip to content

Commit

Permalink
KFP Pipeline Spec 2.1 support
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 622708396
  • Loading branch information
tfx-copybara committed Apr 17, 2024
1 parent 9332479 commit caa070b
Show file tree
Hide file tree
Showing 57 changed files with 712 additions and 515 deletions.
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Current Version (Still in Development)

* Support KFP pipeline spec 2.1.0 version schema
* Depends on kfp-pipeline-spec>0.1.13,<0.2

## Major Features and Improvements

* Dropped python 3.8 support.
Expand Down
4 changes: 2 additions & 2 deletions tfx/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def make_extra_packages_kfp():
return [
# TODO(b/304892416): Migrate from KFP SDK v1 to v2.
'kfp>=1.8.14,<2',
'kfp-pipeline-spec>=0.1.10,<0.2',
'kfp-pipeline-spec>0.1.13,<0.2',
]


Expand All @@ -163,7 +163,7 @@ def make_extra_packages_docker_image():
return [
# TODO(b/304892416): Migrate from KFP SDK v1 to v2.
'kfp>=1.8.14,<2',
'kfp-pipeline-spec>=0.1.10,<0.2',
'kfp-pipeline-spec>0.1.13,<0.2',
'mmh>=2.2,<3',
'python-snappy>=0.5,<0.6',
# Required for tfx/examples/penguin/penguin_utils_cloud_tuner.py
Expand Down
64 changes: 46 additions & 18 deletions tfx/orchestration/kubeflow/v2/compiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ def build_parameter_type_spec(
is_runtime_param = isinstance(value, data_types.RuntimeParameter)
result = pipeline_pb2.ComponentInputsSpec.ParameterSpec()
if isinstance(value, int) or (is_runtime_param and value.ptype == int):
result.type = pipeline_pb2.PrimitiveType.PrimitiveTypeEnum.INT
result.parameter_type = pipeline_pb2.ParameterType.NUMBER_INTEGER
elif isinstance(value, float) or (is_runtime_param and value.ptype == float):
result.type = pipeline_pb2.PrimitiveType.PrimitiveTypeEnum.DOUBLE
result.parameter_type = pipeline_pb2.ParameterType.NUMBER_DOUBLE
elif isinstance(value, str) or (is_runtime_param and value.ptype == str):
result.type = pipeline_pb2.PrimitiveType.PrimitiveTypeEnum.STRING
result.parameter_type = pipeline_pb2.ParameterType.STRING
else:
# By default, unrecognized object will be json dumped, hence is string type.
# For example, resolver class.
result.type = pipeline_pb2.PrimitiveType.PrimitiveTypeEnum.STRING
result.parameter_type = pipeline_pb2.ParameterType.STRING
return result


Expand Down Expand Up @@ -236,33 +236,61 @@ def value_converter(

result = pipeline_pb2.ValueOrRuntimeParameter()
if isinstance(tfx_value, (int, float, str)):
result.constant_value.CopyFrom(get_kubeflow_value(tfx_value))
result.constant.CopyFrom(get_google_value(tfx_value))
elif isinstance(tfx_value, (Dict, List)):
result.constant_value.CopyFrom(
pipeline_pb2.Value(string_value=json.dumps(tfx_value)))
result.constant.CopyFrom(
struct_pb2.Value(string_value=json.dumps(tfx_value))
)
elif isinstance(tfx_value, data_types.RuntimeParameter):
# Attach the runtime parameter to the context.
parameter_utils.attach_parameter(tfx_value)
result.runtime_parameter = tfx_value.name
elif isinstance(tfx_value, metadata_store_pb2.Value):
if tfx_value.WhichOneof('value') == 'int_value':
result.constant_value.CopyFrom(
pipeline_pb2.Value(int_value=tfx_value.int_value))
result.constant.CopyFrom(
struct_pb2.Value(number_value=tfx_value.int_value)
)
elif tfx_value.WhichOneof('value') == 'double_value':
result.constant_value.CopyFrom(
pipeline_pb2.Value(double_value=tfx_value.double_value))
result.constant.CopyFrom(
struct_pb2.Value(number_value=tfx_value.double_value)
)
elif tfx_value.WhichOneof('value') == 'string_value':
result.constant_value.CopyFrom(
pipeline_pb2.Value(string_value=tfx_value.string_value))
result.constant.CopyFrom(
struct_pb2.Value(string_value=tfx_value.string_value)
)
elif isinstance(tfx_value, message.Message):
result.constant_value.CopyFrom(
pipeline_pb2.Value(
result.constant.CopyFrom(
struct_pb2.Value(
string_value=json_format.MessageToJson(
message=tfx_value, sort_keys=True)))
message=tfx_value, sort_keys=True
)
)
)
else:
# By default will attempt to encode the object using json_utils.dumps.
result.constant_value.CopyFrom(
pipeline_pb2.Value(string_value=json_utils.dumps(tfx_value)))
result.constant.CopyFrom(
struct_pb2.Value(string_value=json_utils.dumps(tfx_value))
)
return result


def get_google_value(
tfx_value: Union[int, float, str],
) -> Optional[struct_pb2.Value]:
"""Converts TFX/MLMD values into Kubeflow pipeline Value proto message."""
if tfx_value is None:
return None

result = struct_pb2.Value()
if isinstance(tfx_value, int):
result.number_value = tfx_value
elif isinstance(tfx_value, float):
result.number_value = tfx_value
elif isinstance(tfx_value, str):
result.string_value = tfx_value
else:
raise TypeError('Got unknown type of value: {}'.format(tfx_value))

return result


Expand Down
19 changes: 12 additions & 7 deletions tfx/orchestration/kubeflow/v2/compiler_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,24 @@ def testCustomArtifactSchemaMismatchFails(self):
_MyArtifactWithProperty.PROPERTIES)

def testBuildParameterTypeSpec(self):
type_enum = pipeline_pb2.PrimitiveType.PrimitiveTypeEnum
type_enum = pipeline_pb2.ParameterType.ParameterTypeEnum
testdata = {
42: type_enum.INT,
42.1: type_enum.DOUBLE,
42: type_enum.NUMBER_INTEGER,
42.1: type_enum.NUMBER_DOUBLE,
'42': type_enum.STRING,
data_types.RuntimeParameter(name='_', ptype=int): type_enum.INT,
data_types.RuntimeParameter(name='_', ptype=float): type_enum.DOUBLE,
data_types.RuntimeParameter(
name='_', ptype=int
): type_enum.NUMBER_INTEGER,
data_types.RuntimeParameter(
name='_', ptype=float
): type_enum.NUMBER_DOUBLE,
data_types.RuntimeParameter(name='_', ptype=str): type_enum.STRING,
}
for value, expected_type_enum in testdata.items():
self.assertEqual(
compiler_utils.build_parameter_type_spec(value).type,
expected_type_enum)
compiler_utils.build_parameter_type_spec(value).parameter_type,
expected_type_enum,
)

def testBuildOutputParameterSpecValueArtifact(self):
param = pipeline_pb2.ParameterType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ def refactor_model_blessing(model_blessing: artifact.Artifact,
name_from_id=name_from_id))


def parse_execution_properties(exec_properties: Any) -> Dict[str, Any]:
def parse_execution_properties(
exec_properties: Any, inputs_spec: pipeline_pb2.ComponentInputsSpec
) -> Dict[str, Any]:
"""Parses a map from key to Value proto as execution properties.
Parses a mapping field in a protobuf message, whose value is a Kubeflow Value
Expand All @@ -122,6 +124,8 @@ def parse_execution_properties(exec_properties: Any) -> Dict[str, Any]:
Args:
exec_properties: the mapping field in the proto message, representing the
execution properties of the component.
inputs_spec: Component input spec which has the information of parameter
types of exec_properties.
Returns:
dictionary of the parsed execution properties.
Expand All @@ -132,35 +136,49 @@ def parse_execution_properties(exec_properties: Any) -> Dict[str, Any]:
if k == _OLD_INPUT_BASE_PROPERTY_NAME:
k = standard_component_specs.INPUT_BASE_KEY
# Translate each field from Value pb to plain value.
result[k] = getattr(v, v.WhichOneof('value'))
result[k] = getattr(v, v.WhichOneof('kind'))
parameter = inputs_spec.parameters.get(k)
if (
parameter
and parameter.parameter_type
== pipeline_pb2.ParameterType.NUMBER_INTEGER
):
result[k] = int(result[k])
if result[k] is None:
raise TypeError('Unrecognized type encountered at field %s of execution'
' properties %s' % (k, exec_properties))
raise TypeError(
'Unrecognized type encountered at field %s of execution properties %s'
% (k, exec_properties)
)

return result


def translate_executor_output(
output_dict: Mapping[str, List[artifact.Artifact]],
name_from_id: Mapping[int,
str]) -> Dict[str, pipeline_pb2.ArtifactList]:
name_from_id: Mapping[int, str],
) -> Dict[str, pipeline_pb2.ArtifactList]:
"""Translates output_dict to a Kubeflow ArtifactList mapping."""
result = {}
for k, v in output_dict.items():
result[k] = pipeline_pb2.ArtifactList(artifacts=[
to_runtime_artifact(
artifact_utils.get_single_instance(v), name_from_id)
])
result[k] = pipeline_pb2.ArtifactList(
artifacts=[
to_runtime_artifact(
artifact_utils.get_single_instance(v), name_from_id
)
]
)

return result


def _get_json_value_mapping(
mlmd_value_mapping: Dict[str, metadata_store_pb2.Value]) -> Dict[str, Any]:
mlmd_value_mapping: Dict[str, metadata_store_pb2.Value],
) -> Dict[str, Any]:
"""Converts a mapping field with MLMD Value to JSON Value."""

def get_json_value(
mlmd_value: metadata_store_pb2.Value) -> artifact.JsonValueType:
mlmd_value: metadata_store_pb2.Value,
) -> artifact.JsonValueType:
if not mlmd_value.HasField('value'):
return None
elif mlmd_value.WhichOneof('value') == 'int_value':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,38 @@ def setUp(self):
# Use two protos to store the testdata.
artifacts_pb = pipeline_pb2.ExecutorInput()
io_utils.parse_json_file(
os.path.join(source_data_dir, 'artifacts.json'), artifacts_pb)
os.path.join(source_data_dir, 'artifacts.json'), artifacts_pb
)
self._artifacts = artifacts_pb.inputs.artifacts

# Test legacy properties/custom properties deserialization.
artifacts_legacy_pb = pipeline_pb2.ExecutorInput()
io_utils.parse_json_file(
os.path.join(source_data_dir, 'artifacts_legacy.json'),
artifacts_legacy_pb)
artifacts_legacy_pb,
)
self._artifacts_legacy = artifacts_legacy_pb.inputs.artifacts

properties_pb = pipeline_pb2.ExecutorInput()
inputs_spec_pb = pipeline_pb2.ComponentInputsSpec()
inputs_spec_pb.parameters['input_config'].parameter_type = (
pipeline_pb2.ParameterType.STRING
)
inputs_spec_pb.parameters['output_config'].parameter_type = (
pipeline_pb2.ParameterType.STRING
)
io_utils.parse_json_file(
os.path.join(source_data_dir, 'exec_properties.json'), properties_pb)
self._properties = properties_pb.inputs.parameters
os.path.join(source_data_dir, 'exec_properties.json'), properties_pb
)
self._properties = properties_pb.inputs.parameter_values
self._inputs_spec = inputs_spec_pb

def testParseRawArtifactDict(self):
for artifacts_dict in [self._artifacts, self._artifacts_legacy]:
name_from_id = {}
actual_result = kubeflow_v2_entrypoint_utils.parse_raw_artifact_dict(
artifacts_dict, name_from_id)
artifacts_dict, name_from_id
)
for key in self._expected_dict:
(expected_artifact,) = self._expected_dict[key]
(actual_artifact,) = actual_result[key]
Expand All @@ -137,16 +149,25 @@ def testParseExecutionProperties(self):
self.assertDictEqual(
_EXEC_PROPERTIES,
kubeflow_v2_entrypoint_utils.parse_execution_properties(
self._properties))
self._properties, self._inputs_spec
),
)

def testParseExecutionPropertiesMapsInputBaseUri(self):
properties_pb = pipeline_pb2.ExecutorInput()
properties_pb.inputs.parameters[
'input_base_uri'].string_value = 'gs://input/base'
properties_pb.inputs.parameter_values['input_base_uri'].string_value = (
'gs://input/base'
)
inputs_spec_pb = pipeline_pb2.ComponentInputsSpec()
inputs_spec_pb.parameters['input_base_uri'].parameter_type = (
pipeline_pb2.ParameterType.STRING
)
self.assertDictEqual(
{'input_base': 'gs://input/base'},
kubeflow_v2_entrypoint_utils.parse_execution_properties(
properties_pb.inputs.parameters))
properties_pb.inputs.parameter_values, inputs_spec_pb
),
)

def testCanChangePropertiesByNameIdMapping(self):
model_blessing = standard_artifacts.ModelBlessing()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ def _run_executor(args: argparse.Namespace, beam_args: List[str]) -> None:
"""Selects a particular executor and run it based on name.
Args:
args:
--executor_class_path: The import path of the executor class.
args: --executor_class_path: The import path of the executor class.
--json_serialized_invocation_args: Full JSON-serialized parameters for
this execution.
this execution. --json_serialized_inputs_spec_args: Full JSON-serialized
component inputs spec for this execution.
beam_args: Optional parameter that maps to the optional_pipeline_args
parameter in the pipeline, which provides additional configuration options
for apache-beam and tensorflow.logging.
For more about the beam arguments please refer to:
for apache-beam and tensorflow.logging. For more about the beam arguments
please refer to:
https://cloud.google.com/dataflow/docs/guides/specifying-exec-params
"""
logging.set_verbosity(logging.INFO)
Expand All @@ -62,9 +62,16 @@ def _run_executor(args: argparse.Namespace, beam_args: List[str]) -> None:
executor_input,
ignore_unknown_fields=True)

inputs_spec = pipeline_spec_pb2.ComponentInputsSpec()
json_format.Parse(
args.json_serialized_inputs_spec_args,
inputs_spec,
ignore_unknown_fields=True,
)

inputs_dict = executor_input.inputs.artifacts
outputs_dict = executor_input.outputs.artifacts
inputs_parameter = executor_input.inputs.parameters
inputs_parameter = executor_input.inputs.parameter_values
outputs_parameters = executor_input.outputs.parameters

# Format {pipelineJob.runtimeConfig.gcsOutputDirectory}/{project_number}
Expand All @@ -81,7 +88,7 @@ def _run_executor(args: argparse.Namespace, beam_args: List[str]) -> None:
json_format.Parse(
output_meta_json.read(), output_metadata, ignore_unknown_fields=True)
# Append/Overwrite exec_propertise.
for k, v in output_metadata.parameters.items():
for k, v in output_metadata.parameter_values.items():
inputs_parameter[k].CopyFrom(v)

name_from_id = {}
Expand All @@ -91,7 +98,8 @@ def _run_executor(args: argparse.Namespace, beam_args: List[str]) -> None:
outputs = kubeflow_v2_entrypoint_utils.parse_raw_artifact_dict(
outputs_dict, name_from_id)
exec_properties = kubeflow_v2_entrypoint_utils.parse_execution_properties(
inputs_parameter)
inputs_parameter, inputs_spec
)
logging.info('Executor %s do: inputs: %s, outputs: %s, exec_properties: %s',
args.executor_class_path, inputs, outputs, exec_properties)
executor_cls = import_utils.import_class_by_path(args.executor_class_path)
Expand Down Expand Up @@ -187,6 +195,12 @@ def _parse_flags(argv: List[str]) -> Tuple[argparse.Namespace, List[str]]:
type=str,
required=True,
help='JSON-serialized metadata for this execution.')
parser.add_argument(
'--json_serialized_inputs_spec_args',
type=str,
required=True,
help='JSON-serialized component inputs spec for this execution.',
)

return parser.parse_known_args(argv)

Expand Down
Loading

0 comments on commit caa070b

Please sign in to comment.