Skip to content

Commit

Permalink
Replace buffer management with Arrow buffers
Browse files Browse the repository at this point in the history
This implements a second version of the Query interface using Arrow
Arrays. The core idea here is that we can share Arrow arrays freely and
then convert them to mutable buffers as long as there are no external
references to them. This is all done safely and returns an error if any
buffer is externally referenced.
  • Loading branch information
davisp committed Oct 3, 2024
1 parent c395070 commit 906d507
Show file tree
Hide file tree
Showing 25 changed files with 4,139 additions and 22 deletions.
10 changes: 7 additions & 3 deletions tiledb/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ path = "src/lib.rs"

[dependencies]
anyhow = { workspace = true }
arrow = { version = "52.0.0", features = ["prettyprint"], optional = true }
arrow = { version = "52.0.0", features = ["prettyprint"] }
itertools = "0"
num-traits = { version = "0.2", optional = true }
paste = "1.0"
Expand All @@ -35,5 +35,9 @@ tiledb-utils = { workspace = true }

[features]
default = []
proptest-strategies = ["dep:num-traits", "dep:proptest", "dep:proptest-derive", "dep:tiledb-test-utils"]
arrow = ["dep:arrow"]
proptest-strategies = [
"dep:num-traits",
"dep:proptest",
"dep:proptest-derive",
"dep:tiledb-test-utils",
]
154 changes: 154 additions & 0 deletions tiledb/api/examples/multi_range_subarray_arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::path::PathBuf;
use std::sync::Arc;

use arrow::array as aa;
use itertools::izip;

use tiledb::array::{
Array, ArrayType, AttributeData, CellOrder, DimensionData, DomainData,
SchemaData, TileOrder,
};
use tiledb::context::Context;
use tiledb::error::Error as TileDBError;
use tiledb::query_arrow::{QueryBuilder, QueryLayout, QueryStatus, QueryType};
use tiledb::Result as TileDBResult;
use tiledb::{Datatype, Factory};

const ARRAY_URI: &str = "multi_range_slicing";

/// This example creates a 4x4 dense array with the contents:
///
/// Col: 1 2 3 4
/// Row: 1 1 2 3 4
/// 2 5 6 7 8
/// 3 9 10 11 12
/// 4 13 14 15 16
///
/// The query run restricts rows to [1, 2, 4] and returns all columns which
/// should produce these rows:
///
/// Row Col Value
/// 1 1 1
/// 1 2 2
/// 1 3 3
/// 1 4 4
/// 2 1 5
/// 2 2 6
/// 2 3 7
/// 2 4 8
/// 4 1 13
/// 4 2 14
/// 4 3 15
/// 4 4 16
fn main() -> TileDBResult<()> {
if let Ok(manifest_dir) = std::env::var("CARGO_MANIFEST_DIR") {
let _ = std::env::set_current_dir(
PathBuf::from(manifest_dir).join("examples").join("output"),
);
}

let ctx = Context::new()?;
if Array::exists(&ctx, ARRAY_URI)? {
Array::delete(&ctx, ARRAY_URI)?;
}

create_array(&ctx)?;
write_array(&ctx)?;

let array = Array::open(&ctx, ARRAY_URI, tiledb::array::Mode::Read)?;
let mut query = QueryBuilder::new(array, QueryType::Read)
.with_layout(QueryLayout::RowMajor)
.start_fields()
.field("rows")
.field("cols")
.field("a")
.end_fields()
.start_subarray()
.add_range("rows", &[1, 2])
.add_range("rows", &[4, 4])
.add_range("cols", &[1, 4])
.end_subarray()
.build()
.map_err(|e| TileDBError::Other(format!("{e}")))?;

let status = query
.submit()
.map_err(|e| TileDBError::Other(format!("{e}")))?;

if !matches!(status, QueryStatus::Completed) {
return Err(TileDBError::Other("Make this better.".to_string()));
}

let buffers = query
.buffers()
.map_err(|e| TileDBError::Other(format!("{e}")))?;

let rows = buffers.get::<aa::Int32Array>("rows").unwrap();
let cols = buffers.get::<aa::Int32Array>("cols").unwrap();
let attr = buffers.get::<aa::Int32Array>("a").unwrap();

for (r, c, a) in izip!(rows.values(), cols.values(), attr.values()) {
println!("{} {} {}", r, c, a);
}

Ok(())
}

fn create_array(ctx: &Context) -> TileDBResult<()> {
let schema = SchemaData {
array_type: ArrayType::Dense,
domain: DomainData {
dimension: vec![
DimensionData {
name: "rows".to_owned(),
datatype: Datatype::Int32,
constraints: ([1i32, 4], 4i32).into(),
filters: None,
},
DimensionData {
name: "cols".to_owned(),
datatype: Datatype::Int32,
constraints: ([1i32, 4], 4i32).into(),
filters: None,
},
],
},
attributes: vec![AttributeData {
name: "a".to_owned(),
datatype: Datatype::Int32,
..Default::default()
}],
tile_order: Some(TileOrder::RowMajor),
cell_order: Some(CellOrder::RowMajor),

..Default::default()
};

let schema = schema.create(ctx)?;
Array::create(ctx, ARRAY_URI, schema)?;
Ok(())
}

fn write_array(ctx: &Context) -> TileDBResult<()> {
let data = Arc::new(aa::Int32Array::from(vec![
1i32, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
]));

let array =
tiledb::Array::open(ctx, ARRAY_URI, tiledb::array::Mode::Write)?;

let mut query = QueryBuilder::new(array, QueryType::Write)
.with_layout(QueryLayout::RowMajor)
.start_fields()
.field_with_buffer("a", data)
.end_fields()
.build()
.map_err(|e| TileDBError::Other(format!("{e}")))?;

let (_, _) = query
.submit()
.and_then(|_| query.finalize())
.map_err(|e| TileDBError::Other(format!("{e}")))?;

Ok(())
}
Loading

0 comments on commit 906d507

Please sign in to comment.