Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable querying entirely cold datasources #16676

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@ public interface CoordinatorClient
* Returns a new instance backed by a ServiceClient which follows the provided retryPolicy
*/
CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy);

/**
* Retrieves list of used datasources.
*/
ListenableFuture<List<String>> fetchDataSources();
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,17 @@ public CoordinatorClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
return new CoordinatorClientImpl(client.withRetryPolicy(retryPolicy), jsonMapper);
}

@Override
public ListenableFuture<List<String>> fetchDataSources()
{
final String path = "/druid/coordinator/v1/metadata/datasources";
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<List<String>>() {})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@
/**
* Map of datasource and generic object extending DataSourceInformation.
* This structure can be accessed by {@link #cacheExec} and {@link #callbackExec} threads.
* It contains schema for datasources with atleast 1 available segment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* It contains schema for datasources with atleast 1 available segment.
* It contains schema for datasources with at least 1 available segment.

*/
protected final ConcurrentMap<String, T> tables = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, T> tables = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Just wondering what specific hashMapMethods are you using which required this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started using computeIfAbsent method. The explanation is captured here https://github.com/code-review-checklists/java-concurrency/blob/master/README.md#chm-type.


/**
* This lock coordinates the access from multiple threads to those variables guarded by this lock.
Expand Down Expand Up @@ -269,10 +270,14 @@
final boolean wasRecentFailure = DateTimes.utc(lastFailure)
.plus(config.getMetadataRefreshPeriod())
.isAfterNow();
if (isServerViewInitialized &&

boolean refresh = refreshCondition();
Fixed Show fixed Hide fixed
boolean shouldRefresh =
isServerViewInitialized &&
!wasRecentFailure &&
(!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
(refreshImmediately || nextRefresh < System.currentTimeMillis())) {
refreshCondition() &&
(refreshImmediately || nextRefresh < System.currentTimeMillis());
if (shouldRefresh) {
// We need to do a refresh. Break out of the waiting loop.
break;
}
Expand Down Expand Up @@ -334,6 +339,7 @@
}
}


/**
* Lifecycle start method.
*/
Expand Down Expand Up @@ -361,6 +367,13 @@
// noop
}

public boolean refreshCondition()
{
synchronized (lock) {
return (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty());
}
}

public void awaitInitialization() throws InterruptedException
{
initialized.await();
Expand All @@ -373,6 +386,7 @@
*
* @return schema information for the given datasource
*/
@Nullable
public T getDatasource(String name)
{
return tables.get(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
package org.apache.druid.segment.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
Expand All @@ -33,6 +37,8 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.segment.SchemaPayloadPlus;
Expand All @@ -41,21 +47,30 @@
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordinator.SegmentReplicationStatusManager;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -81,7 +96,15 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
private final ColumnTypeMergePolicy columnTypeMergePolicy;
private final SegmentSchemaCache segmentSchemaCache;
private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private final SegmentReplicationStatusManager segmentReplicationStatusManager;

// Datasource schema for
private final ConcurrentMap<String, DataSourceInformation> coldDatasourceSchema = new ConcurrentHashMap<>();
private final long coldDatasourceSchemaExecDurationMillis;
private final ScheduledExecutorService coldDSScehmaExec;
private @Nullable Future<?> cacheExecFuture = null;
private @Nullable Future<?> coldDSSchemaExecFuture = null;

@Inject
public CoordinatorSegmentMetadataCache(
Expand All @@ -92,14 +115,27 @@ public CoordinatorSegmentMetadataCache(
InternalQueryConfig internalQueryConfig,
ServiceEmitter emitter,
SegmentSchemaCache segmentSchemaCache,
SegmentSchemaBackFillQueue segmentSchemaBackfillQueue
SegmentSchemaBackFillQueue segmentSchemaBackfillQueue,
SqlSegmentsMetadataManager sqlSegmentsMetadataManager,
SegmentReplicationStatusManager segmentReplicationStatusManager,
Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier
)
{
super(queryLifecycleFactory, config, escalator, internalQueryConfig, emitter);
this.config = config;
this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy();
this.segmentSchemaCache = segmentSchemaCache;
this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
this.segmentReplicationStatusManager = segmentReplicationStatusManager;
this.coldDatasourceSchemaExecDurationMillis =
segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis();
coldDSScehmaExec = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("DruidColdDSSchema-ScheduledExecutor-%d")
.setDaemon(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a demon thread ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update, we don't need a daemon thread here.

.build()
);

initServerViewTimelineCallback(serverView);
}
Expand Down Expand Up @@ -168,6 +204,7 @@ public void stop()
{
callbackExec.shutdownNow();
cacheExec.shutdownNow();
coldDSScehmaExec.shutdownNow();
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
if (cacheExecFuture != null) {
Expand All @@ -181,6 +218,12 @@ public void onLeaderStart()
try {
segmentSchemaBackfillQueue.onLeaderStart();
cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
coldDSSchemaExecFuture = coldDSScehmaExec.schedule(
this::coldDatasourceSchemaExec,
coldDatasourceSchemaExecDurationMillis,
TimeUnit.MILLISECONDS
);

if (config.isAwaitInitializationOnStart()) {
awaitInitialization();
}
Expand All @@ -196,6 +239,9 @@ public void onLeaderStop()
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
if (coldDSSchemaExecFuture != null) {
coldDSSchemaExecFuture.cancel(true);
}
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
}
Expand Down Expand Up @@ -336,6 +382,27 @@ public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, S
return availableSegmentMetadata;
}

@Override
public DataSourceInformation getDatasource(String name)
{
DataSourceInformation dataSourceInformation = tables.get(name);
if (dataSourceInformation != null) {
return dataSourceInformation;
}
return coldDatasourceSchema.get(name);
}

@Override
public Map<String, DataSourceInformation> getDataSourceInformationMap()
{
Map<String, DataSourceInformation> copy = new HashMap<>(tables);

for (Map.Entry<String, DataSourceInformation> entry : coldDatasourceSchema.entrySet()) {
copy.computeIfAbsent(entry.getKey(), value -> entry.getValue());
}
return ImmutableMap.copyOf(copy);
}

/**
* Executes SegmentMetadataQuery to fetch schema information for each segment in the refresh list.
* The schema information for individual segments is combined to construct a table schema, which is then cached.
Expand Down Expand Up @@ -382,13 +449,25 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
// Rebuild the datasources.
for (String dataSource : dataSourcesToRebuild) {
final RowSignature rowSignature = buildDataSourceRowSignature(dataSource);
if (rowSignature == null) {

RowSignature mergedSignature = rowSignature;

DataSourceInformation coldDatasourceInformation = coldDatasourceSchema.get(dataSource);
if (coldDatasourceInformation != null) {
if (rowSignature == null) {
mergedSignature = coldDatasourceInformation.getRowSignature();
} else {
mergedSignature = mergeHotAndColdSchema(rowSignature, coldDatasourceInformation.getRowSignature());
}
}

if (mergedSignature == null) {
log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", dataSource);
tables.remove(dataSource);
continue;
}

DataSourceInformation druidTable = new DataSourceInformation(dataSource, rowSignature);
DataSourceInformation druidTable = new DataSourceInformation(dataSource, mergedSignature);
final DataSourceInformation oldTable = tables.put(dataSource, druidTable);

if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
Expand Down Expand Up @@ -419,6 +498,85 @@ private Set<SegmentId> filterSegmentWithCachedSchema(Set<SegmentId> segmentIds)
return cachedSegments;
}

protected void coldDatasourceSchemaExec()
{
log.info("ColdDatasourceSchemaExec");
Collection<ImmutableDruidDataSource> immutableDataSources =
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments();

final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();

Set<String> dataSources = new HashSet<>();

for (ImmutableDruidDataSource dataSource : immutableDataSources) {
String dataSourceName = dataSource.getName();
dataSources.add(dataSourceName);
Collection<DataSegment> dataSegments = dataSource.getSegments();

for (DataSegment segment : dataSegments) {
if (segmentReplicationStatusManager.getReplicationFactor(segment.getId()) != null
&& segmentReplicationStatusManager.getReplicationFactor(segment.getId()) != 0) {
continue;
}
Optional<SchemaPayloadPlus> optionalSchema = segmentSchemaCache.getSchemaForSegment(segment.getId());
if (optionalSchema.isPresent()) {
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
for (String column : rowSignature.getColumnNames()) {
final ColumnType columnType =
rowSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column));

columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType));
}
}
}

final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);

RowSignature coldDSSignature = builder.build();
log.info("Schema for cold ds is %s %s", dataSourceName, coldDSSignature);
coldDatasourceSchema.put(dataSourceName, new DataSourceInformation(dataSourceName, coldDSSignature));

// update tables map with merged schema
tables.computeIfPresent(
dataSourceName,
(ds, info) ->
new DataSourceInformation(
dataSourceName,
mergeHotAndColdSchema(info.getRowSignature(), coldDSSignature)
)
);
}

// remove any stale datasource from the map
coldDatasourceSchema.keySet().retainAll(dataSources);
}

private RowSignature mergeHotAndColdSchema(RowSignature hot, RowSignature cold)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very surprised you need a new method here. There should be existing logic which does this no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can refactor this a bit to have a single method for merging the RowSignature.

{
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();

List<RowSignature> signatures = new ArrayList<>();
signatures.add(hot);
signatures.add(cold);

for (RowSignature signature : signatures) {
for (String column : signature.getColumnNames()) {
final ColumnType columnType =
signature.getColumnType(column)
.orElseThrow(() -> new ISE("Encountered null type for column [%s]", column));

columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType));
}
}

final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);

return builder.build();
}

@VisibleForTesting
@Nullable
@Override
Expand Down
Loading
Loading