Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable querying entirely cold datasources #16676

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix tests
  • Loading branch information
findingrish committed Jul 2, 2024
commit 966f5ac286d81aaaf14a2d46f394ff6c45b49616
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.RunRules;
Expand All @@ -43,6 +44,7 @@
/**
* Manages information about replication status of segments in a cluster.
*/
@LazySingleton
public class SegmentReplicationStatusManager
{
private final MetadataManager metadataManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.SegmentReplicationStatusManager;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.NoopEscalator;
Expand Down Expand Up @@ -110,7 +110,7 @@ public class CoordinatorSegmentDataCacheConcurrencyTest extends SegmentMetadataC
private SegmentSchemaCache segmentSchemaCache;
private SegmentSchemaBackFillQueue backFillQueue;
private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private DruidCoordinator druidCoordinator;
private SegmentReplicationStatusManager segmentReplicationStatusManager;
private Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;
private final ObjectMapper mapper = TestHelper.makeJsonMapper();

Expand Down Expand Up @@ -206,7 +206,7 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class);
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig);
druidCoordinator = Mockito.mock(druidCoordinator);
segmentReplicationStatusManager = Mockito.mock(segmentReplicationStatusManager);

inventoryView.init();
initLatch.await();
Expand Down Expand Up @@ -247,7 +247,7 @@ public void testSegmentMetadataRefreshAndInventoryViewAddSegmentAndBrokerServerV
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -364,7 +364,7 @@ public void testSegmentMetadataRefreshAndDruidSchemaGetSegmentMetadata()
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.SegmentReplicationStatusManager;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AllowAllAuthenticator;
Expand Down Expand Up @@ -116,7 +116,7 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
private CountDownLatch buildTableLatch = new CountDownLatch(1);
private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private DruidCoordinator druidCoordinator;
private SegmentReplicationStatusManager segmentReplicationStatusManager;
private Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;

@Before
Expand All @@ -129,7 +129,7 @@ public void setUp() throws Exception
SegmentsMetadataManagerConfig metadataManagerConfig = Mockito.mock(SegmentsMetadataManagerConfig.class);
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig);
druidCoordinator = Mockito.mock(DruidCoordinator.class);
segmentReplicationStatusManager = Mockito.mock(SegmentReplicationStatusManager.class);
}

@After
Expand Down Expand Up @@ -161,7 +161,7 @@ public CoordinatorSegmentMetadataCache buildSchemaMarkAndTableLatch(SegmentMetad
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -336,7 +336,7 @@ public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, I
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -550,7 +550,7 @@ public void testSegmentAddedCallbackAddNewHistoricalSegment() throws Interrupted
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -601,7 +601,7 @@ public void testSegmentAddedCallbackAddExistingSegment() throws InterruptedExcep
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
Mockito.mock(DruidCoordinator.class),
Mockito.mock(SegmentReplicationStatusManager.class),
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -656,7 +656,7 @@ public void testSegmentAddedCallbackAddNewRealtimeSegment() throws InterruptedEx
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
Mockito.mock(DruidCoordinator.class),
Mockito.mock(SegmentReplicationStatusManager.class),
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -708,7 +708,7 @@ public void testSegmentAddedCallbackAddNewBroadcastSegment() throws InterruptedE
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -752,7 +752,7 @@ public void testSegmentRemovedCallbackEmptyDataSourceAfterRemove() throws Interr
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -813,7 +813,7 @@ public void testSegmentRemovedCallbackNonEmptyDataSourceAfterRemove() throws Int
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -877,7 +877,7 @@ public void testServerSegmentRemovedCallbackRemoveUnknownSegment() throws Interr
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -915,7 +915,7 @@ public void testServerSegmentRemovedCallbackRemoveBrokerSegment() throws Interru
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -966,7 +966,7 @@ public void testServerSegmentRemovedCallbackRemoveHistoricalSegment() throws Int
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -1041,7 +1041,7 @@ public void testRunSegmentMetadataQueryWithContext() throws Exception
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
);

Expand Down Expand Up @@ -1213,7 +1213,7 @@ public void testRefreshShouldEmitMetrics() throws InterruptedException, IOExcept
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
)
{
Expand Down Expand Up @@ -1381,7 +1381,7 @@ public void testRealtimeSchemaAnnouncement() throws InterruptedException, IOExce
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
Expand Down Expand Up @@ -1463,7 +1463,7 @@ public void testRealtimeSchemaAnnouncementDataSourceSchemaUpdated() throws Inter
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
Expand Down Expand Up @@ -1646,7 +1646,7 @@ public void testSchemaBackfilling() throws InterruptedException
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
Expand Down Expand Up @@ -1829,7 +1829,7 @@ public void testColdDatasourceSchema() throws IOException
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

Mockito.when(druidCoordinator.getReplicationFactor(ArgumentMatchers.eq(coldSegment.getId()))).thenReturn(0);
Mockito.when(segmentReplicationStatusManager.getReplicationFactor(ArgumentMatchers.eq(coldSegment.getId()))).thenReturn(0);

ImmutableDruidDataSource druidDataSource =
new ImmutableDruidDataSource(
Expand All @@ -1850,7 +1850,7 @@ public void testColdDatasourceSchema() throws IOException
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
druidCoordinator,
segmentReplicationStatusManager,
segmentsMetadataManagerConfigSupplier
);

Expand Down
Loading