diff --git a/tfx/orchestration/experimental/core/env.py b/tfx/orchestration/experimental/core/env.py index 565322ff64..a1381ecbd7 100644 --- a/tfx/orchestration/experimental/core/env.py +++ b/tfx/orchestration/experimental/core/env.py @@ -113,6 +113,17 @@ def update_pipeline_run_status( ) -> None: """Updates orchestrator storage backends with pipeline run status.""" + @abc.abstractmethod + def create_pipeline_run_node_executions( + self, + owner: str, + pipeline_name: str, + pipeline: pipeline_pb2.Pipeline, + node_id: str, + executions: Sequence[metadata_store_pb2.Execution], + ) -> None: + """Creates (sub-)pipeline run node executions in the storage backend.""" + @abc.abstractmethod def record_orchestration_time(self, pipeline_run_id: str) -> None: """Records the orchestration time for a pipeline run.""" @@ -211,6 +222,16 @@ def update_pipeline_run_status( ) -> None: pass + def create_pipeline_run_node_executions( + self, + owner: str, + pipeline_name: str, + pipeline: pipeline_pb2.Pipeline, + node_id: str, + executions: Sequence[metadata_store_pb2.Execution], + ) -> None: + pass + def record_orchestration_time(self, pipeline_run_id: str) -> None: pass diff --git a/tfx/orchestration/experimental/core/env_test.py b/tfx/orchestration/experimental/core/env_test.py index 14971bb5a3..ec2c27c9b3 100644 --- a/tfx/orchestration/experimental/core/env_test.py +++ b/tfx/orchestration/experimental/core/env_test.py @@ -84,6 +84,16 @@ def update_pipeline_run_status( ) -> None: raise NotImplementedError() + def create_pipeline_run_node_executions( + self, + owner: str, + pipeline_name: str, + pipeline: pipeline_pb2.Pipeline, + node_id: str, + executions: Sequence[metadata_store_pb2.Execution], + ) -> None: + raise NotImplementedError() + def record_orchestration_time(self, pipeline_run_id: str) -> None: raise NotImplementedError()