-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: spawn multiple encoding task inside a batch number of rows in one column when needed #2563
Conversation
rows in one column when needed
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
@@ -344,7 +345,8 @@ impl AccumulationQueue { | |||
// Push into buffered_arrays without copy since we are about to flush anyways | |||
self.buffered_arrays.push(array); | |||
self.current_bytes = 0; | |||
Some(std::mem::take(&mut self.buffered_arrays)) | |||
Some(self.slice_all_buffered_arrays_into_flush_task_arrays()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when doing AccumulationQueue::insert
, I think it's better to have the last portion of the arrow array
remain in self.buffered_arrays
?
fn slice_all_buffered_arrays_into_flush_task_arrays(&mut self) -> Vec<ArrayRef> { | ||
let concat_array = concat( | ||
&self | ||
.buffered_arrays |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do a std::mem::take here?
encoded_pages.push(encoded_page); | ||
} | ||
encoded_pages.sort_by_key(|p| p.page_idx); | ||
for encoded_page in encoded_pages.into_iter() { | ||
self.write_page(encoded_page).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has performance implications, we are waiting compute then do IO, I want to have a better way to batch size independent
+ decouple compute/IO
if flush_task_num == 0 { | ||
flush_task_arrays.push(concat_array); | ||
} else { | ||
let step = concat_array.len() / flush_task_num; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assuming elements in a same column have similar sizes
What is the motivation for multiple pages in this scenario? I have encountered one motivation in the past: if the user provides a very large array then we want multiple pages so we can read the file in pieces and get better I/O parallelism. However, it seems like motivation here is compute parallelism? Would it be better to write one page but introduce chunks into the page? For example: User provides array with 10 million strings. We think 10 million is too many for FSST / dictionary / etc. Maybe we want to calculate compression on chunks of 100,000. Instead of writing 100 pages can we write 1 page that has chunks?
Now we have one page (for I/O). That one page has 100 "chunks" (for compression). I think there are three variables and, as much as possible, would like to keep them independent: I/O size - Needs to be large enough to justify introducing new IOP (e.g. 8 MiB) |
Also, I think many encodings will want "compression chunk" idea. FastLanes uses chunks of 1024 values. Block compression (e.g. zlib / gzip / etc.) has its own built in chunk size. If we store "chunk offsets" in binary it might make decoding easier. One challenge is when a chunk is not "sliceable". For example, in FastLanes a chunk can be sliced. If we want values 50..300 from a chunk of size 1024 that is ok. We can independently decode these values. In block compression (e.g. gzip) a chunk is not sliceable. If we want values 50..300 we have to decompress entire chunk and then extract values. This makes decode scheduling tricky. |
the motivation here is to decouple |
I think introducing chunks into the page is a wonderful idea in solving the optimal compression size problem, I would love to implement it if I can get a chance! but the problem here is more about |
A few thoughts about introducing chunks into the page: |
I am actually hoping that we may be able to get good compression without using any block compression algorithms, for string data, I tested lz4_flex with fsst on we can see more results when we enable encoding cascade, for images and videos, these data is often already compressed and block compression won't help much for integers and floats, we can see more concrete comparisons after we have more encodings |
#2561
list and struct remain unchanged, I want to get some discussion and advise of this approach first