Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify index merging algorithm #5526

Closed
leventov opened this issue Mar 23, 2018 · 17 comments
Closed

Simplify index merging algorithm #5526

leventov opened this issue Mar 23, 2018 · 17 comments

Comments

@leventov
Copy link
Member

leventov commented Mar 23, 2018

Current process seems to be overcomplicated.

The current steps:

  1. Produce per-index bitmaps, using unsorted per-index encoding and per-index row numbers: IncrementalIndexAdapter.processRows() (called from IncrementalIndexAdapter constructor)
  2. Sort per-index dictionary. See StringDimensionIndexer.SortedDimensionDictionary: it has three O(N)-sized structures: for
    a) sorted-to-unsorted encoding conversion,
    b) unsorted-to-sorted encoding conversion,
    c) values in the sorted order itself (for iteration later; however it's materialization is not needed for sure, the list from DimensionDictionary and sorted-to-unsorted encoding could be combined)
    Additionally, when preparing those three structures, another O(N) structure is temporarily created (a TreeMap)
  3. When merging per-index sorted dictionaries, O(N) per-index-sorted-to-global-sorted encoding conversion structures are created: see IndexMerger.DictionaryMergeIterator
  4. Before global merge of rows, encoded row values are converted twice: firstly, per-index-unsorted-to-sorted, using the structure from 2.a), and secondly, per-index-sorted-to-globally-sorted, using the structure from 3).
  5. When merging rows of indexes into a global row stream, per-index-row-number-to-global-row-number conversion structures are created: see IndexMergerV9. mergeIndexesAndWriteColumns().
  6. After writing out the global stream of rows, per-index bitmaps are merged. This merge requires the structures from 2.b) and 5). See StringDimensionMergerV9.mergeBitmaps().

I don't see why it couldn't be simplified to the following:

  1. Create a sorted-to-unsorted encoding conversion. It could be done by creating int[], fill it with 0, 1, .., and then sort it with strategy, delegating comparison to the DimensionDictionary's array of String values. I. e. one O(N) structure is created on this step.
  2. When merging per-index sorted dictionaries (as explained above, it could be done by iterating the structure, created on the previous step, and composing it with DimensionDictionary's array), create per-index-unsorted-to-global-sorted encoding conversion: O(N) structure.
  3. Before global merge of rows, convert encoded row values once, using the structure, created on the previous step.
  4. While merging and writing out a global row stream, global bitmaps are created as we go.

The main question is why we need to create per-index bitmaps in the very beginning, then create a lot of auxiliary structures to keep them usable until the very end, and them merge. If we could(?) just create bitmaps in the end.

@jon-wei @gianm @jihoonson

@RestfulBlue
Copy link

Hello,

in our company we make extensive use of hadoop. One of our last tasks was the construction of monitoring. We considered a huge number of options(opentsdb,prometheus,clickhouse,etc). One of our final candidates was the druid. It fits perfectly into our ecosystem. The only problematic point was the insertion speed. It is very appropriate for us to increase the speed with which we can write. To do this, we would like to use much simpler indexes, sacrificing the speed of query execution.

Is it possible to update the structure of the druid, and add the possibility of own implementation of the structure of the indexes and the segment?

So that it was like a module that you can connect. Thus, use standard indexes on some datasource and own indexes implementation for other.

@leventov
Copy link
Member Author

leventov commented Mar 23, 2018

@RestfulBlue it was discussed in various forms before, see #2654, #2965, #106. Yes it would be nice in theory, but it's not fully implemented at this point. However, #5402 has just been merged (allows to disable bitmap indexes), maybe it will satisfy your goals.

Your question is irrelevant to this issue, if you have extra questions, please go to one of those linked threads, or ask on druid-dev mailing list.

@jon-wei
Copy link
Contributor

jon-wei commented Mar 23, 2018

Before global merge of rows, encoded row values are converted twice: firstly, per-index-unsorted-to-sorted, using the structure from 2.a), and secondly, per-index-sorted-to-globally-sorted, using the structure from 3).

After writing out the global stream of rows, per-index bitmaps are merged. This merge requires the structures from 2.b) and 5). See StringDimensionMergerV9.mergeBitmaps().

The main question is why we need to create per-index bitmaps in the very beginning, then create a lot of auxiliary structures to keep them usable until the very end, and them merge. If we could(?) just create bitmaps in the end.

Hm, there is an intermediate persist to disk step, I don't think the IncrementalIndex is ever merged with other indexes directly.

IncrementalIndexAdapter is only created as a result of a persist() call, which in turn calls merge() but with only a single IncrementalIndex, this happens when indexing tasks persist their current IncrementalIndex object with partial data to disk.

When the partial results are merged later the merge happens only on QueryableIndex objects loaded from disk.

@leventov
Copy link
Member Author

leventov commented Mar 23, 2018

@jon-wei note that

  • That "individual incremental index persist to a segment" step pretty much doesn't optimize anything that I explained in the steps 1)-6). So it still creates all those structures, which often in fact do identity conversion. It could be optimized, but it means adding even more complexity than we have now.

  • I don't remember exactly how incremental index operates, probably it needs to persist segments not only for merging, but batch indexing, especially Spark, does this only because it wants to avoid the full cost and memory consumption of steps 1)-6). So it needs to write to disk. With the new approach it could be avoided.

  • Even when merging only segments, per-index-row-number-to-global-row-number conversion structures are still created. This is not needed with the new approach. An option was added some time ago, to allocate those structures in off-heap memory, apparently in response to some memory problems. If those structures are just not needed, it's obviously even more helpful.

@b-slim
Copy link
Contributor

b-slim commented Mar 23, 2018

@leventov is this part of #4622? If yes can we add some pointers to help navigations between proposals and actual code merging?

@jon-wei
Copy link
Contributor

jon-wei commented Mar 24, 2018

To make sure I'm understanding correctly, the proposed new implementation would look like this example?

Index 1 to merge

Unsorted dict:
horse -> 0
aardvark -> 1
whale -> 2
flamingo -> 3
monkey -> 4

Conversion buffer:
[1, 0, 3, 4, 2]

rows:
{ts=1, x=0}
{ts=2, x=1}
{ts=3, x=2}
{ts=4, x=3}
{ts=5, x=4}

Index 2 to merge

Unsorted dictionary:
mantis -> 0
snake -> 1
dragon -> 2
flamingo -> 3
zebra -> 4

Conversion buffer
[2, 3, 0, 1, 4]

rows:
{ts=1, x=0}
{ts=2, x=1}
{ts=3, x=2}
{ts=4, x=3}
{ts=5, x=4}

Build global sorted dictionary:

[0:aardvark, 1:dragon, 2:flamingo, 3:horse, 4:mantis, 5:monkey, 6:snake, 7:whale, 8:zebra]

index 1 unsorted-per-index -> sorted-global ID:
[3, 0, 7, 2, 5]

index 2 unsorted-per-index -> sorted-global ID:
[4, 6, 1, 2, 8]

Convert rows and merge:

index 1 rows:
rows:
{ts=1, x=3}
{ts=2, x=0}
{ts=3, x=7}
{ts=4, x=2}
{ts=5, x=5}

index 2 rows:
rows:
{ts=1, x=4}
{ts=2, x=6}
{ts=3, x=1}
{ts=4, x=2}
{ts=5, x=8}

merged (bitmaps for each dim value are built while merging across indexes, no bitmaps are created until this step)
{ts=1, x=3}
{ts=1, x=4}
{ts=2, x=0}
{ts=2, x=6}
{ts=3, x=1}
{ts=3, x=7}
{ts=4, x=2}
{ts=5, x=5}
{ts=5, x=8}

====================

I don't remember exactly how incremental index operates, probably it needs to persist segments not only for merging, but batch indexing, especially Spark, does this only because it wants to avoid full and memory consumption of steps 1)-6). So it needs to write to disk. With the new approach it could be avoided.

Hm, I'm not seeing how persisting partial results to disk could be avoided, even if the ingestion system was optimally memory-efficient, it's still possible to feed an indexing task more data than can be held in memory.

For batch tasks, I don't think I see anything that would prevent the merging implementation from being changed to what's being proposed. The partial persists could have an unsorted dictionary plus the dict-encoded rows, with no bitmaps.

For realtime tasks like KafkaIndexTask, the rows held by the task are exposed for querying, so that may be a reason to have sorted bitmaps -> common segment format for the partial chunks of data.

Even when merging only segments, per-index-row-number-to-global-row-number conversion structures are still created. This is not needed with the new approach. An option was added some time ago, to allocate those structures in off-heap memory, apparently in response to some memory problems. If those structures just not needed, it's obviously even more helpful.

Hm, if the bitmap construction is done during the merge across indexes, I guess that would eliminate the need for the row number conversions, but the merge would need to keep the bitmaps for all dimensions in memory as it builds them, vs. now where the bitmaps are merged using the row number conversions on a per-dimension basis.

As the number of dims increases, maybe this would increase total memory usage compared to now? I suppose the bitmaps could be built using per-dimension scans after the rows are merged though.

Also, do you have a link to the PR that allowed the row number conversions to be stored offheap? I didn't see such an option when looking at the code, it looks like the row num conversion buffer is created using IntBuffer.wrap() which always uses creates a HeapIntBuffer

@leventov
Copy link
Member Author

@b-slim this issue is not related to #4622.

@leventov
Copy link
Member Author

leventov commented Mar 24, 2018

@jon-wei

Hm, if the bitmap construction is done during the merge across indexes, I guess that would eliminate the need for the row number conversions, but the merge would need to keep the bitmaps for all dimensions in memory as it builds them, vs. now where the bitmaps are merged using the row number conversions on a per-dimension basis.

Yes, that's the problem. Now I see it. It isn't probably a problem when dimensions for which bitmaps are constructed have reasonable cardinality, but in the worst case, when the cardinality is comparable with the total number of rows (like building bitmaps for the "requestId" dimension), it will increase memory usage of merging process dramatically. So the algorithm that I proposed above is not as robust as the current one.

Most actions of 1)-6), however, really suffer the same problem. It explains why we always persist incremental index before merging.

What should still be done

A special path to produce a segment from a single incremental index. It could avoid creating almost all temporary data structures.

The main merging algorithm could be changed to the following

  1. Create a sorted-to-unsorted encoding conversion. It could be done by creating int[], fill it with 0, 1, .., and then sort it with strategy, delegating comparison to the DimensionDictionary's array of String values. I. e. one O(N) structure is created on this step.
  2. When merging per-index sorted dictionaries (as explained above, it could be done by iterating the structure, created on the previous step, and composing it with DimensionDictionary's array), create per-index-unsorted-to-global-sorted encoding conversion: O(N)-sized structure.
  3. Before global merge of rows, convert encoded row values once, using the structure, created on the previous step.
  4. While merging rows of indexes into a global row stream, produce two types of structures in parallel:
    a) per-index-row-number-to-global-row-number conversion structures, O(N)-sized. This structure is created only for indexes which are segments.
    b) Produce per-index bitmaps, using global dictionary encoding and global row numbers. Before, this was step 1, and different encoding and row numbers were used. This structure is created only for indexes which are incremental indexes.
  5. After writing out the global stream of rows, per-index bitmaps are merged. For bitmaps belonging to merged indexes which are segments, i. e. bitmaps were not created on the step 4.b) but rather taken from the segment itself, because they already exist, conversion from local row numbers to global row numbers is done using the structure, created on the step 4.a). For bitmaps belonging to merged indexes which are incremental indexes, i. e. created on the step 4.b), no additional conversion is needed.

This algorithm should be as robust as the current one, but creates less structures and does less conversions.

@leventov
Copy link
Member Author

I didn't see such an option when looking at the code, it looks like the row num conversion buffer is created using IntBuffer.wrap() which always uses creates a HeapIntBuffer

See DictionaryMergeIterator's useDirect parameter. I couldn't find the commit where it was introduced, maybe it existed from the inception of this algorithm. Anyway, it doesn't matter, because it seems that creating this structure is the lesser evil, so couldn't be avoided (unless the merged index is non-persisted incremental index, see step 4. in the updated algorithm proposal above.)

@jon-wei
Copy link
Contributor

jon-wei commented Mar 27, 2018

A special path to produce a segment from a single incremental index. It could avoid creating almost all temporary data structures.

That sounds good to me

While merging rows of indexes into a global row stream, produce two types of structures in parallel:
a) per-index-row-number-to-global-row-number conversion structures, O(N)-sized. This structure is created only for indexes which are segments.
b) Produce per-index bitmaps, using global dictionary encoding and global row numbers. Before, this was step 1, and different encoding and row numbers were used. This structure is created only for indexes which are incremental indexes.

In this new proposal, when an indexing task needs to persist partial data to disk, under what conditions would it create a segment with a bitmap vs. deferring bitmap creation until merging?

@leventov
Copy link
Member Author

In this new proposal, when an indexing task needs to persist partial data to disk, under what conditions would it create a segment with a bitmap vs. deferring bitmap creation until merging?

To keep the algorithm as robust as the current one, initially it could always create bitmaps.

@leventov
Copy link
Member Author

Note: currently I'm not going to implement those improvements myself, so anybody is free to pick them up.

@stale
Copy link

stale bot commented Jun 21, 2019

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 2 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Jun 21, 2019
@stale
Copy link

stale bot commented Jul 5, 2019

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.

@stale stale bot closed this as completed Jul 5, 2019
@leventov leventov removed the stale label Jul 23, 2019
@leventov leventov reopened this Jul 23, 2019
@on99
Copy link

on99 commented May 3, 2022

@leventov @jon-wei

Hi, I would like to pick this up, however, after reading the source code, I cannot figure out why per-index sorted-to-unsorted encoding conversion is needed, and where such conversion lives in segment. I am just suspecting if such conversion is to make dimension values contiguous so they can be encoded/compressed more compactly? Can you please elaborate so maybe I can pick this up? Thank you!

@github-actions
Copy link

This issue has been marked as stale due to 280 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If this issue is still
relevant, please simply write any comment. Even if closed, you can still revive the
issue at any time or discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions bot added the stale label Jun 29, 2023
@github-actions
Copy link

This issue has been closed due to lack of activity. If you think that
is incorrect, or the issue requires additional review, you can revive the issue at
any time.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Jul 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants