-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unmanaged Memory Leak with Large Parquet Files (Dask + Distributed) #8377
Comments
If your parquet files are 150MB on disk, chances are that they are easily 1GB in memory if not more and there are two threads per worker running loading this data. During deserialization, it is very possible that it requires more than that in peak memory.
If you start Instead of doing this yourself, I recommend using a ready solution. See https://docs.dask.org/en/stable/deploying.html#cloud for some suggestions for cloud deployments. There are open source solutions like Dask Cloud Provider and hosted solutions like Coiled (which has a generous free tier; disclaimer: I work for Coiled) After the data loading comes a shuffle that will also require some memory. Given all this, it is not unexpected that your workers are running out of memory and I strongly recommend using larger and/or more workers to process your data and only launch one worker per node. |
Cluster CreationIn another situation, I would love to work with coiled (in particular, I appreciate the blog posts you put out and that you contribute back to dask, which makes me VERY much want to make use of it as a paying user. Unfortunately, that's not an option right now). I should nonetheless try this out with the free tier of coiled (if only to verify that the bug still exists).
Workers per InstanceSorry for the confusion: I'm only calling pyarrow vs csvThe other thing that makes me think it's a QuestionsQuestions:
My Actions
Related LinksHere are some links where people are whispering about similar issues — none of these are proof, but they at least show me that I'm not crazy.
|
Reproducing the Error with
|
Describe the issue:
Not sure if this requires a separate bug report from #8375, but just in case, I can get the same error with plain old dask + distributed if the parquet data doesn't fit in memory.
When I try to read in 30 parquet files (~150 MB each) into 2 worker nodes (
m5.large
, 8 GB RAM, 500 GB disk each), there is a runaway unmanaged memory process that consumes all of the memory and causes the dask worker to be restarted. Eventually, after crashing on each of the 4 workers (2 nodes, 2 vCPUs each), dask gives up.The process that is consuming all of the memory:
/usr/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=11, pipe_handle=17) --multiprocessing-fork
I now know that that's the mark of a forked process in Python, but I'm unable to figure out where in the code this occurs.
Minimal Complete Verifiable Example:
Anything else we need to know?:
I'm running this on AWS. The dask cluster is handmade (via boto3), where my architecting script runs
dask worker
anddask scheduler
on everything. The cluster seems to be correct because all other operations work, and the cluster is able to handle a 52 GB CSV file just fine — it only fails with parquet.Environment:
Dask version: 2023.11.0
Python version: 3.10.12
Operating System: Ubuntu 22.04.3 LTS
Install method (conda, pip, source): pip
Logs:
worker1.log,
worker2.log,
scheduler.log
The text was updated successfully, but these errors were encountered: