Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable querying entirely cold datasources #16676

Merged
merged 32 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ca33d8b
Fix build
findingrish Mar 14, 2024
1abba25
Merge branch 'master' of github.com:findingrish/druid
findingrish Jun 10, 2024
033c420
temp changes
findingrish Jun 20, 2024
96e77c1
Coordinator changes to process schema for cold segments
findingrish Jul 1, 2024
1739dce
Changes in the broker metadata cache to fetch used datasources schema
findingrish Jul 1, 2024
6ed0b8b
Refactor DruidCoordinator, broker metadata cache changes to execute r…
findingrish Jul 2, 2024
966f5ac
Fix tests
findingrish Jul 2, 2024
d586a8e
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 2, 2024
b72b822
Update MetadataResourceTest
findingrish Jul 2, 2024
ad58f9b
minor changes
findingrish Jul 2, 2024
2944959
Fix refresh condition on Broker, add UTs
findingrish Jul 2, 2024
b09df54
Fix test
findingrish Jul 3, 2024
254cea2
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 3, 2024
25d23c6
minor code changes, add test
findingrish Jul 3, 2024
7169b6d
revert changes
findingrish Jul 7, 2024
89e2d64
Fix test
findingrish Jul 7, 2024
ba380ec
checkstyle
findingrish Jul 8, 2024
d3c112c
Update docs
findingrish Jul 8, 2024
debcb42
review comments
findingrish Jul 8, 2024
479382c
Doc changes
findingrish Jul 8, 2024
87113ca
Update threshold for logging cold schema processing stats
findingrish Jul 8, 2024
2a66ae1
Minor changes
findingrish Jul 9, 2024
695ba92
add debug logging
findingrish Jul 9, 2024
a265cf0
Merge hot and cold schema only while querying datasource schema
findingrish Jul 10, 2024
8182f39
Update tests
findingrish Jul 10, 2024
766082e
Update docs
findingrish Jul 10, 2024
2bea1fb
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 10, 2024
8582f5d
Update test
findingrish Jul 10, 2024
7ed2d96
Update tests wiht SegmentReplicationStatus mock
findingrish Jul 10, 2024
47bf4ab
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 11, 2024
8fb0a04
Minor change
findingrish Jul 11, 2024
87a0926
Minor change
findingrish Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
minor changes
  • Loading branch information
findingrish committed Jul 2, 2024
commit ad58f9b7f6795b19a41e0421235b2b8a39fd08e4
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,10 @@ protected void cacheExecLoop()
.plus(config.getMetadataRefreshPeriod())
.isAfterNow();

boolean refresh = refreshCondition();
boolean shouldRefresh =
isServerViewInitialized &&
if (isServerViewInitialized &&
!wasRecentFailure &&
refreshCondition() &&
(refreshImmediately || nextRefresh < System.currentTimeMillis());
if (shouldRefresh) {
(refreshImmediately || nextRefresh < System.currentTimeMillis())) {
// We need to do a refresh. Break out of the waiting loop.
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private final SegmentReplicationStatusManager segmentReplicationStatusManager;

// Datasource schema for
// Schema for datasources from cold segments
private final ConcurrentMap<String, DataSourceInformation> coldDatasourceSchema = new ConcurrentHashMap<>();
private final long coldDatasourceSchemaExecDurationMillis;
private final ScheduledExecutorService coldDSScehmaExec;
Expand Down Expand Up @@ -500,7 +500,6 @@ private Set<SegmentId> filterSegmentWithCachedSchema(Set<SegmentId> segmentIds)

protected void coldDatasourceSchemaExec()
{
log.info("ColdDatasourceSchemaExec");
Collection<ImmutableDruidDataSource> immutableDataSources =
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments();

Expand Down Expand Up @@ -534,18 +533,29 @@ protected void coldDatasourceSchemaExec()
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);

RowSignature coldDSSignature = builder.build();
log.info("Schema for cold ds is %s %s", dataSourceName, coldDSSignature);
coldDatasourceSchema.put(dataSourceName, new DataSourceInformation(dataSourceName, coldDSSignature));
RowSignature coldSignature = builder.build();

log.debug("[%s] signature from cold segments is [%s]", dataSourceName, coldSignature);

coldDatasourceSchema.put(dataSourceName, new DataSourceInformation(dataSourceName, coldSignature));

// update tables map with merged schema
tables.computeIfPresent(
dataSourceName,
(ds, info) ->
new DataSourceInformation(
dataSourceName,
mergeHotAndColdSchema(info.getRowSignature(), coldDSSignature)
)
(ds, info) -> {
RowSignature mergedSignature = mergeHotAndColdSchema(info.getRowSignature(), coldSignature);

if (!info.getRowSignature().equals(mergedSignature)) {
log.info(
"[%s] has new merged signature: %s. hot signature [%s], cold signature [%s].",
ds, mergedSignature, info.getRowSignature(), coldSignature
);
} else {
log.debug("[%s] merged signature is unchanged.", ds);
}

return new DataSourceInformation(ds, mergedSignature);
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1803,8 +1803,7 @@ public void testSameSegmentAddedOnMultipleServer() throws InterruptedException,
Assert.assertEquals(existingMetadata.getNumReplicas(), currentMetadata.getNumReplicas());
}

@Test
public void testColdDatasourceSchema() throws IOException
private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()
{
DataSegment coldSegment =
DataSegment.builder()
Expand Down Expand Up @@ -1840,7 +1839,7 @@ public void testColdDatasourceSchema() throws IOException

Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.singletonList(druidDataSource));

CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
return new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
Expand All @@ -1853,6 +1852,12 @@ public void testColdDatasourceSchema() throws IOException
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
);
}

@Test
public void testColdDatasourceSchema_mergeSchemaInRefresh() throws IOException
{
CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest();

schema.coldDatasourceSchemaExec();

Expand All @@ -1873,6 +1878,29 @@ public void testColdDatasourceSchema() throws IOException
Assert.assertEquals(ColumnType.LONG, rowSignature.getColumnType(columnNames.get(7)).get());
}

@Test
public void testColdDatasourceSchema_mergeSchemaInColdSchemaExec() throws IOException
{
CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest();

Set<SegmentId> segmentIds = new HashSet<>();
segmentIds.add(segment1.getId());
segmentIds.add(segment2.getId());

schema.refresh(segmentIds, new HashSet<>());
schema.coldDatasourceSchemaExec();

verifyFooDSSchema(schema, 8);
RowSignature rowSignature = schema.getDatasource("foo").getRowSignature();

List<String> columnNames = rowSignature.getColumnNames();
Assert.assertEquals("c1", columnNames.get(6));
Assert.assertEquals(ColumnType.STRING, rowSignature.getColumnType(columnNames.get(6)).get());

Assert.assertEquals("c2", columnNames.get(7));
Assert.assertEquals(ColumnType.LONG, rowSignature.getColumnType(columnNames.get(7)).get());
}

private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.SegmentReplicationStatusManager;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthTestUtils;
Expand Down
Loading