diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5ab1af0b9b..7d4e73ea63 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4590,11 +4590,6 @@ def _create_taskstate_from_graph( lost_keys = self._match_graph_with_tasks(dsk, dependencies, keys) - if len(dsk) > 1: - self.log_event( - ["all", client], {"action": "update_graph", "count": len(dsk)} - ) - if lost_keys: self.report({"op": "cancelled-keys", "keys": lost_keys}, client=client) self.client_releases_keys( @@ -4616,13 +4611,28 @@ def _create_taskstate_from_graph( computation.annotations.update(global_annotations) del global_annotations - runnable, touched_tasks, new_tasks = self._generate_taskstates( + ( + runnable, + touched_tasks, + new_tasks, + colliding_task_count, + ) = self._generate_taskstates( keys=keys, dsk=dsk, dependencies=dependencies, computation=computation, ) + if len(dsk) > 1 or colliding_task_count: + self.log_event( + ["all", client], + { + "action": "update_graph", + "count": len(dsk), + "key-collisions": colliding_task_count, + }, + ) + keys_with_annotations = self._apply_annotations( tasks=new_tasks, annotations_by_type=annotations_by_type, @@ -4815,6 +4825,7 @@ def _generate_taskstates( touched_keys = set() touched_tasks = [] tgs_with_bad_run_spec = set() + colliding_task_count = 0 while stack: k = stack.pop() if k in touched_keys: @@ -4860,6 +4871,7 @@ def _generate_taskstates( # dask/dask#9888. dependencies[k] = deps_lhs + colliding_task_count += 1 if ts.group not in tgs_with_bad_run_spec: tgs_with_bad_run_spec.add(ts.group) logger.warning( @@ -4912,7 +4924,7 @@ def _generate_taskstates( len(touched_tasks), len(keys), ) - return runnable, touched_tasks, new_tasks + return runnable, touched_tasks, new_tasks, colliding_task_count def _apply_annotations( self, diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e73af206d7..9e25fba6c5 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4789,6 +4789,23 @@ async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, de For a real world example where this can trigger, see https://github.com/dask/dask/issues/9888 """ + seen = False + + def _match(event): + _, msg = event + return ( + isinstance(msg, dict) + and msg.get("action", None) == "update_graph" + and msg["key-collisions"] > 0 + ) + + def handler(ev): + if _match(ev): + nonlocal seen + seen = True + + c.subscribe_topic("all", handler) + x1 = c.submit(inc, 1, key="x1") y_old = c.submit(inc, x1, key="y") @@ -4803,6 +4820,8 @@ async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, de assert "Detected different `run_spec` for key 'y'" in log.getvalue() + await async_poll_for(lambda: seen, timeout=5) + async with Worker(s.address): # Used old run_spec assert await y_old == 3