-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable querying entirely cold datasources #16676
base: master
Are you sure you want to change the base?
Changes from 9 commits
ca33d8b
1abba25
033c420
96e77c1
1739dce
6ed0b8b
966f5ac
d586a8e
b72b822
ad58f9b
2944959
b09df54
254cea2
25d23c6
7169b6d
89e2d64
ba380ec
d3c112c
debcb42
479382c
87113ca
2a66ae1
695ba92
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -199,8 +199,9 @@ | |
/** | ||
* Map of datasource and generic object extending DataSourceInformation. | ||
* This structure can be accessed by {@link #cacheExec} and {@link #callbackExec} threads. | ||
* It contains schema for datasources with atleast 1 available segment. | ||
*/ | ||
protected final ConcurrentMap<String, T> tables = new ConcurrentHashMap<>(); | ||
protected final ConcurrentHashMap<String, T> tables = new ConcurrentHashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Just wondering what specific hashMapMethods are you using which required this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I started using |
||
|
||
/** | ||
* This lock coordinates the access from multiple threads to those variables guarded by this lock. | ||
|
@@ -269,10 +270,14 @@ | |
final boolean wasRecentFailure = DateTimes.utc(lastFailure) | ||
.plus(config.getMetadataRefreshPeriod()) | ||
.isAfterNow(); | ||
if (isServerViewInitialized && | ||
|
||
boolean refresh = refreshCondition(); | ||
|
||
boolean shouldRefresh = | ||
isServerViewInitialized && | ||
!wasRecentFailure && | ||
(!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && | ||
(refreshImmediately || nextRefresh < System.currentTimeMillis())) { | ||
refreshCondition() && | ||
(refreshImmediately || nextRefresh < System.currentTimeMillis()); | ||
if (shouldRefresh) { | ||
// We need to do a refresh. Break out of the waiting loop. | ||
break; | ||
} | ||
|
@@ -334,6 +339,7 @@ | |
} | ||
} | ||
|
||
|
||
/** | ||
* Lifecycle start method. | ||
*/ | ||
|
@@ -361,6 +367,13 @@ | |
// noop | ||
} | ||
|
||
public boolean refreshCondition() | ||
{ | ||
synchronized (lock) { | ||
return (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()); | ||
} | ||
} | ||
|
||
public void awaitInitialization() throws InterruptedException | ||
{ | ||
initialized.await(); | ||
|
@@ -373,6 +386,7 @@ | |
* | ||
* @return schema information for the given datasource | ||
*/ | ||
@Nullable | ||
public T getDatasource(String name) | ||
{ | ||
return tables.get(name); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,10 +20,14 @@ | |
package org.apache.druid.segment.metadata; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Supplier; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.google.common.collect.Maps; | ||
import com.google.common.collect.Sets; | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import com.google.inject.Inject; | ||
import org.apache.druid.client.CoordinatorServerView; | ||
import org.apache.druid.client.ImmutableDruidDataSource; | ||
import org.apache.druid.client.InternalQueryConfig; | ||
import org.apache.druid.client.ServerView; | ||
import org.apache.druid.client.TimelineServerView; | ||
|
@@ -33,6 +37,8 @@ | |
import org.apache.druid.java.util.common.lifecycle.LifecycleStop; | ||
import org.apache.druid.java.util.emitter.EmittingLogger; | ||
import org.apache.druid.java.util.emitter.service.ServiceEmitter; | ||
import org.apache.druid.metadata.SegmentsMetadataManagerConfig; | ||
import org.apache.druid.metadata.SqlSegmentsMetadataManager; | ||
import org.apache.druid.query.aggregation.AggregatorFactory; | ||
import org.apache.druid.query.metadata.metadata.SegmentAnalysis; | ||
import org.apache.druid.segment.SchemaPayloadPlus; | ||
|
@@ -41,21 +47,30 @@ | |
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas; | ||
import org.apache.druid.server.QueryLifecycleFactory; | ||
import org.apache.druid.server.coordination.DruidServerMetadata; | ||
import org.apache.druid.server.coordinator.SegmentReplicationStatusManager; | ||
import org.apache.druid.server.security.Escalator; | ||
import org.apache.druid.timeline.DataSegment; | ||
import org.apache.druid.timeline.SegmentId; | ||
|
||
import javax.annotation.Nullable; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.ConcurrentSkipListMap; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
/** | ||
|
@@ -81,7 +96,15 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach | |
private final ColumnTypeMergePolicy columnTypeMergePolicy; | ||
private final SegmentSchemaCache segmentSchemaCache; | ||
private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue; | ||
private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager; | ||
private final SegmentReplicationStatusManager segmentReplicationStatusManager; | ||
|
||
// Datasource schema for | ||
private final ConcurrentMap<String, DataSourceInformation> coldDatasourceSchema = new ConcurrentHashMap<>(); | ||
private final long coldDatasourceSchemaExecDurationMillis; | ||
private final ScheduledExecutorService coldDSScehmaExec; | ||
private @Nullable Future<?> cacheExecFuture = null; | ||
private @Nullable Future<?> coldDSSchemaExecFuture = null; | ||
|
||
@Inject | ||
public CoordinatorSegmentMetadataCache( | ||
|
@@ -92,14 +115,27 @@ public CoordinatorSegmentMetadataCache( | |
InternalQueryConfig internalQueryConfig, | ||
ServiceEmitter emitter, | ||
SegmentSchemaCache segmentSchemaCache, | ||
SegmentSchemaBackFillQueue segmentSchemaBackfillQueue | ||
SegmentSchemaBackFillQueue segmentSchemaBackfillQueue, | ||
SqlSegmentsMetadataManager sqlSegmentsMetadataManager, | ||
SegmentReplicationStatusManager segmentReplicationStatusManager, | ||
Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier | ||
) | ||
{ | ||
super(queryLifecycleFactory, config, escalator, internalQueryConfig, emitter); | ||
this.config = config; | ||
this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy(); | ||
this.segmentSchemaCache = segmentSchemaCache; | ||
this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue; | ||
this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager; | ||
this.segmentReplicationStatusManager = segmentReplicationStatusManager; | ||
this.coldDatasourceSchemaExecDurationMillis = | ||
segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis(); | ||
coldDSScehmaExec = Executors.newSingleThreadScheduledExecutor( | ||
new ThreadFactoryBuilder() | ||
.setNameFormat("DruidColdDSSchema-ScheduledExecutor-%d") | ||
.setDaemon(true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this a demon thread ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will update, we don't need a daemon thread here. |
||
.build() | ||
); | ||
|
||
initServerViewTimelineCallback(serverView); | ||
} | ||
|
@@ -168,6 +204,7 @@ public void stop() | |
{ | ||
callbackExec.shutdownNow(); | ||
cacheExec.shutdownNow(); | ||
coldDSScehmaExec.shutdownNow(); | ||
segmentSchemaCache.onLeaderStop(); | ||
segmentSchemaBackfillQueue.onLeaderStop(); | ||
if (cacheExecFuture != null) { | ||
|
@@ -181,6 +218,12 @@ public void onLeaderStart() | |
try { | ||
segmentSchemaBackfillQueue.onLeaderStart(); | ||
cacheExecFuture = cacheExec.submit(this::cacheExecLoop); | ||
coldDSSchemaExecFuture = coldDSScehmaExec.schedule( | ||
this::coldDatasourceSchemaExec, | ||
coldDatasourceSchemaExecDurationMillis, | ||
TimeUnit.MILLISECONDS | ||
); | ||
|
||
if (config.isAwaitInitializationOnStart()) { | ||
awaitInitialization(); | ||
} | ||
|
@@ -196,6 +239,9 @@ public void onLeaderStop() | |
if (cacheExecFuture != null) { | ||
cacheExecFuture.cancel(true); | ||
} | ||
if (coldDSSchemaExecFuture != null) { | ||
coldDSSchemaExecFuture.cancel(true); | ||
} | ||
segmentSchemaCache.onLeaderStop(); | ||
segmentSchemaBackfillQueue.onLeaderStop(); | ||
} | ||
|
@@ -336,6 +382,27 @@ public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, S | |
return availableSegmentMetadata; | ||
} | ||
|
||
@Override | ||
public DataSourceInformation getDatasource(String name) | ||
{ | ||
DataSourceInformation dataSourceInformation = tables.get(name); | ||
if (dataSourceInformation != null) { | ||
return dataSourceInformation; | ||
} | ||
return coldDatasourceSchema.get(name); | ||
} | ||
|
||
@Override | ||
public Map<String, DataSourceInformation> getDataSourceInformationMap() | ||
{ | ||
Map<String, DataSourceInformation> copy = new HashMap<>(tables); | ||
|
||
for (Map.Entry<String, DataSourceInformation> entry : coldDatasourceSchema.entrySet()) { | ||
copy.computeIfAbsent(entry.getKey(), value -> entry.getValue()); | ||
} | ||
return ImmutableMap.copyOf(copy); | ||
} | ||
|
||
/** | ||
* Executes SegmentMetadataQuery to fetch schema information for each segment in the refresh list. | ||
* The schema information for individual segments is combined to construct a table schema, which is then cached. | ||
|
@@ -382,13 +449,25 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da | |
// Rebuild the datasources. | ||
for (String dataSource : dataSourcesToRebuild) { | ||
final RowSignature rowSignature = buildDataSourceRowSignature(dataSource); | ||
if (rowSignature == null) { | ||
|
||
RowSignature mergedSignature = rowSignature; | ||
|
||
DataSourceInformation coldDatasourceInformation = coldDatasourceSchema.get(dataSource); | ||
if (coldDatasourceInformation != null) { | ||
if (rowSignature == null) { | ||
mergedSignature = coldDatasourceInformation.getRowSignature(); | ||
} else { | ||
mergedSignature = mergeHotAndColdSchema(rowSignature, coldDatasourceInformation.getRowSignature()); | ||
} | ||
} | ||
|
||
if (mergedSignature == null) { | ||
log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", dataSource); | ||
tables.remove(dataSource); | ||
continue; | ||
} | ||
|
||
DataSourceInformation druidTable = new DataSourceInformation(dataSource, rowSignature); | ||
DataSourceInformation druidTable = new DataSourceInformation(dataSource, mergedSignature); | ||
final DataSourceInformation oldTable = tables.put(dataSource, druidTable); | ||
|
||
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { | ||
|
@@ -419,6 +498,85 @@ private Set<SegmentId> filterSegmentWithCachedSchema(Set<SegmentId> segmentIds) | |
return cachedSegments; | ||
} | ||
|
||
protected void coldDatasourceSchemaExec() | ||
{ | ||
log.info("ColdDatasourceSchemaExec"); | ||
Collection<ImmutableDruidDataSource> immutableDataSources = | ||
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments(); | ||
|
||
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>(); | ||
|
||
Set<String> dataSources = new HashSet<>(); | ||
|
||
for (ImmutableDruidDataSource dataSource : immutableDataSources) { | ||
String dataSourceName = dataSource.getName(); | ||
dataSources.add(dataSourceName); | ||
Collection<DataSegment> dataSegments = dataSource.getSegments(); | ||
|
||
for (DataSegment segment : dataSegments) { | ||
if (segmentReplicationStatusManager.getReplicationFactor(segment.getId()) != null | ||
&& segmentReplicationStatusManager.getReplicationFactor(segment.getId()) != 0) { | ||
continue; | ||
} | ||
Optional<SchemaPayloadPlus> optionalSchema = segmentSchemaCache.getSchemaForSegment(segment.getId()); | ||
if (optionalSchema.isPresent()) { | ||
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); | ||
for (String column : rowSignature.getColumnNames()) { | ||
final ColumnType columnType = | ||
rowSignature.getColumnType(column) | ||
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column)); | ||
|
||
columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType)); | ||
} | ||
} | ||
} | ||
|
||
final RowSignature.Builder builder = RowSignature.builder(); | ||
columnTypes.forEach(builder::add); | ||
|
||
RowSignature coldDSSignature = builder.build(); | ||
log.info("Schema for cold ds is %s %s", dataSourceName, coldDSSignature); | ||
coldDatasourceSchema.put(dataSourceName, new DataSourceInformation(dataSourceName, coldDSSignature)); | ||
|
||
// update tables map with merged schema | ||
tables.computeIfPresent( | ||
dataSourceName, | ||
(ds, info) -> | ||
new DataSourceInformation( | ||
dataSourceName, | ||
mergeHotAndColdSchema(info.getRowSignature(), coldDSSignature) | ||
) | ||
); | ||
} | ||
|
||
// remove any stale datasource from the map | ||
coldDatasourceSchema.keySet().retainAll(dataSources); | ||
} | ||
|
||
private RowSignature mergeHotAndColdSchema(RowSignature hot, RowSignature cold) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am very surprised you need a new method here. There should be existing logic which does this no ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can refactor this a bit to have a single method for merging the RowSignature. |
||
{ | ||
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>(); | ||
|
||
List<RowSignature> signatures = new ArrayList<>(); | ||
signatures.add(hot); | ||
signatures.add(cold); | ||
|
||
for (RowSignature signature : signatures) { | ||
for (String column : signature.getColumnNames()) { | ||
final ColumnType columnType = | ||
signature.getColumnType(column) | ||
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column)); | ||
|
||
columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType)); | ||
} | ||
} | ||
|
||
final RowSignature.Builder builder = RowSignature.builder(); | ||
columnTypes.forEach(builder::add); | ||
|
||
return builder.build(); | ||
} | ||
|
||
@VisibleForTesting | ||
@Nullable | ||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.