Skip to content

Commit

Permalink
Refactor: Cleanup test impls of ServiceEmitter (#15683)
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz authored Jan 15, 2024
1 parent 08c01f1 commit 18d2a89
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.AlertBuilder;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
Expand All @@ -90,7 +92,6 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -166,7 +167,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private TaskQueue taskQueue;
private String topic;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private ExceptionCapturingServiceEmitter serviceEmitter;
private StubServiceEmitter serviceEmitter;
private SupervisorStateManagerConfig supervisorConfig;
private KafkaSupervisorIngestionSpec ingestionSchema;

Expand Down Expand Up @@ -222,7 +223,7 @@ public void setupTest()

topic = getTopic();
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
serviceEmitter = new ExceptionCapturingServiceEmitter();
serviceEmitter = new StubServiceEmitter("KafkaSupervisorTest", "localhost");
EmittingLogger.registerEmitter(serviceEmitter);
supervisorConfig = new SupervisorStateManagerConfig();
ingestionSchema = EasyMock.createMock(KafkaSupervisorIngestionSpec.class);
Expand Down Expand Up @@ -3340,9 +3341,7 @@ public void testCheckpointForInactiveTaskGroup()

verifyAll();

Assert.assertNull(serviceEmitter.getStackTrace(), serviceEmitter.getStackTrace());
Assert.assertNull(serviceEmitter.getExceptionMessage(), serviceEmitter.getExceptionMessage());
Assert.assertNull(serviceEmitter.getExceptionClass());
Assert.assertTrue(serviceEmitter.getAlerts().isEmpty());
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -3426,18 +3425,19 @@ public void testCheckpointForUnknownTaskGroup()

verifyAll();

while (serviceEmitter.getStackTrace() == null) {
while (serviceEmitter.getAlerts().isEmpty()) {
Thread.sleep(100);
}

Assert.assertTrue(
serviceEmitter.getStackTrace().startsWith("org.apache.druid.java.util.common.ISE: Cannot find")
AlertEvent alert = serviceEmitter.getAlerts().get(0);
Assert.assertEquals(
"SeekableStreamSupervisor[testDS] failed to handle notice",
alert.getDescription()
);
Assert.assertEquals(
"Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]",
serviceEmitter.getExceptionMessage()
alert.getDataMap().get(AlertBuilder.EXCEPTION_MESSAGE_KEY)
);
Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.AlertBuilder;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
Expand All @@ -86,7 +88,6 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand All @@ -97,8 +98,6 @@
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
Expand Down Expand Up @@ -153,7 +152,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
private SeekableStreamIndexTaskClient<String, String> taskClient;
private TaskQueue taskQueue;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private ExceptionCapturingServiceEmitter serviceEmitter;
private StubServiceEmitter serviceEmitter;
private SupervisorStateManagerConfig supervisorConfig;

public KinesisSupervisorTest()
Expand Down Expand Up @@ -213,7 +212,7 @@ public void setupTest()
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
serviceEmitter = new ExceptionCapturingServiceEmitter();
serviceEmitter = new StubServiceEmitter("KinesisSupervisorTest", "localhost");
EmittingLogger.registerEmitter(serviceEmitter);
supervisorConfig = new SupervisorStateManagerConfig();
}
Expand Down Expand Up @@ -3317,9 +3316,7 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException

verifyAll();

Assert.assertNull(serviceEmitter.getStackTrace(), serviceEmitter.getStackTrace());
Assert.assertNull(serviceEmitter.getExceptionMessage(), serviceEmitter.getExceptionMessage());
Assert.assertNull(serviceEmitter.getExceptionClass());
Assert.assertTrue(serviceEmitter.getAlerts().isEmpty());
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -3443,20 +3440,19 @@ public void testCheckpointForUnknownTaskGroup()

verifyAll();

while (serviceEmitter.getStackTrace() == null) {
while (serviceEmitter.getAlerts().isEmpty()) {
Thread.sleep(100);
}

MatcherAssert.assertThat(
serviceEmitter.getStackTrace(),
CoreMatchers.startsWith("org.apache.druid.java.util.common.ISE: Cannot find taskGroup")
final AlertEvent alert = serviceEmitter.getAlerts().get(0);
Assert.assertEquals(
"SeekableStreamSupervisor[testDS] failed to handle notice",
alert.getDescription()
);

Assert.assertEquals(
"Cannot find taskGroup [0] among all activelyReadingTaskGroups [{}]",
serviceEmitter.getExceptionMessage()
alert.getDataMap().get(AlertBuilder.EXCEPTION_MESSAGE_KEY)
);
Assert.assertEquals(ISE.class.getName(), serviceEmitter.getExceptionClass());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.CachingEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
Expand Down Expand Up @@ -375,18 +375,17 @@ private static CompactionTask.CompactionTuningConfig createTuningConfig()

@Mock
private Clock clock;
private CachingEmitter emitter;
private StubServiceEmitter emitter;

@Before
public void setup()
{
final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP);
emitter = new CachingEmitter();
emitter = new StubServiceEmitter();
toolbox = makeTaskToolbox(
new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())),
testIndexIO,
SEGMENT_MAP,
emitter
SEGMENT_MAP
);
Mockito.when(clock.millis()).thenReturn(0L, 10_000L);
segmentCacheManagerFactory = new SegmentCacheManagerFactory(OBJECT_MAPPER);
Expand Down Expand Up @@ -1551,9 +1550,7 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio
new PeriodGranularity(Period.months(3), null, null),
BatchIOConfig.DEFAULT_DROP_EXISTING
);
Assert.assertEquals(10_000L, emitter.getLastEmittedEvent().toMap().get("value"));
Assert.assertEquals("compact/segmentAnalyzer/fetchAndProcessMillis", emitter.getLastEmittedEvent().toMap().get("metric"));
Assert.assertEquals("metrics", emitter.getLastEmittedEvent().getFeed());
emitter.verifyValue("compact/segmentAnalyzer/fetchAndProcessMillis", 10_000L);
}

@Test
Expand Down Expand Up @@ -1929,11 +1926,10 @@ public ListenableFuture<List<DataSegment>> fetchUsedSegments(
}
}

private static TaskToolbox makeTaskToolbox(
private TaskToolbox makeTaskToolbox(
TaskActionClient taskActionClient,
IndexIO indexIO,
Map<DataSegment, File> segments,
CachingEmitter emitter
Map<DataSegment, File> segments
)
{
final SegmentCacheManager segmentCacheManager = new NoopSegmentCacheManager()
Expand Down Expand Up @@ -1974,7 +1970,7 @@ public void cleanup(DataSegment segment)
.segmentCacheManager(segmentCacheManager)
.taskLogPusher(null)
.attemptId("1")
.emitter(new ServiceEmitter("service", "host", emitter))
.emitter(emitter)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
Expand Down Expand Up @@ -130,8 +129,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@RunWith(Parameterized.class)
public class IndexTaskTest extends IngestionTestBase
Expand Down Expand Up @@ -1368,13 +1365,10 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() throws IOExc
}

@Test
public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws IOException, InterruptedException
public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws IOException
{
final File tmpDir = temporaryFolder.newFolder();

LatchableServiceEmitter latchEmitter = new LatchableServiceEmitter();
latchEmitter.latch = new CountDownLatch(1);

TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);

DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
Expand Down Expand Up @@ -1414,16 +1408,17 @@ public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws IOExcepti
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
.andReturn(new NoopSegmentHandoffNotifierFactory())
.once();
final StubServiceEmitter emitter = new StubServiceEmitter("IndexTaskTest", "localhost");
EasyMock.expect(mockToolbox.getEmitter())
.andReturn(latchEmitter).anyTimes();
.andReturn(emitter).anyTimes();

EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();

EasyMock.replay(mockToolbox);
EasyMock.replay(mockDataSegment1, mockDataSegment2);

Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000));
latchEmitter.latch.await(300000, TimeUnit.MILLISECONDS);
emitter.verifyEmitted("task/segmentAvailability/wait/time", 1);
EasyMock.verify(mockToolbox);
EasyMock.verify(mockDataSegment1, mockDataSegment2);
}
Expand Down Expand Up @@ -3026,27 +3021,6 @@ private static IndexIngestionSpec createIngestionSpec(
}
}

/**
* Used to test that expected metric is emitted by AbstractBatchIndexTask#waitForSegmentAvailability
*/
private static class LatchableServiceEmitter extends ServiceEmitter
{
private CountDownLatch latch;

private LatchableServiceEmitter()
{
super("", "", null);
}

@Override
public void emit(Event event)
{
if (latch != null && "task/segmentAvailability/wait/time".equals(event.toMap().get("metric"))) {
latch.countDown();
}
}
}

@Test
public void testEqualsAndHashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,9 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.CachingEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
Expand Down Expand Up @@ -711,7 +710,7 @@ public File getStorageDirectory()
.shuffleClient(new LocalShuffleClient(intermediaryDataManager))
.taskLogPusher(null)
.attemptId("1")
.emitter(new ServiceEmitter("service", "host", new CachingEmitter()))
.emitter(new StubServiceEmitter())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie
private final List<AlertEvent> alertEvents = new ArrayList<>();
private final Map<String, List<ServiceMetricEvent>> metricEvents = new HashMap<>();

public StubServiceEmitter()
{
super("testing", "localhost", null);
}

public StubServiceEmitter(String service, String host)
{
super(service, host, null);
Expand Down
Loading

0 comments on commit 18d2a89

Please sign in to comment.