Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8.1.x #5327

Merged
merged 16 commits into from
Jan 27, 2023
Merged

8.1.x #5327

Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 168 additions & 116 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
TASK_STATUSES_ORDERED
)
from cylc.flow.task_state_prop import extract_group_state
from cylc.flow.taskdef import generate_graph_parents
from cylc.flow.taskdef import generate_graph_parents, generate_graph_children
from cylc.flow.task_state import TASK_STATUSES_FINAL
from cylc.flow.util import (
serialise,
Expand Down Expand Up @@ -651,34 +651,54 @@ def generate_definition_elements(self):
self.parents = parents

def increment_graph_window(
self, itask, edge_distance=0, active_id=None,
descendant=False, is_parent=False):
"""Generate graph window about given origin to n-edge-distance.
self,
source_tokens,
point,
flow_nums,
edge_distance=0,
active_id=None,
descendant=False,
is_parent=False,
is_manual_submit=False,
itask=None
):
"""Generate graph window about active task proxy to n-edge-distance.

A recursive function, that creates a node then moves to children and
parents repeating this process out to one edge beyond the max window
size (in edges). Going out one edge further, we can trigger
pruning as new active tasks appear beyond this boundary.


Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy object.
source_tokens (cylc.flow.id.Tokens)
point (PointBase)
flow_nums (set)
edge_distance (int):
Graph distance from active/origin node.
active_id (str):
Active/origin node id.
descendant (bool):
Is the current node a direct descendent of the active/origin.
is_parent (bool)
is_manual_submit (bool)
itask (cylc.flow.task_proxy.TaskProxy):
Active/Other task proxy, passed in with pool invocation.

Returns:

None

"""
# Create this source node
s_tokens = self.id_.duplicate(itask.tokens)
# ID passed through recursion as reference to original/active node.
if active_id is None:
active_id = s_tokens.id
source_tokens = self.id_.duplicate(source_tokens)
active_id = source_tokens.id

# flag manual triggers for pruning on deletion.
if itask.is_manual_submit:
if is_manual_submit:
self.prune_trigger_nodes.setdefault(active_id, set()).add(
s_tokens.id
source_tokens.id
)

# Setup and check if active node is another's boundary node
Expand All @@ -698,52 +718,96 @@ def increment_graph_window(
if descendant and self.n_edge_distance > 0:
self.n_window_boundary_nodes[
active_id
].setdefault(edge_distance, set()).add(s_tokens.id)
].setdefault(edge_distance, set()).add(source_tokens.id)
return
if (
(not any(itask.graph_children.values()) and descendant)
or self.n_edge_distance == 0
):
self.n_window_boundary_nodes[
active_id
].setdefault(edge_distance, set()).add(s_tokens.id)

self.n_window_nodes[active_id].add(s_tokens.id)

# Generate task proxy node
is_orphan = self.generate_ghost_task(s_tokens.id, itask, is_parent)
is_orphan, graph_children = self.generate_ghost_task(
source_tokens,
point,
flow_nums,
is_parent,
itask
)

self.n_window_nodes[active_id].add(source_tokens.id)

edge_distance += 1

# Don't expand window about orphan task.
if not is_orphan:
tdef = self.schd.config.taskdefs[source_tokens['task']]
if graph_children is None:
graph_children = generate_graph_children(tdef, point)
if (
(not any(graph_children.values()) and descendant)
or self.n_edge_distance == 0
):
self.n_window_boundary_nodes[
active_id
].setdefault(edge_distance - 1, set()).add(source_tokens.id)

# TODO: xtrigger is workflow_state edges too
# Reference set for workflow relations
for items in itask.graph_children.values():
if edge_distance == 1:
descendant = True
self._expand_graph_window(
s_tokens,
items,
active_id,
itask.flow_nums,
edge_distance,
descendant,
False,
)
final_point = self.schd.config.final_point
if edge_distance == 1:
descendant = True
# Children/downstream nodes
for items in graph_children.values():
for child_name, child_point, _ in items:
if child_point > final_point:
continue
child_tokens = self.id_.duplicate(
cycle=str(child_point),
task=child_name,
)
# We still increment the graph one further to find
# boundary nodes, but don't create elements.
if edge_distance <= self.n_edge_distance:
self.generate_edge(
source_tokens,
child_tokens,
active_id
)
if child_tokens.id in self.n_window_nodes[active_id]:
continue
self.increment_graph_window(
child_tokens,
child_point,
flow_nums,
edge_distance,
active_id,
descendant,
False
)

for items in generate_graph_parents(
itask.tdef, itask.point
).values():
self._expand_graph_window(
s_tokens,
items,
active_id,
itask.flow_nums,
edge_distance,
False,
True,
)
# Parents/upstream nodes
for items in generate_graph_parents(tdef, point).values():
for parent_name, parent_point, _ in items:
if parent_point > final_point:
continue
parent_tokens = self.id_.duplicate(
cycle=str(parent_point),
task=parent_name,
)
if edge_distance <= self.n_edge_distance:
# reverse for parent
self.generate_edge(
parent_tokens,
source_tokens,
active_id
)
if parent_tokens.id in self.n_window_nodes[active_id]:
continue
self.increment_graph_window(
parent_tokens,
parent_point,
flow_nums,
edge_distance,
active_id,
False,
True
)

# If this is the active task (edge_distance has been incremented),
# then add the most distant child as a trigger to prune it.
Expand All @@ -762,65 +826,29 @@ def increment_graph_window(
getattr(self.updated[WORKFLOW], EDGES).edges.extend(
self.n_window_edges[active_id])

def _expand_graph_window(
self,
s_tokens,
items,
active_id,
flow_nums,
edge_distance,
descendant=False,
is_parent=False
):
"""Construct nodes/edges for children/parents of source node."""
final_point = self.schd.config.final_point
for t_name, t_point, _ in items:
if t_point > final_point:
continue
t_tokens = self.id_.duplicate(
cycle=str(t_point),
task=t_name,
def generate_edge(self, parent_tokens, child_tokens, active_id):
"""Construct edge of child and parent task proxy node."""
# Initiate edge element.
e_id = self.edge_id(parent_tokens, child_tokens)
if e_id in self.n_window_edges[active_id]:
return
if (
e_id not in self.data[self.workflow_id][EDGES]
and e_id not in self.added[EDGES]
):
self.added[EDGES][e_id] = PbEdge(
id=e_id,
source=parent_tokens.id,
target=child_tokens.id
)
# Initiate edge element.
if is_parent:
e_id = self.edge_id(t_tokens, s_tokens)
else:
e_id = self.edge_id(s_tokens, t_tokens)
if e_id in self.n_window_edges[active_id]:
continue
if (
e_id not in self.data[self.workflow_id][EDGES]
and e_id not in self.added[EDGES]
and edge_distance <= self.n_edge_distance
):
if is_parent:
self.added[EDGES][e_id] = PbEdge(
id=e_id,
source=t_tokens.id,
target=s_tokens.id
)
else:
self.added[EDGES][e_id] = PbEdge(
id=e_id,
source=s_tokens.id,
target=t_tokens.id
)
# Add edge id to node field for resolver reference
self.updated[TASK_PROXIES].setdefault(
t_tokens.id,
PbTaskProxy(id=t_tokens.id)).edges.append(e_id)
self.updated[TASK_PROXIES].setdefault(
s_tokens.id,
PbTaskProxy(id=s_tokens.id)).edges.append(e_id)
self.n_window_edges[active_id].add(e_id)
if t_tokens.id in self.n_window_nodes[active_id]:
continue
self.increment_graph_window(
TaskProxy(
self.schd.config.get_taskdef(t_name),
t_point, flow_nums, submit_num=0
),
edge_distance, active_id, descendant, is_parent)
# Add edge id to node field for resolver reference
self.updated[TASK_PROXIES].setdefault(
child_tokens.id,
PbTaskProxy(id=child_tokens.id)).edges.append(e_id)
self.updated[TASK_PROXIES].setdefault(
parent_tokens.id,
PbTaskProxy(id=parent_tokens.id)).edges.append(e_id)
self.n_window_edges[active_id].add(e_id)

def remove_pool_node(self, name, point):
"""Remove ID reference and flag isolate node/branch for pruning."""
Expand Down Expand Up @@ -854,33 +882,57 @@ def add_pool_node(self, name, point):
).id
self.all_task_pool.add(tp_id)

def generate_ghost_task(self, tp_id, itask, is_parent=False):
def generate_ghost_task(
self,
tokens,
point,
flow_nums,
is_parent=False,
itask=None
):
"""Create task-point element populated with static data.

Args:
tp_id (str):
data-store task proxy ID.
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy object.
source_tokens (cylc.flow.id.Tokens)
point (PointBase)
flow_nums (set)
is_parent (bool):
Used to determine whether to load DB state.
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy object.

Returns:

True/False
(True/False, Dict/None)

Orphan tasks with no children return (True, None) respectively.

"""
name = itask.tdef.name
name = tokens['task']
point_string = tokens['cycle']
t_id = self.definition_id(name)
point_string = f'{itask.point}'
tp_id = tokens.id
task_proxies = self.data[self.workflow_id][TASK_PROXIES]

is_orphan = False
if name not in self.schd.config.taskdefs:
is_orphan = True

if itask is None:
itask = self.schd.pool.get_task(point_string, name)
if tp_id in task_proxies or tp_id in self.added[TASK_PROXIES]:
return is_orphan
if itask is None:
return is_orphan, None
return is_orphan, itask.graph_children

if itask is None:
itask = TaskProxy(
self.schd.config.get_taskdef(name),
point,
flow_nums,
submit_num=0,
data_mode=True
)

if is_orphan:
self.generate_orphan_task(itask)
Expand All @@ -893,7 +945,7 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
task_def = self.added[TASKS][t_id]
except KeyError:
# Task removed from workflow definition.
return False
return False, itask.graph_children

update_time = time()
tp_stamp = f'{tp_id}@{update_time}'
Expand All @@ -903,7 +955,7 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
task=t_id,
cycle_point=point_string,
is_held=(
(name, itask.point)
(name, point)
in self.schd.pool.tasks_to_hold
),
depth=task_def.depth,
Expand Down Expand Up @@ -961,7 +1013,7 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):

self.updates_pending = True

return is_orphan
return is_orphan, itask.graph_children

def generate_orphan_task(self, itask):
"""Generate orphan task definition."""
Expand Down
Loading