Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista should be able to load Partitioned data frame using ListingOptions #747

Closed
andreclaudino opened this issue Apr 18, 2023 · 0 comments · Fixed by apache/datafusion#9126
Labels
enhancement New feature or request

Comments

@andreclaudino
Copy link

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
On datafusion, described as one of ballista components, I can load a hive-style partitioned dataset using the ListingOptions struct:

let partition_columns: Vec<(String, DataType)> = vec![("uf".to_owned(), DataType::Utf8), ("municipio".to_owned(), DataType::Utf8)];

let listing_options =
        ListingOptions::new(Arc::new(file_format))
            .with_table_partition_cols(partition_columns);

let config = ListingTableConfig::new(listing_table_url)
        .with_listing_options(listing_options)
        .with_schema(resolved_schema);
        
let table_provider = Arc::new(listing_table);

session_context.register_table(table_name, table_provider)?;

I can use the same table_provider object in register_table for a ballista context, it recognizes file fields and return success, but ignores hive-style partition columns.

ballista_context.register_table(table_name, table_provider.clone())?;

Describe the solution you'd like
Given a hive-style partitioned dataset, local or in an object storage, like in the followin example:

$> tree resources/age_gender_draw/
resources/age_gender_draw/
├── uf=CE
│   └── municipio=Fortaleza
│       └── fortaleza.parquet
├── uf=DF
│   └── municipio=Brasilia
│       └── brasilia.parquet
├── uf=RJ
│   └── municipio=Rio De Janeiro
│       └── rio_de_janeiro.parquet
└── uf=SP
    └── municipio=Sao Paulo
        └── sao_paulo.parquet

Ballista should be able to recognize the listing_options configuration when loading a dataset, registering as table or load as dataframe. Basically, the following code should work.

Given the registering function:

pub async fn register_parquet(table_name: &str, source_path: &str, session_context: &SessionContext, ballista_context: &BallistaContext) -> anyhow::Result<()> {
    let session_state = session_context.state();

    let partition_columns: Vec<(String, DataType)> = vec![("uf".to_owned(), DataType::Utf8), ("municipio".to_owned(), DataType::Utf8)];

    let listing_table_url = ListingTableUrl::parse(source_path)?;
    
    let file_format =
        ParquetFormat::new()
            .with_enable_pruning(Some(true))
            .with_skip_metadata(Some(true));

    let listing_options =
        ListingOptions::new(Arc::new(file_format))
            .with_table_partition_cols(partition_columns);

    let resolved_schema = listing_options
        .infer_schema(&session_state, &listing_table_url)
        .await?;

    let config = ListingTableConfig::new(listing_table_url)
        .with_listing_options(listing_options)
        .with_schema(resolved_schema);

    let listing_table =
        ListingTable::try_new(config)?;
    let table_provider = Arc::new(listing_table);

    ballista_context.register_table(table_name, table_provider.clone())?;
    session_context.register_table(table_name, table_provider)?;
    
    Ok(())
}

and the use as:

let query = "SELECT distinct uf, age_group from age_gender_draw LIMIT 10";
register_parquet(table_name, source_path, &session_context, &ballista_context).await?;

let ballista_processed_data_frame = run_query_in_ballista(&query, &ballista_context).await?;
ballista_processed_data_frame.show().await?;

The following error, that happens when when generating ballista_processed_data_frame should not happen:

Error: Schema error: No field named 'uf'. Valid fields are 'age_gender_draw'.'maid', 'age_gender_draw'.'per_capita_income', 'age_gender_draw'.'gender', 'age_gender_draw'.'age_group'.

Caused by:
    No field named 'uf'. Valid fields are 'age_gender_draw'.'maid', 'age_gender_draw'.'per_capita_income', 'age_gender_draw'.'gender', 'age_gender_draw'.'age_group'.
$> 

The result, should be the same as generating fusion_processed_data_frame:

let fusion_processed_data_frame = run_query_in_fusion(&query, &session_context).await?;
fusion_processed_data_frame.show().await?;

Results in

+----+-----------+
| uf | age_group |
+----+-----------+
| CE | 00-14     |
| DF | 45-59     |
| CE | 30-44     |
| DF | 15-29     |
| DF | 00-14     |
| RJ | 15-29     |
| CE | 45-59     |
| RJ | 45-59     |
| DF | 30-44     |
| DF | 60-pl     |
+----+-----------+

Describe alternatives you've considered
Using data fusion is a possible alternative for small datasets, but in my case, where huge datasets are common, Ballistra should be used to scale processing over distributed nodes.

Additional context
There is nothing more I can share now, sorry.

With some help I am able to execute the task and open a pull request, but would need some help to know where to start.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
1 participant