-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify index merging algorithm #5526
Comments
Hello, in our company we make extensive use of hadoop. One of our last tasks was the construction of monitoring. We considered a huge number of options(opentsdb,prometheus,clickhouse,etc). One of our final candidates was the druid. It fits perfectly into our ecosystem. The only problematic point was the insertion speed. It is very appropriate for us to increase the speed with which we can write. To do this, we would like to use much simpler indexes, sacrificing the speed of query execution. Is it possible to update the structure of the druid, and add the possibility of own implementation of the structure of the indexes and the segment? So that it was like a module that you can connect. Thus, use standard indexes on some datasource and own indexes implementation for other. |
@RestfulBlue it was discussed in various forms before, see #2654, #2965, #106. Yes it would be nice in theory, but it's not fully implemented at this point. However, #5402 has just been merged (allows to disable bitmap indexes), maybe it will satisfy your goals. Your question is irrelevant to this issue, if you have extra questions, please go to one of those linked threads, or ask on druid-dev mailing list. |
Hm, there is an intermediate persist to disk step, I don't think the IncrementalIndex is ever merged with other indexes directly. IncrementalIndexAdapter is only created as a result of a persist() call, which in turn calls merge() but with only a single IncrementalIndex, this happens when indexing tasks persist their current IncrementalIndex object with partial data to disk. When the partial results are merged later the merge happens only on QueryableIndex objects loaded from disk. |
@jon-wei note that
|
To make sure I'm understanding correctly, the proposed new implementation would look like this example? Index 1 to mergeUnsorted dict: Conversion buffer: rows: Index 2 to mergeUnsorted dictionary: Conversion buffer rows: Build global sorted dictionary:[0:aardvark, 1:dragon, 2:flamingo, 3:horse, 4:mantis, 5:monkey, 6:snake, 7:whale, 8:zebra] index 1 unsorted-per-index -> sorted-global ID: index 2 unsorted-per-index -> sorted-global ID: Convert rows and merge:index 1 rows: index 2 rows: merged (bitmaps for each dim value are built while merging across indexes, no bitmaps are created until this step) ====================
Hm, I'm not seeing how persisting partial results to disk could be avoided, even if the ingestion system was optimally memory-efficient, it's still possible to feed an indexing task more data than can be held in memory. For batch tasks, I don't think I see anything that would prevent the merging implementation from being changed to what's being proposed. The partial persists could have an unsorted dictionary plus the dict-encoded rows, with no bitmaps. For realtime tasks like KafkaIndexTask, the rows held by the task are exposed for querying, so that may be a reason to have sorted bitmaps -> common segment format for the partial chunks of data.
Hm, if the bitmap construction is done during the merge across indexes, I guess that would eliminate the need for the row number conversions, but the merge would need to keep the bitmaps for all dimensions in memory as it builds them, vs. now where the bitmaps are merged using the row number conversions on a per-dimension basis. As the number of dims increases, maybe this would increase total memory usage compared to now? I suppose the bitmaps could be built using per-dimension scans after the rows are merged though. Also, do you have a link to the PR that allowed the row number conversions to be stored offheap? I didn't see such an option when looking at the code, it looks like the row num conversion buffer is created using IntBuffer.wrap() which always uses creates a HeapIntBuffer |
Yes, that's the problem. Now I see it. It isn't probably a problem when dimensions for which bitmaps are constructed have reasonable cardinality, but in the worst case, when the cardinality is comparable with the total number of rows (like building bitmaps for the "requestId" dimension), it will increase memory usage of merging process dramatically. So the algorithm that I proposed above is not as robust as the current one. Most actions of 1)-6), however, really suffer the same problem. It explains why we always persist incremental index before merging. What should still be doneA special path to produce a segment from a single incremental index. It could avoid creating almost all temporary data structures. The main merging algorithm could be changed to the following
This algorithm should be as robust as the current one, but creates less structures and does less conversions. |
See |
That sounds good to me
In this new proposal, when an indexing task needs to persist partial data to disk, under what conditions would it create a segment with a bitmap vs. deferring bitmap creation until merging? |
To keep the algorithm as robust as the current one, initially it could always create bitmaps. |
Note: currently I'm not going to implement those improvements myself, so anybody is free to pick them up. |
This issue has been marked as stale due to 280 days of inactivity. It will be closed in 2 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time. |
Hi, I would like to pick this up, however, after reading the source code, I cannot figure out why per-index sorted-to-unsorted encoding conversion is needed, and where such conversion lives in segment. I am just suspecting if such conversion is to make dimension values contiguous so they can be encoded/compressed more compactly? Can you please elaborate so maybe I can pick this up? Thank you! |
This issue has been marked as stale due to 280 days of inactivity. |
This issue has been closed due to lack of activity. If you think that |
Current process seems to be overcomplicated.
The current steps:
IncrementalIndexAdapter.processRows()
(called fromIncrementalIndexAdapter
constructor)StringDimensionIndexer.SortedDimensionDictionary
: it has three O(N)-sized structures: fora) sorted-to-unsorted encoding conversion,
b) unsorted-to-sorted encoding conversion,
c) values in the sorted order itself (for iteration later; however it's materialization is not needed for sure, the list from
DimensionDictionary
and sorted-to-unsorted encoding could be combined)Additionally, when preparing those three structures, another O(N) structure is temporarily created (a TreeMap)
IndexMerger.DictionaryMergeIterator
IndexMergerV9. mergeIndexesAndWriteColumns()
.StringDimensionMergerV9.mergeBitmaps()
.I don't see why it couldn't be simplified to the following:
int[]
, fill it with 0, 1, .., and then sort it with strategy, delegating comparison to theDimensionDictionary
's array of String values. I. e. one O(N) structure is created on this step.DimensionDictionary
's array), create per-index-unsorted-to-global-sorted encoding conversion: O(N) structure.The main question is why we need to create per-index bitmaps in the very beginning, then create a lot of auxiliary structures to keep them usable until the very end, and them merge. If we could(?) just create bitmaps in the end.
@jon-wei @gianm @jihoonson
The text was updated successfully, but these errors were encountered: