Skip to content

Commit

Permalink
Switching emitter. This will allow for a per feed emitter designation. (
Browse files Browse the repository at this point in the history
#13363)

* Switching emitter. This will allow for a per feed emitter designation.

This will work by looking at an event's feed and direct it to a specific emitter. If no specific feed is specified for a feed.
The emitter can direct the event to a default emitter.

* fix checkstyle issues and make docs for switching emitter use basic event feeds

* fix broken docs, add test, and guard against misconfigurations

* add module test
add switching emitter module test

* fix broken SwitchingEmitterModuleTest

* add apache license to top of test

* fix checkstyle issues

* address comments by adding javadocs, removing a todo, and making druid docs more clear
  • Loading branch information
TSFenwick committed Dec 5, 2022
1 parent 45a8fa2 commit 10bec54
Show file tree
Hide file tree
Showing 7 changed files with 579 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.emitter.core;

import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* An emitter than that offers the ability to direct an event to multiple emitters based on the event's feed.
*/
public class SwitchingEmitter implements Emitter
{

private static final Logger log = new Logger(SwitchingEmitter.class);

private final Emitter[] defaultEmitters;

private final Map<String, List<Emitter>> feedToEmitters;
private final Set<Emitter> knownEmitters;

/**
* Constructor for the SwitchingEmitter
*
* @param feedToEmitters Map of feed to a list of emitters that correspond to each feed,
* @param defaultEmitter A list of emitters to use if there isn't a match of feed to an emitter
*/
public SwitchingEmitter(Map<String, List<Emitter>> feedToEmitters, Emitter[] defaultEmitter)
{
this.feedToEmitters = feedToEmitters;
this.defaultEmitters = defaultEmitter;
ImmutableSet.Builder<Emitter> emittersSetBuilder = new ImmutableSet.Builder<>();
emittersSetBuilder.addAll(Arrays.stream(defaultEmitter).iterator());
for (List<Emitter> emitterList : feedToEmitters.values()) {
for (Emitter emitter : emitterList) {
emittersSetBuilder.add(emitter);
}
}
this.knownEmitters = emittersSetBuilder.build();
}

/**
* Start the emitter. This will start all the emitters the SwitchingEmitter uses.
*/
@Override
@LifecycleStart
public void start()
{
log.info("Starting Switching Emitter.");

for (Emitter e : knownEmitters) {
log.info("Starting emitter %s.", e.getClass().getName());
e.start();
}
}

/**
* Emit an event. This method must not throw exceptions or block. The emitters that this uses must also not throw
* exceptions or block.
* <p>
* This emitter will direct events based on feed to a list of emitters specified. If there is no match the event will
* use a list of default emitters instead.
* <p>
* Emitters that this emitter uses that receive too many events and internal queues fill up, should drop events rather
* than blocking or consuming excessive memory.
* <p>
* If an emitter that this emitter uses receives input it considers to be invalid, or has an internal problem, it
* should deal with that by logging a warning rather than throwing an exception. Emitters that log warnings
* should consider throttling warnings to avoid excessive logs, since a busy Druid cluster can emit a high volume of
* events.
*
* @param event The event that will be emitted.
*/
@Override
public void emit(Event event)
{
// linear search is likely faster than hashed lookup
for (Map.Entry<String, List<Emitter>> feedToEmitters : feedToEmitters.entrySet()) {
if (feedToEmitters.getKey().equals(event.getFeed())) {
for (Emitter emitter : feedToEmitters.getValue()) {
emitter.emit(event);
}
return;
}
}
for (Emitter emitter : defaultEmitters) {
emitter.emit(event);
}
}

/**
* Triggers this emitter to tell all emitters that this uses to flush.
* @throws IOException
*/
@Override
public void flush() throws IOException
{
boolean fail = false;
log.info("Flushing Switching Emitter.");

for (Emitter e : knownEmitters) {
try {
log.info("Flushing emitter %s.", e.getClass().getName());
e.flush();
}
catch (IOException ex) {
log.error(ex, "Failed to flush emitter [%s]", e.getClass().getName());
fail = true;
}
}

if (fail) {
throw new IOException("failed to flush one or more emitters");
}
}

/**
* Closes all emitters that the SwitchingEmitter uses
* @throws IOException
*/
@Override
@LifecycleStop
public void close() throws IOException
{
boolean fail = false;
log.info("Closing Switching Emitter.");

for (Emitter e : knownEmitters) {
try {
log.info("Closing emitter %s.", e.getClass().getName());
e.close();
}
catch (IOException ex) {
log.error(ex, "Failed to close emitter [%s]", e.getClass().getName());
fail = true;
}
}

if (fail) {
throw new IOException("failed to close one or more emitters");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.emitter.core;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class SwitchingEmitterTest
{

private static final String FEED_1 = "feed1";
private static final String FEED_2 = "feed2";
private static final String FEED_3 = "feed3";
private SwitchingEmitter switchingEmitter;

private Map<String, List<Emitter>> emitters;
private List<Emitter> defaultEmitters;

private Emitter feed1Emitter1;
private Emitter feed1Emitter2;
private Emitter feed2Emitter1;
private Emitter feed1AndFeed3Emitter;

private Set<Emitter> allEmitters;

@Before
public void setup()
{
this.defaultEmitters = ImmutableList.of(
EasyMock.createMock(Emitter.class),
EasyMock.createMock(Emitter.class)
);
this.feed1Emitter1 = EasyMock.createMock(Emitter.class);
this.feed1Emitter2 = EasyMock.createMock(Emitter.class);
this.feed2Emitter1 = EasyMock.createMock(Emitter.class);
this.feed1AndFeed3Emitter = EasyMock.createMock(Emitter.class);
this.emitters = ImmutableMap.of(FEED_1, ImmutableList.of(feed1Emitter1, feed1Emitter2, feed1AndFeed3Emitter),
FEED_2, ImmutableList.of(feed2Emitter1),
FEED_3, ImmutableList.of(feed1AndFeed3Emitter));

allEmitters = new HashSet<>();
allEmitters.addAll(defaultEmitters);
for (List<Emitter> feedEmitters : emitters.values()) {
allEmitters.addAll(feedEmitters);
}
this.switchingEmitter = new SwitchingEmitter(emitters, defaultEmitters.toArray(new Emitter[0]));
}

@Test
public void testStart()
{
for (Emitter emitter : allEmitters) {
emitter.start();
EasyMock.replay(emitter);
}

switchingEmitter.start();
}

@Test
public void testEmit()
{
// test emitting events to all 3 feeds and default emitter
Event feed1Event = EasyMock.createMock(Event.class);
Event feed2Event = EasyMock.createMock(Event.class);
Event feed3Event = EasyMock.createMock(Event.class);
Event eventWithNoMatchingFeed = EasyMock.createMock(Event.class);

EasyMock.expect(feed1Event.getFeed()).andReturn(FEED_1).anyTimes();
EasyMock.expect(feed2Event.getFeed()).andReturn(FEED_2).anyTimes();
EasyMock.expect(feed3Event.getFeed()).andReturn(FEED_3).anyTimes();
EasyMock.expect(eventWithNoMatchingFeed.getFeed()).andReturn("no-real-feed").anyTimes();
EasyMock.replay(feed1Event, feed2Event, feed3Event, eventWithNoMatchingFeed);

for (Emitter emitter : defaultEmitters) {
emitter.emit(eventWithNoMatchingFeed);
}
for (Emitter emitter : emitters.get("feed1")) {
emitter.emit(feed1Event);
}
for (Emitter emitter : emitters.get("feed2")) {
emitter.emit(feed2Event);
}
for (Emitter emitter : emitters.get("feed3")) {
emitter.emit(feed3Event);
}
for (Emitter emitter : allEmitters) {
EasyMock.replay(emitter);
}

switchingEmitter.emit(feed1Event);
switchingEmitter.emit(feed2Event);
switchingEmitter.emit(feed3Event);
switchingEmitter.emit(eventWithNoMatchingFeed);
}

@Test
public void testFlush() throws IOException
{
for (Emitter emitter : allEmitters) {
emitter.flush();
EasyMock.replay(emitter);
}

switchingEmitter.flush();
}

@Test
public void testClose() throws IOException
{
for (Emitter emitter : allEmitters) {
emitter.close();
EasyMock.replay(emitter);
}

switchingEmitter.close();
}

@After
public void tearDown()
{
for (Emitter emitter : allEmitters) {
EasyMock.verify(emitter);
}
}
}
9 changes: 9 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ There are several emitters available:
- [`parametrized`](#parametrized-http-emitter-module) operates like the `http` emitter but fine-tunes the recipient URL based on the event feed.
- [`composing`](#composing-emitter-module) initializes multiple emitter modules.
- [`graphite`](#graphite-emitter) emits metrics to a [Graphite](https://graphiteapp.org/) Carbon service.
- [`switching`](#switching-emitter) initializes and emits to multiple emitter modules based on the event feed.

##### Logging Emitter Module

Expand Down Expand Up @@ -483,6 +484,14 @@ Instead use `recipientBaseUrlPattern` described in the table below.

To use graphite as emitter set `druid.emitter=graphite`. For configuration details, see [Graphite emitter](../development/extensions-contrib/graphite.md) for the Graphite emitter Druid extension.

##### Switching Emitter

To use switching as emitter set `druid.emitter=switching`.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter.switching.emitters`|JSON map of feed to list of emitter modules that will be used for the mapped feed, e.g., {"metrics":["http"], "alerts":["logging"]}|{}|
|`druid.emitter.switching.defaultEmitters`|JSON list of emitter modules to load that will be used if there is no emitter specifically designated for that event's feed, e.g., ["logging","http"].|[]|

### Metadata storage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void configure(Binder binder)
binder.install(new HttpEmitterModule());
binder.install(new ParametrizedUriEmitterModule());
binder.install(new ComposingEmitterModule());
binder.install(new SwitchingEmitterModule());

binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(LazySingleton.class);

Expand Down
Loading

0 comments on commit 10bec54

Please sign in to comment.