Skip to content

Commit

Permalink
Speed up kill tasks by deleting segments in batch (#14131)
Browse files Browse the repository at this point in the history
* allow for batched delete of segments instead of deleting segment data one by one

create new batchdelete method in datasegment killer that has default functionality
of iterating through all segments and calling delete on them. This will enable
a slow rollout of other deepstorage implementations to move to a batched delete
on their own time

* cleanup batchdelete segments

* batch delete with the omni data deleter

cleaned up code
just need to add tests and docs for this functionality

* update java doc to explain how it will try to use batch if function is overwritten

* rename killBatch to kill
add unit tests

* add omniDataSegmentKillerTest for deleting multiple segments at a time. fix checkstyle

* explain test peculiarity better

* clean up batch kill in s3.

* remove unused return value. cleanup comments and fix checkstyle

* default to batch delete. more specific java docs. list segments that couldn't be deleted
if there was a client error or server error

* simplify error handling

* add tests where an exception is thrown when killing multiple s3 segments

* add test for failing to delete two calls with the s3 client

* fix javadoc for kill(List<DataSegment> segments) clean up tests remove feature flag

* fix typo in javadocs

* fix test failure

* fix checkstyle and improve tests

* fix intellij inspections issues

* address comments, make delete multiple segments not assume same bucket

* fix test errors

* better grammar and punctuation. fix test. and better logging for exception

* remove unused code

* avoid extra arraylist instantiation

* fix broken test

* fix broken test

* fix tests to use assert.throws
  • Loading branch information
TSFenwick authored Jul 27, 2023
1 parent ee9cfc7 commit 9a9038c
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package org.apache.druid.storage.s3;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MapUtils;
Expand All @@ -31,7 +34,11 @@
import org.apache.druid.timeline.DataSegment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
*
Expand All @@ -40,12 +47,15 @@ public class S3DataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(S3DataSegmentKiller.class);

// AWS has max limit of 1000 objects that can be requested to be deleted at a time.
private static final int MAX_MULTI_OBJECT_DELETE_SIZE = 1000;

/**
* Any implementation of DataSegmentKiller is initialized when an ingestion job starts if the extension is loaded,
* even when the implementation of DataSegmentKiller is not used. As a result, if we have a s3 client instead
* of a supplier of it, it can cause unnecessary config validation for s3 even when it's not used at all.
* To perform the config validation only when it is actually used, we use a supplier.
*
* <p>
* See OmniDataSegmentKiller for how DataSegmentKillers are initialized.
*/
private final Supplier<ServerSideEncryptingAmazonS3> s3ClientSupplier;
Expand All @@ -64,13 +74,116 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}

@Override
public void kill(List<DataSegment> segments) throws SegmentLoadingException
{
if (segments.isEmpty()) {
return;
}
if (segments.size() == 1) {
kill(segments.get(0));
return;
}

// create a map of bucket to keys to delete
Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete = new HashMap<>();
for (DataSegment segment : segments) {
String s3Bucket = MapUtils.getString(segment.getLoadSpec(), S3DataSegmentPuller.BUCKET);
String path = MapUtils.getString(segment.getLoadSpec(), S3DataSegmentPuller.KEY);
List<DeleteObjectsRequest.KeyVersion> keysToDelete = bucketToKeysToDelete.computeIfAbsent(
s3Bucket,
k -> new ArrayList<>()
);
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
}

final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
boolean shouldThrowException = false;
for (Map.Entry<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeys : bucketToKeysToDelete.entrySet()) {
String s3Bucket = bucketToKeys.getKey();
List<DeleteObjectsRequest.KeyVersion> keysToDelete = bucketToKeys.getValue();
boolean hadException = deleteKeysForBucket(s3Client, s3Bucket, keysToDelete);
if (hadException) {
shouldThrowException = true;
}
}
if (shouldThrowException) {
// exception error message gets cutoff without providing any details. look at the logs for more details.
// this was a shortcut to handle the many different ways there could potentially be failures and handle them
// reasonably
throw new SegmentLoadingException(
"Couldn't delete segments from S3. See the task logs for more details."
);
}
}

/**
* Delete all keys in a bucket from s3
*
* @param s3Client client used to communicate with s3
* @param s3Bucket the bucket where the keys exist
* @param keysToDelete the keys to delete
* @return a boolean value of true if there was an issue deleting one or many keys, a boolean value of false if
* succesful
*/
private boolean deleteKeysForBucket(
ServerSideEncryptingAmazonS3 s3Client,
String s3Bucket,
List<DeleteObjectsRequest.KeyVersion> keysToDelete
)
{
boolean hadException = false;
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
deleteObjectsRequest.setQuiet(true);
List<List<DeleteObjectsRequest.KeyVersion>> keysChunks = Lists.partition(
keysToDelete,
MAX_MULTI_OBJECT_DELETE_SIZE
);
for (List<DeleteObjectsRequest.KeyVersion> chunkOfKeys : keysChunks) {
List<String> keysToDeleteStrings = chunkOfKeys.stream().map(
DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
try {
deleteObjectsRequest.setKeys(chunkOfKeys);
log.info(
"Removing from bucket: [%s] the following index files: [%s] from s3!",
s3Bucket,
keysToDeleteStrings
);
s3Client.deleteObjects(deleteObjectsRequest);
}
catch (MultiObjectDeleteException e) {
hadException = true;
Map<String, List<String>> errorToKeys = new HashMap<>();
for (MultiObjectDeleteException.DeleteError error : e.getErrors()) {
errorToKeys.computeIfAbsent(error.getMessage(), k -> new ArrayList<>()).add(error.getKey());
}
errorToKeys.forEach((key, value) -> log.error(
"Unable to delete from bucket [%s], the following keys [%s], because [%s]",
s3Bucket,
String.join(", ", value),
key
));
}
catch (AmazonServiceException e) {
hadException = true;
log.noStackTrace().warn(e,
"Unable to delete from bucket [%s], the following keys [%s]",
s3Bucket,
chunkOfKeys.stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.joining(", "))
);
}
}
return hadException;
}

@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
try {
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3Bucket = MapUtils.getString(loadSpec, S3DataSegmentPuller.BUCKET);
String s3Path = MapUtils.getString(loadSpec, S3DataSegmentPuller.KEY);
String s3DescriptorPath = DataSegmentKiller.descriptorPath(s3Path);

final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@

package org.apache.druid.storage.s3;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
Expand All @@ -42,7 +48,10 @@
public class S3DataSegmentKillerTest extends EasyMockSupport
{
private static final String KEY_1 = "key1";
private static final String KEY_1_PATH = KEY_1 + "/";
private static final String KEY_1_DESCRIPTOR_PATH = KEY_1_PATH + "descriptor.json";
private static final String KEY_2 = "key2";
private static final String KEY_2_PATH = KEY_2 + "/";
private static final String TEST_BUCKET = "test_bucket";
private static final String TEST_PREFIX = "test_prefix";
private static final URI PREFIX_URI = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, TEST_PREFIX));
Expand All @@ -52,6 +61,30 @@ public class S3DataSegmentKillerTest extends EasyMockSupport
private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException());
private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException());

private static final DataSegment DATA_SEGMENT_1 = new DataSegment(
"test",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("bucket", TEST_BUCKET, "key", KEY_1_PATH),
null,
null,
NoneShardSpec.instance(),
0,
1
);

private static final DataSegment DATA_SEGMENT_2 = new DataSegment(
"test",
Intervals.of("2015-04-13/2015-04-14"),
"1",
ImmutableMap.of("bucket", TEST_BUCKET, "key", KEY_2_PATH),
null,
null,
NoneShardSpec.instance(),
0,
1
);

@Mock
private ServerSideEncryptingAmazonS3 s3Client;
@Mock
Expand Down Expand Up @@ -213,4 +246,163 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSeg
Assert.assertTrue(ioExceptionThrown);
EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig);
}

@Test
public void test_kill_singleSegment_doesntexist_passes() throws SegmentLoadingException
{
EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_PATH)).andReturn(false);
EasyMock.expectLastCall().once();
EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH)).andReturn(false);
EasyMock.expectLastCall().once();
EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);

segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(DATA_SEGMENT_1);
}

@Test
public void test_kill_singleSegment_exists_passes() throws SegmentLoadingException
{
EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_PATH)).andReturn(true);
EasyMock.expectLastCall().once();

s3Client.deleteObject(TEST_BUCKET, KEY_1_PATH);
EasyMock.expectLastCall().andVoid();

EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH)).andReturn(true);
EasyMock.expectLastCall().once();

s3Client.deleteObject(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH);
EasyMock.expectLastCall().andVoid();

EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);

segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(DATA_SEGMENT_1);
}

@Test
public void test_kill_listOfOneSegment() throws SegmentLoadingException
{
EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_PATH)).andReturn(true);
EasyMock.expectLastCall().once();

s3Client.deleteObject(TEST_BUCKET, KEY_1_PATH);
EasyMock.expectLastCall().andVoid();

EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH)).andReturn(true);
EasyMock.expectLastCall().once();

s3Client.deleteObject(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH);
EasyMock.expectLastCall().andVoid();


EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1));
}

@Test
public void test_kill_listOfNoSegments() throws SegmentLoadingException
{
EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(ImmutableList.of());
// has an assertion error if there is an unexpected method call on a mock. Do nothing because we expect the kill
// method to not interact with mocks
}

@Test
public void test_kill_listOfSegments() throws SegmentLoadingException
{
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET);
deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_1_PATH);
// struggled with the idea of making it match on equaling this
s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class));
EasyMock.expectLastCall().andVoid().times(2);


EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_1));
}

@Test
public void test_kill_listOfSegments_multiDeleteExceptionIsThrown()
{
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET);
deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_2_PATH);
// struggled with the idea of making it match on equaling this
s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class));
MultiObjectDeleteException.DeleteError deleteError = new MultiObjectDeleteException.DeleteError();
deleteError.setKey(KEY_1_PATH);
MultiObjectDeleteException multiObjectDeleteException = new MultiObjectDeleteException(
ImmutableList.of(deleteError),
ImmutableList.of());
EasyMock.expectLastCall().andThrow(multiObjectDeleteException).once();

EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);

SegmentLoadingException thrown = Assert.assertThrows(
SegmentLoadingException.class,
() -> segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_2))
);
Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage());
}

@Test
public void test_kill_listOfSegments_multiDeleteExceptionIsThrownMultipleTimes()
{
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET);
deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_2_PATH);
// struggled with the idea of making it match on equaling this
s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class));
MultiObjectDeleteException.DeleteError deleteError = new MultiObjectDeleteException.DeleteError();
deleteError.setKey(KEY_1_PATH);
MultiObjectDeleteException multiObjectDeleteException = new MultiObjectDeleteException(
ImmutableList.of(deleteError),
ImmutableList.of());
EasyMock.expectLastCall().andThrow(multiObjectDeleteException).once();
MultiObjectDeleteException.DeleteError deleteError2 = new MultiObjectDeleteException.DeleteError();
deleteError2.setKey(KEY_2_PATH);
MultiObjectDeleteException multiObjectDeleteException2 = new MultiObjectDeleteException(
ImmutableList.of(deleteError2),
ImmutableList.of());
EasyMock.expectLastCall().andThrow(multiObjectDeleteException2).once();

EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
ImmutableList.Builder<DataSegment> builder = ImmutableList.builder();
// limit is 1000 per chunk, but we attempt to delete 2 objects per key so this will be 1002 keys so it will make 2
// calls via the s3client to delete all these objects
for (int ii = 0; ii < 501; ii++) {
builder.add(DATA_SEGMENT_1);
}
SegmentLoadingException thrown = Assert.assertThrows(
SegmentLoadingException.class,
() -> segmentKiller.kill(builder.build())
);

Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage());
}

@Test
public void test_kill_listOfSegments_amazonServiceExceptionExceptionIsThrown()
{
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET);
deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_2_PATH);
// struggled with the idea of making it match on equaling this
s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class));
EasyMock.expectLastCall().andThrow(new AmazonServiceException("")).once();

EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);

SegmentLoadingException thrown = Assert.assertThrows(
SegmentLoadingException.class,
() -> segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_2))
);
Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception

// Kill segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
for (DataSegment segment : unusedSegments) {
toolbox.getDataSegmentKiller().kill(segment);
}
toolbox.getDataSegmentKiller().kill(unusedSegments);

return TaskStatus.success(getId());
}
Expand Down
Loading

0 comments on commit 9a9038c

Please sign in to comment.