Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScanQuery supports multi column orderBy queries #13168

Conversation

599166320
Copy link
Contributor

@599166320 599166320 commented Oct 2, 2022

Fixes #12958.

Description

  1. Add normal column sorting
  2. Add Inline DataSource Sorting
  3. Fix #13152 inline empty data sorting
  4. Sorter interface extraction for future improvement
  5. By default, the data in the segment is sorted by QueueBasedMultiColumnSorter,Other sorters can be used through context parameters
  6. Merge results using QueueBasedMultiColumnSorter
  7. Reference Sort style of __time , adding priority queue strategy and n-way merge strategy

Key changed/added classes in this PR
  • OrderByQueryRunner
  • OrderBySequence
  • QueueBasedMultiColumnSorter
  • TreeMultisetBasedOrderByQueryRunner
  • TreeMultisetBasedSorterSequence
  • TreeMultisetBasedMulticolumnSorter
  • ScanQueryRunnerFactory
  • MultiColumnSorter
  • QueueBasedMultiColumnSorter
  • ScanQueryQueryToolChest

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

import java.util.Iterator;
import java.util.List;

public interface MultiColumnSorter<T>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paul-rogers I have added a wrapper interface to the sorter

import java.util.Set;
import java.util.stream.Collectors;

class OrderByQueryRunner implements QueryRunner<ScanResultValue>
Copy link
Contributor Author

@599166320 599166320 Oct 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paul-rogers By default, priority queue based sorting is used in a segment. The sorting column will be materialized in advance. All sorting columns will be traversed first, and the topK offset will be cached in the priority queue. Then, the data to be returned will be extracted using the delayed materialization strategy, and only the row corresponding to the topK offset needs to be extracted.

Copy link
Contributor Author

@599166320 599166320 Oct 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using these QueryRunner/Sequence pair (ListBasedOrderByQueryRunner/ListBasedSorterSequence, TreeMultisetOrderByQueryRunner/TreeMultisetBasedSorterSequence), the sort ->merge ->limit positions are separated

@lgtm-com
Copy link

lgtm-com bot commented Oct 2, 2022

This pull request introduces 1 alert and fixes 1 when merging 5efc2d1 into ebfe1c0 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

fixed alerts:

  • 1 for Dereferenced variable may be null

@lgtm-com
Copy link

lgtm-com bot commented Oct 3, 2022

This pull request fixes 1 alert when merging 858a0b6 into ebfe1c0 - view on LGTM.com

fixed alerts:

  • 1 for Dereferenced variable may be null

@paul-rogers
Copy link
Contributor

paul-rogers commented Oct 3, 2022

@599166320, thanks for the update! I'll take another look soon.

I noticed that this is a new PR that replaces the existing, closed one. As it turns out, you can just reuse the same PR by pushing (or force-pushing) new commits. Reusing the same PR allows all comments to be in one place and preserves tags and other status. If the PR is closed, you can reopen it and be back in business. Let's use this PR from now on for this project. I've copied tags from your prior PR to this one, and included a link to the prior PR so we can see our earlier comments.

Please reach out if you need help: doing this PR stuff can be tricky the first time you do it.

@paul-rogers
Copy link
Contributor

Note to reviewers: please see prior history in PR #13031.

@paul-rogers
Copy link
Contributor

@599166320, thanks for your explanations. Is it fair to state these as the requirements you want to satisfy?

  • Allow sorting of columns other than the pre-defined sort order.
  • Sorting requires materializing the sort keys. Materialize only the sort keys: once the data is sorted, use the row IDs (rids) within the sort keys to fetch the data. This approach minimizes the memory footprint of the sort as it does not buffer the non-key columns.
  • Optimize a limit operation: sort the entire set of rows that match the filter criteria, but fetch on-key columns only for those rows within the limit.

These are all great goals! The goal of minimizing the memory footprint is a worth one.

The above approach is similar to how an RDBMS uses a B-Tree index to read off index keys, then uses the rid within the key to fetch matching rows. (RDBMS systems usually add another layer of optimization. If the query requests only the sort keys, then skip the step to read the underlying rows. This is called an "index-only query" by some of us; other names also exist.)

@paul-rogers
Copy link
Contributor

I previously commented on the use of a priority queue to do sorting. I was concerned about the cost of sorting n rows using a priority queue. A quicksort runs in O(n log n) time.

According to the docs, the insert time for MinMaxPriorityQueue is O(log n). Since we insert n elements total, the total cost is O(n log n), the same as for quicksort. Given this, the difference in performance will be in the constant overhead of the two methods, plus any memory overhead differences.

These questions can only be worked out by doing performance testing. In the interests of moving ahead, the priority queue implementation is fine for now. According to the docs, when used with a limit, the cost of removing the last element (to allow a new, lower-value element) is constant time, so that should be fine.

@599166320
Copy link
Contributor Author

599166320 commented Oct 4, 2022

@599166320, thanks for the update! I'll take another look soon.

I noticed that this is a new PR that replaces the existing, closed one. As it turns out, you can just reuse the same PR by pushing (or force-pushing) new commits. Reusing the same PR allows all comments to be in one place and preserves tags and other status. If the PR is closed, you can reopen it and be back in business. Let's use this PR from now on for this project. I've copied tags from your prior PR to this one, and included a link to the prior PR so we can see our earlier comments.

Please reach out if you need help: doing this PR stuff can be tricky the first time you do it.

@paul-rogers Thanks for your tips and help

@lgtm-com
Copy link

lgtm-com bot commented Nov 14, 2022

This pull request introduces 1 alert and fixes 1 when merging c5b33d0 into a3edda3 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

fixed alerts:

  • 1 for Dereferenced variable may be null

Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog.

@lgtm-com
Copy link

lgtm-com bot commented Nov 15, 2022

This pull request introduces 1 alert and fixes 1 when merging e0c7983 into 309cae7 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

fixed alerts:

  • 1 for Dereferenced variable may be null

Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog.

@lgtm-com
Copy link

lgtm-com bot commented Nov 16, 2022

This pull request introduces 1 alert and fixes 1 when merging 138f7fd into 78d0b0a - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

fixed alerts:

  • 1 for Dereferenced variable may be null

Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog.

@lgtm-com
Copy link

lgtm-com bot commented Nov 17, 2022

This pull request introduces 1 alert and fixes 1 when merging d5406c0 into 78d0b0a - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

fixed alerts:

  • 1 for Dereferenced variable may be null

Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog.

@599166320
Copy link
Contributor Author

Let's ignore the problem reported by Travis CI first, because it has nothing to do with this pr.

I put the two versions of sorting on the test cluster to run the same sorting in a table with 300 million rows of data, and found that the performance of the improved version of sorting (based on dictionary sorting + based on rowId random access) is 10 times faster than the original.

@lgtm-com
Copy link

lgtm-com bot commented Nov 24, 2022

This pull request introduces 1 alert and fixes 1 when merging a8f86b3 into 16385c7 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

fixed alerts:

  • 1 for Dereferenced variable may be null

Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog.

Copy link
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@599166320, great progress! This area is complex. See the next round of comments.

columns = columns.isEmpty() ? srv.getColumns() : columns;
List<List<Object>> events = (List<List<Object>>) srv.getEvents();
for (Object event : events) {
if (event instanceof LinkedHashMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not ideal. The scan result value carries a format type: list or compact list. A list is a list of maps. A compact list is a list of lists. All incoming result values will be one or the other (depending on a setting in the query). This gives us two choices:

  • Load all items in their list or map forms, sort using that form, and recreate outgoing batches, or
  • Convert all items to one format (a list say), sort, then convert back (to a map, if needed).


/**
* Scan the segments in parallel, complete the sorting of each batch within each segment, and then complete the sorting of each segment level
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think about this.

Druid already handles parallel execution of segment scans within a historical. Are we redoing that here? Or, did the previous code not actually run in parallel?

We can't run cursors in parallel because they scan the same segment: in a low-memory condition, we might thrash the OS cache as we try to load all the segment slices in memory at the same time.

Druid's concurrency model is to run a limited number of threads, with those threads picking up tasks ready to run. If we have, say, 20 cursors for a segment, we'll have 20 tasks competing a for a small number of threads. Will the extra complexity actually be faster than running the cursors in sequence?

Would be great to explain the parallelism model we're implementing to answer those questions for future readers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked it by printing the log:

2022-12-14T07:34:57,759 INFO [qtp1231864343-87] org.apache.druid.server.QueryResource - ThreadId:87,ThreadName:qtp1231864343-87
...
2022-12-14T07:35:03,791 INFO [scan_wikipedia_[2016-06-27T00:00:00.000Z/2016-06-27T01:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T00:00:00.000Z/2016-06-27T01:00:00.000Z]
2022-12-14T07:35:03,807 INFO [scan_wikipedia_[2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z]
2022-12-14T07:35:03,849 INFO [scan_wikipedia_[2016-06-27T02:00:00.000Z/2016-06-27T03:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T02:00:00.000Z/2016-06-27T03:00:00.000Z]
2022-12-14T07:35:03,854 INFO [scan_wikipedia_[2016-06-27T03:00:00.000Z/2016-06-27T04:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T03:00:00.000Z/2016-06-27T04:00:00.000Z]
2022-12-14T07:35:03,861 INFO [scan_wikipedia_[2016-06-27T04:00:00.000Z/2016-06-27T05:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T04:00:00.000Z/2016-06-27T05:00:00.000Z]
2022-12-14T07:35:03,881 INFO [scan_wikipedia_[2016-06-27T05:00:00.000Z/2016-06-27T06:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T05:00:00.000Z/2016-06-27T06:00:00.000Z]
2022-12-14T07:35:03,888 INFO [scan_wikipedia_[2016-06-27T06:00:00.000Z/2016-06-27T07:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T06:00:00.000Z/2016-06-27T07:00:00.000Z]
2022-12-14T07:35:03,896 INFO [scan_wikipedia_[2016-06-27T07:00:00.000Z/2016-06-27T08:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T07:00:00.000Z/2016-06-27T08:00:00.000Z]
2022-12-14T07:35:03,909 INFO [scan_wikipedia_[2016-06-27T08:00:00.000Z/2016-06-27T09:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T08:00:00.000Z/2016-06-27T09:00:00.000Z]
2022-12-14T07:35:03,919 INFO [scan_wikipedia_[2016-06-27T09:00:00.000Z/2016-06-27T10:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T09:00:00.000Z/2016-06-27T10:00:00.000Z]
2022-12-14T07:35:03,927 INFO [scan_wikipedia_[2016-06-27T10:00:00.000Z/2016-06-27T11:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T10:00:00.000Z/2016-06-27T11:00:00.000Z]
2022-12-14T07:35:03,932 INFO [scan_wikipedia_[2016-06-27T11:00:00.000Z/2016-06-27T12:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T11:00:00.000Z/2016-06-27T12:00:00.000Z]
2022-12-14T07:35:03,936 INFO [scan_wikipedia_[2016-06-27T12:00:00.000Z/2016-06-27T13:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T12:00:00.000Z/2016-06-27T13:00:00.000Z]
2022-12-14T07:35:03,939 INFO [scan_wikipedia_[2016-06-27T13:00:00.000Z/2016-06-27T14:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T13:00:00.000Z/2016-06-27T14:00:00.000Z]
2022-12-14T07:35:03,951 INFO [scan_wikipedia_[2016-06-27T14:00:00.000Z/2016-06-27T15:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T14:00:00.000Z/2016-06-27T15:00:00.000Z]
2022-12-14T07:35:03,955 INFO [scan_wikipedia_[2016-06-27T15:00:00.000Z/2016-06-27T16:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T15:00:00.000Z/2016-06-27T16:00:00.000Z]
2022-12-14T07:35:03,975 INFO [scan_wikipedia_[2016-06-27T16:00:00.000Z/2016-06-27T17:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T16:00:00.000Z/2016-06-27T17:00:00.000Z]
2022-12-14T07:35:03,984 INFO [scan_wikipedia_[2016-06-27T17:00:00.000Z/2016-06-27T18:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T17:00:00.000Z/2016-06-27T18:00:00.000Z]
2022-12-14T07:35:03,994 INFO [scan_wikipedia_[2016-06-27T18:00:00.000Z/2016-06-27T19:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T18:00:00.000Z/2016-06-27T19:00:00.000Z]
2022-12-14T07:35:03,998 INFO [scan_wikipedia_[2016-06-27T19:00:00.000Z/2016-06-27T20:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine 

It can be seen from the above logs that the ThreadId of scanQuery is always 87. It can be proved that even if multiple segments are queried, scanQuery is executed by a single thread.

For this improved version, I refer to the parallelism model of TopNQuery. I will briefly describe this parallelism model:

When the client submits a query, druid will be divided into multiple sub query tasks according to the granularity of the segment, and each sub query task corresponds to a segment. When the query is actually executed, the sub query task will be placed in the queue of the queryProcessingPool. The thread pool will have multiple threads computing the result set in parallel. After each sub query is executed, its own computing results will be pushed to the priority queue, and the priority queue will merge the result sets of multiple sub queries. The resource usage in the whole calculation process is controlled by the queryProcessingPool and limit parameters. As long as it is properly(druid.processing.numThreads) configured, I think it can avoid excessive memory usage,frequent thread creation and thread competition.

List<List<Object>> eventList = new ArrayList<>();
List<String> columns = new ArrayList<>();
String segmentId = null;
while (it.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a comment to explain what we're doing? Looks like we read a set of scan result values and combine their rows into one bug scan result value. Again, the idea with scan query is to limit batch size. Does something else where break the results back into batches? Or, maybe copies these values into a sorter? If copy into a sorter, should that be done here, then later merge the results?

Comment on lines 536 to 539
return queryRunnerFactory.mergeRunners(
Execs.directExecutor(),
runners.stream().map(p -> p.rhs).collect(Collectors.toList())
).run(

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
@Override
public void add(T[] sorterElement)
{
queue.offer(sorterElement);

Check notice

Code scanning / CodeQL

Ignored error status of call

Method add ignores exceptional return value of MinMaxPriorityQueue<T\[\]>.offer.
Copy link

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions bot added the stale label Jan 10, 2024
Copy link

github-actions bot commented Feb 8, 2024

This pull request/issue has been closed due to lack of activity. If you think that
is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Feb 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unsatisfiable WHERE clause leads to exception instead of empty result Modify scanQuery to support orderby
4 participants