Skip to content

Commit

Permalink
Audit API DELETE datasource (markAllSegmentsAsUnused) (#15653)
Browse files Browse the repository at this point in the history
Changes:
- Add audit for `DELETE /druid/coordinator/v1/datasources/{datasourceName}`
- Minor refactor
  • Loading branch information
kfaraz authored Jan 11, 2024
1 parent ee77fa7 commit f445ba4
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ public Response getDataSource(
return Response.ok(getSimpleDatasource(dataSourceName)).build();
}

private interface MarkSegments
private interface SegmentUpdateOperation
{
int markSegments() throws UnknownSegmentIdsException;
int perform() throws UnknownSegmentIdsException;
}

@POST
Expand All @@ -193,9 +193,9 @@ private interface MarkSegments
@ResourceFilters(DatasourceResourceFilter.class)
public Response markAsUsedAllNonOvershadowedSegments(@PathParam("dataSourceName") final String dataSourceName)
{
MarkSegments markSegments = () -> segmentsMetadataManager.markAsUsedAllNonOvershadowedSegmentsInDataSource(
SegmentUpdateOperation operation = () -> segmentsMetadataManager.markAsUsedAllNonOvershadowedSegmentsInDataSource(
dataSourceName);
return doMarkSegments("markAsUsedAllNonOvershadowedSegments", dataSourceName, markSegments);
return performSegmentUpdate(dataSourceName, operation);
}

@POST
Expand All @@ -207,7 +207,7 @@ public Response markAsUsedNonOvershadowedSegments(
MarkDataSourceSegmentsPayload payload
)
{
MarkSegments markSegments = () -> {
SegmentUpdateOperation operation = () -> {
final Interval interval = payload.getInterval();
if (interval != null) {
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval);
Expand All @@ -216,7 +216,7 @@ public Response markAsUsedNonOvershadowedSegments(
return segmentsMetadataManager.markAsUsedNonOvershadowedSegments(dataSourceName, segmentIds);
}
};
return doMarkSegmentsWithPayload("markAsUsedNonOvershadowedSegments", dataSourceName, payload, markSegments);
return performSegmentUpdate(dataSourceName, payload, operation);
}

@POST
Expand All @@ -230,13 +230,11 @@ public Response markSegmentsAsUnused(
@Context final HttpServletRequest req
)
{
MarkSegments markSegments = () -> {
SegmentUpdateOperation operation = () -> {
final Interval interval = payload.getInterval();
final int numUpdatedSegments;
final Object auditPayload;
if (interval != null) {
numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval);
auditPayload = Collections.singletonMap("interval", interval);
} else {
final Set<SegmentId> segmentIds =
payload.getSegmentIds()
Expand All @@ -245,33 +243,31 @@ public Response markSegmentsAsUnused(
.filter(Objects::nonNull)
.collect(Collectors.toSet());

// Note: segments for the "wrong" datasource are ignored.
// Filter out segmentIds that do not belong to this datasource
numUpdatedSegments = segmentsMetadataManager.markSegmentsAsUnused(
segmentIds.stream()
.filter(segmentId -> segmentId.getDataSource().equals(dataSourceName))
.collect(Collectors.toSet())
);
auditPayload = Collections.singletonMap("segmentIds", segmentIds);
}
auditManager.doAudit(
AuditEntry.builder()
.key(dataSourceName)
.type("segment.markUnused")
.payload(auditPayload)
.payload(payload)
.auditInfo(AuthorizationUtils.buildAuditInfo(req))
.request(AuthorizationUtils.buildRequestInfo("coordinator", req))
.build()
);
return numUpdatedSegments;
};
return doMarkSegmentsWithPayload("markSegmentsAsUnused", dataSourceName, payload, markSegments);
return performSegmentUpdate(dataSourceName, payload, operation);
}

private Response doMarkSegmentsWithPayload(
String method,
private Response performSegmentUpdate(
String dataSourceName,
MarkDataSourceSegmentsPayload payload,
MarkSegments markSegments
SegmentUpdateOperation operation
)
{
if (payload == null || !payload.isValid()) {
Expand All @@ -287,7 +283,7 @@ private Response doMarkSegmentsWithPayload(
return logAndCreateDataSourceNotFoundResponse(dataSourceName);
}

return doMarkSegments(method, dataSourceName, markSegments);
return performSegmentUpdate(dataSourceName, operation);
}

private static Response logAndCreateDataSourceNotFoundResponse(String dataSourceName)
Expand All @@ -296,21 +292,21 @@ private static Response logAndCreateDataSourceNotFoundResponse(String dataSource
return Response.noContent().build();
}

private static Response doMarkSegments(String method, String dataSourceName, MarkSegments markSegments)
private static Response performSegmentUpdate(String dataSourceName, SegmentUpdateOperation operation)
{
try {
int numChangedSegments = markSegments.markSegments();
int numChangedSegments = operation.perform();
return Response.ok(ImmutableMap.of("numChangedSegments", numChangedSegments)).build();
}
catch (UnknownSegmentIdsException e) {
log.warn("Segment ids %s are not found", e.getUnknownSegmentIds());
log.warn("Could not find segmentIds[%s]", e.getUnknownSegmentIds());
return Response
.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("message", e.getMessage()))
.build();
}
catch (Exception e) {
log.error(e, "Error occurred during [%s] call, data source: [%s]", method, dataSourceName);
log.error(e, "Error occurred while updating segments for data source[%s]", dataSourceName);
return Response
.serverError()
.entity(ImmutableMap.of("error", "Exception occurred.", "message", Throwables.getRootCause(e).toString()))
Expand Down Expand Up @@ -345,8 +341,23 @@ public Response markAsUnusedAllSegmentsOrKillUnusedSegmentsInInterval(
if (killSegments) {
return killUnusedSegmentsInInterval(dataSourceName, interval, req);
} else {
MarkSegments markSegments = () -> segmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(dataSourceName);
return doMarkSegments("markAsUnusedAllSegments", dataSourceName, markSegments);
SegmentUpdateOperation operation = () -> segmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(dataSourceName);
final Response response = performSegmentUpdate(dataSourceName, operation);

final int responseCode = response.getStatus();
if (responseCode >= 200 && responseCode < 300) {
auditManager.doAudit(
AuditEntry.builder()
.key(dataSourceName)
.type("segment.markUnused")
.payload(response.getEntity())
.auditInfo(AuthorizationUtils.buildAuditInfo(req))
.request(AuthorizationUtils.buildRequestInfo("coordinator", req))
.build()
);
}

return response;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ public void testKillSegmentsInIntervalInDataSource()
}

@Test
public void testMarkAsUnusedAllSegmentsInDataSource()
public void testMarkAsUnusedAllSegmentsInDataSourceBadRequest()
{
OverlordClient overlordClient = EasyMock.createStrictMock(OverlordClient.class);
EasyMock.replay(overlordClient, server);
Expand All @@ -626,6 +626,22 @@ public void testMarkAsUnusedAllSegmentsInDataSource()
EasyMock.verify(overlordClient, server);
}

@Test
public void testMarkAsUnusedAllSegmentsInDataSource()
{
prepareRequestForAudit();

OverlordClient overlordClient = EasyMock.createStrictMock(OverlordClient.class);
EasyMock.replay(overlordClient, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, overlordClient, null, null, auditManager);
Response response = dataSourcesResource
.markAsUnusedAllSegmentsOrKillUnusedSegmentsInInterval("datasource", null, null, request);
Assert.assertEquals(200, response.getStatus());

EasyMock.verify(request);
}

@Test
public void testIsHandOffComplete()
{
Expand Down

0 comments on commit f445ba4

Please sign in to comment.