Skip to content

Commit

Permalink
EQL: Add CircuitBreaker for sequence queries (#74381) (#74683)
Browse files Browse the repository at this point in the history
The sequence matching algorithm holds some structures to keep track of
the matched and potentially matching sequences of events. When large
amount of events, sequence stages needs to be processed but also when
the requested size of the query (number of sequences to return) is large,
those structure can potentially increase the memory footprint.

Add a CircuitBreaker which can be configured through cluster settings,
which accounts for the memory used during the execution of a sequence
query. The memory accounting takes place every fetch_size number
of processed events (docs), to avoid significant performance overhead.

(cherry picked from commit c6f0fb8)
  • Loading branch information
matriv committed Jun 29, 2021
1 parent 7f4df16 commit 5745131
Show file tree
Hide file tree
Showing 17 changed files with 414 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.xpack.eql.analysis.PostAnalyzer;
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.eql.analysis.Verifier;
Expand All @@ -29,7 +29,6 @@

public class PlanExecutor {
private final Client client;
private final NamedWriteableRegistry writableRegistry;

private final IndexResolver indexResolver;
private final FunctionRegistry functionRegistry;
Expand All @@ -39,15 +38,16 @@ public class PlanExecutor {
private final Verifier verifier;
private final Optimizer optimizer;
private final Planner planner;
private final CircuitBreaker circuitBreaker;

private final Metrics metrics;


public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRegistry writeableRegistry) {
public PlanExecutor(Client client, IndexResolver indexResolver, CircuitBreaker circuitBreaker) {
this.client = client;
this.writableRegistry = writeableRegistry;

this.indexResolver = indexResolver;
this.circuitBreaker = circuitBreaker;

this.functionRegistry = new EqlFunctionRegistry();

this.metrics = new Metrics();
Expand All @@ -60,7 +60,18 @@ public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRe
}

private EqlSession newSession(EqlConfiguration cfg) {
return new EqlSession(client, cfg, indexResolver, preAnalyzer, postAnalyzer, functionRegistry, verifier, optimizer, planner, this);
return new EqlSession(
client,
cfg,
indexResolver,
preAnalyzer,
postAnalyzer,
functionRegistry,
verifier,
optimizer,
planner,
circuitBreaker
);
}

public void eql(EqlConfiguration cfg, String eql, ParserParams parserParams, ActionListener<Results> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
}

int completionStage = criteria.size() - 1;
SequenceMatcher matcher = new SequenceMatcher(completionStage, descending, maxSpan, limit);
SequenceMatcher matcher = new SequenceMatcher(completionStage, descending, maxSpan, limit, session.circuitBreaker());

TumblingWindow w = new TumblingWindow(new PITAwareQueryClient(session),
criteria.subList(0, completionStage),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@

package org.elasticsearch.xpack.eql.execution.search;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.search.SearchHit;

import java.util.Objects;

import static org.elasticsearch.xpack.eql.util.SearchHitUtils.qualifiedIndex;

public class HitReference {
public class HitReference implements Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(HitReference.class);

private final String index;
private final String id;
Expand All @@ -35,6 +39,12 @@ public String id() {
return id;
}

@Override
public long ramBytesUsed() {
// index string is cached in TumblingWindow and there is no need of accounting for it
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id);
}

@Override
public int hashCode() {
return Objects.hash(index, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

package org.elasticsearch.xpack.eql.execution.search;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import java.util.Objects;

public class Ordinal implements Comparable<Ordinal> {
public class Ordinal implements Comparable<Ordinal>, Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Ordinal.class);

private final long timestamp;
private final Comparable<Object> tiebreaker;
Expand All @@ -33,6 +37,11 @@ public long implicitTiebreaker() {
return implicitTiebreaker;
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE;
}

@Override
public int hashCode() {
return Objects.hash(timestamp, tiebreaker, implicitTiebreaker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;

Expand All @@ -18,13 +20,15 @@
* Dedicated collection for mapping a key to a list of sequences
* The list represents the sequence for each stage (based on its index) and is fixed in size
*/
class KeyToSequences {
class KeyToSequences implements Accountable {

/**
* Utility class holding the sequencegroup/until tuple that also handles
* lazy initialization.
*/
private class SequenceEntry {
private static class SequenceEntry implements Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(SequenceEntry.class);

private final SequenceGroup[] groups;
// created lazily
Expand Down Expand Up @@ -52,6 +56,16 @@ void until(Ordinal ordinal) {
}
until.add(ordinal);
}

@Override
public long ramBytesUsed() {
long size = SHALLOW_SIZE;
if (until != null) {
size += until.ramBytesUsed();
}
size += RamUsageEstimator.sizeOf(groups);
return size;
}
}

private final int listSize;
Expand Down Expand Up @@ -133,6 +147,11 @@ public void clear() {
keyToSequences.clear();
}

@Override
public long ramBytesUsed() {
return RamUsageEstimator.sizeOfMap(keyToSequences);
}

@Override
public String toString() {
return LoggerMessageFormat.format(null, "Keys=[{}]", keyToSequences.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;

Expand All @@ -15,7 +17,9 @@
/**
* A match within a sequence, holding the result and occurrence time.
*/
class Match {
class Match implements Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Match.class);

private final Ordinal ordinal;
private final HitReference hit;
Expand All @@ -33,6 +37,11 @@ HitReference hit() {
return hit;
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(ordinal) + RamUsageEstimator.sizeOf(hit);
}

@Override
public int hashCode() {
return Objects.hash(ordinal, hit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;

Expand All @@ -21,7 +23,9 @@
* List of in-flight ordinals for a given key. For fast lookup, typically associated with a stage.
* this class expects the insertion to be ordered
*/
abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
abstract class OrdinalGroup<E> implements Iterable<Ordinal>, Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(OrdinalGroup.class);

private final Function<E, Ordinal> extractor;

Expand Down Expand Up @@ -131,6 +135,11 @@ public Ordinal next() {
};
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOfCollection(elements);
}

@Override
public int hashCode() {
return elements.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
Expand All @@ -25,7 +27,9 @@
* Defined by its key and stage.
* This class is NOT immutable (to optimize memory) which means its associations need to be managed.
*/
public class Sequence implements Comparable<Sequence> {
public class Sequence implements Comparable<Sequence>, Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Sequence.class);

private final SequenceKey key;
private final int stages;
Expand Down Expand Up @@ -70,6 +74,11 @@ public List<HitReference> hits() {
return hits;
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(matches);
}

@Override
public int compareTo(Sequence o) {
return ordinal().compareTo(o.ordinal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.util.CollectionUtils;

import java.util.Arrays;
Expand All @@ -15,7 +17,9 @@

import static java.util.Collections.emptyList;

public class SequenceKey {
public class SequenceKey implements Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(SequenceKey.class);

public static final SequenceKey NONE = new SequenceKey();

Expand All @@ -31,6 +35,11 @@ public List<Object> asList() {
return keys == null ? emptyList() : Arrays.asList(keys);
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOfObject(keys);
}

@Override
public int hashCode() {
return hashCode;
Expand Down
Loading

0 comments on commit 5745131

Please sign in to comment.