Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enhance binary array encoding, make it the default #2521

Merged
merged 6 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ message SimpleStruct {}
message Binary {
ArrayEncoding indices = 1;
ArrayEncoding bytes = 2;
uint64 null_adjustment = 3;
}

// Encodings that decode into an Arrow array
Expand Down
7 changes: 3 additions & 4 deletions protos/file2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ import "google/protobuf/empty.proto";
//
// * Each Lance file contains between 0 and 4Gi columns
// * Each column contains between 0 and 4Gi pages
// * Each page contains between 0 and 4Gi items
// * Each page contains between 0 and 2^64 items
// * Different pages within a column can have different items counts
// * Columns may have more than 4Gi items, though this will require more than
// one page
// * Columns may have up to 2^64 items
// * Different columns within a file can have different item counts
//
// The Lance file format does not have any notion of a type system or schemas.
Expand Down Expand Up @@ -178,7 +177,7 @@ message ColumnMetadata {
// may be empty.
repeated uint64 buffer_sizes = 2;
// Logical length (e.g. # rows) of the page
uint32 length = 3;
uint64 length = 3;
// The encoding used to encode the page
Encoding encoding = 4;
}
Expand Down
4 changes: 2 additions & 2 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ message DataFile {
// - dimension: packed-struct (0):
// - x: u32 (1)
// - y: u32 (2)
// - path: string (3)
// - path: list<u32> (3)
// - embedding: fsl<768> (4)
// - fp64
// - borders: fsl<4> (5)
Expand All @@ -249,7 +249,7 @@ message DataFile {
// This reflects quite a few phenomenon:
// - The packed struct is encoded into a single column and there is no top-level column
// for the x or y fields
// - The string is encoded into two columns
// - The variable sized list is encoded into two columns
// - The embedding is encoded into a single column (common for FSL of primitive) and there
// is not "FSL column"
// - The borders field actually does have an "FSL column"
Expand Down
96 changes: 31 additions & 65 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use futures::stream::{BoxStream, FuturesOrdered};
use futures::{FutureExt, StreamExt, TryStreamExt};
use lance_arrow::DataTypeExt;
use lance_core::datatypes::{Field, Schema};
use log::trace;
use snafu::{location, Location};
Expand All @@ -230,9 +231,7 @@ use tokio::sync::mpsc::{self, unbounded_channel};
use lance_core::{Error, Result};
use tracing::instrument;

use crate::encoder::get_str_encoding_type;
use crate::encoder::{values_column_encoding, EncodedBatch};
use crate::encodings::logical::binary::BinaryFieldScheduler;
use crate::encodings::logical::list::{ListFieldScheduler, OffsetPageInfo};
use crate::encodings::logical::primitive::PrimitiveFieldScheduler;
use crate::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
Expand All @@ -246,7 +245,7 @@ use crate::{BufferScheduler, EncodingsIo};
#[derive(Debug)]
pub struct PageInfo {
/// The number of rows in the page
pub num_rows: u32,
pub num_rows: u64,
/// The encoding that explains the buffers in the page
pub encoding: pb::ArrayEncoding,
/// The offsets and sizes of the buffers in the file
Expand Down Expand Up @@ -550,26 +549,12 @@ impl CoreFieldDecoderStrategy {
}

fn is_primitive(data_type: &DataType) -> bool {
if data_type.is_primitive() {
if data_type.is_primitive() | data_type.is_binary_like() {
true
} else if get_str_encoding_type() {
match data_type {
// DataType::is_primitive doesn't consider these primitive but we do
DataType::Boolean
| DataType::Null
| DataType::FixedSizeBinary(_)
| DataType::Utf8 => true,
DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
_ => false,
}
} else {
match data_type {
// DataType::is_primitive doesn't consider these primitive but we do
DataType::Boolean
| DataType::Null
| DataType::FixedSizeBinary(_)
// | DataType::Utf8
=> true,
DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
_ => false,
}
Expand Down Expand Up @@ -711,28 +696,6 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
.boxed();
Ok((chain, list_scheduler_fut))
}
DataType::Utf8 | DataType::Binary | DataType::LargeBinary | DataType::LargeUtf8 => {
let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, true)))
} else {
DataType::LargeList(Arc::new(ArrowField::new("item", DataType::UInt8, true)))
};
let list_field = ArrowField::new(&field.name, list_type, true);
let list_field = Field::try_from(&list_field).unwrap();
// We've changed the data type but are still decoding the same "field"
let (chain, list_decoder) =
chain.restart_at_current(&list_field, column_infos, buffers)?;
let data_type = data_type.clone();
let binary_scheduler_fut = async move {
let list_decoder = list_decoder.await?;
Ok(
Arc::new(BinaryFieldScheduler::new(list_decoder, data_type.clone()))
as Arc<dyn FieldScheduler>,
)
}
.boxed();
Ok((chain, binary_scheduler_fut))
}
DataType::Struct(fields) => {
let column_info = column_infos.pop_front().unwrap();
Self::check_simple_struct(&column_info, chain.current_path()).unwrap();
Expand Down Expand Up @@ -775,9 +738,9 @@ fn root_column(num_rows: u64) -> ColumnInfo {
let root_pages = (0..num_root_pages)
.map(|i| PageInfo {
num_rows: if i == num_root_pages - 1 {
final_page_num_rows as u32
final_page_num_rows
} else {
u32::MAX
u64::MAX
},
encoding: pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
Expand Down Expand Up @@ -861,8 +824,8 @@ impl DecodeBatchScheduler {
return;
}
let next_scan_line = maybe_next_scan_line.unwrap();
num_rows_scheduled += next_scan_line.rows_scheduled as u64;
rows_to_schedule -= next_scan_line.rows_scheduled as u64;
num_rows_scheduled += next_scan_line.rows_scheduled;
rows_to_schedule -= next_scan_line.rows_scheduled;
trace!(
"Scheduled scan line of {} rows and {} decoders",
next_scan_line.rows_scheduled,
Expand Down Expand Up @@ -1060,11 +1023,10 @@ impl BatchDecodeStream {
return Ok(None);
}

let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64) as u32;
self.rows_remaining -= to_take as u64;
let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
self.rows_remaining -= to_take;

let scheduled_need =
(self.rows_drained + to_take as u64).saturating_sub(self.rows_scheduled);
let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
if scheduled_need > 0 {
let desired_scheduled = scheduled_need + self.rows_scheduled;
Expand All @@ -1075,25 +1037,25 @@ impl BatchDecodeStream {
let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
if actually_scheduled < desired_scheduled {
let under_scheduled = desired_scheduled - actually_scheduled;
to_take -= under_scheduled as u32;
to_take -= under_scheduled;
}
}

if to_take == 0 {
return Ok(None);
}

let avail = self.root_decoder.avail_u64();
let avail = self.root_decoder.avail();
trace!("Top level page has {} rows already available", avail);
if avail < to_take as u64 {
if avail < to_take {
trace!(
"Top level page waiting for an additional {} rows",
to_take as u64 - avail
to_take - avail
);
self.root_decoder.wait(to_take).await?;
}
let next_task = self.root_decoder.drain(to_take)?;
self.rows_drained += to_take as u64;
self.rows_drained += to_take;
Ok(Some(next_task))
}

Expand Down Expand Up @@ -1125,7 +1087,12 @@ impl BatchDecodeStream {
});
next_task.map(|(task, num_rows)| {
let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
let next_task = ReadBatchTask { task, num_rows };
// This should be true since batch size is u32
debug_assert!(num_rows <= u32::MAX as u64);
let next_task = ReadBatchTask {
task,
num_rows: num_rows as u32,
};
(next_task, slf)
})
});
Expand Down Expand Up @@ -1178,8 +1145,8 @@ pub trait PrimitivePageDecoder: Send + Sync {
/// * `all_null` - A mutable bool, set to true if a decoder determines all values are null
fn decode(
&self,
rows_to_skip: u32,
num_rows: u32,
rows_to_skip: u64,
num_rows: u64,
all_null: &mut bool,
) -> Result<Vec<BytesMut>>;
fn num_buffers(&self) -> u32;
Expand All @@ -1193,7 +1160,6 @@ pub trait PrimitivePageDecoder: Send + Sync {
/// be shared in follow-up I/O tasks.
///
/// See [`crate::decoder`] for more information

pub trait PageScheduler: Send + Sync + std::fmt::Debug {
/// Schedules a batch of I/O to load the data needed for the requested ranges
///
Expand All @@ -1208,7 +1174,7 @@ pub trait PageScheduler: Send + Sync + std::fmt::Debug {
/// scheduled. This can be used to assign priority to I/O requests
fn schedule_ranges(
&self,
ranges: &[Range<u32>],
ranges: &[Range<u64>],
scheduler: &Arc<dyn EncodingsIo>,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
Expand Down Expand Up @@ -1284,7 +1250,7 @@ impl SchedulerContext {

#[derive(Debug)]
pub struct ScheduledScanLine {
pub rows_scheduled: u32,
pub rows_scheduled: u64,
pub decoders: Vec<DecoderReady>,
}

Expand Down Expand Up @@ -1366,7 +1332,7 @@ pub struct NextDecodeTask {
/// The decode task itself
pub task: Box<dyn DecodeArrayTask>,
/// The number of rows that will be created
pub num_rows: u32,
pub num_rows: u64,
/// Whether or not the decoder that created this still has more rows to decode
pub has_more: bool,
}
Expand Down Expand Up @@ -1434,13 +1400,13 @@ pub trait LogicalPageDecoder: std::fmt::Debug + Send {
})
}
/// Waits for enough data to be loaded to decode `num_rows` of data
fn wait(&mut self, num_rows: u32) -> BoxFuture<Result<()>>;
fn wait(&mut self, num_rows: u64) -> BoxFuture<Result<()>>;
/// Creates a task to decode `num_rows` of data into an array
fn drain(&mut self, num_rows: u32) -> Result<NextDecodeTask>;
fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
/// The number of rows that are in the page but haven't yet been "waited"
fn unawaited(&self) -> u32;
fn unawaited(&self) -> u64;
/// The number of rows that have been "waited" but not yet decoded
fn avail(&self) -> u32;
fn avail(&self) -> u64;
/// The data type of the decoded data
fn data_type(&self) -> &DataType;
}
Expand Down
65 changes: 15 additions & 50 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use crate::{
decoder::{ColumnInfo, PageInfo},
encodings::{
logical::{
binary::BinaryFieldEncoder, list::ListFieldEncoder, primitive::PrimitiveFieldEncoder,
r#struct::StructFieldEncoder,
list::ListFieldEncoder, primitive::PrimitiveFieldEncoder, r#struct::StructFieldEncoder,
},
physical::{
basic::BasicEncoder, binary::BinaryEncoder, fixed_size_list::FslEncoder,
Expand Down Expand Up @@ -102,7 +101,7 @@ pub struct EncodedPage {
// The encoded array data
pub array: EncodedArray,
/// The number of rows in the encoded page
pub num_rows: u32,
pub num_rows: u64,
/// The index of the column
pub column_idx: u32,
}
Expand Down Expand Up @@ -228,11 +227,6 @@ fn get_compression_scheme() -> CompressionScheme {
parse_compression_scheme(&compression_scheme).unwrap_or(CompressionScheme::None)
}

pub fn get_str_encoding_type() -> bool {
let str_encoding = std::env::var("LANCE_STR_ARRAY_ENCODING").unwrap_or("none".to_string());
matches!(str_encoding.as_str(), "binary")
}

impl CoreArrayEncodingStrategy {
fn array_encoder_from_type(data_type: &DataType) -> Result<Box<dyn ArrayEncoder>> {
match data_type {
Expand All @@ -242,20 +236,14 @@ impl CoreArrayEncodingStrategy {
*dimension as u32,
)))))
}
DataType::Utf8 => {
if get_str_encoding_type() {
let bin_indices_encoder = Self::array_encoder_from_type(&DataType::UInt64)?;
let bin_bytes_encoder = Self::array_encoder_from_type(&DataType::UInt8)?;

Ok(Box::new(BinaryEncoder::new(
bin_indices_encoder,
bin_bytes_encoder,
)))
} else {
Ok(Box::new(BasicEncoder::new(Box::new(
ValueEncoder::try_new(data_type, get_compression_scheme())?,
))))
}
DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
let bin_indices_encoder = Self::array_encoder_from_type(&DataType::UInt64)?;
let bin_bytes_encoder = Self::array_encoder_from_type(&DataType::UInt8)?;

Ok(Box::new(BinaryEncoder::new(
bin_indices_encoder,
bin_bytes_encoder,
)))
}
_ => Ok(Box::new(BasicEncoder::new(Box::new(
ValueEncoder::try_new(data_type, get_compression_scheme())?,
Expand Down Expand Up @@ -369,30 +357,16 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
| DataType::UInt64
| DataType::UInt8
| DataType::FixedSizeBinary(_)
| DataType::FixedSizeList(_, _) => Ok(Box::new(PrimitiveFieldEncoder::try_new(
| DataType::FixedSizeList(_, _)
| DataType::Binary
| DataType::LargeBinary
| DataType::Utf8
| DataType::LargeUtf8 => Ok(Box::new(PrimitiveFieldEncoder::try_new(
cache_bytes_per_column,
keep_original_array,
self.array_encoding_strategy.clone(),
column_index.next_column_index(field.id),
)?)),
DataType::Utf8 => {
if get_str_encoding_type() {
Ok(Box::new(PrimitiveFieldEncoder::try_new(
cache_bytes_per_column,
keep_original_array,
self.array_encoding_strategy.clone(),
column_index.next_column_index(field.id),
)?))
} else {
let list_idx = column_index.next_column_index(field.id);
column_index.skip();
Ok(Box::new(BinaryFieldEncoder::new(
cache_bytes_per_column,
keep_original_array,
list_idx,
)))
}
}
DataType::List(child) => {
let list_idx = column_index.next_column_index(field.id);
let inner_encoding = encoding_strategy_root.create_field_encoder(
Expand Down Expand Up @@ -431,15 +405,6 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
header_idx,
)))
}
DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => {
let list_idx = column_index.next_column_index(field.id);
column_index.skip();
Ok(Box::new(BinaryFieldEncoder::new(
cache_bytes_per_column,
keep_original_array,
list_idx,
)))
}
_ => todo!("Implement encoding for field {}", field),
}
}
Expand Down
1 change: 0 additions & 1 deletion rust/lance-encoding/src/encodings/logical.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

pub mod binary;
pub mod list;
pub mod primitive;
pub mod r#struct;
Loading
Loading