Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: spawn multiple encoding task inside a batch number of rows in one column when needed #2563

Closed
wants to merge 1 commit into from

Conversation

broccoliSpicy
Copy link
Contributor

@broccoliSpicy broccoliSpicy commented Jul 3, 2024

#2561
list and struct remain unchanged, I want to get some discussion and advise of this approach first

Copy link

github-actions bot commented Jul 3, 2024

ACTION NEEDED
Lance follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

For details on the error please inspect the "PR Title Check" action.

@@ -344,7 +345,8 @@ impl AccumulationQueue {
// Push into buffered_arrays without copy since we are about to flush anyways
self.buffered_arrays.push(array);
self.current_bytes = 0;
Some(std::mem::take(&mut self.buffered_arrays))
Some(self.slice_all_buffered_arrays_into_flush_task_arrays())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when doing AccumulationQueue::insert, I think it's better to have the last portion of the arrow array remain in self.buffered_arrays?

fn slice_all_buffered_arrays_into_flush_task_arrays(&mut self) -> Vec<ArrayRef> {
let concat_array = concat(
&self
.buffered_arrays
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do a std::mem::take here?

encoded_pages.push(encoded_page);
}
encoded_pages.sort_by_key(|p| p.page_idx);
for encoded_page in encoded_pages.into_iter() {
self.write_page(encoded_page).await?;
Copy link
Contributor Author

@broccoliSpicy broccoliSpicy Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has performance implications, we are waiting compute then do IO, I want to have a better way to batch size independent + decouple compute/IO

if flush_task_num == 0 {
flush_task_arrays.push(concat_array);
} else {
let step = concat_array.len() / flush_task_num;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming elements in a same column have similar sizes

@broccoliSpicy broccoliSpicy changed the title fix issue#2561, spawn multiple encoding task inside a batch number of rows in one column when needed fix: spawn multiple encoding task inside a batch number of rows in one column when needed Jul 4, 2024
@github-actions github-actions bot added the bug Something isn't working label Jul 4, 2024
@westonpace
Copy link
Contributor

What is the motivation for multiple pages in this scenario?

I have encountered one motivation in the past: if the user provides a very large array then we want multiple pages so we can read the file in pieces and get better I/O parallelism. However, it seems like motivation here is compute parallelism?

Would it be better to write one page but introduce chunks into the page?

For example:

User provides array with 10 million strings. We think 10 million is too many for FSST / dictionary / etc. Maybe we want to calculate compression on chunks of 100,000.

Instead of writing 100 pages can we write 1 page that has chunks?

message Fsst {
  ArrayEncoding binary = 1;
  message SymbolTable {
    uint64 binary_offset = 2;
    bytes symbol_table = 3;
  }
  repeated SymbolTable symbol_tables = 4;
}

Now we have one page (for I/O). That one page has 100 "chunks" (for compression).

I think there are three variables and, as much as possible, would like to keep them independent:

I/O size - Needs to be large enough to justify introducing new IOP (e.g. 8 MiB)
Compression size - Smaller means more compressible but more overhead, find balance
Decode size - Aim to fit each decoded batch into CPU cache

@westonpace
Copy link
Contributor

Also, I think many encodings will want "compression chunk" idea.

FastLanes uses chunks of 1024 values.
Dictionary should also use chunks, though we might want a larger value, 1024 seems small for dictionary.

Block compression (e.g. zlib / gzip / etc.) has its own built in chunk size. If we store "chunk offsets" in binary it might make decoding easier.

One challenge is when a chunk is not "sliceable". For example, in FastLanes a chunk can be sliced. If we want values 50..300 from a chunk of size 1024 that is ok. We can independently decode these values. In block compression (e.g. gzip) a chunk is not sliceable. If we want values 50..300 we have to decompress entire chunk and then extract values. This makes decode scheduling tricky.

@broccoliSpicy
Copy link
Contributor Author

broccoliSpicy commented Jul 8, 2024

What is the motivation for multiple pages in this scenario?

I have encountered one motivation in the past: if the user provides a very large array then we want multiple pages so we can read the file in pieces and get better I/O parallelism. However, it seems like motivation here is compute parallelism?

the motivation here is to decouple batch size in number of rows and page size we wrote, currently, we check if there is enough data for a encoding task when a batch number of rows in one column comes, however, for our multi-model workload, the sizes in different columns can vary a lot, we may want to write multiple pages for larger columns while not issuing any write for smaller columns.

@broccoliSpicy
Copy link
Contributor Author

broccoliSpicy commented Jul 8, 2024

Would it be better to write one page but introduce chunks into the page?

I think there are three variables and, as much as possible, would like to keep them independent:
I/O size - Needs to be large enough to justify introducing new IOP (e.g. 8 MiB)
Compression size - Smaller means more compressible but more overhead, find balance
Decode size - Aim to fit each decoded batch into CPU cache

I think introducing chunks into the page is a wonderful idea in solving the optimal compression size problem, I would love to implement it if I can get a chance!

but the problem here is more about write many pages for large column when a batch comes

@broccoliSpicy
Copy link
Contributor Author

A few thoughts about introducing chunks into the page:
1: to allow compaction, we need to allow chunks freely flow between adjacent pages, we can detach the encoding_chunk phase and write_page phase to do that,
encode_chunk -> compaction -> write_page
2: the compaction needs to be done in a sequential order, we only get to know where to put this encoded_chunk after all chunks before this are encoded.
3: putting the chunk information(offset, size, decode information) in column_meta_data may take too much space, if we put the chunk information in a page header, we may need to add a layer ChunkDecoder below PageDecoder
4: after encoding, if we duplicate the decode meta data, we can slice a chunk into two pages

@broccoliSpicy
Copy link
Contributor Author

broccoliSpicy commented Jul 12, 2024

Block compression (e.g. zlib / gzip / etc.) has its own built in chunk size. If we store "chunk offsets" in binary it might make decoding easier.

I am actually hoping that we may be able to get good compression without using any block compression algorithms,

for string data, I tested lz4_flex with fsst on ms marco a few weeks ago and I saw lz4_flex's compression ratio is about 1.7 on third column, where fsst is about 1.55, I vaguely remember fsst is better in the first and second column of ms marco.

we can see more results when we enable encoding cascade,

for images and videos, these data is often already compressed and block compression won't help much

for integers and floats, we can see more concrete comparisons after we have more encodings

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants