Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable querying entirely cold datasources #16676

Merged
merged 32 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ca33d8b
Fix build
findingrish Mar 14, 2024
1abba25
Merge branch 'master' of github.com:findingrish/druid
findingrish Jun 10, 2024
033c420
temp changes
findingrish Jun 20, 2024
96e77c1
Coordinator changes to process schema for cold segments
findingrish Jul 1, 2024
1739dce
Changes in the broker metadata cache to fetch used datasources schema
findingrish Jul 1, 2024
6ed0b8b
Refactor DruidCoordinator, broker metadata cache changes to execute r…
findingrish Jul 2, 2024
966f5ac
Fix tests
findingrish Jul 2, 2024
d586a8e
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 2, 2024
b72b822
Update MetadataResourceTest
findingrish Jul 2, 2024
ad58f9b
minor changes
findingrish Jul 2, 2024
2944959
Fix refresh condition on Broker, add UTs
findingrish Jul 2, 2024
b09df54
Fix test
findingrish Jul 3, 2024
254cea2
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 3, 2024
25d23c6
minor code changes, add test
findingrish Jul 3, 2024
7169b6d
revert changes
findingrish Jul 7, 2024
89e2d64
Fix test
findingrish Jul 7, 2024
ba380ec
checkstyle
findingrish Jul 8, 2024
d3c112c
Update docs
findingrish Jul 8, 2024
debcb42
review comments
findingrish Jul 8, 2024
479382c
Doc changes
findingrish Jul 8, 2024
87113ca
Update threshold for logging cold schema processing stats
findingrish Jul 8, 2024
2a66ae1
Minor changes
findingrish Jul 9, 2024
695ba92
add debug logging
findingrish Jul 9, 2024
a265cf0
Merge hot and cold schema only while querying datasource schema
findingrish Jul 10, 2024
8182f39
Update tests
findingrish Jul 10, 2024
766082e
Update docs
findingrish Jul 10, 2024
2bea1fb
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 10, 2024
8582f5d
Update test
findingrish Jul 10, 2024
7ed2d96
Update tests wiht SegmentReplicationStatus mock
findingrish Jul 10, 2024
47bf4ab
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 11, 2024
8fb0a04
Minor change
findingrish Jul 11, 2024
87a0926
Minor change
findingrish Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
minor code changes, add test
  • Loading branch information
findingrish committed Jul 3, 2024
commit 25d23c645ceb6e6f608c4c617373aea1270b60f1
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,11 @@ public void refreshWaitCondition() throws InterruptedException
// noop
}

