Skip to content

Commit

Permalink
Orchestrator shouldn't crash when MLMD call fails
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 641385227
  • Loading branch information
tfx-copybara committed Jun 28, 2024
1 parent 4e71a35 commit d836308
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
12 changes: 9 additions & 3 deletions tfx/orchestration/experimental/core/pipeline_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,9 +1298,15 @@ def orchestrate(
if filter_fn is None:
filter_fn = lambda _: True

all_pipeline_states = pstate.PipelineState.load_all_active_and_owned(
mlmd_connection_manager.primary_mlmd_handle
)
# Try to load active pipelines. If there is a recoverable error, return False
# and then retry in the next orchestration iteration.
try:
all_pipeline_states = pstate.PipelineState.load_all_active_and_owned(
mlmd_connection_manager.primary_mlmd_handle
)
except Exception as e: # pylint: disable=broad-except
raise e

pipeline_states = [s for s in all_pipeline_states if filter_fn(s)]
if not pipeline_states:
logging.info('No active pipelines to run.')
Expand Down
3 changes: 3 additions & 0 deletions tfx/orchestration/experimental/core/pipeline_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from tfx.types import standard_artifacts
from tfx.utils import status as status_lib

from ml_metadata import errors as mlmd_errors
from ml_metadata.proto import metadata_store_pb2


Expand Down Expand Up @@ -3590,5 +3591,7 @@ def test_orchestrate_pipelines_with_specified_pipeline_uid(
self.assertTrue(task_queue.is_empty())




if __name__ == '__main__':
tf.test.main()

0 comments on commit d836308

Please sign in to comment.