Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable querying entirely cold datasources #16676

Merged
merged 32 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ca33d8b
Fix build
findingrish Mar 14, 2024
1abba25
Merge branch 'master' of github.com:findingrish/druid
findingrish Jun 10, 2024
033c420
temp changes
findingrish Jun 20, 2024
96e77c1
Coordinator changes to process schema for cold segments
findingrish Jul 1, 2024
1739dce
Changes in the broker metadata cache to fetch used datasources schema
findingrish Jul 1, 2024
6ed0b8b
Refactor DruidCoordinator, broker metadata cache changes to execute r…
findingrish Jul 2, 2024
966f5ac
Fix tests
findingrish Jul 2, 2024
d586a8e
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 2, 2024
b72b822
Update MetadataResourceTest
findingrish Jul 2, 2024
ad58f9b
minor changes
findingrish Jul 2, 2024
2944959
Fix refresh condition on Broker, add UTs
findingrish Jul 2, 2024
b09df54
Fix test
findingrish Jul 3, 2024
254cea2
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 3, 2024
25d23c6
minor code changes, add test
findingrish Jul 3, 2024
7169b6d
revert changes
findingrish Jul 7, 2024
89e2d64
Fix test
findingrish Jul 7, 2024
ba380ec
checkstyle
findingrish Jul 8, 2024
d3c112c
Update docs
findingrish Jul 8, 2024
debcb42
review comments
findingrish Jul 8, 2024
479382c
Doc changes
findingrish Jul 8, 2024
87113ca
Update threshold for logging cold schema processing stats
findingrish Jul 8, 2024
2a66ae1
Minor changes
findingrish Jul 9, 2024
695ba92
add debug logging
findingrish Jul 9, 2024
a265cf0
Merge hot and cold schema only while querying datasource schema
findingrish Jul 10, 2024
8182f39
Update tests
findingrish Jul 10, 2024
766082e
Update docs
findingrish Jul 10, 2024
2bea1fb
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 10, 2024
8582f5d
Update test
findingrish Jul 10, 2024
7ed2d96
Update tests wiht SegmentReplicationStatus mock
findingrish Jul 10, 2024
47bf4ab
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 11, 2024
8fb0a04
Minor change
findingrish Jul 11, 2024
87a0926
Minor change
findingrish Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix refresh condition on Broker, add UTs
  • Loading branch information
findingrish committed Jul 2, 2024
commit 2944959647bdb91e6f4336feb4e3e3a827e23099
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,5 @@ public interface CoordinatorClient
/**
* Retrieves list of used datasources.
*/
ListenableFuture<List<String>> fetchDataSources();
ListenableFuture<Set<String>> fetchUsedDataSources();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the definition of used data sources here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the method name to fetchDatasourcesWithUsedSegments to make it more understandable.
I don't think we need to document about what used segments means, since it is widely referred in the code and docs. For example, here https://druid.apache.org/docs/latest/api-reference/data-management-api/#mark-a-single-segment-as-used.

}
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ public CoordinatorClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
}

@Override
public ListenableFuture<List<String>> fetchDataSources()
public ListenableFuture<Set<String>> fetchUsedDataSources()
{
final String path = "/druid/coordinator/v1/metadata/datasources";
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<List<String>>() {})
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<Set<String>>() {})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ protected void cacheExecLoop()

if (isServerViewInitialized &&
!wasRecentFailure &&
refreshCondition() &&
shouldRefresh() &&
(refreshImmediately || nextRefresh < System.currentTimeMillis())) {
// We need to do a refresh. Break out of the waiting loop.
break;
Expand Down Expand Up @@ -364,11 +364,10 @@ public void refreshWaitCondition() throws InterruptedException
// noop
}

public boolean refreshCondition()
@SuppressWarnings("GuardedBy")
public boolean shouldRefresh()
{
synchronized (lock) {
return (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty());
}
return (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty());
}

public void awaitInitialization() throws InterruptedException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
}

@Override
public ListenableFuture<List<String>> fetchDataSources()
public ListenableFuture<Set<String>> fetchUsedDataSources()
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ public void stop()
}

