Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Job] Support DAG execution by replacing is_chain with is_dag check #4186

Merged
merged 4 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions sky/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,22 @@ def is_chain(self) -> bool:
(all(degree <= 1 for degree in out_degrees) and
sum(degree == 0 for degree in out_degrees) == 1))

def is_connected_dag(self) -> bool:
"""Check if the graph is a connected directed acyclic graph (DAG).

Returns:
True if the graph is a connected DAG (weakly connected,
directed and acyclic), False otherwise.
"""

# A graph is weakly connected if replacing all directed edges with
# undirected edges produces a connected graph, i.e., any two nodes
# can reach each other ignoring edge directions.
if not nx.is_weakly_connected(self.graph):
return False

return nx.is_directed_acyclic_graph(self.graph)


class _DagContext(threading.local):
"""A thread-local stack of Dags."""
Expand Down
8 changes: 5 additions & 3 deletions sky/jobs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ def launch(
dag = dag_utils.convert_entrypoint_to_dag(entrypoint)
dag, mutated_user_config = admin_policy_utils.apply(
dag, use_mutated_config_in_current_request=False)
if not dag.is_chain():
if not dag.is_connected_dag():
with ux_utils.print_exception_no_traceback():
raise ValueError('Only single-task or chain DAG is '
f'allowed for job_launch. Dag: {dag}')
raise ValueError(
f'Only connected DAG is allowed for job_launch. If your dag '
f'contains multiple subgraph that is a connected dag, please '
f'separate them into multiple dag. Get: {dag}')

dag_utils.maybe_infer_and_fill_dag_and_task_names(dag)

Expand Down
Loading