Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refines secondary sampling feature test to be realistic #916

Merged
merged 1 commit into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 91 additions & 54 deletions brave/src/test/java/brave/features/propagation/SecondarySampling.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,85 +16,118 @@
*/
package brave.features.propagation;

import brave.Tracing;
import brave.handler.MutableSpan;
import brave.propagation.B3SinglePropagation;
import brave.propagation.Propagation.Getter;
import brave.propagation.Propagation.KeyFactory;
import brave.propagation.Propagation.Setter;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.Sampler;
import com.google.common.base.Splitter;
import brave.sampler.RateLimitingSampler;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public final class SecondarySampling {
public static final class FinishedSpanHandler extends brave.handler.FinishedSpanHandler {
final Map<String, brave.handler.FinishedSpanHandler> configuredHandlers;

public FinishedSpanHandler(Map<String, brave.handler.FinishedSpanHandler> configuredHandlers) {
this.configuredHandlers = configuredHandlers;
public static SecondarySampling create() {
return new SecondarySampling();
}

Propagation.Factory propagationFactory = B3SinglePropagation.FACTORY;
final Map<String, brave.handler.FinishedSpanHandler> systemToHandlers = new ConcurrentHashMap<>();

SecondarySampling() {
}

public SecondarySampling putSystem(String system, brave.handler.FinishedSpanHandler handler) {
systemToHandlers.put(system, handler);
return this;
}

public SecondarySampling removeSystem(String system) {
systemToHandlers.remove(system);
return this;
}

public Tracing build(Tracing.Builder builder) {
return builder
.addFinishedSpanHandler(new FinishedSpanHandler(systemToHandlers))
// BRAVE6: we need a better collaboration model for propagation than wrapping, as it makes
// configuration complex.
.propagationFactory(new PropagationFactory(propagationFactory, systemToHandlers.keySet()))
.build();
}

static final class FinishedSpanHandler extends brave.handler.FinishedSpanHandler {
final Map<String, brave.handler.FinishedSpanHandler> systemToHandlers;

FinishedSpanHandler(Map<String, brave.handler.FinishedSpanHandler> systemToHandlers) {
this.systemToHandlers = systemToHandlers;
}

@Override public boolean handle(TraceContext context, MutableSpan span) {
Extra extra = context.findExtra(Extra.class);
if (extra == null) return true;
for (String state : extra.states.keySet()) {
brave.handler.FinishedSpanHandler handler = configuredHandlers.get(state);
if (handler != null) handler.handle(context, span);
for (String system : extra.systems.keySet()) {
brave.handler.FinishedSpanHandler handler = systemToHandlers.get(system);
if (handler != null && "1".equals(extra.systems.get(system).get("sampled"))) {
handler.handle(context, span);
}
}
return true;
}
}

public static final class PropagationFactory extends Propagation.Factory {
static final class PropagationFactory extends Propagation.Factory {
final Propagation.Factory delegate;
final Set<String> configuredStates;
final Set<String> configuredSystems;

PropagationFactory(Propagation.Factory delegate, Set<String> configuredStates) {
PropagationFactory(Propagation.Factory delegate, Set<String> configuredSystems) {
this.delegate = delegate;
this.configuredStates = configuredStates;
this.configuredSystems = configuredSystems;
}

@Override public boolean supportsJoin() {
return delegate.supportsJoin();
}

@Override public <K> Propagation<K> create(KeyFactory<K> keyFactory) {
return new Propagation<>(delegate.create(keyFactory), keyFactory, configuredStates);
return new Propagation<>(delegate.create(keyFactory), keyFactory, configuredSystems);
}
}

static final class Extra {
Map<String, Map<String, String>> states = new LinkedHashMap<>();
final Map<String, Map<String, String>> systems = new LinkedHashMap<>();

@Override public String toString() {
return states.entrySet()
.stream()
.map(s -> s.getKey() + ":" + Extra.toString(s.getValue()))
.collect(Collectors.joining(";"));
return systems.entrySet()
.stream()
.map(s -> s.getKey() + ":" + Extra.toString(s.getValue()))
.collect(Collectors.joining(";"));
}

static String toString(Map<String, String> s) {
return s.entrySet()
.stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(","));
.stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(","));
}
}

static class Propagation<K> implements brave.propagation.Propagation<K> {

final brave.propagation.Propagation<K> delegate;
final Set<String> configuredStates;
final Set<String> configuredSystems;
final K samplingKey;

Propagation(brave.propagation.Propagation<K> delegate, KeyFactory<K> keyFactory,
Set<String> configuredStates) {
Set<String> configuredSystems) {
this.delegate = delegate;
this.configuredStates = configuredStates;
this.configuredSystems = configuredSystems;
this.samplingKey = keyFactory.create("sampling");
}

Expand Down Expand Up @@ -126,21 +159,21 @@ static final class Injector<C, K> implements TraceContext.Injector<C> {
@Override public void inject(TraceContext traceContext, C carrier) {
delegate.inject(traceContext, carrier);
Extra sampled = traceContext.findExtra(Extra.class);
if (sampled == null || sampled.states.isEmpty()) return;
if (sampled == null || sampled.systems.isEmpty()) return;
setter.put(carrier, samplingKey, sampled.toString());
}
}

static final class Extractor<C, K> implements TraceContext.Extractor<C> {
final TraceContext.Extractor<C> delegate;
final Getter<C, K> getter;
final Set<String> configuredStates;
final Set<String> configuredSystems;
final K samplingKey;

Extractor(Propagation<K> propagation, Getter<C, K> getter) {
this.delegate = propagation.delegate.extractor(getter);
this.getter = getter;
this.configuredStates = propagation.configuredStates;
this.configuredSystems = propagation.configuredSystems;
this.samplingKey = propagation.samplingKey;
}

Expand All @@ -152,44 +185,48 @@ static final class Extractor<C, K> implements TraceContext.Extractor<C> {
TraceContextOrSamplingFlags.Builder builder = result.toBuilder().addExtra(extra);
if (maybeValue == null) return builder.build();

for (String entry : Splitter.on(";").split(maybeValue)) {
String[] nameValue = entry.split(":");
for (String entry : maybeValue.split(";", 100)) {
String[] nameValue = entry.split(":", 2);
String name = nameValue[0];
Map<String, String> state = Splitter.on(",").withKeyValueSeparator("=").split(nameValue[1]);

if (configuredStates.contains(name)) {
state = new LinkedHashMap<>(state); // make mutable
if (update(state)) {
if (state.get("sampled").equals("1")) {
builder.sampledLocal(); // this allows us to override the default decision
}
extra.states.put(name, state);
}
} else {
extra.states.put(name, state);

Map<String, String> systemToState = parseSystem(nameValue[1]);
if (configuredSystems.contains(name) && updateStateAndSample(systemToState)) {
builder.sampledLocal(); // this means data will be recorded
}
if (!systemToState.isEmpty()) extra.systems.put(name, systemToState);
}

return builder.build();
}
}

static boolean update(Map<String, String> state) {
// if there's a rate, convert it to a sampling decision
String rate = state.remove("rate");
if (rate != null) {
static Map<String, String> parseSystem(String system) {
Map<String, String> result = new LinkedHashMap<>();
for (String entry : system.split(",", 100)) {
String[] nameValue = entry.split("=", 2);
result.put(nameValue[0], nameValue[1]);
}
return result;
}

static boolean updateStateAndSample(Map<String, String> state) {
// if there's a tps, convert it to a sampling decision
String tps = state.remove("tps");
if (tps != null) {
// in real life the sampler would be cached
boolean decision = Sampler.create(Float.parseFloat(rate)).isSampled(1L);
boolean decision = RateLimitingSampler.create(Integer.parseInt(tps)).isSampled(1L);
state.put("sampled", decision ? "1" : "0");
} else if (state.containsKey("ttl")) {
// decrement ttl if there is one
return decision;
}

if (state.containsKey("ttl")) { // decrement ttl if there is one
String ttl = state.remove("ttl");
if (ttl != null && !ttl.equals("1")) {
state.put("ttl", Integer.toString(Integer.parseInt(ttl) - 1));
if (ttl.equals("1")) {
state.remove("sampled");
} else {
return false; // remove the out-dated decision
state.put("ttl", Integer.toString(Integer.parseInt(ttl) - 1));
}
}
return true;
return "1".equals(state.get("sampled"));
}
}
Loading