/**
* Execute refresh on the broker in each cycle if CentralizedDatasourceSchema is enabled.
* Execute refresh on the broker in each cycle if CentralizedDatasourceSchema is enabled
* else if there are segments or datasources to be refreshed.
*/
@Override
public boolean refreshCondition()
public boolean shouldRefresh()
{
return centralizedDatasourceSchemaConfig.isEnabled();
return centralizedDatasourceSchemaConfig.isEnabled() || super.shouldRefresh();
}

/**
Expand All @@ -206,7 +207,7 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
final Set<String> dataSourcesToQuery = new HashSet<>(segmentMetadataInfo.keySet());

// this is the complete set of datasources polled from the Coordinator
final List<String> polledDatasources = queryDataSources();
final Set<String> polledDatasources = queryDataSources();

dataSourcesToQuery.addAll(polledDatasources);

Expand Down Expand Up @@ -274,12 +275,15 @@ protected void removeSegmentAction(SegmentId segmentId)
// noop, no additional action needed when segment is removed.
}

private List<String> queryDataSources()
private Set<String> queryDataSources()
{
List<String> dataSources = null;
Set<String> dataSources = new HashSet<>();

try {
dataSources = FutureUtils.getUnchecked(coordinatorClient.fetchDataSources(), true);
Set<String> polled = FutureUtils.getUnchecked(coordinatorClient.fetchUsedDataSources(), true);
if (polled != null) {
dataSources.addAll(polled);
}
}
catch (Exception e) {
log.debug(e, "Failed to query datasources from the Coordinator.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestTimelineServerView;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -332,6 +334,9 @@ public void testBrokerPollsAllDSSchema() throws InterruptedException
ArgumentCaptor<Set<String>> argumentCaptor = ArgumentCaptor.forClass(Set.class);
CoordinatorClient coordinatorClient = Mockito.mock(CoordinatorClient.class);
Mockito.when(coordinatorClient.fetchDataSourceInformation(argumentCaptor.capture())).thenReturn(Futures.immediateFuture(null));

Set<String> datsources = Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, SOME_DATASOURCE, "xyz", "coldDS");
Mockito.when(coordinatorClient.fetchUsedDataSources()).thenReturn(Futures.immediateFuture(datsources));
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
Expand All @@ -347,15 +352,95 @@ public void testBrokerPollsAllDSSchema() throws InterruptedException
schema.start();
schema.awaitInitialization();

Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, SOME_DATASOURCE), argumentCaptor.getValue());
Assert.assertEquals(datsources, argumentCaptor.getValue());

refreshLatch = new CountDownLatch(1);
serverView.addSegment(newSegment("xyz", 0), ServerType.HISTORICAL);

refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);

// verify that previously refreshed are included in the last coordinator poll
Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, SOME_DATASOURCE, "xyz"), argumentCaptor.getValue());
Assert.assertEquals(datsources, argumentCaptor.getValue());
}

@Test
public void testRefreshOnEachCycleCentralizedDatasourceSchemaEnabled() throws InterruptedException
{
CentralizedDatasourceSchemaConfig config = CentralizedDatasourceSchemaConfig.create();
config.setEnabled(true);

serverView = new TestTimelineServerView(walker.getSegments(), Collections.emptyList());
druidServers = serverView.getDruidServers();

BrokerSegmentMetadataCacheConfig metadataCacheConfig = BrokerSegmentMetadataCacheConfig.create("PT1S");
metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S"));
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
config
) {
@Override
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild)
throws IOException
{
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
refreshLatch.countDown();
}
};

// refresh should be executed more than once, with the feature disabled refresh should be executed only once
refreshLatch = new CountDownLatch(3);
schema.start();
schema.awaitInitialization();

refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);

Assert.assertEquals(0, refreshLatch.getCount());
}

@Test
public void testRefreshOnEachCycleCentralizedDatasourceSchemaDisabled() throws InterruptedException
{
BrokerSegmentMetadataCacheConfig metadataCacheConfig = BrokerSegmentMetadataCacheConfig.create("PT1S");
metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S"));

serverView = new TestTimelineServerView(walker.getSegments(), Collections.emptyList());
druidServers = serverView.getDruidServers();

BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
) {
@Override
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild)
throws IOException
{
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
refreshLatch.countDown();
}
};

// refresh should be executed only once
refreshLatch = new CountDownLatch(3);
schema.start();
schema.awaitInitialization();

refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);

Assert.assertEquals(2, refreshLatch.getCount());
}

@Test
Expand Down
Loading