-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable querying entirely cold datasources #16676
base: master
Are you sure you want to change the base?
Conversation
server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR has some refactors which should be tackled separately, in order to facilitate a smoother review of the core changes here.
return datasourceToUnavailableSegments; | ||
} | ||
|
||
public Object2IntMap<String> getDatasourceToDeepStorageQueryOnlySegmentCount() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findingrish , this seems like the only new method that has been added here.
Please remove this new class SegmentReplicationStatusManager
and move the code back toDruidCoordinator
.
If a refactor is required, please do it in a separate PR.
This PR should focus only on the required changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correction: It seems that this method had already existed too.
@findingrish , is there any new code in SegmentReplicationStatusManager
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kfaraz There is no new code in SegmentReplicationStatusManager
. The reason for refactoring was a cyclic dependency between CoordinatorSegmentMetadataCache
and DruidCoordinator
while trying to use DruidCoordinator#getSegmentReplicationFactor
.
I will raise a separate PR for the refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Can you share some more details on how the cyclic dependency is coming into picture?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently DruidCoordinator
has a dependency on CoordinatorSegmentMetadataCache
, for this patch I need to use DruidCoordinator#getSegmentReplicationFactor
in CoordinatorSegmentMetadataCache
which is resulting in cyclic dependency.
As a solution, I have refactored DruidCoordinator
to separate out the code which updates segmentReplicationStatus
and broadcastSegments
.
Let me know if this solution makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findingrish , you could just expose a method updateSegmentReplicationStatus()
on CoordinatorSegmentMetadataCache
. Call this method from DruidCoordinator.UpdateReplicationStatus.run()
where we update broadcastSegments
and segmentReplicationStatus
.
Let me know if this works for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this approach would work for me.
However, it seems bit odd that DruidCoordinator.UpdateReplicationStatus
has to additionally update state in some other class, ideally the consumer CoordinatorSegmentMetadataCache
should be pulling this information?
Is there a reason to avoid the refactor work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to avoid the refactor work?
Yes, the dependencies are already all over the place which makes the code less readable and also complicates testing. A refactor is needed here but it would have to be thought through a little.
However, it seems bit odd that DruidCoordinator.UpdateReplicationStatus has to additionally update state in some other class,
Not really, you can think of the DruidCoordinator
(or rather the UpdateReplicationStatus
duty in this case) as sending a notification to the CoordinatorSegmentMetadatCache
saying that the segment replication status has been updated. The DruidCoordinator
already sends notification to the metadata cache about leadership status, this is another notification in the same vein.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the dependencies are already all over the place which makes the code less readable and also complicates testing. A refactor is needed here but it would have to be thought through a little.
Yes, this makes sense. DruidCoordinator
refactoring would need more thought.
Thanks for the suggestion, I will update the patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
/** | ||
* Retrieves list of used datasources. | ||
*/ | ||
ListenableFuture<Set<String>> fetchUsedDataSources(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the definition of used data sources here.
*/ | ||
protected final ConcurrentMap<String, T> tables = new ConcurrentHashMap<>(); | ||
protected final ConcurrentHashMap<String, T> tables = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Just wondering what specific hashMapMethods are you using which required this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started using computeIfAbsent
method. The explanation is captured here https://github.com/code-review-checklists/java-concurrency/blob/master/README.md#chm-type.
coldScehmaExec = Executors.newSingleThreadScheduledExecutor( | ||
new ThreadFactoryBuilder() | ||
.setNameFormat("DruidColdSchema-ScheduledExecutor-%d") | ||
.setDaemon(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a demon thread ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will update, we don't need a daemon thread here.
@@ -181,6 +220,12 @@ public void onLeaderStart() | |||
try { | |||
segmentSchemaBackfillQueue.onLeaderStart(); | |||
cacheExecFuture = cacheExec.submit(this::cacheExecLoop); | |||
coldSchemaExecFuture = coldScehmaExec.schedule( | |||
this::coldDatasourceSchemaExec, | |||
coldSchemaExecPeriodMillis, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a specific reason to undocumented these properties.
Do we have any metrics which tell us the performance of these executor service in terms of number of cold segments back filed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any metrics which tell us the performance of these executor service in terms of number of cold segments back filed
We are not backfilling segment here. It is just looping over the segments, identifying cold segment and building their schema.
If the datasource schema is updated it is logged.
coldSchemaTable.keySet().retainAll(dataSources); | ||
} | ||
|
||
private RowSignature mergeHotAndColdSchema(RowSignature hot, RowSignature cold) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am very surprised you need a new method here. There should be existing logic which does this no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can refactor this a bit to have a single method for merging the RowSignature.
} | ||
|
||
// remove any stale datasource from the map | ||
coldSchemaTable.keySet().retainAll(dataSources); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test case for this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in CoordinatorSegmentMetadataCacheTest#testColdDatasourceSchema_verifyStaleDatasourceRemoved
.
Problem
Currently, datasource schema doesn’t include columns from cold segments. This makes it impossible to query entirely cold datasource.
Approach
Backfill schema for cold segments
Leverage the existing schema backfill flow added as part of
CentralizedDatasourceSchema
feature. Users are supposed to manually load the cold segments by making their replication factor as 1 and once the schema is backfilled (can be verified from the metadata database) they can unload the segment.Handling entirely cold datasource
The problem with cold datasource is that Broker just doesn’t know about the datasource if none of the segment are available. So, the datasource wouldn’t even appear on the console for querying.
We need a way for the Brokers to be aware of cold datasource, so that it can fetch its schema from the Coordinator.
Currently, brokers request schema for available datasources from Coordinator in each refresh cycle.
Brokers now poll set of used datasources from the Coordinator first and then request their schema from the Coordinator.
Once Broker has schema for Cold datasources, it will show up in the console and become available for querying.
Key changes
CoordinatorSegmentMetadataCache
BrokerSegmentMetadataCache
DruidCoordinator
SegmentReplicationStatusManager
which managessegmentReplicationStatus
&broadcastSegments
state. This was needed to avoid cyclic dependency betweenDruidCoordinator
andCoordinatorSegmentMetadataCache
.This PR has: