Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read partition columns of Hive dataset #404

Closed
grantmcdermott opened this issue Sep 28, 2023 · 12 comments · Fixed by #442
Closed

Read partition columns of Hive dataset #404

grantmcdermott opened this issue Sep 28, 2023 · 12 comments · Fixed by #442
Labels
enhancement New feature or request reader Anything related to reading data upstream
Milestone

Comments

@grantmcdermott
Copy link
Collaborator

grantmcdermott commented Sep 28, 2023

To import multiple files within the same directory, we can use the pattern globbing capabilities of scan_parquet and co. However, as we have documented in the "Data import" section of the intro vignette, this globbing strategy unfortunately doesn't recognize the partition columns (directories) of a Hive-style dataset, i.e. those of the form parentdir/subdir1=value1/subsubdir2=value2/data.parquet. This is particularly limiting for larger datasets, which are almost certainly going to be Hive-partitioned for efficient storage.

Example:

library(polars)

dir.create("mtcars-ds")
arrow::write_dataset(
    mtcars, "mtcars-ds",
    partitioning = c("cyl", "am")
)

# show files
dir("mtcars-ds", recursive = TRUE)
#> [1] "cyl=4/am=0/part-0.parquet" "cyl=4/am=1/part-0.parquet"
#> [3] "cyl=6/am=0/part-0.parquet" "cyl=6/am=1/part-0.parquet"
#> [5] "cyl=8/am=0/part-0.parquet" "cyl=8/am=1/part-0.parquet"

# We can polars pattern globbing to scan all parquet files in the folder...
# ... but note that the partition columns are missing
pl$scan_parquet("mtcars-ds/**/*.parquet")$limit(2)$collect()
#> shape: (2, 9)
#> ┌──────┬───────┬──────┬──────┬───┬──────┬─────┬──────┬──────┐
#> │ mpg  ┆ disp  ┆ hp   ┆ drat ┆ … ┆ qsec ┆ vs  ┆ gear ┆ carb │
#> │ ---  ┆ ---   ┆ ---  ┆ ---  ┆   ┆ ---  ┆ --- ┆ ---  ┆ ---  │
#> │ f64  ┆ f64   ┆ f64  ┆ f64  ┆   ┆ f64  ┆ f64 ┆ f64  ┆ f64  │
#> ╞══════╪═══════╪══════╪══════╪═══╪══════╪═════╪══════╪══════╡
#> │ 24.4 ┆ 146.7 ┆ 62.0 ┆ 3.69 ┆ … ┆ 20.0 ┆ 1.0 ┆ 4.0  ┆ 2.0  │
#> │ 22.8 ┆ 140.8 ┆ 95.0 ┆ 3.92 ┆ … ┆ 22.9 ┆ 1.0 ┆ 4.0  ┆ 2.0  │
#> └──────┴───────┴──────┴──────┴───┴──────┴─────┴──────┴──────┘

Note that this is an upstream issue affecting Polars main, although py-polars does have pl.scan_pyarrow_dataset as a workaround. See pola-rs/polars#4347 and pola-rs/polars#10276.

As a workaround in r-polars, I've written a little read_hive function that seems to do a pretty good job of recognizing and appending the partition column types for Hive-partitioned datasets.

read_hive = function(
    path,
    filter = NULL,
    select = NULL,
    with_columns = NULL,
    lazy = FALSE
    ) {

    paths = dir(path, recursive = TRUE, full.names = TRUE)
    paths_split = strsplit(paths, "/")
    paths_split = lapply(paths_split, \(x) head(tail(x, -1), -1))
    paths_split = do.call(Map, c(c, paths_split))
    paths_split_names = strsplit(names(paths_split), split = "=")
    paths_split_names = gsub("=.*$", "", names(paths_split))
    paths_split = setNames(paths_split, paths_split_names)

    aliases_values = lapply(
        paths_split,
        function(x) {
            values = strsplit(x, split = "=")
            values = sapply(values, `[[`, 2)
            # catch for numeric (and integer) columns
            values_num = suppressWarnings(as.numeric(values))
            if (!anyNA(values_num)) {
                integers = abs(values_num - round(values_num)) < .Machine$double.eps^0.5
                # cat("\n Integers:", integers, "\n")
                if (!anyNA(integers) && all(integers)) {
                    values_num = as.integer(values_num)
                }
                values = values_num
            } else {
                # ditto for date columns
                # note: only attempt this on non-numeric columns to avoid accidental coercion
                values_date = tryCatch(suppressWarnings(as.Date(values)), error = \(e) NA)
                if (!anyNA(values_date)) {
                    values = values_date
                }
            }

            return(values)
        }
    )

    ret = lapply(
        seq_along(paths),
        function(p) {
            
            file_to_read = paths[p]
            fscan = pl$scan_parquet(file_to_read)
            values = sapply(aliases_values, \(a) a[p])
            for (i in seq_along(values)) {
                fscan = fscan$with_columns(
                    pl$lit(values[[i]])$alias(names(values)[[i]])
                )
            }

            if (!is.null(filter)) {
                fscan = fscan$filter(filter)
            }

            if (!is.null(select)) {
                fscan = fscan$select(select)
            }

            if (isFALSE(lazy)) fscan = fscan$collect()
            return(fscan)
        }
    )

    ## Remove once https://github.com/pola-rs/r-polars/issues/386 is fixed
    if (isFALSE(lazy)) {
        ret = pl$concat(ret)
        # Only perform with_columns operations on concatenated dataset, else
        # might make mistakes when applying aggregating functions on subsets
        # of the data (e.g., max)
        if (!is.null(with_columns)) {
            ret = ret$with_columns(with_columns)
        }
    }

    return(ret)

}

One nice feature of this function is that correctly parses integer versus float types where possible, avoiding unnecessary memory overhead. It also does so upfront to avoid type mismatches when the sub datasets are finally concatenated. Quick examples:

read_hive("mtcars-ds")
#> shape: (32, 11)
#> ┌──────┬───────┬───────┬──────┬───┬──────┬──────┬─────┬─────┐
#> │ mpg  ┆ disp  ┆ hp    ┆ drat ┆ … ┆ gear ┆ carb ┆ cyl ┆ am  │
#> │ ---  ┆ ---   ┆ ---   ┆ ---  ┆   ┆ ---  ┆ ---  ┆ --- ┆ --- │
#> │ f64  ┆ f64   ┆ f64   ┆ f64  ┆   ┆ f64  ┆ f64  ┆ i32 ┆ i32 │
#> ╞══════╪═══════╪═══════╪══════╪═══╪══════╪══════╪═════╪═════╡
#> │ 24.4 ┆ 146.7 ┆ 62.0  ┆ 3.69 ┆ … ┆ 4.0  ┆ 2.0  ┆ 4   ┆ 0   │
#> │ 22.8 ┆ 140.8 ┆ 95.0  ┆ 3.92 ┆ … ┆ 4.0  ┆ 2.0  ┆ 4   ┆ 0   │
#> │ 21.5 ┆ 120.1 ┆ 97.0  ┆ 3.7  ┆ … ┆ 3.0  ┆ 1.0  ┆ 4   ┆ 0   │
#> │ 22.8 ┆ 108.0 ┆ 93.0  ┆ 3.85 ┆ … ┆ 4.0  ┆ 1.0  ┆ 4   ┆ 1   │
#> │ …    ┆ …     ┆ …     ┆ …    ┆ … ┆ …    ┆ …    ┆ …   ┆ …   │
#> │ 13.3 ┆ 350.0 ┆ 245.0 ┆ 3.73 ┆ … ┆ 3.0  ┆ 4.0  ┆ 8   ┆ 0   │
#> │ 19.2 ┆ 400.0 ┆ 175.0 ┆ 3.08 ┆ … ┆ 3.0  ┆ 2.0  ┆ 8   ┆ 0   │
#> │ 15.8 ┆ 351.0 ┆ 264.0 ┆ 4.22 ┆ … ┆ 5.0  ┆ 4.0  ┆ 8   ┆ 1   │
#> │ 15.0 ┆ 301.0 ┆ 335.0 ┆ 3.54 ┆ … ┆ 5.0  ┆ 8.0  ┆ 8   ┆ 1   │
#> └──────┴───────┴───────┴──────┴───┴──────┴──────┴─────┴─────┘
read_hive("mtcars-ds", filter = pl$col("cyl") <= 6)
#> shape: (18, 11)
#> ┌──────┬───────┬───────┬──────┬───┬──────┬──────┬─────┬─────┐
#> │ mpg  ┆ disp  ┆ hp    ┆ drat ┆ … ┆ gear ┆ carb ┆ cyl ┆ am  │
#> │ ---  ┆ ---   ┆ ---   ┆ ---  ┆   ┆ ---  ┆ ---  ┆ --- ┆ --- │
#> │ f64  ┆ f64   ┆ f64   ┆ f64  ┆   ┆ f64  ┆ f64  ┆ i32 ┆ i32 │
#> ╞══════╪═══════╪═══════╪══════╪═══╪══════╪══════╪═════╪═════╡
#> │ 24.4 ┆ 146.7 ┆ 62.0  ┆ 3.69 ┆ … ┆ 4.0  ┆ 2.0  ┆ 4   ┆ 0   │
#> │ 22.8 ┆ 140.8 ┆ 95.0  ┆ 3.92 ┆ … ┆ 4.0  ┆ 2.0  ┆ 4   ┆ 0   │
#> │ 21.5 ┆ 120.1 ┆ 97.0  ┆ 3.7  ┆ … ┆ 3.0  ┆ 1.0  ┆ 4   ┆ 0   │
#> │ 22.8 ┆ 108.0 ┆ 93.0  ┆ 3.85 ┆ … ┆ 4.0  ┆ 1.0  ┆ 4   ┆ 1   │
#> │ …    ┆ …     ┆ …     ┆ …    ┆ … ┆ …    ┆ …    ┆ …   ┆ …   │
#> │ 17.8 ┆ 167.6 ┆ 123.0 ┆ 3.92 ┆ … ┆ 4.0  ┆ 4.0  ┆ 6   ┆ 0   │
#> │ 21.0 ┆ 160.0 ┆ 110.0 ┆ 3.9  ┆ … ┆ 4.0  ┆ 4.0  ┆ 6   ┆ 1   │
#> │ 21.0 ┆ 160.0 ┆ 110.0 ┆ 3.9  ┆ … ┆ 4.0  ┆ 4.0  ┆ 6   ┆ 1   │
#> │ 19.7 ┆ 145.0 ┆ 175.0 ┆ 3.62 ┆ … ┆ 5.0  ┆ 6.0  ┆ 6   ┆ 1   │
#> └──────┴───────┴───────┴──────┴───┴──────┴──────┴─────┴─────┘
read_hive("mtcars-ds", filter = (pl$col("cyl") <= 6 & pl$col("mpg") > 30))
#> shape: (4, 11)
#> ┌──────┬──────┬───────┬──────┬───┬──────┬──────┬─────┬─────┐
#> │ mpg  ┆ disp ┆ hp    ┆ drat ┆ … ┆ gear ┆ carb ┆ cyl ┆ am  │
#> │ ---  ┆ ---  ┆ ---   ┆ ---  ┆   ┆ ---  ┆ ---  ┆ --- ┆ --- │
#> │ f64  ┆ f64  ┆ f64   ┆ f64  ┆   ┆ f64  ┆ f64  ┆ i32 ┆ i32 │
#> ╞══════╪══════╪═══════╪══════╪═══╪══════╪══════╪═════╪═════╡
#> │ 32.4 ┆ 78.7 ┆ 66.0  ┆ 4.08 ┆ … ┆ 4.0  ┆ 1.0  ┆ 4   ┆ 1   │
#> │ 30.4 ┆ 75.7 ┆ 52.0  ┆ 4.93 ┆ … ┆ 4.0  ┆ 2.0  ┆ 4   ┆ 1   │
#> │ 33.9 ┆ 71.1 ┆ 65.0  ┆ 4.22 ┆ … ┆ 4.0  ┆ 1.0  ┆ 4   ┆ 1   │
#> │ 30.4 ┆ 95.1 ┆ 113.0 ┆ 3.77 ┆ … ┆ 5.0  ┆ 2.0  ┆ 4   ┆ 1   │
#> └──────┴──────┴───────┴──────┴───┴──────┴──────┴─────┴─────┘
read_hive(
    "mtcars-ds",
    filter = pl$col("cyl") <= 6,
    select = pl$col("drat", "hp")
)
#> shape: (18, 2)
#> ┌──────┬───────┐
#> │ drat ┆ hp    │
#> │ ---  ┆ ---   │
#> │ f64  ┆ f64   │
#> ╞══════╪═══════╡
#> │ 3.69 ┆ 62.0  │
#> │ 3.92 ┆ 95.0  │
#> │ 3.7  ┆ 97.0  │
#> │ 3.85 ┆ 93.0  │
#> │ …    ┆ …     │
#> │ 3.92 ┆ 123.0 │
#> │ 3.9  ┆ 110.0 │
#> │ 3.9  ┆ 110.0 │
#> │ 3.62 ┆ 175.0 │
#> └──────┴───────┘
read_hive(
    "mtcars-ds",
    filter = pl$col("cyl") <= 6,
    with_columns = pl$col("mpg")$max()$over("cyl")$alias("max_mpg")
)
#> shape: (18, 12)
#> ┌──────┬───────┬───────┬──────┬───┬──────┬─────┬─────┬─────────┐
#> │ mpg  ┆ disp  ┆ hp    ┆ drat ┆ … ┆ carb ┆ cyl ┆ am  ┆ max_mpg │
#> │ ---  ┆ ---   ┆ ---   ┆ ---  ┆   ┆ ---  ┆ --- ┆ --- ┆ ---     │
#> │ f64  ┆ f64   ┆ f64   ┆ f64  ┆   ┆ f64  ┆ i32 ┆ i32 ┆ f64     │
#> ╞══════╪═══════╪═══════╪══════╪═══╪══════╪═════╪═════╪═════════╡
#> │ 24.4 ┆ 146.7 ┆ 62.0  ┆ 3.69 ┆ … ┆ 2.0  ┆ 4   ┆ 0   ┆ 33.9    │
#> │ 22.8 ┆ 140.8 ┆ 95.0  ┆ 3.92 ┆ … ┆ 2.0  ┆ 4   ┆ 0   ┆ 33.9    │
#> │ 21.5 ┆ 120.1 ┆ 97.0  ┆ 3.7  ┆ … ┆ 1.0  ┆ 4   ┆ 0   ┆ 33.9    │
#> │ 22.8 ┆ 108.0 ┆ 93.0  ┆ 3.85 ┆ … ┆ 1.0  ┆ 4   ┆ 1   ┆ 33.9    │
#> │ …    ┆ …     ┆ …     ┆ …    ┆ … ┆ …    ┆ …   ┆ …   ┆ …       │
#> │ 17.8 ┆ 167.6 ┆ 123.0 ┆ 3.92 ┆ … ┆ 4.0  ┆ 6   ┆ 0   ┆ 21.4    │
#> │ 21.0 ┆ 160.0 ┆ 110.0 ┆ 3.9  ┆ … ┆ 4.0  ┆ 6   ┆ 1   ┆ 21.4    │
#> │ 21.0 ┆ 160.0 ┆ 110.0 ┆ 3.9  ┆ … ┆ 4.0  ┆ 6   ┆ 1   ┆ 21.4    │
#> │ 19.7 ┆ 145.0 ┆ 175.0 ┆ 3.62 ┆ … ┆ 6.0  ┆ 6   ┆ 1   ┆ 21.4    │
#> └──────┴───────┴───────┴──────┴───┴──────┴─────┴─────┴─────────┘

Is there interest in adding this function (or some variant thereof) to the r-polars codebase? If so, let me know and I can put in a PR.

P.S. As noted in the function comments, ideally we can get rid of some scan versus read control-flow once a concat method for LazyFrames is enabled (#386).

@eitsupi
Copy link
Collaborator

eitsupi commented Sep 28, 2023

Thanks, but I think this is a problem that should be solved on the Rust side, and am against adding workarounds to this package that are only possible on the R side.
I think this should be in a separate R package.

@grantmcdermott
Copy link
Collaborator Author

Ha, I've just seen that this functionally has now been added on the Rust side! pola-rs/polars#11284

This will solve a major headache that I, personally, have had so far with using Polars in real-life projects.

@sorhawell
Copy link
Collaborator

we have had a rough time trying to translate some deltalake, arrow, and other fancy connections which were implemented with python via third-party python packages.

We are very happy when rust-polars implements such connections from the ground up, it makes it much easier for us to support.

just a thought
Otherwise something as read_hive() which may be difficult to maintain long-term we maybe could host such functions but flag them as experimental e.g. It would just be a pity of someone else gave up on reading hive if there is a fair solution out there.
pl$experimental$read_hive()

@grantmcdermott
Copy link
Collaborator Author

Yeah, totally makes sense. However, since the Rust implementation has now been added, what's the best way to go about incorporating these updates on the r-polars side? There are a number of .rs files that have been changed. Do you want me to take a stab at pulling in these? I must admit that I'm not entirely sure about the mapping of usptream Rust changes to the current r-polars file structure, so I'll probably be quite inefficient. Any guidance etc. would be much appreciated!

@etiennebacher
Copy link
Collaborator

etiennebacher commented Sep 29, 2023

I think these changes will be incorporated when the rust-polars dependency in cargo.lock will be updated. For now, the new functions on the rust side are only in the development version, so I guess we need to wait for rust-polars 0.34 to be released before getting them in r-polars. If this is like 0.32, then it's probably gonna be a big effort (cf #334) but I didn't do it so I might lack some info on how hard it was 😅


@grantmcdermott in the meantime, I'm interested in having your custom function in tidypolars if it doesn't get implemented here.

@sorhawell
Copy link
Collaborator

sorhawell commented Sep 29, 2023

#334 was more labour intense than average :)

@grantmcdermott
Copy link
Collaborator Author

@grantmcdermott in the meantime, I'm interested in having your custom function in tidypolars if it doesn't get implemented here.

Thanks @etiennebacher. Will do when I get a sec.

@eitsupi
Copy link
Collaborator

eitsupi commented Oct 9, 2023

The breaking changes in (Rust) polars are not sufficiently documented in Changelog, so we have to read a lot of commits and source code to understand the changes.
This is really hard work.......

@eitsupi eitsupi added enhancement New feature or request upstream reader Anything related to reading data labels Oct 9, 2023
@sorhawell
Copy link
Collaborator

sorhawell commented Oct 9, 2023

I can try to take a stab at -> 0.33 also. I don't use change log too much as there is too much to read ^^ . I mostly look at the git blame view on corresponding py-polars implementation of something where we now have a compiler error or unit test error. All we have to automatically monitor change is compiler error and unit tests.

@etiennebacher
Copy link
Collaborator

P.S. As noted in the function comments, ideally we can get rid of some scan versus read control-flow once a concat method for LazyFrames is enabled (#386).

@grantmcdermott it is now possible to concat a list of LazyFrames (#407 ) in case you want to update your function

@grantmcdermott
Copy link
Collaborator Author

Super, thanks for the HU @etiennebacher. I haven't forgotten about that potential PR. A few too many plates spinning at the moment...

@eitsupi eitsupi added this to the 0.10 milestone Oct 31, 2023
@grantmcdermott
Copy link
Collaborator Author

Great to see this update on the Rust side @etiennebacher! FWIW the vignette will need to updated to reflect this change:

Finally, can read/scan multiple files in the same directory through pattern
globbing. However, please note that partition-aware scanning is not yet
supported out of the box (e.g., Hive-style partitioned datasets). Follow
[this issue](https://github.com/pola-rs/polars/issues/4347) for more details
about when this will be resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request reader Anything related to reading data upstream
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants