Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dumps_task #8067

Merged
merged 5 commits into from
Aug 11, 2023
Merged

Remove dumps_task #8067

merged 5 commits into from
Aug 11, 2023

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Aug 3, 2023

This is a tangent to #8049

I noticed that the dumps_task is a surprisingly expensive operation (about 12% in #7998)

It is also a rather significant complexity driver and I believe it is no longer necessary now that pickle is used on the scheduler.

This PR explores what actually relies on this behavior and how much complexity we can remove with the removal of dumps_task

@github-actions
Copy link
Contributor

github-actions bot commented Aug 3, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       20 files  ±    0         20 suites  ±0   11h 35m 47s ⏱️ + 39m 57s
  3 757 tests +    5    3 645 ✔️ ±    0     106 💤 +  2  5 +3  1 🔥 ±0 
36 343 runs   - 134  34 585 ✔️  - 223  1 750 💤 +85  7 +4  1 🔥 ±0 

For more details on these failures and errors, see this check.

Results for commit 43b261f. ± Comparison against base commit ef6d4bf.

This pull request removes 12 and adds 17 tests. Note that renamed tests count towards both.
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[executing-False-deserialize_task]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[executing-False-execute]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[executing-True-deserialize_task]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[executing-True-execute]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[resumed-False-deserialize_task]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[resumed-False-execute]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[resumed-True-deserialize_task]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[resumed-True-execute]
distributed.tests.test_scheduler ‑ test_dumps_task
distributed.tests.test_worker ‑ test_gather_missing_workers_replicated[True]
…
distributed.protocol.tests.test_numpy
distributed.shuffle.tests.test_rechunk
distributed.shuffle.tests.test_shuffle ‑ test_restarting_does_not_deadlock
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[executing-False]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[executing-True]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[resumed-False]
distributed.tests.test_cancelled_state ‑ test_execute_preamble_early_cancel[resumed-True]
distributed.tests.test_client ‑ test_gather_race_vs_AMM[False]
distributed.tests.test_client ‑ test_gather_race_vs_AMM[True]
distributed.tests.test_utils_comm ‑ test_gather_from_workers_busy
…

♻️ This comment has been updated with latest results.

Comment on lines -362 to -377
class Refcount:
"Track how many instances of this class exist; logs the count at creation and deletion"

count = 0
lock = dask.utils.SerializableLock()
log = []

def __init__(self):
with self.lock:
type(self).count += 1
self.log.append(self.count)

def __del__(self):
with self.lock:
self.log.append(self.count)
type(self).count -= 1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is interesting. This PR is not changing anything in terms of scheduling, ordering, etc. but this is still quite reliably failing. It seems as if Refcount is relying on explicit garbage collection. This is something I want to look into a little more since we're seeing a lot of GC warnings recently. However, for the sake of this PR I rewrote it to count keys in data instead of relying on GC. Eventually, I think both tests would make sense

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really a weird case and somehow connected to how this object is defined in a local context.
I looked pretty closely but I cannot find any cyclic references. In fact, I see actually fewer objects actually tracked by GC than this counter is let to believe. I know that CPython guarantees that __del__ is indeed called and only called once but I believe there are some caveats about when this is the case.

@fjetter fjetter marked this pull request as ready for review August 9, 2023 15:06
@fjetter fjetter changed the title WIP Remove dumps_task Remove dumps_task Aug 9, 2023
@fjetter fjetter changed the title Remove dumps_task Remove dumps_task Aug 9, 2023
@fjetter
Copy link
Member Author

fjetter commented Aug 9, 2023

Tests look good and considering the large reduction of complexity, I suggest to move forward unless benchmarking raises a red flag (A/B currently running, manual tests hasn't shown any anomalies)

@fjetter
Copy link
Member Author

fjetter commented Aug 10, 2023

Well, benchmarks are happy, mostly

https://github.com/coiled/benchmarks/suites/14979719727/artifacts/854911855

Some rather common operations are 20-30% faster! Some tests (primarily the parquet tests) are slightly negatively impacted. I suspect this is because we're no longer caching parts of the deserialization but I haven't verified

Wall Clock
image

Average memory also looks good (this is interesting...)

image

We do see a couple of jumps in peak memory usage

image

I suspect that the memory changes are more or less an artifact of subtle timing changes but I haven't verified.

The very large outlier in wall time is the test_single_future which I strongly suspect suffers from the removed cache. The absolute change is minimal but the relative one is large.

@fjetter
Copy link
Member Author

fjetter commented Aug 10, 2023

Thinking about these results for a moment, I suspect the improved runtime is mostly from the removal of the deserialization step.

try:
function, args, kwargs = await self._maybe_deserialize_task(ts)
except Exception as exc:
logger.error("Could not deserialize task %s", key, exc_info=True)
return ExecuteFailureEvent.from_exception(
exc,
key=key,
run_id=run_id,
stimulus_id=f"run-spec-deserialize-failed-{time()}",
)

While deserializing tasks we were basically already blocking a task slot on the state machine even though the threadpool was idling. (I was recently also thinking about "oversubscribing" the state machine, i.e. state_machine.nthreads > TPE.max_workers to create some pressure and keep the TPE busy, very different ticket, of course)

@hendrikmakait hendrikmakait self-requested a review August 10, 2023 12:53
@fjetter
Copy link
Member Author

fjetter commented Aug 10, 2023

cc @madsbk this may also interest you? Not directly related to #8083 but kind of

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @fjetter! I <3 the reduction in complexity.

assert not function and not args and not kwargs
function = execute_task
args = (task,)
def _normalize_task(task: Any) -> T_runspec:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It feels like this should live in utils (and maybe be public?) instead of worker given that we use it on the scheduler as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want this to be public. This is merely a translation layer between garbage outside and clean within

Eventually this should become dask/dask#9969

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I guess it being public in a private module might be preferable but that's nit-picking.

elif not isinstance(self.run_spec, SerializedTask):
self.run_spec = SerializedTask(task=self.run_spec)
if isinstance(self.run_spec, ToPickle):
# FIXME Sometimes the protocol is not unpacking this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create an issue for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can but this is very low prio

distributed/worker_state_machine.py Outdated Show resolved Hide resolved
Co-authored-by: Hendrik Makait <hendrik.makait@gmail.com>
@hendrikmakait hendrikmakait merged commit 4f30abc into dask:main Aug 11, 2023
19 of 25 checks passed
@fjetter fjetter deleted the remove_dumps_task branch August 12, 2023 13:04
cache_loads: LRU[bytes, Callable[..., Any]] = LRU(maxsize=100)


def loads_function(bytes_object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @madsbk - It looks like we were using this function in dask-cuda (rapidsai/dask-cuda#1219)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we use it for its caching feature but I don't think it is needed.

wence- added a commit to wence-/dask-cuda that referenced this pull request Sep 27, 2023
In versions of distributed after dask/distributed#8067 but before
dask/distributed#8216, we must patch protocol.loads to include the
same decompression fix.
rapids-bot bot pushed a commit to rapidsai/dask-cuda that referenced this pull request Sep 27, 2023
In versions of distributed after dask/distributed#8067 but before dask/distributed#8216, we must patch protocol.loads to include the same decompression fix.

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #1247
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants