Skip to content

Commit

Permalink
Fix Cannot mark an unqueryable datasource's segments used / unused (#…
Browse files Browse the repository at this point in the history
…16127)

* * fix

* * address review comments

* * all remove the short-circuit for markUnused api

* * add test
  • Loading branch information
zachjsh committed Mar 15, 2024
1 parent 3eefc47 commit f3d77fe
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ public Response getQueryableDataSources(
@Path("/{dataSourceName}")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response getDataSource(
public Response getQueryableDataSource(
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("full") final String full
)
{
final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
final ImmutableDruidDataSource dataSource = getQueryableDataSource(dataSourceName);

if (dataSource == null) {
return logAndCreateDataSourceNotFoundResponse(dataSourceName);
Expand Down Expand Up @@ -209,31 +209,41 @@ public Response markAsUsedNonOvershadowedSegments(
MarkDataSourceSegmentsPayload payload
)
{
SegmentUpdateOperation operation = () -> {
final Interval interval = payload.getInterval();
if (interval != null) {
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval);
} else {
final Set<String> segmentIds = payload.getSegmentIds();
if (segmentIds == null || segmentIds.isEmpty()) {
return 0;
}
if (payload == null || !payload.isValid()) {
log.warn("Invalid request payload: [%s]", payload);
return Response
.status(Response.Status.BAD_REQUEST)
.entity("Invalid request payload, either interval or segmentIds array must be specified")
.build();
} else {
SegmentUpdateOperation operation = () -> {

final Interval interval = payload.getInterval();
if (interval != null) {
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval);
} else {
final Set<String> segmentIds = payload.getSegmentIds();
if (segmentIds == null || segmentIds.isEmpty()) {
return 0;
}

// Validate segmentIds
final List<String> invalidSegmentIds = new ArrayList<>();
for (String segmentId : segmentIds) {
if (SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId).isEmpty()) {
invalidSegmentIds.add(segmentId);
// Validate segmentIds
final List<String> invalidSegmentIds = new ArrayList<>();
for (String segmentId : segmentIds) {
if (SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId).isEmpty()) {
invalidSegmentIds.add(segmentId);
}
}
if (!invalidSegmentIds.isEmpty()) {
throw InvalidInput.exception("Could not parse invalid segment IDs[%s]", invalidSegmentIds);
}

return segmentsMetadataManager.markAsUsedNonOvershadowedSegments(dataSourceName, segmentIds);
}
if (!invalidSegmentIds.isEmpty()) {
throw InvalidInput.exception("Could not parse invalid segment IDs[%s]", invalidSegmentIds);
}
};

return segmentsMetadataManager.markAsUsedNonOvershadowedSegments(dataSourceName, segmentIds);
}
};
return performSegmentUpdate(dataSourceName, payload, operation);
return performSegmentUpdate(dataSourceName, operation);
}
}

@POST
Expand All @@ -246,61 +256,47 @@ public Response markSegmentsAsUnused(
final MarkDataSourceSegmentsPayload payload,
@Context final HttpServletRequest req
)
{
SegmentUpdateOperation operation = () -> {
final Interval interval = payload.getInterval();
final int numUpdatedSegments;
if (interval != null) {
numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval);
} else {
final Set<SegmentId> segmentIds =
payload.getSegmentIds()
.stream()
.map(idStr -> SegmentId.tryParse(dataSourceName, idStr))
.filter(Objects::nonNull)
.collect(Collectors.toSet());

// Filter out segmentIds that do not belong to this datasource
numUpdatedSegments = segmentsMetadataManager.markSegmentsAsUnused(
segmentIds.stream()
.filter(segmentId -> segmentId.getDataSource().equals(dataSourceName))
.collect(Collectors.toSet())
);
}
auditManager.doAudit(
AuditEntry.builder()
.key(dataSourceName)
.type("segment.markUnused")
.payload(payload)
.auditInfo(AuthorizationUtils.buildAuditInfo(req))
.request(AuthorizationUtils.buildRequestInfo("coordinator", req))
.build()
);
return numUpdatedSegments;
};
return performSegmentUpdate(dataSourceName, payload, operation);
}

private Response performSegmentUpdate(
String dataSourceName,
MarkDataSourceSegmentsPayload payload,
SegmentUpdateOperation operation
)
{
if (payload == null || !payload.isValid()) {
log.warn("Invalid request payload: [%s]", payload);
return Response
.status(Response.Status.BAD_REQUEST)
.entity("Invalid request payload, either interval or segmentIds array must be specified")
.build();
} else {
SegmentUpdateOperation operation = () -> {
final Interval interval = payload.getInterval();
final int numUpdatedSegments;
if (interval != null) {
numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval);
} else {
final Set<SegmentId> segmentIds =
payload.getSegmentIds()
.stream()
.map(idStr -> SegmentId.tryParse(dataSourceName, idStr))
.filter(Objects::nonNull)
.collect(Collectors.toSet());

// Filter out segmentIds that do not belong to this datasource
numUpdatedSegments = segmentsMetadataManager.markSegmentsAsUnused(
segmentIds.stream()
.filter(segmentId -> segmentId.getDataSource().equals(dataSourceName))
.collect(Collectors.toSet())
);
}
auditManager.doAudit(
AuditEntry.builder()
.key(dataSourceName)
.type("segment.markUnused")
.payload(payload)
.auditInfo(AuthorizationUtils.buildAuditInfo(req))
.request(AuthorizationUtils.buildRequestInfo("coordinator", req))
.build()
);
return numUpdatedSegments;
};
return performSegmentUpdate(dataSourceName, operation);
}

final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
if (dataSource == null) {
return logAndCreateDataSourceNotFoundResponse(dataSourceName);
}

return performSegmentUpdate(dataSourceName, operation);
}

private static Response logAndCreateDataSourceNotFoundResponse(String dataSourceName)
Expand Down Expand Up @@ -434,7 +430,7 @@ public Response getIntervalsWithServedSegmentsOrAllServedSegmentsPerIntervals(
)
{
if (simple == null && full == null) {
final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
final ImmutableDruidDataSource dataSource = getQueryableDataSource(dataSourceName);
if (dataSource == null) {
return logAndCreateDataSourceNotFoundResponse(dataSourceName);
}
Expand All @@ -460,7 +456,7 @@ public Response getServedSegmentsInInterval(
{
final Interval theInterval = Intervals.of(interval.replace('_', '/'));
if (simple == null && full == null) {
final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
final ImmutableDruidDataSource dataSource = getQueryableDataSource(dataSourceName);
if (dataSource == null) {
return logAndCreateDataSourceNotFoundResponse(dataSourceName);
}
Expand Down Expand Up @@ -617,7 +613,7 @@ private Response getServedSegmentsInInterval(
Predicate<Interval> intervalFilter
)
{
final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
final ImmutableDruidDataSource dataSource = getQueryableDataSource(dataSourceName);

if (dataSource == null) {
return logAndCreateDataSourceNotFoundResponse(dataSourceName);
Expand Down Expand Up @@ -667,7 +663,7 @@ public Response getAllServedSegments(
@QueryParam("full") String full
)
{
ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
ImmutableDruidDataSource dataSource = getQueryableDataSource(dataSourceName);
if (dataSource == null) {
return logAndCreateDataSourceNotFoundResponse(dataSourceName);
}
Expand All @@ -689,7 +685,7 @@ public Response getServedSegment(
@PathParam("segmentId") String segmentId
)
{
ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
ImmutableDruidDataSource dataSource = getQueryableDataSource(dataSourceName);
if (dataSource == null) {
return logAndCreateDataSourceNotFoundResponse(dataSourceName);
}
Expand Down Expand Up @@ -747,7 +743,7 @@ public Response getTiersWhereSegmentsAreServed(@PathParam("dataSourceName") Stri
}

@Nullable
private ImmutableDruidDataSource getDataSource(final String dataSourceName)
private ImmutableDruidDataSource getQueryableDataSource(final String dataSourceName)
{
List<DruidDataSource> dataSources = serverInventoryView
.getInventory()
Expand Down
Loading

0 comments on commit f3d77fe

Please sign in to comment.