Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data-store reloaded flag #5715

Merged
merged 2 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,21 +531,23 @@ def initiate_data_model(self, reloaded=False):
self.generate_definition_elements()

# Update workflow statuses and totals (assume needed)
self.update_workflow()
self.update_workflow(True)

# Apply current deltas
self.batch_deltas()
self.apply_delta_batch()
# Clear deltas after application
self.clear_delta_store()
self.clear_delta_batch()

if not reloaded:
# Gather this batch of deltas for publish
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()
# Gather the store as batch of deltas for publishing
self.batch_deltas(True)
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()

self.updates_pending = False

# Clear deltas after application and publishing
self.clear_delta_store()
# Clear second batch after publishing
self.clear_delta_batch()

def generate_definition_elements(self):
Expand All @@ -563,6 +565,8 @@ def generate_definition_elements(self):
workflow.id = self.workflow_id
workflow.last_updated = update_time
workflow.stamp = f'{workflow.id}@{workflow.last_updated}'
# Treat play/restart as hard reload of definition.
workflow.reloaded = True

graph = workflow.edges
graph.leaves[:] = config.leaves
Expand Down Expand Up @@ -1494,7 +1498,7 @@ def insert_db_job(self, row_idx, row):
tp_delta.jobs.append(j_id)
self.updates_pending = True

def update_data_structure(self, reloaded=False):
def update_data_structure(self):
"""Workflow batch updates in the data structure."""
# load database history for flagged nodes
self.apply_task_proxy_db_history()
Expand All @@ -1521,11 +1525,7 @@ def update_data_structure(self, reloaded=False):
# Apply all deltas
self.apply_delta_batch()

if reloaded:
self.clear_delta_batch()
self.batch_deltas(reloaded=True)

if self.updates_pending or reloaded:
if self.updates_pending:
self.apply_delta_checksum()
# Gather this batch of deltas for publish
self.publish_deltas = self.get_publish_deltas()
Expand All @@ -1536,6 +1536,18 @@ def update_data_structure(self, reloaded=False):
self.clear_delta_batch()
self.clear_delta_store()

def update_workflow_states(self):
"""Batch workflow state updates."""

# update the workflow state in the data store
self.update_workflow()

# push out update deltas
self.batch_deltas()
self.apply_delta_batch()
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()

def prune_data_store(self):
"""Remove flagged nodes and edges not in the set of active paths."""

Expand Down Expand Up @@ -1808,7 +1820,7 @@ def set_graph_window_extent(self, n_edge_distance):
self.next_n_edge_distance = n_edge_distance
self.updates_pending = True

def update_workflow(self):
def update_workflow(self, reloaded=False):
"""Update workflow element status and state totals."""
# Create new message and copy existing message content
data = self.data[self.workflow_id]
Expand Down Expand Up @@ -1864,6 +1876,9 @@ def update_workflow(self):
w_delta.status_msg = status_msg
delta_set = True

if reloaded is not w_data.reloaded:
w_delta.reloaded = reloaded

if self.schd.pool.main_pool:
pool_points = set(self.schd.pool.main_pool)
oldest_point = str(min(pool_points))
Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@
workflow_id=w_id)
delta_store[DELTA_ADDED] = (
self.data_store_mgr.data[w_id])
delta_store[DELTA_ADDED][

Check warning on line 571 in cylc/flow/network/resolvers.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/resolvers.py#L571

Added line #L571 was not covered by tests
WORKFLOW
].reloaded = True
deltas_queue.put(
(w_id, 'initial_burst', delta_store))
elif w_id in self.delta_store[sub_id]:
Expand Down
38 changes: 16 additions & 22 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ async def command_reload_workflow(self) -> None:
self._update_workflow_state()

# Re-initialise data model on reload
self.data_store_mgr.initiate_data_model(reloaded=True)
self.data_store_mgr.initiate_data_model(self.is_reloaded)

# Reset the remote init map to trigger fresh file installation
self.task_job_mgr.task_remote_mgr.remote_init_map.clear()
Expand Down Expand Up @@ -1548,7 +1548,7 @@ async def workflow_shutdown(self):

# Is the workflow ready to shut down now?
if self.pool.can_stop(self.stop_mode):
await self.update_data_structure(self.is_reloaded)
await self.update_data_structure()
self.proc_pool.close()
if self.stop_mode != StopMode.REQUEST_NOW_NOW:
# Wait for process pool to complete,
Expand Down Expand Up @@ -1767,7 +1767,7 @@ async def main_loop(self) -> None:

if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure(self.is_reloaded)
await self.update_data_structure()

if has_updated:
if not self.is_reloaded:
Expand Down Expand Up @@ -1838,37 +1838,31 @@ def _update_workflow_state(self):
A cut-down version of update_data_structure which only considers
workflow state changes e.g. status, status message, state totals, etc.
"""
# Publish any existing before potentially creating more
self._publish_deltas()
# update the workflow state in the data store
self.data_store_mgr.update_workflow()

# push out update deltas
self.data_store_mgr.batch_deltas()
self.data_store_mgr.apply_delta_batch()
self.data_store_mgr.apply_delta_checksum()
self.data_store_mgr.publish_deltas = (
self.data_store_mgr.get_publish_deltas()
)
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)

# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
self.data_store_mgr.update_workflow_states()
self._publish_deltas()

async def update_data_structure(self, reloaded: bool = False):
"""Update DB, UIS, Summary data elements"""
# Publish any existing before potentially creating more
self._publish_deltas()
# Collect/apply data store updates/deltas
self.data_store_mgr.update_data_structure(reloaded=reloaded)
# Publish updates:
self.data_store_mgr.update_data_structure()
self._publish_deltas()
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)

def _publish_deltas(self):
"""Publish pending deltas."""
if self.data_store_mgr.publish_pending:
self.data_store_mgr.publish_pending = False
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)
# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)

def check_workflow_timers(self):
"""Check timers, and abort or run event handlers as configured."""
Expand Down
Loading