Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable querying entirely cold datasources #16676

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

findingrish
Copy link
Contributor

@findingrish findingrish commented Jul 1, 2024

Problem

Currently, datasource schema doesn’t include columns from cold segments. This makes it impossible to query entirely cold datasource.

Approach

  • Mechanism to backfill schema for cold segments in the metadata database. Note, that this is required only for segments created prior to enabling CentralizedDatasourceSchema feature.
  • Update datasource schema building logic on the Coordinator to include schema from cold segments.
  • Make Brokers aware of entirely cold datasource.

Backfill schema for cold segments

Leverage the existing schema backfill flow added as part of CentralizedDatasourceSchema feature. Users are supposed to manually load the cold segments by making their replication factor as 1 and once the schema is backfilled (can be verified from the metadata database) they can unload the segment.

Handling entirely cold datasource

The problem with cold datasource is that Broker just doesn’t know about the datasource if none of the segment are available. So, the datasource wouldn’t even appear on the console for querying.
We need a way for the Brokers to be aware of cold datasource, so that it can fetch its schema from the Coordinator.

Currently, brokers request schema for available datasources from Coordinator in each refresh cycle.
Brokers now poll set of used datasources from the Coordinator first and then request their schema from the Coordinator.

Once Broker has schema for Cold datasources, it will show up in the console and become available for querying.

Key changes

  • CoordinatorSegmentMetadataCache
    • It runs a scheduled thread to fetch used segments and build datasource schema from cold segments. It then merges this schema with datasource schema built using hot segments.
    • The refresh logic is also updated to merge the hot datasource schema with cold schema.
  • BrokerSegmentMetadataCache
    • The refresh condition is slightly updated, refresh is executed in each cycle if the feature is enabled.
    • The refresh logic is also updated to poll used datasources from the Coordinator. This way Broker can fetch cold datasource schema.
  • DruidCoordinator
    • Created a new class SegmentReplicationStatusManager which manages segmentReplicationStatus & broadcastSegments state. This was needed to avoid cyclic dependency between DruidCoordinator and CoordinatorSegmentMetadataCache.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR has some refactors which should be tackled separately, in order to facilitate a smoother review of the core changes here.

return datasourceToUnavailableSegments;
}

public Object2IntMap<String> getDatasourceToDeepStorageQueryOnlySegmentCount()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findingrish , this seems like the only new method that has been added here.
Please remove this new class SegmentReplicationStatusManager and move the code back toDruidCoordinator.

If a refactor is required, please do it in a separate PR.
This PR should focus only on the required changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: It seems that this method had already existed too.
@findingrish , is there any new code in SegmentReplicationStatusManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfaraz There is no new code in SegmentReplicationStatusManager. The reason for refactoring was a cyclic dependency between CoordinatorSegmentMetadataCache and DruidCoordinator while trying to use DruidCoordinator#getSegmentReplicationFactor.

I will raise a separate PR for the refactor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Can you share some more details on how the cyclic dependency is coming into picture?

Copy link
Contributor Author

@findingrish findingrish Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently DruidCoordinator has a dependency on CoordinatorSegmentMetadataCache, for this patch I need to use DruidCoordinator#getSegmentReplicationFactor in CoordinatorSegmentMetadataCache which is resulting in cyclic dependency.

As a solution, I have refactored DruidCoordinator to separate out the code which updates segmentReplicationStatus and broadcastSegments.

Let me know if this solution makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findingrish , you could just expose a method updateSegmentReplicationStatus() on CoordinatorSegmentMetadataCache. Call this method from DruidCoordinator.UpdateReplicationStatus.run() where we update broadcastSegments and segmentReplicationStatus.

Let me know if this works for you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this approach would work for me.
However, it seems bit odd that DruidCoordinator.UpdateReplicationStatus has to additionally update state in some other class, ideally the consumer CoordinatorSegmentMetadataCache should be pulling this information?

Is there a reason to avoid the refactor work?

Copy link
Contributor

@kfaraz kfaraz Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to avoid the refactor work?

Yes, the dependencies are already all over the place which makes the code less readable and also complicates testing. A refactor is needed here but it would have to be thought through a little.

However, it seems bit odd that DruidCoordinator.UpdateReplicationStatus has to additionally update state in some other class,

Not really, you can think of the DruidCoordinator (or rather the UpdateReplicationStatus duty in this case) as sending a notification to the CoordinatorSegmentMetadatCache saying that the segment replication status has been updated. The DruidCoordinator already sends notification to the metadata cache about leadership status, this is another notification in the same vein.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the dependencies are already all over the place which makes the code less readable and also complicates testing. A refactor is needed here but it would have to be thought through a little.

Yes, this makes sense. DruidCoordinator refactoring would need more thought.

Thanks for the suggestion, I will update the patch.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

/**
* Retrieves list of used datasources.
*/
ListenableFuture<Set<String>> fetchUsedDataSources();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the definition of used data sources here.

*/
protected final ConcurrentMap<String, T> tables = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, T> tables = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Just wondering what specific hashMapMethods are you using which required this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started using computeIfAbsent method. The explanation is captured here https://github.com/code-review-checklists/java-concurrency/blob/master/README.md#chm-type.

coldScehmaExec = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("DruidColdSchema-ScheduledExecutor-%d")
.setDaemon(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a demon thread ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update, we don't need a daemon thread here.

@@ -181,6 +220,12 @@ public void onLeaderStart()
try {
segmentSchemaBackfillQueue.onLeaderStart();
cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
coldSchemaExecFuture = coldScehmaExec.schedule(
this::coldDatasourceSchemaExec,
coldSchemaExecPeriodMillis,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason to undocumented these properties.
Do we have any metrics which tell us the performance of these executor service in terms of number of cold segments back filed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any metrics which tell us the performance of these executor service in terms of number of cold segments back filed

We are not backfilling segment here. It is just looping over the segments, identifying cold segment and building their schema.
If the datasource schema is updated it is logged.

coldSchemaTable.keySet().retainAll(dataSources);
}

private RowSignature mergeHotAndColdSchema(RowSignature hot, RowSignature cold)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very surprised you need a new method here. There should be existing logic which does this no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can refactor this a bit to have a single method for merging the RowSignature.

}

// remove any stale datasource from the map
coldSchemaTable.keySet().retainAll(dataSources);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case for this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in CoordinatorSegmentMetadataCacheTest#testColdDatasourceSchema_verifyStaleDatasourceRemoved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants