Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable querying entirely cold datasources #16676

Merged
merged 32 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ca33d8b
Fix build
findingrish Mar 14, 2024
1abba25
Merge branch 'master' of github.com:findingrish/druid
findingrish Jun 10, 2024
033c420
temp changes
findingrish Jun 20, 2024
96e77c1
Coordinator changes to process schema for cold segments
findingrish Jul 1, 2024
1739dce
Changes in the broker metadata cache to fetch used datasources schema
findingrish Jul 1, 2024
6ed0b8b
Refactor DruidCoordinator, broker metadata cache changes to execute r…
findingrish Jul 2, 2024
966f5ac
Fix tests
findingrish Jul 2, 2024
d586a8e
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 2, 2024
b72b822
Update MetadataResourceTest
findingrish Jul 2, 2024
ad58f9b
minor changes
findingrish Jul 2, 2024
2944959
Fix refresh condition on Broker, add UTs
findingrish Jul 2, 2024
b09df54
Fix test
findingrish Jul 3, 2024
254cea2
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 3, 2024
25d23c6
minor code changes, add test
findingrish Jul 3, 2024
7169b6d
revert changes
findingrish Jul 7, 2024
89e2d64
Fix test
findingrish Jul 7, 2024
ba380ec
checkstyle
findingrish Jul 8, 2024
d3c112c
Update docs
findingrish Jul 8, 2024
debcb42
review comments
findingrish Jul 8, 2024
479382c
Doc changes
findingrish Jul 8, 2024
87113ca
Update threshold for logging cold schema processing stats
findingrish Jul 8, 2024
2a66ae1
Minor changes
findingrish Jul 9, 2024
695ba92
add debug logging
findingrish Jul 9, 2024
a265cf0
Merge hot and cold schema only while querying datasource schema
findingrish Jul 10, 2024
8182f39
Update tests
findingrish Jul 10, 2024
766082e
Update docs
findingrish Jul 10, 2024
2bea1fb
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 10, 2024
8582f5d
Update test
findingrish Jul 10, 2024
7ed2d96
Update tests wiht SegmentReplicationStatus mock
findingrish Jul 10, 2024
47bf4ab
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
findingrish Jul 11, 2024
8fb0a04
Minor change
findingrish Jul 11, 2024
87a0926
Minor change
findingrish Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'upstream/master' into cold_ds_schema
  • Loading branch information
findingrish committed Jul 2, 2024
commit d586a8e4b5936acbf3563c165264c8d31b83beaf
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ public interface CoordinatorClient
ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(Set<String> datasources);

/**
* Retrieves list of used datasources.
* Fetch bootstrap segments from the coordinator. The results must be streamed back to the caller as the
* result set can be large.
*/
ListenableFuture<List<String>> fetchDataSources();
ListenableFuture<BootstrapSegmentsResponse> fetchBootstrapSegments();

/**
* Returns a new instance backed by a ServiceClient which follows the provided retryPolicy
*/
CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy);

/**
* Retrieves list of used datasources.
*/
ListenableFuture<List<String>> fetchDataSources();
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,24 @@ public ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(
}

@Override
public ListenableFuture<List<String>> fetchDataSources()
public ListenableFuture<BootstrapSegmentsResponse> fetchBootstrapSegments()
{
final String path = "/druid/coordinator/v1/metadata/datasources";
final String path = "/druid/coordinator/v1/metadata/bootstrapSegments";
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
new RequestBuilder(HttpMethod.POST, path),
new InputStreamResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<List<String>>() {})
in -> new BootstrapSegmentsResponse(
new JsonParserIterator<>(
// Some servers, like the Broker, may have PruneLoadSpec set to true for optimization reasons.
// We specifically use LoadableDataSegment here instead of DataSegment so the callers can still correctly
// load the bootstrap segments, as the load specs are guaranteed not to be pruned.
jsonMapper.getTypeFactory().constructType(LoadableDataSegment.class),
Futures.immediateFuture(in),
jsonMapper
)
)
);
}

Expand All @@ -179,4 +188,17 @@ public CoordinatorClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
return new CoordinatorClientImpl(client.withRetryPolicy(retryPolicy), jsonMapper);
}

