Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use with_columns addition to a lazy frame with filter to prevent physically reading files #10980

Closed
deanm0000 opened this issue Sep 7, 2023 · 15 comments · Fixed by #11284
Closed
Labels
enhancement New feature or an improvement of an existing feature

Comments

@deanm0000
Copy link
Collaborator

Problem description

Say I have a big parquet file

If I do:

(
    pl.scan_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet")
    .with_columns(someflag=5)
    .filter(pl.col('someflag')==6)
    .collect()
)

I would think that it could "see" that someflag is always 5 without actually looking at the file and then, given that the filter only keeps instances where someflag==6, that it wouldn't download the file. However it does download the file.

Of course it begs the question why would I ever do that?

I'm trying to think of a way to make scanning a cloud dataset (in particular a hive partition) faster. It seems that using scan_pyarrow_dataset ends up opening every file even when a hive partition and filter should tell it to only open one file. Maybe this is just something that happens with Azure but that's my frame of reference.

I wanted to try to essentially walk my hive partitions and then concat all the parquet files to a list of scan_parquets. However on each of the scan_parquets I "attach" a with_columns that would add the columns for the hive so I'd have something like

df=pl.concat([
    pl.scan_parquet("my/path/year=2020/month=5/part-0.parquet").with_columns(year=pl.lit(2020), month=pl.lit(5)),
    pl.scan_parquet("my/path/year=2020/month=6/part-0.parquet").with_columns(year=pl.lit(2020, month=pl.lit(6)),
    pl.scan_parquet("my/path/year=2020/month=7/part-0.parquet").with_columns(year=pl.lit(2020, month=pl.lit(7)),
    pl.scan_parquet("my/path/year=2020/month=8/part-0.parquet").with_columns(year=pl.lit(2020, month=pl.lit(8))
])

From that I was hoping I could do

df.filter((pl.col("year")==2020) & (pl.col("month")==5)).collect()

Here's a snippet of an explain:

UNION
  PLAN 0:
    FILTER [([(col("year")) == (2022)]) & ([(col("month")) == (11)])] FROM

     WITH_COLUMNS:
     [2023.alias("year"), 4.alias("month")]

        PYTHON SCAN 
        PROJECT */11 COLUMNS
  PLAN 1:
    FILTER [([(col("year")) == (2022)]) & ([(col("month")) == (11)])] FROM

     WITH_COLUMNS:
     [2019.alias("year"), 5.alias("month")]

        PYTHON SCAN 
        PROJECT */11 COLUMNS

In looking at this, I'm guessing it performs tasks from the bottom up. Could it parse that the combination of with_columns and filter can exclude the python scan and skip it like partition pruning in postgres?

@deanm0000 deanm0000 added the enhancement New feature or an improvement of an existing feature label Sep 7, 2023
@orlp
Copy link
Collaborator

orlp commented Sep 7, 2023

I would think that it could "see" that someflag is always 5 without actually looking at the file and then, given that the filter only keeps instances where someflag==6, that it wouldn't download the file. However it does download the file.

Of course it begs the question why would I ever do that?

We need to always read at least the header of the parquet file to get the schema.

I don't know whether we support efficient HTTP range request readers yet, but we should eventually be able to support that so we only read the portion of the file we would as if it were on disk.

@deanm0000
Copy link
Collaborator Author

We need to always read at least the header of the parquet file to get the schema.

For http files, like the one above, it does seem to download the whole file when I do scan_parquet so I guess that wasn't a good example.

However it seems, for fsspec files, it can just read the header of the file. For example, I have a big file on the cloud where if I do read_parquet it takes 25 seconds but if I just scan_parquet, it takes 0.5s.

@orlp
Copy link
Collaborator

orlp commented Sep 8, 2023

For http files, like the one above, it does seem to download the whole file when I do scan_parquet so I guess that wasn't a good example.

Well, I don't think you'd be wrong to expect it. If the server supports HTTP Range Requests (and most do), we should be able to read just the part that we need, even through http.

That said, that's a different and more specific feature request than this, so I'll close this one.

@orlp orlp closed this as not planned Won't fix, can't repro, duplicate, stale Sep 8, 2023
@deanm0000
Copy link
Collaborator Author

The discussion on HTTP range requests was orthogonal to the feature request which might be called partition pruning. I'm not sure if you're closing because the partition pruning part isn't planned or if it seemed I was asking for something to do with HTTP range requests.

Here's a benchmark that is really the impetus of my request. Unfortunately this is based on my company's files so others can't run this themselves, at least not on the same data.

import duckdb
import pyarrow.dataset as ds
import polars as pl
import fsspec
import os
import time
import asyncio
abfs = fsspec.filesystem('abfss', connection_string=os.environ['Synblob'])
pathroot="pjm/prices/darts/yearmorows/"

strt=time.time()
dfdirect=pl.read_parquet(f"abfs://{pathroot}year=2022/month=11/0000.parquet", storage_options=abfs.storage_options)
print(f"direct file read took {round(time.time()-strt,1)} returned {dfdirect.shape[0]} rows")

strt=time.time()
dfdirect=pl.scan_parquet(f"abfs://{pathroot}year=2022/month=11/0000.parquet", storage_options=abfs.storage_options)
print(f"scan one file no collect took {round(time.time()-strt,1)}")

strt=time.time()
myds=ds.dataset(pathroot, filesystem=abfs, partitioning='hive')
dfpyds=pl.from_arrow(myds.to_table(filter=((ds.field('year')==2022) & (ds.field('month')==11))))
print(f"ds only filter took {round(time.time()-strt,1)} returned {dfpyds.shape[0]} rows")

strt=time.time()
myds=ds.dataset(pathroot, filesystem=abfs, partitioning='hive')
dfplds=pl.scan_pyarrow_dataset(myds).filter((pl.col('year')==2022) & (pl.col('month')==11)).collect()
print(f"pld filter took {round(time.time()-strt,1)} returned {dfplds.shape[0]} rows")

strt=time.time()
con = duckdb.connect()
con.register_filesystem(abfs)
dfduck=con.execute(f"""select * 
           from read_parquet('abfs://{pathroot}*/*/*.parquet', hive_partitioning=true) 
           where year='2022' and month='11'
           """).pl()
print(f"duckdb took {round(time.time()-strt,1)} returned {dfduck.shape[0]} rows")

async def mywalk(path, conn_string=os.environ['Synblob']):
    abfs = fsspec.filesystem('abfss', connection_string=conn_string, asyncronous=True)
    next_get = await abfs._ls(path)
    allreturn = []
    pending = [asyncio.create_task(abfs._ls(x)) for x in next_get]
    for iter in range(5):
        new_dones, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        pending = list(pending)
        for new_done in new_dones:
            new_result=new_done.result()
            for sub_result in new_result:
                if await abfs._isdir(sub_result):
                    pending.append(asyncio.create_task(abfs._ls(sub_result)))
                else:
                    allreturn.append(sub_result)
        if len(pending)==0:
            break
    return allreturn


strt=time.time()
allfiles = await mywalk(pathroot)
print(f"async walk took {round(time.time()-strt,1)}")
strt=time.time()
plscan=[]
for fl in allfiles:
    hivecols={x.split('=')[0]:int(x.split('=')[1]) for x in [x for x in fl.split('/') if "=" in x]}
    plscan.append(
        pl.scan_parquet(f"abfs://{fl}", storage_options=abfs.storage_options)
        .with_columns(**hivecols)
        .filter((pl.col('year')==2022) & (pl.col('month')==11))
    )
print(f"all scans w/filter pre collect took {round(time.time()-strt,1)}")
plscan=pl.collect_all(plscan)
print(f"scan, filter, collect_all {round(time.time()-strt,1)}")

strt=time.time()
plscan=[]
for fl in allfiles:
    hivecols={x.split('=')[0]:int(x.split('=')[1]) for x in [folder for folder in fl.split('/') if "=" in folder]}
    plscan.append(
        pl.scan_parquet(f"abfs://{fl}", storage_options=abfs.storage_options)
        .with_columns(**hivecols)
    )
plscanlf=pl.concat(plscan)
print(f"all scans and concat, no filter yet, pre collect took {round(time.time()-strt,1)}")
plscandf=plscanlf.filter((pl.col('year')==2022) & (pl.col('month')==11)).collect()
print(f"scan, concat, filter, collect took {round(time.time()-strt,1)}")

output is

direct file read took 29.8 returned 9683751 rows
scan one file no collect took 0.3
ds only filter took 244.1 returned 9683751 rows
pld filter took 238.1 returned 9683751 rows
duckdb took 42.1 returned 9683751 rows
async walk took 4.7
all scans w/filter pre collect took 19.1
scan, filter, collect_all 903.9
all scans and concat, no filter yet, pre collect took 23.8
scan, concat, filter, collect took 880.1

The important take away, I think, is that I can scan a file and it clearly doesn't read the whole file as we compare the difference in times between the read and scan for a single file.

Then the next part of the test is to see how long it will take to read in the whole dataset and apply filters that would return just a single file. I did that 5 ways

  1. using pyarrow to filter and only converting to polars at the end
  2. using scan_pyarrow_dataset
  3. manually scanning every file in the dataset and putting a filter on each lazyframe before collect_all
  4. manually scanning every file in the dataset, concating them, then filtering that before collect
  5. using duckdb

The benchmark time of loading the file directly without going through any dataset is 29.8 sec. Using pyarrow dataset with or without polars took significantly longer than that benchmark. Scanning all of the files in the dataset manually took just 19 seconds so, seemingly, it should take about 19.1 + 29.8=48.9 seconds to return the file in question.

For the purpose of my feature request, I'm going to set aside what happened with pyarrow and focus just on the polars only tests (ie 3 and 4).

In those cases, it seems it should have taken somewhere close to 48.9 seconds to gather one file's worth of data. Based on how long it actually took (around 900 seconds) to collect all the concatted lazyframes, it seems to have read all the files in their entirety before the new columns were applied to them and only then did they get filtered out. If we had partition pruning then it would recognize before reading the underlying file that it could disregard that whole plan

Lastly, just to corroborate my intuition that it should (or at least could) take just 48.9 seconds, please note that the duckdb query took just 42.1 seconds.

@orlp orlp reopened this Sep 8, 2023
@orlp
Copy link
Collaborator

orlp commented Sep 8, 2023

@deanm0000 I see, this is more about hive partitioning then? I've reopened this, sorry for misunderstanding.

@cjackal
Copy link
Contributor

cjackal commented Sep 10, 2023

I think the title is misleading; what we really can do here is optimizing filter on literal expressions, not with_columns with general expressions (that may depend on previous columns). And it sounds doable I think.

@mkleinbort-ic
Copy link

Is this related to #10685? I've been looking for a solution for a long time.

@deanm0000
Copy link
Collaborator Author

deanm0000 commented Sep 11, 2023

@cjackal

I think the title is misleading; what we really can do here is optimizing filter on literal expressions, not with_columns with general expressions (that may depend on previous columns). And it sounds doable I think.

I'm happy to edit the title if you have a suggestion. I wasn't really sure the best, most concise, way to articulate what I was getting after.

@mkleinbort-ic
maybe. My first inclination was to say no but I think they overlap in the following setup.

dfdirect=pl.scan_parquet(f"abfs://{pathroot}year=2022/month=11/0000.parquet", storage_options=abfs.storage_options)
dfdirect.with_columns(xyz=5).select('xyz').collect()

In this case, when it collects, it still reads the whole file, as evidenced by taking about 24s, even though the selected column is already defined independent of the contents of the file itself. In this case it would still need to read the metadata of the file to know how many rows it needs to conform to the schema but it doesn't need any of the actual data.

In contrast if I do:

pq.read_table(f"{pathroot}year=2022/month=11/0000.parquet", filesystem=abfs, columns={
    'a':pc.scalar(20)
})

then it will return a table with 9.6M rows of one column called a with a value of 20 in just 1s.

@ldacey
Copy link

ldacey commented Sep 23, 2023

Most of our data is on azure blob and we use fsspec as well. I am surprised that reading a pyarrow dataset and filtering with polars using scan_pyarrow_dataset is so slow. That is the approach I use and I assumed it would be the fastest.

@deanm0000
Copy link
Collaborator Author

Yeah, hopefully the rust optimizer experts can tweak things to effectuate this issue coupled with the forthcoming cloud read functionality so polars can catch up with duckdb in terms of hive partitions.

@ritchie46
Copy link
Member

To me this seems like the x-y problem. I believe what we really want is hive partitioning support in our optimizer and parquet readers. I am planning to add that this week.

@ritchie46
Copy link
Member

Most of our data is on azure blob and we use fsspec as well. I am surprised that reading a pyarrow dataset and filtering with polars using scan_pyarrow_dataset is so slow. That is the approach I use and I assumed it would be the fastest.

The pyarrow datasets defaults are slow. Try turning on prebuffering it can ben an order of magnitude faster: apache/arrow#36765

@ritchie46
Copy link
Member

Regarding this. Is it common that hive partitioning adds the columns as their primitive types? E.g. is month=1 added as an integer, or is it kept as a string?

And are the columns appended or prepended to the schema?

@ldacey
Copy link

ldacey commented Sep 24, 2023

Most of our data is on azure blob and we use fsspec as well. I am surprised that reading a pyarrow dataset and filtering with polars using scan_pyarrow_dataset is so slow. That is the approach I use and I assumed it would be the fastest.

The pyarrow datasets defaults are slow. Try turning on prebuffering it can ben an order of magnitude faster: apache/arrow#36765

Before, with format="parquet"

[2023-09-24T23:46:26.388+0000] {pyarrow_readers.py:175} INFO - Filtering on: (month_id == 202302)
[2023-09-24T23:50:34.993+0000] {pyarrow_readers.py:123} INFO - Read 2,006,595 rows and 169 columns (1.83 GB)
248.9 seconds with prebuffer disabled

After turning on pre_buffering:

[2023-09-24T23:51:53.823+0000] {pyarrow_readers.py:175} INFO - Filtering on: (month_id == 202302)
[2023-09-24T23:51:56.571+0000] {pyarrow_readers.py:123} INFO - Read 2,006,595 rows and 169 columns (1.83 GB)
2.9 seconds with prebuffer enabled

From 249 seconds down to under 3 seconds. That has to be the biggest performance boost from a boolean toggle I have run into before. It seems like the number of columns does not matter - 169 columns and 1.83 GB of data versus 5 columns and 108 MB of data both took the same time roughly with pre_buffer enabled.

@ldacey
Copy link

ldacey commented Sep 25, 2023

Regarding this. Is it common that hive partitioning adds the columns as their primitive types? E.g. is month=1 added as an integer, or is it kept as a string?

And are the columns appended or prepended to the schema?

I use date_id (20230925) and month_id (202309) for partitions most often because pyarrow reads them as integers and it allows for me to use relational operators easily without defining a ds.partitioning object to define the schema of the hive partitions. That way 99% of the time I can define "partitioning": "hive"

dataset = ds.dataset(table_uri, 
                     filesystem=filesystem, 
                     format=ds.ParquetFileFormat(default_fragment_scan_options=ds.ParquetFragmentScanOptions(pre_buffer=True)), 
                     partitioning="hive",
                    ).filter((ds.field("month_id") >= 202309) & (ds.field("date_id") < 20230904))
dataset.to_table()

Columns are appended to the schema. In my case, the month_id column becomes the final column in the table, and it is returned as pa.int32().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants