Skip to content

Commit

Permalink
Clean up SqlSegmentsMetadataManager and corresponding tests (#16044)
Browse files Browse the repository at this point in the history
Changes:

Improve `SqlSegmentsMetadataManager`
- Break the loop in `populateUsedStatusLastUpdated` before going to sleep if there are no more segments to update
- Add comments and clean up logs

Refactor `SqlSegmentsMetadataManagerTest`
- Merge `SqlSegmentsMetadataManagerEmptyTest` into this test
- Add method `testPollEmpty`
- Shave a few seconds off of the tests by reducing poll duration
- Simplify creation of test segments
- Some renames here and there
- Remove unused methods
- Move `TestDerbyConnector.allowLastUsedFlagToBeNull` to this class

Other minor changes
- Add javadoc to `NoneShardSpec`
- Use lambda in `SqlSegmentMetadataPublisher`
  • Loading branch information
kfaraz committed Mar 8, 2024
1 parent 3caacba commit 5f20372
Show file tree
Hide file tree
Showing 6 changed files with 433 additions and 575 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
import java.util.Map;

/**
* {@link ShardSpec} with no partitioning in a time chunk, i.e. a single segment
* per time chunk. This shard spec has been deprecated and is not generated by
* the Druid code anymore. The class has been retained only for backward
* compatibility reasons.
* <p>
* For more information, refer to
* <a href="https://github.com/apache/druid/pull/6883">PR #6883</a>.
*
* @deprecated Since Druid 0.15.0. Segments generated by Druid 0.15.0 onwards
* do not use this shard spec.
*/
@Deprecated
public class NoneShardSpec implements ShardSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
package org.apache.druid.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -79,8 +76,7 @@ public void publishSegment(final DataSegment segment) throws IOException
);
}

@VisibleForTesting
void publishSegment(
private void publishSegment(
final String segmentId,
final String dataSource,
final String createdDate,
Expand All @@ -96,31 +92,18 @@ void publishSegment(
try {
final DBI dbi = connector.getDBI();
List<Map<String, Object>> exists = dbi.withHandle(
new HandleCallback<List<Map<String, Object>>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle)
{
return handle.createQuery(
StringUtils.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable())
)
.bind("id", segmentId)
.list();
}
}
handle -> handle.createQuery(
StringUtils.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable())
).bind("id", segmentId).list()
);

if (!exists.isEmpty()) {
log.info("Found [%s] in DB, not updating DB", segmentId);
log.info("Skipping publish of segment[%s] as it already exists in the metadata store.", segmentId);
return;
}

dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
handle ->
handle.createStatement(statement)
.bind("id", segmentId)
.bind("dataSource", dataSource)
Expand All @@ -132,11 +115,7 @@ public Void withHandle(Handle handle)
.bind("used", used)
.bind("payload", payload)
.bind("used_status_last_updated", usedFlagLastUpdated)
.execute();

return null;
}
}
.execute()
);
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -335,7 +335,7 @@ public void populateUsedFlagLastUpdatedAsync()
{
ExecutorService executorService = Executors.newSingleThreadExecutor();
usedFlagLastUpdatedPopulationFuture = executorService.submit(
() -> populateUsedFlagLastUpdated()
this::populateUsedFlagLastUpdated
);
}

Expand All @@ -347,70 +347,68 @@ public void populateUsedFlagLastUpdatedAsync()
@VisibleForTesting
void populateUsedFlagLastUpdated()
{
String segmentsTable = getSegmentsTable();
final String segmentsTable = getSegmentsTable();
log.info(
"Populating used_status_last_updated with non-NULL values for unused segments in [%s]",
"Populating column 'used_status_last_updated' with non-NULL values for unused segments in table[%s].",
segmentsTable
);

int limit = 100;
final int batchSize = 100;
int totalUpdatedEntries = 0;

// Update the rows in batches of size 100
while (true) {
List<String> segmentsToUpdate = new ArrayList<>(100);
final List<String> segmentsToUpdate = new ArrayList<>(batchSize);
int numUpdatedRows;
try {
connector.retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
segmentsToUpdate.addAll(handle.createQuery(
StringUtils.format(
"SELECT id FROM %1$s WHERE used_status_last_updated IS NULL and used = :used %2$s",
segmentsTable,
connector.limitClause(limit)
)
).bind("used", false).mapTo(String.class).list());
return null;
}
handle -> {
segmentsToUpdate.addAll(handle.createQuery(
StringUtils.format(
"SELECT id FROM %1$s WHERE used_status_last_updated IS NULL and used = :used %2$s",
segmentsTable,
connector.limitClause(batchSize)
)
).bind("used", false).mapTo(String.class).list());
return null;
}
);

if (segmentsToUpdate.isEmpty()) {
// We have no segments to process
break;
}

connector.retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
Batch updateBatch = handle.createBatch();
String sql = "UPDATE %1$s SET used_status_last_updated = '%2$s' WHERE id = '%3$s'";
String now = DateTimes.nowUtc().toString();
for (String id : segmentsToUpdate) {
updateBatch.add(StringUtils.format(sql, segmentsTable, now, id));
}
updateBatch.execute();
return null;
numUpdatedRows = connector.retryWithHandle(
handle -> {
final Batch updateBatch = handle.createBatch();
final String sql = "UPDATE %1$s SET used_status_last_updated = '%2$s' WHERE id = '%3$s'";
String now = DateTimes.nowUtc().toString();
for (String id : segmentsToUpdate) {
updateBatch.add(StringUtils.format(sql, segmentsTable, now, id));
}
int[] results = updateBatch.execute();
return Arrays.stream(results).sum();
}
);
totalUpdatedEntries += numUpdatedRows;
}
catch (Exception e) {
log.warn(e, "Population of used_status_last_updated in [%s] has failed. There may be unused segments with"
+ " NULL values for used_status_last_updated that won't be killed!", segmentsTable);
log.warn(e, "Populating column 'used_status_last_updated' in table[%s] has failed. There may be unused segments with"
+ " NULL values for 'used_status_last_updated' that won't be killed!", segmentsTable);
return;
}

totalUpdatedEntries += segmentsToUpdate.size();
log.info("Updated a batch of %d rows in [%s] with a valid used_status_last_updated date",
segmentsToUpdate.size(),
segmentsTable
log.debug(
"Updated a batch of [%d] rows in table[%s] with a valid used_status_last_updated date",
segmentsToUpdate.size(), segmentsTable
);

// Do not wait if there are no more segments to update
if (segmentsToUpdate.size() == numUpdatedRows && numUpdatedRows < batchSize) {
break;
}

// Wait for some time before processing the next batch
try {
Thread.sleep(10000);
}
Expand All @@ -420,9 +418,8 @@ public Void withHandle(Handle handle)
}
}
log.info(
"Finished updating [%s] with a valid used_status_last_updated date. %d rows updated",
segmentsTable,
totalUpdatedEntries
"Populated column 'used_status_last_updated' in table[%s] in [%d] rows.",
segmentsTable, totalUpdatedEntries
);
}

Expand Down

This file was deleted.

Loading

0 comments on commit 5f20372

Please sign in to comment.