@Override
public ListenableFuture<List<String>> fetchDataSources()
{
final String path = "/druid/coordinator/v1/metadata/datasources";
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<List<String>>() {})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,6 @@ public Map<String, Double> getDatasourceToLoadStatus()
return loadStatus;
}

/**
* @return Set of broadcast segments determined by the latest run of the {@link RunRules} duty.
* If the coordinator runs haven't triggered or are delayed, this information may be stale.
*/
@Nullable
public Set<DataSegment> getBroadcastSegments()
{
return broadcastSegments;
}

@Nullable
public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
{
Expand Down Expand Up @@ -701,6 +691,4 @@ public String toString()
return "DutiesRunnable{group='" + dutyGroupName + '\'' + '}';
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import javax.inject.Inject;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* Manages information about replication status of segments in a cluster.
Expand All @@ -49,6 +50,12 @@ public class SegmentReplicationStatusManager
{
private final MetadataManager metadataManager;

/**
* Set of broadcast segments determined in the latest coordinator run of the {@link RunRules} duty.
* This might contain stale information if the Coordinator duties haven't run or are delayed.
*/
private volatile Set<DataSegment> broadcastSegments = null;

/**
* Used to determine count of under-replicated or unavailable segments.
* Updated in each coordinator run in the {@link SegmentReplicationStatusManager.UpdateReplicationStatus} duty.
Expand All @@ -68,6 +75,16 @@ public SegmentReplicationStatusManager(MetadataManager metadataManager)
this.metadataManager = metadataManager;
}

/**
* @return Set of broadcast segments determined by the latest run of the {@link RunRules} duty.
* If the coordinator runs haven't triggered or are delayed, this information may be stale.
*/
@Nullable
public Set<DataSegment> getBroadcastSegments()
{
return broadcastSegments;
}

public Object2IntMap<String> getDatasourceToUnavailableSegmentCount()
{
if (segmentReplicationStatus == null) {
Expand Down Expand Up @@ -155,6 +172,7 @@ class UpdateReplicationStatus implements CoordinatorDuty
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
broadcastSegments = params.getBroadcastSegments();
segmentReplicationStatus = params.getSegmentReplicationStatus();

// Collect stats for unavailable and under-replicated segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public Response getDataSourceInformation(
@ResourceFilters(DatasourceResourceFilter.class)
public Response getBootstrapSegments()
{
final Set<DataSegment> broadcastSegments = coordinator.getBroadcastSegments();
final Set<DataSegment> broadcastSegments = segmentReplicationStatusManager.getBroadcastSegments();
if (broadcastSegments == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("Bootstrap segments are not initialized yet."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(
}

@Override
public ListenableFuture<List<String>> fetchDataSources()
public ListenableFuture<BootstrapSegmentsResponse> fetchBootstrapSegments()
{
throw new UnsupportedOperationException();
}
Expand All @@ -75,4 +75,10 @@ public CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
// Ignore retryPolicy for the test client.
return this;
}

@Override
public ListenableFuture<List<String>> fetchDataSources()
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ public void testCoordinatorRun() throws Exception
coordinator.start();

Assert.assertNull(segmentReplicationStatusManager.getReplicationFactor(dataSegment.getId()));
Assert.assertNull(segmentReplicationStatusManager.getBroadcastSegments());

// Wait for this coordinator to become leader
leaderAnnouncerLatch.await();
Expand Down Expand Up @@ -298,7 +299,7 @@ public void testCoordinatorRun() throws Exception
segmentReplicationStatusManager.getDatasourceToUnavailableSegmentCount();
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.size());
Assert.assertEquals(0, numsUnavailableUsedSegmentsPerDataSource.getInt(dataSource));
Assert.assertEquals(0, coordinator.getBroadcastSegments().size());
Assert.assertEquals(0, segmentReplicationStatusManager.getBroadcastSegments().size());

Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
segmentReplicationStatusManager.getTierToDatasourceToUnderReplicatedCount(false);
Expand Down Expand Up @@ -581,7 +582,7 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith
coordinatorRunLatch.await();

Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus());
Assert.assertEquals(new HashSet<>(dataSegments.values()), coordinator.getBroadcastSegments());
Assert.assertEquals(new HashSet<>(dataSegments.values()), segmentReplicationStatusManager.getBroadcastSegments());

// Under-replicated counts are updated only after the next coordinator run
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.