Skip to content

Commit

Permalink
Refactor: Add common method in AbstractBatchIndexTask to create inges…
Browse files Browse the repository at this point in the history
…tion stats report (#16202)

Changes
-  No functional changes
- Add method `AbstractBatchIndexTask.buildIngestionStatsReport()` used in several batch tasks
- Add utility method `AbstractBatchIndexTask.addBuildSegmentStatsToReport()`
- Use boolean argument to represent a full report instead of the String `full` 
in internal methods. (REST API remains unchanged.)
- Rename `IngestionStatsAndErrorsTaskReportData` to `IngestionStatsAndErrors`
- Clean up some of the methods
  • Loading branch information
kfaraz committed Mar 28, 2024
1 parent 3471352 commit 4df4896
Show file tree
Hide file tree
Showing 25 changed files with 470 additions and 877 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.IndexTaskTest;
Expand Down Expand Up @@ -1615,7 +1615,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
newDataSchemaMetadata()
);

IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
IngestionStatsAndErrors reportData = getTaskReportData();

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Expand Down Expand Up @@ -1695,7 +1695,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());

IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
IngestionStatsAndErrors reportData = getTaskReportData();

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Expand Down Expand Up @@ -3061,7 +3061,7 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception
);

// Verify unparseable data
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
IngestionStatsAndErrors reportData = getTaskReportData();

ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
Expand Down Expand Up @@ -3233,7 +3233,7 @@ public void testCompletionReportPartitionStats() throws Exception
TaskStatus status = future.get();

Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L);
}
Expand Down Expand Up @@ -3281,7 +3281,7 @@ public void testCompletionReportMultiplePartitionStats() throws Exception
TaskStatus status = future.get();

Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
Expand Down Expand Up @@ -1182,7 +1182,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
newDataSchemaMetadata()
);

IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
IngestionStatsAndErrors reportData = getTaskReportData();

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Expand Down Expand Up @@ -1268,7 +1268,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());

IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
IngestionStatsAndErrors reportData = getTaskReportData();

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,19 @@
import java.util.Map;
import java.util.Objects;

public class IngestionStatsAndErrorsTaskReportData
public class IngestionStatsAndErrors
{
@JsonProperty
private IngestionState ingestionState;

@JsonProperty
private Map<String, Object> unparseableEvents;

@JsonProperty
private Map<String, Object> rowStats;

@JsonProperty
@Nullable
private String errorMsg;

@JsonProperty
private boolean segmentAvailabilityConfirmed;

@JsonProperty
private long segmentAvailabilityWaitTimeMs;

@JsonProperty
private Map<String, Long> recordsProcessed;

@JsonProperty
private Long segmentsRead;
@JsonProperty
private Long segmentsPublished;

public IngestionStatsAndErrorsTaskReportData(
private final IngestionState ingestionState;
private final Map<String, Object> unparseableEvents;
private final Map<String, Object> rowStats;
private final String errorMsg;
private final boolean segmentAvailabilityConfirmed;
private final long segmentAvailabilityWaitTimeMs;
private final Map<String, Long> recordsProcessed;
private final Long segmentsRead;
private final Long segmentsPublished;

public IngestionStatsAndErrors(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@JsonProperty("rowStats") Map<String, Object> rowStats,
Expand Down Expand Up @@ -139,12 +122,12 @@ public Long getSegmentsPublished()
return segmentsPublished;
}

public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports(
public static IngestionStatsAndErrors getPayloadFromTaskReports(
Map<String, TaskReport> taskReports
)
{
return (IngestionStatsAndErrorsTaskReportData) taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
.getPayload();
return (IngestionStatsAndErrors) taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
.getPayload();
}

@Override
Expand All @@ -156,7 +139,7 @@ public boolean equals(Object o)
if (o == null || getClass() != o.getClass()) {
return false;
}
IngestionStatsAndErrorsTaskReportData that = (IngestionStatsAndErrorsTaskReportData) o;
IngestionStatsAndErrors that = (IngestionStatsAndErrors) o;
return getIngestionState() == that.getIngestionState() &&
Objects.equals(getUnparseableEvents(), that.getUnparseableEvents()) &&
Objects.equals(getRowStats(), that.getRowStats()) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public class IngestionStatsAndErrorsTaskReport implements TaskReport
private final String taskId;

@JsonProperty
private final IngestionStatsAndErrorsTaskReportData payload;
private final IngestionStatsAndErrors payload;

@JsonCreator
public IngestionStatsAndErrorsTaskReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("payload") IngestionStatsAndErrorsTaskReportData payload
@JsonProperty("payload") IngestionStatsAndErrors payload
)
{
this.taskId = taskId;
Expand All @@ -57,7 +57,7 @@ public String getReportKey()
}

@Override
public IngestionStatsAndErrorsTaskReportData getPayload()
public IngestionStatsAndErrors getPayload()
{
return payload;
}
Expand Down
Loading

0 comments on commit 4df4896

Please sign in to comment.