diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index fdf16b2ac505..edeb16665ba4 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -69,4 +69,9 @@ public interface CoordinatorClient * Returns a new instance backed by a ServiceClient which follows the provided retryPolicy */ CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy); + + /** + * Retrieves list of datasources with used segments. + */ + ListenableFuture> fetchDataSourcesWithUsedSegments(); } diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java index 4c795c9dbd47..fc3deee12ed3 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java @@ -188,4 +188,17 @@ public CoordinatorClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy) { return new CoordinatorClientImpl(client.withRetryPolicy(retryPolicy), jsonMapper); } + + @Override + public ListenableFuture> fetchDataSourcesWithUsedSegments() + { + final String path = "/druid/coordinator/v1/metadata/datasources"; + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.GET, path), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference>() {}) + ); + } } diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index 9cb2297db828..88e6ee97b983 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -200,7 +200,7 @@ public abstract class AbstractSegmentMetadataCache tables = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap tables = new ConcurrentHashMap<>(); /** * This lock coordinates the access from multiple threads to those variables guarded by this lock. @@ -269,9 +269,10 @@ protected void cacheExecLoop() final boolean wasRecentFailure = DateTimes.utc(lastFailure) .plus(config.getMetadataRefreshPeriod()) .isAfterNow(); + if (isServerViewInitialized && !wasRecentFailure && - (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && + shouldRefresh() && (refreshImmediately || nextRefresh < System.currentTimeMillis())) { // We need to do a refresh. Break out of the waiting loop. break; @@ -334,6 +335,7 @@ protected void cacheExecLoop() } } + /** * Lifecycle start method. */ @@ -361,6 +363,15 @@ public void refreshWaitCondition() throws InterruptedException // noop } + /** + * Refresh is executed only when there are segments or datasources needing refresh. + */ + @SuppressWarnings("GuardedBy") + protected boolean shouldRefresh() + { + return (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()); + } + public void awaitInitialization() throws InterruptedException { initialized.await(); @@ -373,6 +384,7 @@ public void awaitInitialization() throws InterruptedException * * @return schema information for the given datasource */ + @Nullable public T getDatasource(String name) { return tables.get(name); diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index dad0b78ea778..3a4f548b8ba9 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -20,19 +20,27 @@ package org.apache.druid.segment.metadata; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import org.apache.druid.client.CoordinatorServerView; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.InternalQueryConfig; import org.apache.druid.client.ServerView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.metadata.SegmentsMetadataManagerConfig; +import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.segment.SchemaPayloadPlus; @@ -41,21 +49,30 @@ import org.apache.druid.segment.realtime.appenderator.SegmentSchemas; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordinator.loading.SegmentReplicaCount; +import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus; import org.apache.druid.server.security.Escalator; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -71,17 +88,36 @@ *
  • Metadata query is executed only for those non-realtime segments for which the schema is not cached.
  • *
  • Datasources marked for refresh are then rebuilt.
* + *

+ * It is important to note that the datasource schema returned in {@link #getDatasource} & {@link #getDataSourceInformationMap()} + * also includes columns from cold segments. + * Cold segments are processed in a separate thread and datasource schema from cold segments is separately stored. + *

*/ @ManageLifecycle public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCache { private static final EmittingLogger log = new EmittingLogger(CoordinatorSegmentMetadataCache.class); + private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L; + private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS = TimeUnit.SECONDS.toMillis(50); private final SegmentMetadataCacheConfig config; private final ColumnTypeMergePolicy columnTypeMergePolicy; private final SegmentSchemaCache segmentSchemaCache; private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue; + private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager; + private volatile SegmentReplicationStatus segmentReplicationStatus = null; + + // Datasource schema built from only cold segments. + private final ConcurrentHashMap coldSchemaTable = new ConcurrentHashMap<>(); + + // Period for cold schema processing thread. This is a multiple of segment polling period. + // Cold schema processing runs slower than the segment poll to save processing cost of all segments. + // The downside is a delay in columns from cold segment reflecting in the datasource schema. + private final long coldSchemaExecPeriodMillis; + private final ScheduledExecutorService coldSchemaExec; private @Nullable Future cacheExecFuture = null; + private @Nullable Future coldSchemaExecFuture = null; @Inject public CoordinatorSegmentMetadataCache( @@ -92,7 +128,9 @@ public CoordinatorSegmentMetadataCache( InternalQueryConfig internalQueryConfig, ServiceEmitter emitter, SegmentSchemaCache segmentSchemaCache, - SegmentSchemaBackFillQueue segmentSchemaBackfillQueue + SegmentSchemaBackFillQueue segmentSchemaBackfillQueue, + SqlSegmentsMetadataManager sqlSegmentsMetadataManager, + Supplier segmentsMetadataManagerConfigSupplier ) { super(queryLifecycleFactory, config, escalator, internalQueryConfig, emitter); @@ -100,6 +138,15 @@ public CoordinatorSegmentMetadataCache( this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy(); this.segmentSchemaCache = segmentSchemaCache; this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue; + this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager; + this.coldSchemaExecPeriodMillis = + segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis() * COLD_SCHEMA_PERIOD_MULTIPLIER; + coldSchemaExec = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("DruidColdSchema-ScheduledExecutor-%d") + .setDaemon(false) + .build() + ); initServerViewTimelineCallback(serverView); } @@ -168,11 +215,15 @@ public void stop() { callbackExec.shutdownNow(); cacheExec.shutdownNow(); + coldSchemaExec.shutdownNow(); segmentSchemaCache.onLeaderStop(); segmentSchemaBackfillQueue.onLeaderStop(); if (cacheExecFuture != null) { cacheExecFuture.cancel(true); } + if (coldSchemaExecFuture != null) { + coldSchemaExecFuture.cancel(true); + } } public void onLeaderStart() @@ -181,6 +232,12 @@ public void onLeaderStart() try { segmentSchemaBackfillQueue.onLeaderStart(); cacheExecFuture = cacheExec.submit(this::cacheExecLoop); + coldSchemaExecFuture = coldSchemaExec.schedule( + this::coldDatasourceSchemaExec, + coldSchemaExecPeriodMillis, + TimeUnit.MILLISECONDS + ); + if (config.isAwaitInitializationOnStart()) { awaitInitialization(); } @@ -196,6 +253,9 @@ public void onLeaderStop() if (cacheExecFuture != null) { cacheExecFuture.cancel(true); } + if (coldSchemaExecFuture != null) { + coldSchemaExecFuture.cancel(true); + } segmentSchemaCache.onLeaderStop(); segmentSchemaBackfillQueue.onLeaderStop(); } @@ -209,6 +269,11 @@ public synchronized void refreshWaitCondition() throws InterruptedException segmentSchemaCache.awaitInitialization(); } + public void updateSegmentReplicationStatus(SegmentReplicationStatus segmentReplicationStatus) + { + this.segmentReplicationStatus = segmentReplicationStatus; + } + @Override protected void unmarkSegmentAsMutable(SegmentId segmentId) { @@ -336,6 +401,62 @@ public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, S return availableSegmentMetadata; } + @Override + public DataSourceInformation getDatasource(String name) + { + return getMergedDatasourceInformation(tables.get(name), coldSchemaTable.get(name)).orElse(null); + } + + @Override + public Map getDataSourceInformationMap() + { + Map hot = new HashMap<>(tables); + Map cold = new HashMap<>(coldSchemaTable); + Set combinedDatasources = new HashSet<>(hot.keySet()); + combinedDatasources.addAll(cold.keySet()); + ImmutableMap.Builder combined = ImmutableMap.builder(); + + for (String dataSource : combinedDatasources) { + getMergedDatasourceInformation(hot.get(dataSource), cold.get(dataSource)) + .ifPresent(merged -> combined.put( + dataSource, + merged + )); + } + + return combined.build(); + } + + private Optional getMergedDatasourceInformation( + final DataSourceInformation hot, + final DataSourceInformation cold + ) + { + if (hot == null && cold == null) { + return Optional.empty(); + } else if (hot != null && cold == null) { + return Optional.of(hot); + } else if (hot == null && cold != null) { + return Optional.of(cold); + } else { + final Map columnTypes = new LinkedHashMap<>(); + + List signatures = new ArrayList<>(); + // hot datasource schema takes precedence + signatures.add(hot.getRowSignature()); + signatures.add(cold.getRowSignature()); + + for (RowSignature signature : signatures) { + mergeRowSignature(columnTypes, signature); + } + + final RowSignature.Builder builder = RowSignature.builder(); + columnTypes.forEach(builder::add); + + return Optional.of(new DataSourceInformation(hot.getDataSource(), builder.build())); + } + } + /** * Executes SegmentMetadataQuery to fetch schema information for each segment in the refresh list. * The schema information for individual segments is combined to construct a table schema, which is then cached. @@ -382,6 +503,7 @@ public void refresh(final Set segmentsToRefresh, final Set da // Rebuild the datasources. for (String dataSource : dataSourcesToRebuild) { final RowSignature rowSignature = buildDataSourceRowSignature(dataSource); + if (rowSignature == null) { log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", dataSource); tables.remove(dataSource); @@ -419,6 +541,94 @@ private Set filterSegmentWithCachedSchema(Set segmentIds) return cachedSegments; } + @Nullable + private Integer getReplicationFactor(SegmentId segmentId) + { + if (segmentReplicationStatus == null) { + return null; + } + SegmentReplicaCount replicaCountsInCluster = segmentReplicationStatus.getReplicaCountsInCluster(segmentId); + return replicaCountsInCluster == null ? null : replicaCountsInCluster.required(); + } + + @VisibleForTesting + protected void coldDatasourceSchemaExec() + { + Stopwatch stopwatch = Stopwatch.createStarted(); + + Set dataSourceWithColdSegmentSet = new HashSet<>(); + + int datasources = 0; + int segments = 0; + int dataSourceWithColdSegments = 0; + + Collection immutableDataSources = + sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments(); + + for (ImmutableDruidDataSource dataSource : immutableDataSources) { + datasources++; + Collection dataSegments = dataSource.getSegments(); + + final Map columnTypes = new LinkedHashMap<>(); + + for (DataSegment segment : dataSegments) { + Integer replicationFactor = getReplicationFactor(segment.getId()); + if (replicationFactor != null && replicationFactor != 0) { + continue; + } + Optional optionalSchema = segmentSchemaCache.getSchemaForSegment(segment.getId()); + if (optionalSchema.isPresent()) { + RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); + mergeRowSignature(columnTypes, rowSignature); + } + segments++; + } + + if (columnTypes.isEmpty()) { + // this datasource doesn't have any cold segment + continue; + } + + final RowSignature.Builder builder = RowSignature.builder(); + columnTypes.forEach(builder::add); + + RowSignature coldSignature = builder.build(); + + String dataSourceName = dataSource.getName(); + dataSourceWithColdSegmentSet.add(dataSourceName); + dataSourceWithColdSegments++; + + log.debug("[%s] signature from cold segments is [%s]", dataSourceName, coldSignature); + + coldSchemaTable.put(dataSourceName, new DataSourceInformation(dataSourceName, coldSignature)); + } + + // remove any stale datasource from the map + coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet); + + String executionStatsLog = StringUtils.format( + "Cold schema processing took [%d] millis. " + + "Processed total [%d] datasources, [%d] segments. Found [%d] datasources with cold segments.", + stopwatch.millisElapsed(), datasources, segments, dataSourceWithColdSegments + ); + if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) { + log.info(executionStatsLog); + } else { + log.debug(executionStatsLog); + } + } + + private void mergeRowSignature(final Map columnTypes, final RowSignature signature) + { + for (String column : signature.getColumnNames()) { + final ColumnType columnType = + signature.getColumnType(column) + .orElseThrow(() -> new ISE("Encountered null type for column [%s]", column)); + + columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType)); + } + } + @VisibleForTesting @Nullable @Override @@ -434,13 +644,7 @@ public RowSignature buildDataSourceRowSignature(final String dataSource) Optional optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId); if (optionalSchema.isPresent()) { RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); - for (String column : rowSignature.getColumnNames()) { - final ColumnType columnType = - rowSignature.getColumnType(column) - .orElseThrow(() -> new ISE("Encountered null type for column [%s]", column)); - - columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType)); - } + mergeRowSignature(columnTypes, rowSignature); } else { // mark it for refresh, however, this case shouldn't arise by design markSegmentAsNeedRefresh(segmentId); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 36cfac8089c4..9710bda79b44 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -816,6 +816,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { broadcastSegments = params.getBroadcastSegments(); segmentReplicationStatus = params.getSegmentReplicationStatus(); + if (coordinatorSegmentMetadataCache != null) { + coordinatorSegmentMetadataCache.updateSegmentReplicationStatus(segmentReplicationStatus); + } // Collect stats for unavailable and under-replicated segments final CoordinatorRunStats stats = params.getCoordinatorStats(); diff --git a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java index 5aee343a851b..58f5af58a3e7 100644 --- a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java +++ b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java @@ -75,4 +75,10 @@ public CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy) // Ignore retryPolicy for the test client. return this; } + + @Override + public ListenableFuture> fetchDataSourcesWithUsedSegments() + { + throw new UnsupportedOperationException(); + } } diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java index 81f65acf84ae..4cc4ac38184b 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java @@ -20,6 +20,8 @@ package org.apache.druid.segment.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import org.apache.druid.client.BrokerServerView; @@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.metadata.SegmentsMetadataManagerConfig; +import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.TableDataSource; @@ -61,16 +65,19 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; +import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.mockito.Mockito; import javax.annotation.Nullable; import java.io.File; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -101,6 +108,8 @@ public class CoordinatorSegmentDataCacheConcurrencyTest extends SegmentMetadataC private TestSegmentMetadataQueryWalker walker; private SegmentSchemaCache segmentSchemaCache; private SegmentSchemaBackFillQueue backFillQueue; + private SqlSegmentsMetadataManager sqlSegmentsMetadataManager; + private Supplier segmentsMetadataManagerConfigSupplier; private final ObjectMapper mapper = TestHelper.makeJsonMapper(); @Before @@ -190,6 +199,12 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) } ); + sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class); + Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList()); + SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class); + Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000)); + segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig); + inventoryView.init(); initLatch.await(); exec = Execs.multiThreaded(4, "DruidSchemaConcurrencyTest-%d"); @@ -227,7 +242,9 @@ public void testSegmentMetadataRefreshAndInventoryViewAddSegmentAndBrokerServerV new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -341,7 +358,9 @@ public void testSegmentMetadataRefreshAndDruidSchemaGetSegmentMetadata() new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index e5b6db1d42df..ef1fb1e8eddf 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -22,11 +22,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.InternalQueryConfig; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; @@ -37,6 +40,8 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SegmentsMetadataManagerConfig; +import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.QueryContexts; @@ -66,6 +71,8 @@ import org.apache.druid.server.QueryResponse; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.loading.SegmentReplicaCount; +import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AllowAllAuthenticator; @@ -74,18 +81,23 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; import org.easymock.EasyMock; +import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import org.skife.jdbi.v2.StatementContext; import java.io.File; import java.io.IOException; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -106,12 +118,19 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad private CoordinatorSegmentMetadataCache runningSchema; private CountDownLatch buildTableLatch = new CountDownLatch(1); private CountDownLatch markDataSourceLatch = new CountDownLatch(1); + private SqlSegmentsMetadataManager sqlSegmentsMetadataManager; + private Supplier segmentsMetadataManagerConfigSupplier; @Before @Override public void setUp() throws Exception { super.setUp(); + sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class); + Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList()); + SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class); + Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000)); + segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig); } @After @@ -132,6 +151,7 @@ public CoordinatorSegmentMetadataCache buildSchemaMarkAndTableLatch() throws Int public CoordinatorSegmentMetadataCache buildSchemaMarkAndTableLatch(SegmentMetadataCacheConfig config) throws InterruptedException { Preconditions.checkState(runningSchema == null); + runningSchema = new CoordinatorSegmentMetadataCache( getQueryLifecycleFactory(walker), serverView, @@ -140,7 +160,9 @@ public CoordinatorSegmentMetadataCache buildSchemaMarkAndTableLatch(SegmentMetad new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -178,7 +200,7 @@ public void testGetTableMap() throws InterruptedException public void testGetTableMapFoo() throws InterruptedException { CoordinatorSegmentMetadataCache schema = buildSchemaMarkAndTableLatch(); - verifyFooDSSchema(schema); + verifyFooDSSchema(schema, 6); } @Test @@ -312,7 +334,9 @@ public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, I new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -523,7 +547,9 @@ public void testSegmentAddedCallbackAddNewHistoricalSegment() throws Interrupted new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -558,6 +584,11 @@ public void testSegmentAddedCallbackAddExistingSegment() throws InterruptedExcep { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(2); + SqlSegmentsMetadataManager sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class); + Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList()); + SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class); + Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000)); + Supplier segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig); CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( getQueryLifecycleFactory(walker), serverView, @@ -566,7 +597,9 @@ public void testSegmentAddedCallbackAddExistingSegment() throws InterruptedExcep new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -605,6 +638,11 @@ public void testSegmentAddedCallbackAddNewRealtimeSegment() throws InterruptedEx { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); + SqlSegmentsMetadataManager sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class); + Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList()); + SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class); + Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000)); + Supplier segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig); CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( getQueryLifecycleFactory(walker), serverView, @@ -613,7 +651,9 @@ public void testSegmentAddedCallbackAddNewRealtimeSegment() throws InterruptedEx new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -649,6 +689,11 @@ public void testSegmentAddedCallbackAddNewBroadcastSegment() throws InterruptedE { String datasource = "newSegmentAddTest"; CountDownLatch addSegmentLatch = new CountDownLatch(1); + SqlSegmentsMetadataManager sqlSegmentsMetadataManager = Mockito.mock(SqlSegmentsMetadataManager.class); + Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList()); + SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class); + Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000)); + Supplier segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig); CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( getQueryLifecycleFactory(walker), serverView, @@ -657,7 +702,9 @@ public void testSegmentAddedCallbackAddNewBroadcastSegment() throws InterruptedE new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -698,7 +745,9 @@ public void testSegmentRemovedCallbackEmptyDataSourceAfterRemove() throws Interr new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -756,7 +805,9 @@ public void testSegmentRemovedCallbackNonEmptyDataSourceAfterRemove() throws Int new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -817,7 +868,9 @@ public void testServerSegmentRemovedCallbackRemoveUnknownSegment() throws Interr new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -852,7 +905,9 @@ public void testServerSegmentRemovedCallbackRemoveBrokerSegment() throws Interru new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -900,7 +955,9 @@ public void testServerSegmentRemovedCallbackRemoveHistoricalSegment() throws Int new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -972,7 +1029,9 @@ public void testRunSegmentMetadataQueryWithContext() throws Exception internalQueryConfig, new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ); Map queryContext = ImmutableMap.of( @@ -1141,7 +1200,9 @@ public void testRefreshShouldEmitMetrics() throws InterruptedException, IOExcept new InternalQueryConfig(), emitter, segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override @@ -1306,7 +1367,9 @@ public void testRealtimeSchemaAnnouncement() throws InterruptedException, IOExce new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override void updateSchemaForRealtimeSegments(SegmentSchemas segmentSchemas) @@ -1385,7 +1448,9 @@ public void testRealtimeSchemaAnnouncementDataSourceSchemaUpdated() throws Inter new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override public void refresh(Set segmentsToRefresh, Set dataSourcesToRebuild) @@ -1565,7 +1630,9 @@ public void testSchemaBackfilling() throws InterruptedException new InternalQueryConfig(), new NoopServiceEmitter(), segmentSchemaCache, - backFillQueue + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier ) { @Override public Set refreshSegmentsForDataSource(String dataSource, Set segments) @@ -1594,7 +1661,7 @@ public void refresh(Set segmentsToRefresh, Set dataSourcesToR Assert.assertEquals(0, refreshCount.get()); // verify that datasource schema is built - verifyFooDSSchema(schema); + verifyFooDSSchema(schema, 6); serverView.addSegment(segment3, ServerType.HISTORICAL); @@ -1721,12 +1788,384 @@ public void testSameSegmentAddedOnMultipleServer() throws InterruptedException, Assert.assertEquals(existingMetadata.getNumReplicas(), currentMetadata.getNumReplicas()); } - private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema) + private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest() + { + // foo has both hot and cold segments + DataSegment coldSegment = + DataSegment.builder() + .dataSource(DATASOURCE1) + .interval(Intervals.of("1998/P2Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + // cold has only cold segments + DataSegment singleColdSegment = + DataSegment.builder() + .dataSource("cold") + .interval(Intervals.of("2000/P2Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + ImmutableMap.Builder segmentStatsMap = new ImmutableMap.Builder<>(); + segmentStatsMap.put(coldSegment.getId(), new SegmentMetadata(20L, "foo-fingerprint")); + segmentStatsMap.put(singleColdSegment.getId(), new SegmentMetadata(20L, "cold-fingerprint")); + ImmutableMap.Builder schemaPayloadMap = new ImmutableMap.Builder<>(); + schemaPayloadMap.put( + "foo-fingerprint", + new SchemaPayload(RowSignature.builder() + .add("dim1", ColumnType.STRING) + .add("c1", ColumnType.STRING) + .add("c2", ColumnType.LONG) + .build()) + ); + schemaPayloadMap.put( + "cold-fingerprint", + new SchemaPayload( + RowSignature.builder() + .add("f1", ColumnType.STRING) + .add("f2", ColumnType.DOUBLE) + .build() + ) + ); + + segmentSchemaCache.updateFinalizedSegmentSchema( + new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build()) + ); + + List druidDataSources = new ArrayList<>(); + Map segmentMap = new HashMap<>(); + segmentMap.put(coldSegment.getId(), coldSegment); + segmentMap.put(segment1.getId(), segment1); + segmentMap.put(segment2.getId(), segment2); + druidDataSources.add(new ImmutableDruidDataSource( + coldSegment.getDataSource(), + Collections.emptyMap(), + segmentMap + )); + druidDataSources.add(new ImmutableDruidDataSource( + singleColdSegment.getDataSource(), + Collections.emptyMap(), + Collections.singletonMap(singleColdSegment.getId(), singleColdSegment) + )); + + Mockito.when( + sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()) + .thenReturn(druidDataSources); + + CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( + getQueryLifecycleFactory(walker), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + segmentSchemaCache, + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier + ); + + SegmentReplicaCount zeroSegmentReplicaCount = Mockito.mock(SegmentReplicaCount.class); + SegmentReplicaCount nonZeroSegmentReplicaCount = Mockito.mock(SegmentReplicaCount.class); + Mockito.when(zeroSegmentReplicaCount.required()).thenReturn(0); + Mockito.when(nonZeroSegmentReplicaCount.required()).thenReturn(1); + SegmentReplicationStatus segmentReplicationStatus = Mockito.mock(SegmentReplicationStatus.class); + Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegment.getId()))) + .thenReturn(zeroSegmentReplicaCount); + Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(singleColdSegment.getId()))) + .thenReturn(zeroSegmentReplicaCount); + Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(segment1.getId()))) + .thenReturn(nonZeroSegmentReplicaCount); + + Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(segment2.getId()))) + .thenReturn(nonZeroSegmentReplicaCount); + + schema.updateSegmentReplicationStatus(segmentReplicationStatus); + schema.updateSegmentReplicationStatus(segmentReplicationStatus); + + return schema; + } + + @Test + public void testColdDatasourceSchema_refreshAfterColdSchemaExec() throws IOException + { + CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(); + + schema.coldDatasourceSchemaExec(); + + Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet()); + + // verify that cold schema for both foo and cold is present + RowSignature fooSignature = schema.getDatasource("foo").getRowSignature(); + List columnNames = fooSignature.getColumnNames(); + + // verify that foo schema doesn't contain columns from hot segments + Assert.assertEquals(3, columnNames.size()); + + Assert.assertEquals("dim1", columnNames.get(0)); + Assert.assertEquals(ColumnType.STRING, fooSignature.getColumnType(columnNames.get(0)).get()); + + Assert.assertEquals("c1", columnNames.get(1)); + Assert.assertEquals(ColumnType.STRING, fooSignature.getColumnType(columnNames.get(1)).get()); + + Assert.assertEquals("c2", columnNames.get(2)); + Assert.assertEquals(ColumnType.LONG, fooSignature.getColumnType(columnNames.get(2)).get()); + + RowSignature coldSignature = schema.getDatasource("cold").getRowSignature(); + columnNames = coldSignature.getColumnNames(); + Assert.assertEquals("f1", columnNames.get(0)); + Assert.assertEquals(ColumnType.STRING, coldSignature.getColumnType(columnNames.get(0)).get()); + + Assert.assertEquals("f2", columnNames.get(1)); + Assert.assertEquals(ColumnType.DOUBLE, coldSignature.getColumnType(columnNames.get(1)).get()); + + Set segmentIds = new HashSet<>(); + segmentIds.add(segment1.getId()); + segmentIds.add(segment2.getId()); + + schema.refresh(segmentIds, new HashSet<>()); + + Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet()); + + coldSignature = schema.getDatasource("cold").getRowSignature(); + columnNames = coldSignature.getColumnNames(); + Assert.assertEquals("f1", columnNames.get(0)); + Assert.assertEquals(ColumnType.STRING, coldSignature.getColumnType(columnNames.get(0)).get()); + + Assert.assertEquals("f2", columnNames.get(1)); + Assert.assertEquals(ColumnType.DOUBLE, coldSignature.getColumnType(columnNames.get(1)).get()); + + // foo now contains schema from both hot and cold segments + verifyFooDSSchema(schema, 8); + RowSignature rowSignature = schema.getDatasource("foo").getRowSignature(); + + // cold columns should be present at the end + columnNames = rowSignature.getColumnNames(); + Assert.assertEquals("c1", columnNames.get(6)); + Assert.assertEquals(ColumnType.STRING, rowSignature.getColumnType(columnNames.get(6)).get()); + + Assert.assertEquals("c2", columnNames.get(7)); + Assert.assertEquals(ColumnType.LONG, rowSignature.getColumnType(columnNames.get(7)).get()); + } + + @Test + public void testColdDatasourceSchema_coldSchemaExecAfterRefresh() throws IOException + { + CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(); + + Set segmentIds = new HashSet<>(); + segmentIds.add(segment1.getId()); + segmentIds.add(segment2.getId()); + + schema.refresh(segmentIds, new HashSet<>()); + // cold datasource shouldn't be present + Assert.assertEquals(Collections.singleton("foo"), schema.getDataSourceInformationMap().keySet()); + + // cold columns shouldn't be present + verifyFooDSSchema(schema, 6); + Assert.assertNull(schema.getDatasource("cold")); + + schema.coldDatasourceSchemaExec(); + + // could datasource should be present now + Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet()); + + RowSignature coldSignature = schema.getDatasource("cold").getRowSignature(); + List columnNames = coldSignature.getColumnNames(); + Assert.assertEquals("f1", columnNames.get(0)); + Assert.assertEquals(ColumnType.STRING, coldSignature.getColumnType(columnNames.get(0)).get()); + + Assert.assertEquals("f2", columnNames.get(1)); + Assert.assertEquals(ColumnType.DOUBLE, coldSignature.getColumnType(columnNames.get(1)).get()); + + // columns from cold datasource should be present + verifyFooDSSchema(schema, 8); + RowSignature rowSignature = schema.getDatasource("foo").getRowSignature(); + + columnNames = rowSignature.getColumnNames(); + Assert.assertEquals("c1", columnNames.get(6)); + Assert.assertEquals(ColumnType.STRING, rowSignature.getColumnType(columnNames.get(6)).get()); + + Assert.assertEquals("c2", columnNames.get(7)); + Assert.assertEquals(ColumnType.LONG, rowSignature.getColumnType(columnNames.get(7)).get()); + } + + @Test + public void testColdDatasourceSchema_verifyStaleDatasourceRemoved() + { + DataSegment coldSegmentAlpha = + DataSegment.builder() + .dataSource("alpha") + .interval(Intervals.of("2000/P2Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + DataSegment coldSegmentBeta = + DataSegment.builder() + .dataSource("beta") + .interval(Intervals.of("2000/P2Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + DataSegment coldSegmentGamma = + DataSegment.builder() + .dataSource("gamma") + .interval(Intervals.of("2000/P2Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + DataSegment hotSegmentGamma = + DataSegment.builder() + .dataSource("gamma") + .interval(Intervals.of("2001/P2Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + ImmutableMap.Builder segmentStatsMap = new ImmutableMap.Builder<>(); + segmentStatsMap.put(coldSegmentAlpha.getId(), new SegmentMetadata(20L, "cold")); + segmentStatsMap.put(coldSegmentBeta.getId(), new SegmentMetadata(20L, "cold")); + segmentStatsMap.put(hotSegmentGamma.getId(), new SegmentMetadata(20L, "hot")); + segmentStatsMap.put(coldSegmentGamma.getId(), new SegmentMetadata(20L, "cold")); + + ImmutableMap.Builder schemaPayloadMap = new ImmutableMap.Builder<>(); + schemaPayloadMap.put( + "cold", + new SchemaPayload(RowSignature.builder() + .add("dim1", ColumnType.STRING) + .add("c1", ColumnType.STRING) + .add("c2", ColumnType.LONG) + .build()) + ); + schemaPayloadMap.put( + "hot", + new SchemaPayload(RowSignature.builder() + .add("c3", ColumnType.STRING) + .add("c4", ColumnType.STRING) + .build()) + ); + segmentSchemaCache.updateFinalizedSegmentSchema( + new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build()) + ); + + List druidDataSources = new ArrayList<>(); + druidDataSources.add( + new ImmutableDruidDataSource( + "alpha", + Collections.emptyMap(), + Collections.singletonMap(coldSegmentAlpha.getId(), coldSegmentAlpha) + ) + ); + + Map gammaSegments = new HashMap<>(); + gammaSegments.put(hotSegmentGamma.getId(), hotSegmentGamma); + gammaSegments.put(coldSegmentGamma.getId(), coldSegmentGamma); + + druidDataSources.add( + new ImmutableDruidDataSource( + "gamma", + Collections.emptyMap(), + gammaSegments + ) + ); + + Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()) + .thenReturn(druidDataSources); + + CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( + getQueryLifecycleFactory(walker), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + segmentSchemaCache, + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier + ); + + SegmentReplicaCount zeroSegmentReplicaCount = Mockito.mock(SegmentReplicaCount.class); + SegmentReplicaCount nonZeroSegmentReplicaCount = Mockito.mock(SegmentReplicaCount.class); + Mockito.when(zeroSegmentReplicaCount.required()).thenReturn(0); + Mockito.when(nonZeroSegmentReplicaCount.required()).thenReturn(1); + SegmentReplicationStatus segmentReplicationStatus = Mockito.mock(SegmentReplicationStatus.class); + Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentAlpha.getId()))) + .thenReturn(zeroSegmentReplicaCount); + Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentBeta.getId()))) + .thenReturn(zeroSegmentReplicaCount); + Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentGamma.getId()))) + .thenReturn(zeroSegmentReplicaCount); + + Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(hotSegmentGamma.getId()))) + .thenReturn(nonZeroSegmentReplicaCount); + + schema.updateSegmentReplicationStatus(segmentReplicationStatus); + + schema.coldDatasourceSchemaExec(); + // alpha has only 1 cold segment + Assert.assertNotNull(schema.getDatasource("alpha")); + // gamma has both hot and cold segment + Assert.assertNotNull(schema.getDatasource("gamma")); + // assert that cold schema for gamma doesn't contain any columns from hot segment + RowSignature rowSignature = schema.getDatasource("gamma").getRowSignature(); + Assert.assertTrue(rowSignature.contains("dim1")); + Assert.assertTrue(rowSignature.contains("c1")); + Assert.assertTrue(rowSignature.contains("c2")); + Assert.assertFalse(rowSignature.contains("c3")); + Assert.assertFalse(rowSignature.contains("c4")); + + Assert.assertEquals(new HashSet<>(Arrays.asList("alpha", "gamma")), schema.getDataSourceInformationMap().keySet()); + + druidDataSources.clear(); + druidDataSources.add( + new ImmutableDruidDataSource( + "beta", + Collections.emptyMap(), + Collections.singletonMap(coldSegmentBeta.getId(), coldSegmentBeta) + ) + ); + + druidDataSources.add( + new ImmutableDruidDataSource( + "gamma", + Collections.emptyMap(), + Collections.singletonMap(hotSegmentGamma.getId(), hotSegmentGamma) + ) + ); + + Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()) + .thenReturn(druidDataSources); + + schema.coldDatasourceSchemaExec(); + Assert.assertNotNull(schema.getDatasource("beta")); + // alpha doesn't have any segments + Assert.assertNull(schema.getDatasource("alpha")); + // gamma just has 1 hot segment + Assert.assertNull(schema.getDatasource("gamma")); + + Assert.assertNull(schema.getDatasource("doesnotexist")); + + Assert.assertEquals(Collections.singleton("beta"), schema.getDataSourceInformationMap().keySet()); + } + + private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) { final DataSourceInformation fooDs = schema.getDatasource("foo"); final RowSignature fooRowSignature = fooDs.getRowSignature(); List columnNames = fooRowSignature.getColumnNames(); - Assert.assertEquals(6, columnNames.size()); + Assert.assertEquals(columns, columnNames.size()); Assert.assertEquals("__time", columnNames.get(0)); Assert.assertEquals(ColumnType.LONG, fooRowSignature.getColumnType(columnNames.get(0)).get()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java index 7974ed460ebb..628b6ea39781 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java @@ -173,6 +173,16 @@ public void stop() callbackExec.shutdownNow(); } + /** + * Execute refresh on the broker in each cycle if CentralizedDatasourceSchema is enabled + * else if there are segments or datasources to be refreshed. + */ + @Override + protected boolean shouldRefresh() + { + return centralizedDatasourceSchemaConfig.isEnabled() || super.shouldRefresh(); + } + /** * Refreshes the set of segments in two steps: *
    @@ -196,6 +206,11 @@ public void refresh(final Set segmentsToRefresh, final Set da // segmentMetadataInfo keys should be a superset of all other sets including datasources to refresh final Set dataSourcesToQuery = new HashSet<>(segmentMetadataInfo.keySet()); + // this is the complete set of datasources polled from the Coordinator + final Set polledDatasources = queryDataSources(); + + dataSourcesToQuery.addAll(polledDatasources); + log.debug("Querying schema for [%s] datasources from Coordinator.", dataSourcesToQuery); // Fetch datasource information from the Coordinator @@ -227,14 +242,7 @@ public void refresh(final Set segmentsToRefresh, final Set da // Remove those datasource for which we received schema from the Coordinator. dataSourcesToRebuild.removeAll(polledDataSourceMetadata.keySet()); - if (centralizedDatasourceSchemaConfig.isEnabled()) { - // this is a hacky way to ensure refresh is executed even if there are no new segments to refresh - // once, CentralizedDatasourceSchema feature is GA, brokers should simply poll schema for all datasources - dataSourcesNeedingRebuild.addAll(segmentMetadataInfo.keySet()); - } else { - dataSourcesNeedingRebuild.clear(); - } - log.debug("DatasourcesNeedingRebuild are [%s]", dataSourcesNeedingRebuild); + dataSourcesNeedingRebuild.clear(); } // Rebuild the datasources. @@ -267,6 +275,23 @@ protected void removeSegmentAction(SegmentId segmentId) // noop, no additional action needed when segment is removed. } + private Set queryDataSources() + { + Set dataSources = new HashSet<>(); + + try { + Set polled = FutureUtils.getUnchecked(coordinatorClient.fetchDataSourcesWithUsedSegments(), true); + if (polled != null) { + dataSources.addAll(polled); + } + } + catch (Exception e) { + log.debug(e, "Failed to query datasources from the Coordinator."); + } + + return dataSources; + } + private Map queryDataSourceInformation(Set dataSourcesToQuery) { Stopwatch stopwatch = Stopwatch.createStarted(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index 23b2759286c5..65610ce99f28 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -74,11 +74,13 @@ import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.TestTimelineServerView; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; +import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -332,6 +334,9 @@ public void testBrokerPollsAllDSSchema() throws InterruptedException ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Set.class); CoordinatorClient coordinatorClient = Mockito.mock(CoordinatorClient.class); Mockito.when(coordinatorClient.fetchDataSourceInformation(argumentCaptor.capture())).thenReturn(Futures.immediateFuture(null)); + + Set datsources = Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, SOME_DATASOURCE, "xyz", "coldDS"); + Mockito.when(coordinatorClient.fetchDataSourcesWithUsedSegments()).thenReturn(Futures.immediateFuture(datsources)); BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, @@ -347,7 +352,7 @@ public void testBrokerPollsAllDSSchema() throws InterruptedException schema.start(); schema.awaitInitialization(); - Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, SOME_DATASOURCE), argumentCaptor.getValue()); + Assert.assertEquals(datsources, argumentCaptor.getValue()); refreshLatch = new CountDownLatch(1); serverView.addSegment(newSegment("xyz", 0), ServerType.HISTORICAL); @@ -355,7 +360,87 @@ public void testBrokerPollsAllDSSchema() throws InterruptedException refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS); // verify that previously refreshed are included in the last coordinator poll - Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, SOME_DATASOURCE, "xyz"), argumentCaptor.getValue()); + Assert.assertEquals(datsources, argumentCaptor.getValue()); + } + + @Test + public void testRefreshOnEachCycleCentralizedDatasourceSchemaEnabled() throws InterruptedException + { + CentralizedDatasourceSchemaConfig config = CentralizedDatasourceSchemaConfig.create(); + config.setEnabled(true); + + serverView = new TestTimelineServerView(walker.getSegments(), Collections.emptyList()); + druidServers = serverView.getDruidServers(); + + BrokerSegmentMetadataCacheConfig metadataCacheConfig = BrokerSegmentMetadataCacheConfig.create("PT1S"); + metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S")); + BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), + new NoopCoordinatorClient(), + config + ) { + @Override + public void refresh(Set segmentsToRefresh, Set dataSourcesToRebuild) + throws IOException + { + super.refresh(segmentsToRefresh, dataSourcesToRebuild); + refreshLatch.countDown(); + } + }; + + // refresh should be executed more than once, with the feature disabled refresh should be executed only once + refreshLatch = new CountDownLatch(3); + schema.start(); + schema.awaitInitialization(); + + refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS); + + Assert.assertEquals(0, refreshLatch.getCount()); + } + + @Test + public void testRefreshOnEachCycleCentralizedDatasourceSchemaDisabled() throws InterruptedException + { + BrokerSegmentMetadataCacheConfig metadataCacheConfig = BrokerSegmentMetadataCacheConfig.create("PT1S"); + metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S")); + + serverView = new TestTimelineServerView(walker.getSegments(), Collections.emptyList()); + druidServers = serverView.getDruidServers(); + + BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), + new NoopCoordinatorClient(), + CentralizedDatasourceSchemaConfig.create() + ) { + @Override + public void refresh(Set segmentsToRefresh, Set dataSourcesToRebuild) + throws IOException + { + super.refresh(segmentsToRefresh, dataSourcesToRebuild); + refreshLatch.countDown(); + } + }; + + // refresh should be executed only once + refreshLatch = new CountDownLatch(3); + schema.start(); + schema.awaitInitialization(); + + refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS); + + Assert.assertEquals(2, refreshLatch.getCount()); } @Test