Skip to content

Commit

Permalink
Upgrade to DataFusion 34.0.0-rc1 (#927)
Browse files Browse the repository at this point in the history
* Use latest DF

* update deps

* Set max encoding size

* tests compile now

* fix some todo comments

* address another todo item

* specify message sizes in gRPC clients as well

* fix typo and use 34 rc1
  • Loading branch information
andygrove authored Dec 12, 2023
1 parent d0921c4 commit 934b32f
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 30 deletions.
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ members = [
resolver = "2"

[workspace.dependencies]
arrow = { version = "48.0.0", features=["ipc_compression"] }
arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "48.0.0", default-features = false }
arrow = { version = "49.0.0", features=["ipc_compression"] }
arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "49.0.0", default-features = false }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
datafusion = "33.0.0"
datafusion-cli = "33.0.0"
datafusion-proto = "33.0.0"
object_store = "0.7.0"
sqlparser = "0.39.0"
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "34.0.0-rc1" }
datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "34.0.0-rc1" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "34.0.0-rc1" }
object_store = "0.8.0"
sqlparser = "0.40.0"
tonic = { version = "0.10" }
tonic-build = { version = "0.10", default-features = false, features = [
"transport",
Expand Down
2 changes: 0 additions & 2 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,6 @@ mod tests {
target_partitions: x.target_partitions,
file_sort_order: vec![],
infinite_source: false,
insert_mode:
datafusion::datasource::listing::ListingTableInsertMode::Error,
file_type_write_options: None,
single_file: false,
};
Expand Down
35 changes: 29 additions & 6 deletions ballista/core/src/cache_layer/object_store/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use crate::error::BallistaError;
use async_trait::async_trait;
use ballista_cache::loading_cache::LoadingCache;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::stream::{self, BoxStream, StreamExt};
use log::info;
use object_store::path::Path;
use object_store::{
Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions, PutResult,
};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
Expand Down Expand Up @@ -73,7 +74,24 @@ impl<M> ObjectStore for FileCacheObjectStore<M>
where
M: CacheMedium,
{
async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
async fn put(
&self,
_location: &Path,
_bytes: Bytes,
) -> object_store::Result<PutResult> {
Err(Error::NotSupported {
source: Box::new(BallistaError::General(
"Write path is not supported".to_string(),
)),
})
}

async fn put_opts(
&self,
_location: &Path,
_bytes: Bytes,
_opts: PutOptions,
) -> object_store::Result<PutResult> {
Err(Error::NotSupported {
source: Box::new(BallistaError::General(
"Write path is not supported".to_string(),
Expand Down Expand Up @@ -209,13 +227,18 @@ where
})
}

async fn list(
fn list(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
Err(Error::NotSupported {
source: Box::new(BallistaError::General("List is not supported".to_string())),
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
stream::once(async {
Err(Error::NotSupported {
source: Box::new(BallistaError::General(
"List is not supported".to_string(),
)),
})
})
.boxed()
}

async fn list_with_delimiter(
Expand Down
24 changes: 19 additions & 5 deletions ballista/core/src/cache_layer/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions,
PutResult,
};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
Expand Down Expand Up @@ -60,10 +61,23 @@ impl Display for ObjectStoreWithKey {

#[async_trait]
impl ObjectStore for ObjectStoreWithKey {
async fn put(&self, location: &Path, bytes: Bytes) -> object_store::Result<()> {
async fn put(
&self,
location: &Path,
bytes: Bytes,
) -> object_store::Result<PutResult> {
self.inner.put(location, bytes).await
}

async fn put_opts(
&self,
location: &Path,
bytes: Bytes,
opts: PutOptions,
) -> object_store::Result<PutResult> {
self.inner.put_opts(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &Path,
Expand Down Expand Up @@ -115,11 +129,11 @@ impl ObjectStore for ObjectStoreWithKey {
self.inner.delete(location).await
}

async fn list(
fn list(
&self,
prefix: Option<&Path>,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
self.inner.list(prefix).await
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list(prefix)
}

async fn list_with_delimiter(
Expand Down
1 change: 1 addition & 0 deletions ballista/core/src/cache_layer/policy/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ where
last_modified: source_meta.last_modified,
size: cache_meta.size,
e_tag: source_meta.e_tag,
version: None,
})
}

Expand Down
2 changes: 1 addition & 1 deletion ballista/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for BallistaQueryPlanner<T> {
match logical_plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(_)) => {
// table state is managed locally in the BallistaContext, not in the scheduler
Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))))
}
_ => Ok(Arc::new(DistributedQueryExec::with_repr(
self.scheduler_url.clone(),
Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,7 @@ mod test {
last_modified: Default::default(),
size: 1,
e_tag: None,
version: None,
},
partition_values: vec![],
range: None,
Expand Down
6 changes: 0 additions & 6 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
let path = Path::from(path.as_str());
let file_metas: Vec<_> = obj_store
.list(Some(&path))
.await
.map_err(|e| {
let msg = format!("Error listing files: {e}");
error!("{}", msg);
tonic::Status::internal(msg)
})?
.try_collect()
.await
.map_err(|e| {
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use ballista::prelude::{
use datafusion::arrow::array::*;
use datafusion::arrow::util::display::array_value_to_string;
use datafusion::common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
use datafusion::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
Expand Down Expand Up @@ -845,7 +845,6 @@ async fn get_table(
table_partition_cols: vec![],
file_sort_order: vec![],
infinite_source: false,
insert_mode: ListingTableInsertMode::Error,
file_type_write_options: None,
single_file: false,
};
Expand Down

0 comments on commit 934b32f

Please sign in to comment.