/**
* Refresh is executed only when there are segments or datasources needing refresh.
*/
@SuppressWarnings("GuardedBy")
public boolean shouldRefresh()
protected boolean shouldRefresh()
{
return (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -100,11 +99,11 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
private final SegmentReplicationStatusManager segmentReplicationStatusManager;

// Schema for datasources from cold segments
private final ConcurrentMap<String, DataSourceInformation> coldDatasourceSchema = new ConcurrentHashMap<>();
private final long coldDatasourceSchemaExecDurationMillis;
private final ScheduledExecutorService coldDSScehmaExec;
private final ConcurrentHashMap<String, DataSourceInformation> coldSchemaTable = new ConcurrentHashMap<>();
private final long coldSchemaExecPeriodMillis;
private final ScheduledExecutorService coldScehmaExec;
private @Nullable Future<?> cacheExecFuture = null;
private @Nullable Future<?> coldDSSchemaExecFuture = null;
private @Nullable Future<?> coldSchemaExecFuture = null;

@Inject
public CoordinatorSegmentMetadataCache(
Expand All @@ -128,11 +127,11 @@ public CoordinatorSegmentMetadataCache(
this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
this.segmentReplicationStatusManager = segmentReplicationStatusManager;
this.coldDatasourceSchemaExecDurationMillis =
this.coldSchemaExecPeriodMillis =
segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis();
coldDSScehmaExec = Executors.newSingleThreadScheduledExecutor(
coldScehmaExec = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("DruidColdDSSchema-ScheduledExecutor-%d")
.setNameFormat("DruidColdSchema-ScheduledExecutor-%d")
.setDaemon(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a demon thread ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update, we don't need a daemon thread here.

.build()
);
Expand Down Expand Up @@ -204,12 +203,15 @@ public void stop()
{
callbackExec.shutdownNow();
cacheExec.shutdownNow();
coldDSScehmaExec.shutdownNow();
coldScehmaExec.shutdownNow();
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
if (coldSchemaExecFuture != null) {
coldSchemaExecFuture.cancel(true);
}
}

public void onLeaderStart()
Expand All @@ -218,9 +220,9 @@ public void onLeaderStart()
try {
segmentSchemaBackfillQueue.onLeaderStart();
cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
coldDSSchemaExecFuture = coldDSScehmaExec.schedule(
coldSchemaExecFuture = coldScehmaExec.schedule(
this::coldDatasourceSchemaExec,
coldDatasourceSchemaExecDurationMillis,
coldSchemaExecPeriodMillis,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason to undocumented these properties.
Do we have any metrics which tell us the performance of these executor service in terms of number of cold segments back filed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any metrics which tell us the performance of these executor service in terms of number of cold segments back filed

We are not backfilling segment here. It is just looping over the segments, identifying cold segment and building their schema.
If the datasource schema is updated it is logged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this exec iterates over all the segments, what things do we have to figure out how much time it took for execution?
Should we publish some summary stats to increase operability.

Copy link
Contributor Author

@findingrish findingrish Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I am logging details if the execution duration is greater than 50 seconds.

TimeUnit.MILLISECONDS
);

Expand All @@ -239,8 +241,8 @@ public void onLeaderStop()
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
if (coldDSSchemaExecFuture != null) {
coldDSSchemaExecFuture.cancel(true);
if (coldSchemaExecFuture != null) {
coldSchemaExecFuture.cancel(true);
}
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
Expand Down Expand Up @@ -387,17 +389,19 @@ public DataSourceInformation getDatasource(String name)
{
DataSourceInformation dataSourceInformation = tables.get(name);
if (dataSourceInformation != null) {
// implies that the datasource is entirely cold
return dataSourceInformation;
}
return coldDatasourceSchema.get(name);
return coldSchemaTable.get(name);
}

@Override
public Map<String, DataSourceInformation> getDataSourceInformationMap()
{
Map<String, DataSourceInformation> copy = new HashMap<>(tables);

for (Map.Entry<String, DataSourceInformation> entry : coldDatasourceSchema.entrySet()) {
for (Map.Entry<String, DataSourceInformation> entry : coldSchemaTable.entrySet()) {
// add entirely cold datasource schema
copy.computeIfAbsent(entry.getKey(), value -> entry.getValue());
}
return ImmutableMap.copyOf(copy);
Expand Down Expand Up @@ -452,7 +456,7 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da

RowSignature mergedSignature = rowSignature;

DataSourceInformation coldDatasourceInformation = coldDatasourceSchema.get(dataSource);
DataSourceInformation coldDatasourceInformation = coldSchemaTable.get(dataSource);
if (coldDatasourceInformation != null) {
if (rowSignature == null) {
mergedSignature = coldDatasourceInformation.getRowSignature();
Expand Down Expand Up @@ -498,6 +502,7 @@ private Set<SegmentId> filterSegmentWithCachedSchema(Set<SegmentId> segmentIds)
return cachedSegments;
}

@VisibleForTesting
protected void coldDatasourceSchemaExec()
{
Collection<ImmutableDruidDataSource> immutableDataSources =
Expand All @@ -513,8 +518,8 @@ protected void coldDatasourceSchemaExec()
Collection<DataSegment> dataSegments = dataSource.getSegments();

for (DataSegment segment : dataSegments) {
if (segmentReplicationStatusManager.getReplicationFactor(segment.getId()) != null
&& segmentReplicationStatusManager.getReplicationFactor(segment.getId()) != 0) {
Integer replicationFactor = segmentReplicationStatusManager.getReplicationFactor(segment.getId());
if (replicationFactor != null && replicationFactor != 0) {
continue;
}
Optional<SchemaPayloadPlus> optionalSchema = segmentSchemaCache.getSchemaForSegment(segment.getId());
Expand All @@ -537,9 +542,10 @@ protected void coldDatasourceSchemaExec()

log.debug("[%s] signature from cold segments is [%s]", dataSourceName, coldSignature);

coldDatasourceSchema.put(dataSourceName, new DataSourceInformation(dataSourceName, coldSignature));
coldSchemaTable.put(dataSourceName, new DataSourceInformation(dataSourceName, coldSignature));

// update tables map with merged schema
// update tables map with merged schema, if signature doesn't exist we do not add entry in this table
// schema for entirely cold datasource is maintained separately
tables.computeIfPresent(
dataSourceName,
(ds, info) -> {
Expand All @@ -560,14 +566,15 @@ protected void coldDatasourceSchemaExec()
}

// remove any stale datasource from the map
coldDatasourceSchema.keySet().retainAll(dataSources);
coldSchemaTable.keySet().retainAll(dataSources);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case for this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in CoordinatorSegmentMetadataCacheTest#testColdDatasourceSchema_verifyStaleDatasourceRemoved.

}

private RowSignature mergeHotAndColdSchema(RowSignature hot, RowSignature cold)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very surprised you need a new method here. There should be existing logic which does this no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can refactor this a bit to have a single method for merging the RowSignature.

{
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();

List<RowSignature> signatures = new ArrayList<>();
// hot datasource schema takes precedence
signatures.add(hot);
signatures.add(cold);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1820,6 +1820,7 @@ private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()
schemaPayloadMap.put(
"fp",
new SchemaPayload(RowSignature.builder()
.add("dim1", ColumnType.STRING)
.add("c1", ColumnType.STRING)
.add("c2", ColumnType.LONG)
.build())
Expand Down Expand Up @@ -1901,6 +1902,89 @@ public void testColdDatasourceSchema_mergeSchemaInColdSchemaExec() throws IOExce
Assert.assertEquals(ColumnType.LONG, rowSignature.getColumnType(columnNames.get(7)).get());
}

@Test
public void testColdDatasourceSchema_verifyStaleDatasourceRemoved()
{
DataSegment coldSegmentAlpha =
DataSegment.builder()
.dataSource("alpha")
.interval(Intervals.of("2000/P2Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();

DataSegment coldSegmentBeta =
DataSegment.builder()
.dataSource("beta")
.interval(Intervals.of("2000/P2Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();

ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new ImmutableMap.Builder<>();
segmentStatsMap.put(coldSegmentAlpha.getId(), new SegmentMetadata(20L, "fp"));
segmentStatsMap.put(coldSegmentBeta.getId(), new SegmentMetadata(20L, "fp"));

ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new ImmutableMap.Builder<>();
schemaPayloadMap.put(
"fp",
new SchemaPayload(RowSignature.builder()
.add("dim1", ColumnType.STRING)
.add("c1", ColumnType.STRING)
.add("c2", ColumnType.LONG)
.build())
);
segmentSchemaCache.updateFinalizedSegmentSchema(
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

Mockito.when(segmentReplicationStatusManager.getReplicationFactor(ArgumentMatchers.eq(coldSegmentAlpha.getId()))).thenReturn(0);
Mockito.when(segmentReplicationStatusManager.getReplicationFactor(ArgumentMatchers.eq(coldSegmentBeta.getId()))).thenReturn(0);

ImmutableDruidDataSource druidDataSource =
new ImmutableDruidDataSource(
coldSegmentAlpha.getDataSource(),
Collections.emptyMap(),
Collections.singletonMap(coldSegmentAlpha.getId(), coldSegmentAlpha)
);

Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
.thenReturn(Collections.singletonList(druidDataSource));

CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
);

schema.coldDatasourceSchemaExec();
Assert.assertNotNull(schema.getDatasource("alpha"));

druidDataSource =
new ImmutableDruidDataSource(
coldSegmentBeta.getDataSource(),
Collections.emptyMap(),
Collections.singletonMap(coldSegmentBeta.getId(), coldSegmentBeta)
);

Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
.thenReturn(Collections.singletonList(druidDataSource));

schema.coldDatasourceSchemaExec();
Assert.assertNotNull(schema.getDatasource("beta"));
Assert.assertNull(schema.getDatasource("alpha"));
}

private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void stop()
* else if there are segments or datasources to be refreshed.
*/
@Override
public boolean shouldRefresh()
protected boolean shouldRefresh()
{
return centralizedDatasourceSchemaConfig.isEnabled() || super.shouldRefresh();
}
Expand Down
Loading