-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ScanQuery supports multi column orderBy queries #13168
ScanQuery supports multi column orderBy queries #13168
Conversation
…sorting task according to the Druid design pattern
import java.util.Iterator; | ||
import java.util.List; | ||
|
||
public interface MultiColumnSorter<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@paul-rogers I have added a wrapper interface to the sorter
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
class OrderByQueryRunner implements QueryRunner<ScanResultValue> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@paul-rogers By default, priority queue based sorting is used in a segment. The sorting column will be materialized in advance. All sorting columns will be traversed first, and the topK offset will be cached in the priority queue. Then, the data to be returned will be extracted using the delayed materialization strategy, and only the row corresponding to the topK offset needs to be extracted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using these QueryRunner/Sequence
pair (ListBasedOrderByQueryRunner/ListBasedSorterSequence
, TreeMultisetOrderByQueryRunner/TreeMultisetBasedSorterSequence
), the sort ->merge ->limit
positions are separated
This pull request introduces 1 alert and fixes 1 when merging 5efc2d1 into ebfe1c0 - view on LGTM.com new alerts:
fixed alerts:
|
This pull request fixes 1 alert when merging 858a0b6 into ebfe1c0 - view on LGTM.com fixed alerts:
|
@599166320, thanks for the update! I'll take another look soon. I noticed that this is a new PR that replaces the existing, closed one. As it turns out, you can just reuse the same PR by pushing (or force-pushing) new commits. Reusing the same PR allows all comments to be in one place and preserves tags and other status. If the PR is closed, you can reopen it and be back in business. Let's use this PR from now on for this project. I've copied tags from your prior PR to this one, and included a link to the prior PR so we can see our earlier comments. Please reach out if you need help: doing this PR stuff can be tricky the first time you do it. |
Note to reviewers: please see prior history in PR #13031. |
@599166320, thanks for your explanations. Is it fair to state these as the requirements you want to satisfy?
These are all great goals! The goal of minimizing the memory footprint is a worth one. The above approach is similar to how an RDBMS uses a B-Tree index to read off index keys, then uses the rid within the key to fetch matching rows. (RDBMS systems usually add another layer of optimization. If the query requests only the sort keys, then skip the step to read the underlying rows. This is called an "index-only query" by some of us; other names also exist.) |
I previously commented on the use of a priority queue to do sorting. I was concerned about the cost of sorting n rows using a priority queue. A quicksort runs in O(n log n) time. According to the docs, the insert time for These questions can only be worked out by doing performance testing. In the interests of moving ahead, the priority queue implementation is fine for now. According to the docs, when used with a limit, the cost of removing the last element (to allow a new, lower-value element) is constant time, so that should be fine. |
@paul-rogers Thanks for your tips and help |
This pull request introduces 1 alert and fixes 1 when merging c5b33d0 into a3edda3 - view on LGTM.com new alerts:
fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request introduces 1 alert and fixes 1 when merging e0c7983 into 309cae7 - view on LGTM.com new alerts:
fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request introduces 1 alert and fixes 1 when merging 138f7fd into 78d0b0a - view on LGTM.com new alerts:
fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request introduces 1 alert and fixes 1 when merging d5406c0 into 78d0b0a - view on LGTM.com new alerts:
fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
Let's ignore the problem reported by Travis CI first, because it has nothing to do with this pr. I put the two versions of sorting on the test cluster to run the same sorting in a table with 300 million rows of data, and found that the performance of the improved version of sorting (based on dictionary sorting + based on rowId random access) is 10 times faster than the original. |
This pull request introduces 1 alert and fixes 1 when merging a8f86b3 into 16385c7 - view on LGTM.com new alerts:
fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@599166320, great progress! This area is complex. See the next round of comments.
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByMergeRowIterator.java
Outdated
Show resolved
Hide resolved
columns = columns.isEmpty() ? srv.getColumns() : columns; | ||
List<List<Object>> events = (List<List<Object>>) srv.getEvents(); | ||
for (Object event : events) { | ||
if (event instanceof LinkedHashMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not ideal. The scan result value carries a format type: list or compact list. A list is a list of maps. A compact list is a list of lists. All incoming result values will be one or the other (depending on a setting in the query). This gives us two choices:
- Load all items in their list or map forms, sort using that form, and recreate outgoing batches, or
- Convert all items to one format (a list say), sort, then convert back (to a map, if needed).
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByMergeRowIterator.java
Outdated
Show resolved
Hide resolved
|
||
/** | ||
* Scan the segments in parallel, complete the sorting of each batch within each segment, and then complete the sorting of each segment level | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's think about this.
Druid already handles parallel execution of segment scans within a historical. Are we redoing that here? Or, did the previous code not actually run in parallel?
We can't run cursors in parallel because they scan the same segment: in a low-memory condition, we might thrash the OS cache as we try to load all the segment slices in memory at the same time.
Druid's concurrency model is to run a limited number of threads, with those threads picking up tasks ready to run. If we have, say, 20 cursors for a segment, we'll have 20 tasks competing a for a small number of threads. Will the extra complexity actually be faster than running the cursors in sequence?
Would be great to explain the parallelism model we're implementing to answer those questions for future readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked it by printing the log:
2022-12-14T07:34:57,759 INFO [qtp1231864343-87] org.apache.druid.server.QueryResource - ThreadId:87,ThreadName:qtp1231864343-87
...
2022-12-14T07:35:03,791 INFO [scan_wikipedia_[2016-06-27T00:00:00.000Z/2016-06-27T01:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T00:00:00.000Z/2016-06-27T01:00:00.000Z]
2022-12-14T07:35:03,807 INFO [scan_wikipedia_[2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z]
2022-12-14T07:35:03,849 INFO [scan_wikipedia_[2016-06-27T02:00:00.000Z/2016-06-27T03:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T02:00:00.000Z/2016-06-27T03:00:00.000Z]
2022-12-14T07:35:03,854 INFO [scan_wikipedia_[2016-06-27T03:00:00.000Z/2016-06-27T04:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T03:00:00.000Z/2016-06-27T04:00:00.000Z]
2022-12-14T07:35:03,861 INFO [scan_wikipedia_[2016-06-27T04:00:00.000Z/2016-06-27T05:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T04:00:00.000Z/2016-06-27T05:00:00.000Z]
2022-12-14T07:35:03,881 INFO [scan_wikipedia_[2016-06-27T05:00:00.000Z/2016-06-27T06:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T05:00:00.000Z/2016-06-27T06:00:00.000Z]
2022-12-14T07:35:03,888 INFO [scan_wikipedia_[2016-06-27T06:00:00.000Z/2016-06-27T07:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T06:00:00.000Z/2016-06-27T07:00:00.000Z]
2022-12-14T07:35:03,896 INFO [scan_wikipedia_[2016-06-27T07:00:00.000Z/2016-06-27T08:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T07:00:00.000Z/2016-06-27T08:00:00.000Z]
2022-12-14T07:35:03,909 INFO [scan_wikipedia_[2016-06-27T08:00:00.000Z/2016-06-27T09:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T08:00:00.000Z/2016-06-27T09:00:00.000Z]
2022-12-14T07:35:03,919 INFO [scan_wikipedia_[2016-06-27T09:00:00.000Z/2016-06-27T10:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T09:00:00.000Z/2016-06-27T10:00:00.000Z]
2022-12-14T07:35:03,927 INFO [scan_wikipedia_[2016-06-27T10:00:00.000Z/2016-06-27T11:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T10:00:00.000Z/2016-06-27T11:00:00.000Z]
2022-12-14T07:35:03,932 INFO [scan_wikipedia_[2016-06-27T11:00:00.000Z/2016-06-27T12:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T11:00:00.000Z/2016-06-27T12:00:00.000Z]
2022-12-14T07:35:03,936 INFO [scan_wikipedia_[2016-06-27T12:00:00.000Z/2016-06-27T13:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T12:00:00.000Z/2016-06-27T13:00:00.000Z]
2022-12-14T07:35:03,939 INFO [scan_wikipedia_[2016-06-27T13:00:00.000Z/2016-06-27T14:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T13:00:00.000Z/2016-06-27T14:00:00.000Z]
2022-12-14T07:35:03,951 INFO [scan_wikipedia_[2016-06-27T14:00:00.000Z/2016-06-27T15:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T14:00:00.000Z/2016-06-27T15:00:00.000Z]
2022-12-14T07:35:03,955 INFO [scan_wikipedia_[2016-06-27T15:00:00.000Z/2016-06-27T16:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T15:00:00.000Z/2016-06-27T16:00:00.000Z]
2022-12-14T07:35:03,975 INFO [scan_wikipedia_[2016-06-27T16:00:00.000Z/2016-06-27T17:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T16:00:00.000Z/2016-06-27T17:00:00.000Z]
2022-12-14T07:35:03,984 INFO [scan_wikipedia_[2016-06-27T17:00:00.000Z/2016-06-27T18:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T17:00:00.000Z/2016-06-27T18:00:00.000Z]
2022-12-14T07:35:03,994 INFO [scan_wikipedia_[2016-06-27T18:00:00.000Z/2016-06-27T19:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T18:00:00.000Z/2016-06-27T19:00:00.000Z]
2022-12-14T07:35:03,998 INFO [scan_wikipedia_[2016-06-27T19:00:00.000Z/2016-06-27T20:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine
It can be seen from the above logs that the ThreadId of scanQuery is always 87. It can be proved that even if multiple segments are queried, scanQuery is executed by a single thread.
For this improved version, I refer to the parallelism model of TopNQuery
. I will briefly describe this parallelism model:
When the client submits a query, druid will be divided into multiple sub query tasks according to the granularity of the segment, and each sub query task corresponds to a segment. When the query is actually executed, the sub query task will be placed in the queue of the queryProcessingPool
. The thread pool will have multiple threads computing the result set in parallel. After each sub query is executed, its own computing results will be pushed to the priority queue, and the priority queue will merge the result sets of multiple sub queries. The resource usage in the whole calculation process is controlled by the queryProcessingPool and limit parameters. As long as it is properly(druid.processing.numThreads
) configured, I think it can avoid excessive memory usage,frequent thread creation and thread competition.
List<List<Object>> eventList = new ArrayList<>(); | ||
List<String> columns = new ArrayList<>(); | ||
String segmentId = null; | ||
while (it.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a comment to explain what we're doing? Looks like we read a set of scan result values and combine their rows into one bug scan result value. Again, the idea with scan query is to limit batch size. Does something else where break the results back into batches? Or, maybe copies these values into a sorter? If copy into a sorter, should that be done here, then later merge the results?
# Conflicts: # sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
return queryRunnerFactory.mergeRunners( | ||
Execs.directExecutor(), | ||
runners.stream().map(p -> p.rhs).collect(Collectors.toList()) | ||
).run( |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
processing/src/main/java/org/apache/druid/collections/QueueBasedSorter.java
Fixed
Show fixed
Hide fixed
This pull request has been marked as stale due to 60 days of inactivity. |
This pull request/issue has been closed due to lack of activity. If you think that |
Fixes #12958.
Description
QueueBasedMultiColumnSorter
,Other sorters can be used through context parametersQueueBasedMultiColumnSorter
__time
, addingpriority queue strategy
andn-way merge strategy
Key changed/added classes in this PR
OrderByQueryRunner
OrderBySequence
QueueBasedMultiColumnSorter
TreeMultisetBasedOrderByQueryRunner
TreeMultisetBasedSorterSequence
TreeMultisetBasedMulticolumnSorter
ScanQueryRunnerFactory
MultiColumnSorter
QueueBasedMultiColumnSorter
ScanQueryQueryToolChest
This PR has: