Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable querying entirely cold datasources #16676

Merged
merged 32 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ca33d8b
Fix build
findingrish Mar 14, 2024
1abba25
Merge branch 'master' of github.com:findingrish/druid
findingrish Jun 10, 2024
033c420
temp changes
findingrish Jun 20, 2024
96e77c1
Coordinator changes to process schema for cold segments
findingrish Jul 1, 2024
1739dce
Changes in the broker metadata cache to fetch used datasources schema
findingrish Jul 1, 2024
6ed0b8b
Refactor DruidCoordinator, broker metadata cache changes to execute r…
findingrish Jul 2, 2024
966f5ac
Fix tests
findingrish Jul 2, 2024
d586a8e
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 2, 2024
b72b822
Update MetadataResourceTest
findingrish Jul 2, 2024
ad58f9b
minor changes
findingrish Jul 2, 2024
2944959
Fix refresh condition on Broker, add UTs
findingrish Jul 2, 2024
b09df54
Fix test
findingrish Jul 3, 2024
254cea2
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 3, 2024
25d23c6
minor code changes, add test
findingrish Jul 3, 2024
7169b6d
revert changes
findingrish Jul 7, 2024
89e2d64
Fix test
findingrish Jul 7, 2024
ba380ec
checkstyle
findingrish Jul 8, 2024
d3c112c
Update docs
findingrish Jul 8, 2024
debcb42
review comments
findingrish Jul 8, 2024
479382c
Doc changes
findingrish Jul 8, 2024
87113ca
Update threshold for logging cold schema processing stats
findingrish Jul 8, 2024
2a66ae1
Minor changes
findingrish Jul 9, 2024
695ba92
add debug logging
findingrish Jul 9, 2024
a265cf0
Merge hot and cold schema only while querying datasource schema
findingrish Jul 10, 2024
8182f39
Update tests
findingrish Jul 10, 2024
766082e
Update docs
findingrish Jul 10, 2024
2bea1fb
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 10, 2024
8582f5d
Update test
findingrish Jul 10, 2024
7ed2d96
Update tests wiht SegmentReplicationStatus mock
findingrish Jul 10, 2024
47bf4ab
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 11, 2024
8fb0a04
Minor change
findingrish Jul 11, 2024
87a0926
Minor change
findingrish Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix test
  • Loading branch information
findingrish committed Jul 7, 2024
commit 89e2d64d1cf49dea115ff7985258eb407893625c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordinator.SegmentReplicationStatusManager;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -506,6 +506,16 @@ private Set<SegmentId> filterSegmentWithCachedSchema(Set<SegmentId> segmentIds)
return cachedSegments;
}

@Nullable
private Integer getReplicationFactor(SegmentId segmentId)
{
if (segmentReplicationStatus == null) {
return null;
}
SegmentReplicaCount replicaCountsInCluster = segmentReplicationStatus.getReplicaCountsInCluster(segmentId);
return replicaCountsInCluster == null ? null : replicaCountsInCluster.required();
}

@VisibleForTesting
protected void coldDatasourceSchemaExec()
{
Expand All @@ -522,7 +532,7 @@ protected void coldDatasourceSchemaExec()
Collection<DataSegment> dataSegments = dataSource.getSegments();

for (DataSegment segment : dataSegments) {
Integer replicationFactor = segmentReplicationStatusManager.getReplicationFactor(segment.getId());
Integer replicationFactor = getReplicationFactor(segment.getId());
if (replicationFactor != null && replicationFactor != 0) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
broadcastSegments = params.getBroadcastSegments();
segmentReplicationStatus = params.getSegmentReplicationStatus();
if (coordinatorSegmentMetadataCache != null) {
coordinatorSegmentMetadataCache.updateSegmentReplicationStatus(segmentReplicationStatus);
}

// Collect stats for unavailable and under-replicated segments
final CoordinatorRunStats stats = params.getCoordinatorStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void incrementQueued(SegmentAction action)
* @param required Number of replicas as required by load or broadcast rules.
* @param numLoadingServers Number of servers that can load replicas of this segment.
*/
void setRequired(int required, int numLoadingServers)
public void setRequired(int required, int numLoadingServers)
{
this.required = required;
this.requiredAndLoadable = Math.min(required, numLoadingServers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.SegmentReplicationStatusManager;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.NoopEscalator;
Expand Down Expand Up @@ -110,7 +109,6 @@ public class CoordinatorSegmentDataCacheConcurrencyTest extends SegmentMetadataC
private SegmentSchemaCache segmentSchemaCache;
private SegmentSchemaBackFillQueue backFillQueue;
private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private SegmentReplicationStatusManager segmentReplicationStatusManager;
private Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;
private final ObjectMapper mapper = TestHelper.makeJsonMapper();

Expand Down Expand Up @@ -206,7 +204,6 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class);
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig);
segmentReplicationStatusManager = Mockito.mock(SegmentReplicationStatusManager.class);

inventoryView.init();
initLatch.await();
Expand Down Expand Up @@ -247,7 +244,6 @@ public void testSegmentMetadataRefreshAndInventoryViewAddSegmentAndBrokerServerV
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -364,7 +360,6 @@ public void testSegmentMetadataRefreshAndDruidSchemaGetSegmentMetadata()
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.SegmentReplicationStatusManager;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AllowAllAuthenticator;
Expand All @@ -95,6 +96,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand All @@ -116,7 +118,6 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
private CountDownLatch buildTableLatch = new CountDownLatch(1);
private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private SegmentReplicationStatusManager segmentReplicationStatusManager;
private Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;

@Before
Expand All @@ -129,7 +130,6 @@ public void setUp() throws Exception
SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class);
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig);
segmentReplicationStatusManager = Mockito.mock(SegmentReplicationStatusManager.class);
}

@After
Expand Down Expand Up @@ -161,7 +161,6 @@ public CoordinatorSegmentMetadataCache buildSchemaMarkAndTableLatch(SegmentMetad
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -336,7 +335,6 @@ public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, I
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -550,7 +548,6 @@ public void testSegmentAddedCallbackAddNewHistoricalSegment() throws Interrupted
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -601,7 +598,6 @@ public void testSegmentAddedCallbackAddExistingSegment() throws InterruptedExcep
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
Mockito.mock(SegmentReplicationStatusManager.class),
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -656,7 +652,6 @@ public void testSegmentAddedCallbackAddNewRealtimeSegment() throws InterruptedEx
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
Mockito.mock(SegmentReplicationStatusManager.class),
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -708,7 +703,6 @@ public void testSegmentAddedCallbackAddNewBroadcastSegment() throws InterruptedE
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -752,7 +746,6 @@ public void testSegmentRemovedCallbackEmptyDataSourceAfterRemove() throws Interr
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -813,7 +806,6 @@ public void testSegmentRemovedCallbackNonEmptyDataSourceAfterRemove() throws Int
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -877,7 +869,6 @@ public void testServerSegmentRemovedCallbackRemoveUnknownSegment() throws Interr
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -915,7 +906,6 @@ public void testServerSegmentRemovedCallbackRemoveBrokerSegment() throws Interru
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -966,7 +956,6 @@ public void testServerSegmentRemovedCallbackRemoveHistoricalSegment() throws Int
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -1041,7 +1030,6 @@ public void testRunSegmentMetadataQueryWithContext() throws Exception
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
);

Expand Down Expand Up @@ -1213,7 +1201,6 @@ public void testRefreshShouldEmitMetrics() throws InterruptedException, IOExcept
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -1381,7 +1368,6 @@ public void testRealtimeSchemaAnnouncement() throws InterruptedException, IOExce
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
Expand Down Expand Up @@ -1463,7 +1449,6 @@ public void testRealtimeSchemaAnnouncementDataSourceSchemaUpdated() throws Inter
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
Expand Down Expand Up @@ -1646,7 +1631,6 @@ public void testSchemaBackfilling() throws InterruptedException
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
Expand Down Expand Up @@ -1829,8 +1813,6 @@ private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

Mockito.when(segmentReplicationStatusManager.getReplicationFactor(ArgumentMatchers.eq(coldSegment.getId()))).thenReturn(0);

ImmutableDruidDataSource druidDataSource =
new ImmutableDruidDataSource(
coldSegment.getDataSource(),
Expand All @@ -1840,7 +1822,7 @@ private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()

Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.singletonList(druidDataSource));

return new CoordinatorSegmentMetadataCache(
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
Expand All @@ -1850,9 +1832,16 @@ private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
);

SegmentReplicaCount segmentReplicaCount = new SegmentReplicaCount();
segmentReplicaCount.setRequired(0, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the values already be zero in a fresh instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I was trying to be explicit about setting it to 0, so that it is clear in the test that the given segment is unavailable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just add a comment to that effect and/or use test method names that clarify that point.
Invoking this method requires making it public which doesn't really seem necessary since it is only going to be used in this test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Updated.


SegmentReplicationStatus segmentReplicationStatus = new SegmentReplicationStatus(Collections.singletonMap(coldSegment.getId(), Collections.singletonMap("default", segmentReplicaCount)));
schema.updateSegmentReplicationStatus(segmentReplicationStatus);

return schema;
}

@Test
Expand Down Expand Up @@ -1940,9 +1929,6 @@ public void testColdDatasourceSchema_verifyStaleDatasourceRemoved()
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

Mockito.when(segmentReplicationStatusManager.getReplicationFactor(ArgumentMatchers.eq(coldSegmentAlpha.getId()))).thenReturn(0);
Mockito.when(segmentReplicationStatusManager.getReplicationFactor(ArgumentMatchers.eq(coldSegmentBeta.getId()))).thenReturn(0);

ImmutableDruidDataSource druidDataSource =
new ImmutableDruidDataSource(
coldSegmentAlpha.getDataSource(),
Expand All @@ -1963,10 +1949,21 @@ public void testColdDatasourceSchema_verifyStaleDatasourceRemoved()
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
);

SegmentReplicaCount segmentReplicaCount = new SegmentReplicaCount();
segmentReplicaCount.setRequired(0, 0);

Map<SegmentId, Map<String, SegmentReplicaCount>> segmentIdSegmentReplicaCountMap = new HashMap<>();

Map<String, SegmentReplicaCount> segmentReplicaCountMap = Collections.singletonMap("default", segmentReplicaCount);
segmentIdSegmentReplicaCountMap.put(coldSegmentAlpha.getId(), segmentReplicaCountMap);
segmentIdSegmentReplicaCountMap.put(coldSegmentBeta.getId(), segmentReplicaCountMap);
SegmentReplicationStatus segmentReplicationStatus = new SegmentReplicationStatus(segmentIdSegmentReplicaCountMap);

schema.updateSegmentReplicationStatus(segmentReplicationStatus);

schema.coldDatasourceSchemaExec();
Assert.assertNotNull(schema.getDatasource("alpha"));

Expand Down
Loading