-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use with_columns addition to a lazy frame with filter to prevent physically reading files #10980
Comments
We need to always read at least the header of the parquet file to get the schema. I don't know whether we support efficient HTTP range request readers yet, but we should eventually be able to support that so we only read the portion of the file we would as if it were on disk. |
For http files, like the one above, it does seem to download the whole file when I do scan_parquet so I guess that wasn't a good example. However it seems, for fsspec files, it can just read the header of the file. For example, I have a big file on the cloud where if I do |
Well, I don't think you'd be wrong to expect it. If the server supports HTTP Range Requests (and most do), we should be able to read just the part that we need, even through http. That said, that's a different and more specific feature request than this, so I'll close this one. |
The discussion on HTTP range requests was orthogonal to the feature request which might be called partition pruning. I'm not sure if you're closing because the partition pruning part isn't planned or if it seemed I was asking for something to do with HTTP range requests. Here's a benchmark that is really the impetus of my request. Unfortunately this is based on my company's files so others can't run this themselves, at least not on the same data.
output is
The important take away, I think, is that I can scan a file and it clearly doesn't read the whole file as we compare the difference in times between the read and scan for a single file. Then the next part of the test is to see how long it will take to read in the whole dataset and apply filters that would return just a single file. I did that 5 ways
The benchmark time of loading the file directly without going through any dataset is 29.8 sec. Using pyarrow dataset with or without polars took significantly longer than that benchmark. Scanning all of the files in the dataset manually took just 19 seconds so, seemingly, it should take about 19.1 + 29.8=48.9 seconds to return the file in question. For the purpose of my feature request, I'm going to set aside what happened with pyarrow and focus just on the polars only tests (ie 3 and 4). In those cases, it seems it should have taken somewhere close to 48.9 seconds to gather one file's worth of data. Based on how long it actually took (around 900 seconds) to collect all the concatted lazyframes, it seems to have read all the files in their entirety before the new columns were applied to them and only then did they get filtered out. If we had partition pruning then it would recognize before reading the underlying file that it could disregard that whole plan Lastly, just to corroborate my intuition that it should (or at least could) take just 48.9 seconds, please note that the duckdb query took just 42.1 seconds. |
@deanm0000 I see, this is more about hive partitioning then? I've reopened this, sorry for misunderstanding. |
I think the title is misleading; what we really can do here is optimizing |
Is this related to #10685? I've been looking for a solution for a long time. |
I'm happy to edit the title if you have a suggestion. I wasn't really sure the best, most concise, way to articulate what I was getting after. @mkleinbort-ic
In this case, when it collects, it still reads the whole file, as evidenced by taking about 24s, even though the selected column is already defined independent of the contents of the file itself. In this case it would still need to read the metadata of the file to know how many rows it needs to conform to the schema but it doesn't need any of the actual data. In contrast if I do:
then it will return a table with 9.6M rows of one column called |
Most of our data is on azure blob and we use fsspec as well. I am surprised that reading a pyarrow dataset and filtering with polars using scan_pyarrow_dataset is so slow. That is the approach I use and I assumed it would be the fastest. |
Yeah, hopefully the rust optimizer experts can tweak things to effectuate this issue coupled with the forthcoming cloud read functionality so polars can catch up with duckdb in terms of hive partitions. |
To me this seems like the x-y problem. I believe what we really want is hive partitioning support in our optimizer and parquet readers. I am planning to add that this week. |
The pyarrow datasets defaults are slow. Try turning on prebuffering it can ben an order of magnitude faster: apache/arrow#36765 |
Regarding this. Is it common that hive partitioning adds the columns as their primitive types? E.g. is And are the columns appended or prepended to the schema? |
Before, with format="parquet"
After turning on pre_buffering:
From 249 seconds down to under 3 seconds. That has to be the biggest performance boost from a boolean toggle I have run into before. It seems like the number of columns does not matter - 169 columns and 1.83 GB of data versus 5 columns and 108 MB of data both took the same time roughly with pre_buffer enabled. |
I use date_id (20230925) and month_id (202309) for partitions most often because pyarrow reads them as integers and it allows for me to use relational operators easily without defining a dataset = ds.dataset(table_uri,
filesystem=filesystem,
format=ds.ParquetFileFormat(default_fragment_scan_options=ds.ParquetFragmentScanOptions(pre_buffer=True)),
partitioning="hive",
).filter((ds.field("month_id") >= 202309) & (ds.field("date_id") < 20230904))
dataset.to_table() Columns are appended to the schema. In my case, the month_id column becomes the final column in the table, and it is returned as pa.int32(). |
Problem description
Say I have a big parquet file
If I do:
I would think that it could "see" that someflag is always 5 without actually looking at the file and then, given that the filter only keeps instances where someflag==6, that it wouldn't download the file. However it does download the file.
Of course it begs the question why would I ever do that?
I'm trying to think of a way to make scanning a cloud dataset (in particular a hive partition) faster. It seems that using scan_pyarrow_dataset ends up opening every file even when a hive partition and filter should tell it to only open one file. Maybe this is just something that happens with Azure but that's my frame of reference.
I wanted to try to essentially walk my hive partitions and then concat all the parquet files to a list of scan_parquets. However on each of the scan_parquets I "attach" a with_columns that would add the columns for the hive so I'd have something like
From that I was hoping I could do
Here's a snippet of an explain:
In looking at this, I'm guessing it performs tasks from the bottom up. Could it parse that the combination of with_columns and filter can exclude the python scan and skip it like partition pruning in postgres?
The text was updated successfully, but these errors were encountered: