Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add storeCompactionState flag support to msq #15965

Merged
merged 29 commits into from
Apr 9, 2024

Conversation

gargvishesh
Copy link
Contributor

@gargvishesh gargvishesh commented Feb 26, 2024

Compaction in the native engine by default records the state of compaction for each segment in the lastCompactionState segment field. This PR adds support for doing the same in the MSQ engine, targeted for future cases such as REPLACE and compaction done via MSQ.

Note that this PR doesn't implicitly store the compaction state for MSQ replace tasks; it is stored with flag "storeCompactionState": true in the query context.

Release Note

storeCompactionState context flag is now supported for MSQ Replace tasks.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Feb 26, 2024
Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.
We should also add a assert to the compaction config in MSQReplaceTests.

List<String> partitionDimensions
)
{
final boolean storeCompactionState = task.getContextValue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document this in the SQLBasedIngestion context parameter docs saying we support these context parameters in MSQ and link them to the original documentation of storeCompactionState

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just thinking if it would be better to move it to query context instead of task context to enable setting it from the web-console. Any thoughts on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to query context

IndexSpec indexSpec = task.getQuerySpec().getTuningConfig().getIndexSpec();
GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
Map<String, Object> transformSpec = dataSchema.getTransformSpec() == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious to know why this is required for compaction state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these fields are captured when setting compaction state in the native flow.

Tasks.DEFAULT_STORE_COMPACTION_STATE
);

if (storeCompactionState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we checking that the sql statement is a replace ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be part of the logic to set this flag itself. Currently, this PR doesn't incorporate the logic to implicitly set it in REPLACE commands, so this would have to be explicitly set.

Copy link
Contributor

@cryptoe cryptoe Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the flag is set and the user issues an insert command, it should be an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check if its a replace query using :

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

@gargvishesh
Copy link
Contributor Author

gargvishesh commented Feb 26, 2024

Left some comments. We should also add a assert to the compaction config in MSQReplaceTests.

A clarification: this PR doesn't implicitly set the compaction state for all MSQ replace tasks

@kfaraz kfaraz self-requested a review March 5, 2024 05:39
@AmatyaAvadhanula AmatyaAvadhanula self-requested a review March 7, 2024 03:18
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to extract the dataSchema, indexSpec, granularitySpec and (compute) the partitionsSpec, and use a common utility for both native batch and MSQ?

@gargvishesh
Copy link
Contributor Author

Would it be better to extract the dataSchema, indexSpec, granularitySpec and (compute) the partitionsSpec, and use a common utility for both native batch and MSQ?

The bulk of the code is to extract/compute these values themselves, so I think the common utility will be of little value other than just creating the CompactionState object and the transform fn.

@AmatyaAvadhanula
Copy link
Contributor

other than just creating the CompactionState object and the transform fn.

I think we are interested in storing the compaction state to prevent additional compactions from running on an already compacted interval.
If there are two separate methods for the native batch and MSQ based compaction state computations, it's possible that they diverge, leading to unwanted compactions.


@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceSegmentsWithQuarterSegmentGranularity(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter Note test

The parameter 'contextName' is never used.
Copy link
Contributor

@kfaraz kfaraz Apr 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter needs to be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is required and is there in every other test -- though unused.

Copy link
Contributor

@kfaraz kfaraz Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the clarification. The second parameter is used just to name the test in JUnit. There also seems to be a lot of repetition of the annotations.

Edit: Apparently this is the just the way JUnit5 works. There is no way (yet) to parameterize the constructor of the entire test class.

@gargvishesh
Copy link
Contributor Author

I think we are interested in storing the compaction state to prevent additional compactions from running on an already compacted interval.

I've moved the annotation function calculation to a common place now

website/.spelling Outdated Show resolved Hide resolved
Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. Overall lgtm.
@gargvishesh Please test this on a dev local cluster confirming that the compaction does not trigger if we set this flag.
Or triggers if there is a change in the "compaction spec".

} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
partitionSpec = new DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(), null);
} else {
log.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this should be a MSQ error ie throw a MSQ Fault and fail the job since if we do add new shardSpecs to MSQ, we should also add support to store compaction stage. If we donot add code here, the jobs of the user would pass with this error message in the logs. It would require lot of debugging to figure out that we missed adding stuff here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing an MSQException now.

@@ -1715,9 +1726,109 @@ private void publishSegmentsIfNeeded(
//noinspection unchecked
@SuppressWarnings("unchecked")
final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);

Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity();
Copy link
Contributor

@kfaraz kfaraz Apr 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than declaring this function outside the if-else and assigning identity() to it, it should just be declared where necessary. You can make the segments non-final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applying the function in the branch itself now before sending to publish.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gargvishesh , thanks for the changes.

I have left some comments. There are also some from @cryptoe that need to be addressed before this PR can be merged.

I guess at some point, we would need a test that verifies that segments written by MSQ REPLACE are indeed not picked up by compaction if the desired state matches. But maybe we can do that in an integration-test in a follow up PR.

if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)
|| Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
partitionSpec = new DimensionRangePartitionsSpec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using DimensionRangePartitionsSpec for single-dim segments, is it possible that segments partitioned by single-dim would get re-picked by compaction if compaction config has single as the desired state?
I am not entirely sure if we still allow users to use single in the compaction config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code, the equals method compares the class type first before comparing the fields themselves. So you are right: a single type spec should be stored in the corresponding instance. Have updated the handling now. Thanks!

@gargvishesh
Copy link
Contributor Author

@cryptoe @kfaraz
Tested it on local cluster. Storing compaction state and triggering compaction only if states differ are working as expected. However, the defaults for DynamicPartitionsSpec weren't matching with those used by compaction. I've changed the spec creation from using DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(), null) to DynamicPartitionsSpec(null, DynamicPartitionsSpec.DEFAULT_COMPACTION_MAX_TOTAL_ROWS). The original task.getQuerySpec().getTuningConfig().getRowsPerSegment() uses 3M by default and is meant to be target #rows whereas default maxRowsPerSegment used by DynamicPartitionsSpec is 5M.

@cryptoe
Copy link
Contributor

cryptoe commented Apr 2, 2024

ested it on local cluster. Storing compaction state and triggering compaction only if states differ are working as expected. However, the defaults for DynamicPartitionsSpec weren't matching with those used by compaction. I've changed the spec creation from using DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(), null) to DynamicPartitionsSpec(null, DynamicPartitionsSpec.DEFAULT_COMPACTION_MAX_TOTAL_ROWS). The original task.getQuerySpec().getTuningConfig().getRowsPerSegment() uses 3M by default and is meant to be target #rows whereas default maxRowsPerSegment used by DynamicPartitionsSpec is 5M.

Very important catch @gargvishesh . Thank you.
I feel the fix is not clean though. We would have to adjust the shuffle specs in MSQ.
Let me think through a bit more.

docs/multi-stage-query/reference.md Outdated Show resolved Hide resolved
final MSQTuningConfig tuningConfig = task.getQuerySpec().getTuningConfig();
PartitionsSpec partitionSpec;

// There is currently no way of specifying either maxRowsPerSegment or maxTotalRows for an MSQ task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add another line of comment to explain the implications of this fact for the code here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added and relocated to the appropriate line.

} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// There is currently no way of specifying either maxRowsPerSegment or maxTotalRows for an MSQ task.
// Hence using null for both which ends up translating to DEFAULT_MAX_ROWS_PER_SEGMENT for maxRowsPerSegment.
partitionSpec = new DynamicPartitionsSpec(null, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxRowsPerSegment=numRowsPerSegment no ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @kfaraz

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left 2 comments. LGTM otherwise.

PartitionsSpec partitionSpec;

if (Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)) {
String partitionDimension = ((SingleDimensionShardSpec) shardSpec).getDimension();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think shard spec cannot be single in MSQ. Lets just check for Range shard spec.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM!!

@cryptoe cryptoe merged commit 3d595cf into apache:master Apr 9, 2024
85 checks passed
@cryptoe
Copy link
Contributor

cryptoe commented Apr 9, 2024

@gargvishesh Could you please add the release notes in the PR description.

@adarshsanjeev adarshsanjeev added this to the 30.0.0 milestone May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - Documentation Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants