Skip to content

Latest commit

 

History

History

examples

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

Ballista Examples

This directory contains examples for executing distributed queries with Ballista.

Standalone Examples

The standalone example is the easiest to get started with. Ballista supports a standalone mode where a scheduler and executor are started in-process.

cargo run --example standalone_sql --features="ballista/standalone"

Source code for standalone SQL example

#[tokio::main]
async fn main() -> Result<()> {
    let config = BallistaConfig::builder()
        .set("ballista.shuffle.partitions", "1")
        .build()?;

    let ctx = BallistaContext::standalone(&config, 2).await?;

    ctx.register_csv(
        "test",
        "testdata/aggregate_test_100.csv",
        CsvReadOptions::new(),
    )
    .await?;

    let df = ctx.sql("select count(1) from test").await?;

    df.show().await?;
    Ok(())
}

Distributed Examples

For background information on the Ballista architecture, refer to the Ballista README.

Start a standalone cluster

From the root of the project, build release binaries.

cargo build --release

Start a Ballista scheduler process in a new terminal session.

RUST_LOG=info ./target/release/ballista-scheduler

Start one or more Ballista executor processes in new terminal sessions. When starting more than one executor, a unique port number must be specified for each executor.

RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50051
RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052

Running the examples

The examples can be run using the cargo run --bin syntax.

Distributed SQL Example

cargo run --release --bin sql

Source code for distributed SQL example

use ballista::prelude::*;
use datafusion::prelude::CsvReadOptions;

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results, using SQL
#[tokio::main]
async fn main() -> Result<()> {
    let config = BallistaConfig::builder()
        .set("ballista.shuffle.partitions", "4")
        .build()?;
    let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

    // register csv file with the execution context
    ctx.register_csv(
        "test",
        "testdata/aggregate_test_100.csv",
        CsvReadOptions::new(),
    )
    .await?;

    // execute the query
    let df = ctx
        .sql(
            "SELECT c1, MIN(c12), MAX(c12) \
        FROM test \
        WHERE c11 > 0.1 AND c11 < 0.9 \
        GROUP BY c1",
        )
        .await?;

    // print the results
    df.show().await?;

    Ok(())
}

Distributed DataFrame Example

cargo run --release --bin dataframe

Source code for distributed DataFrame example

#[tokio::main]
async fn main() -> Result<()> {
    let config = BallistaConfig::builder()
        .set("ballista.shuffle.partitions", "4")
        .build()?;
    let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

    let filename = "testdata/alltypes_plain.parquet";

    // define the query using the DataFrame trait
    let df = ctx
        .read_parquet(filename, ParquetReadOptions::default())
        .await?
        .select_columns(&["id", "bool_col", "timestamp_col"])?
        .filter(col("id").gt(lit(1)))?;

    // print the results
    df.show().await?;

    Ok(())
}