Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EQL: Add CircuitBreaker for sequence queries #74381

Merged
merged 18 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.xpack.eql.analysis.PostAnalyzer;
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.eql.analysis.Verifier;
Expand All @@ -29,7 +29,6 @@

public class PlanExecutor {
private final Client client;
private final NamedWriteableRegistry writableRegistry;

private final IndexResolver indexResolver;
private final FunctionRegistry functionRegistry;
Expand All @@ -39,15 +38,16 @@ public class PlanExecutor {
private final Verifier verifier;
private final Optimizer optimizer;
private final Planner planner;
private final CircuitBreaker circuitBreaker;

private final Metrics metrics;


public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRegistry writeableRegistry) {
public PlanExecutor(Client client, IndexResolver indexResolver, CircuitBreaker circuitBreaker) {
this.client = client;
this.writableRegistry = writeableRegistry;

this.indexResolver = indexResolver;
this.circuitBreaker = circuitBreaker;

this.functionRegistry = new EqlFunctionRegistry();

this.metrics = new Metrics();
Expand All @@ -60,7 +60,18 @@ public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRe
}

private EqlSession newSession(EqlConfiguration cfg) {
return new EqlSession(client, cfg, indexResolver, preAnalyzer, postAnalyzer, functionRegistry, verifier, optimizer, planner, this);
return new EqlSession(
client,
cfg,
indexResolver,
preAnalyzer,
postAnalyzer,
functionRegistry,
verifier,
optimizer,
planner,
circuitBreaker
);
}

public void eql(EqlConfiguration cfg, String eql, ParserParams parserParams, ActionListener<Results> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
}

int completionStage = criteria.size() - 1;
SequenceMatcher matcher = new SequenceMatcher(completionStage, descending, maxSpan, limit);
SequenceMatcher matcher = new SequenceMatcher(completionStage, descending, maxSpan, limit, session.circuitBreaker());

TumblingWindow w = new TumblingWindow(new PITAwareQueryClient(session),
criteria.subList(0, completionStage),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@

package org.elasticsearch.xpack.eql.execution.search;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.search.SearchHit;

import java.util.Objects;

import static org.elasticsearch.xpack.eql.util.SearchHitUtils.qualifiedIndex;

public class HitReference {
public class HitReference implements Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(HitReference.class);

private final String index;
private final String id;
Expand All @@ -35,6 +39,12 @@ public String id() {
return id;
}

@Override
public long ramBytesUsed() {
// index string is cached in TumblingWindow and there is no need of accounting for it
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id);
}

@Override
public int hashCode() {
return Objects.hash(index, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

package org.elasticsearch.xpack.eql.execution.search;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import java.util.Objects;

public class Ordinal implements Comparable<Ordinal> {
public class Ordinal implements Comparable<Ordinal>, Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Ordinal.class);

private final long timestamp;
private final Comparable<Object> tiebreaker;
Expand All @@ -33,6 +37,11 @@ public long implicitTiebreaker() {
return implicitTiebreaker;
}

@Override
public long ramBytesUsed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above - since the fields are final and this method is going to be called multiple times it's worth considering caching the long (and thus increasing its memory size) to save on virtual calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's already cached, as it's computed once and saved in a static variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is incorrect - the timestamp and implicitTiebreaker need to be accounted (the tiebreaker comparable is only a reference so shallow).

Either these are accounted for or there's no point in tracking Ordinal at all.
It would be useful to see the impact the accounting takes on the call tree though some basic profiling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are accounted with the RamUsageEstimator.shallowSizeOfInstance(Ordinal.class), all 3 fields, and the tiebreaker just as an object Reference.

return SHALLOW_SIZE;
}

@Override
public int hashCode() {
return Objects.hash(timestamp, tiebreaker, implicitTiebreaker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;

Expand All @@ -18,13 +20,15 @@
* Dedicated collection for mapping a key to a list of sequences
* The list represents the sequence for each stage (based on its index) and is fixed in size
*/
class KeyToSequences {
class KeyToSequences implements Accountable {

/**
* Utility class holding the sequencegroup/until tuple that also handles
* lazy initialization.
*/
private class SequenceEntry {
private static class SequenceEntry implements Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(SequenceEntry.class);

private final SequenceGroup[] groups;
// created lazily
Expand Down Expand Up @@ -52,6 +56,16 @@ void until(Ordinal ordinal) {
}
until.add(ordinal);
}

@Override
public long ramBytesUsed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the ram used is computed in a batch, there's no point in keeping track per remote/until/add method. It's either incremental or a batch - currently it's a mix.

long size = SHALLOW_SIZE;
if (until != null) {
size += until.ramBytesUsed();
}
size += RamUsageEstimator.sizeOf(groups);
return size;
}
}

private final int listSize;
Expand Down Expand Up @@ -133,6 +147,11 @@ public void clear() {
keyToSequences.clear();
}

@Override
public long ramBytesUsed() {
return RamUsageEstimator.sizeOfMap(keyToSequences);
}

@Override
public String toString() {
return LoggerMessageFormat.format(null, "Keys=[{}]", keyToSequences.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;

Expand All @@ -15,7 +17,9 @@
/**
* A match within a sequence, holding the result and occurrence time.
*/
class Match {
class Match implements Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Match.class);

private final Ordinal ordinal;
private final HitReference hit;
Expand All @@ -33,6 +37,11 @@ HitReference hit() {
return hit;
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(ordinal) + RamUsageEstimator.sizeOf(hit);
}

@Override
public int hashCode() {
return Objects.hash(ordinal, hit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;

Expand All @@ -21,7 +23,9 @@
* List of in-flight ordinals for a given key. For fast lookup, typically associated with a stage.
* this class expects the insertion to be ordered
*/
abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
abstract class OrdinalGroup<E> implements Iterable<Ordinal>, Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(OrdinalGroup.class);

private final Function<E, Ordinal> extractor;

Expand Down Expand Up @@ -131,6 +135,11 @@ public Ordinal next() {
};
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOfCollection(elements);
}

@Override
public int hashCode() {
return elements.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.HitReference;
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
Expand All @@ -25,7 +27,9 @@
* Defined by its key and stage.
* This class is NOT immutable (to optimize memory) which means its associations need to be managed.
*/
public class Sequence implements Comparable<Sequence> {
public class Sequence implements Comparable<Sequence>, Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Sequence.class);

private final SequenceKey key;
private final int stages;
Expand Down Expand Up @@ -70,6 +74,11 @@ public List<HitReference> hits() {
return hits;
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(matches);
}

@Override
public int compareTo(Sequence o) {
return ordinal().compareTo(o.ordinal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.eql.execution.sequence;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.util.CollectionUtils;

import java.util.Arrays;
Expand All @@ -15,7 +17,9 @@

import static java.util.Collections.emptyList;

public class SequenceKey {
public class SequenceKey implements Accountable {

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(SequenceKey.class);

public static final SequenceKey NONE = new SequenceKey();

Expand All @@ -31,6 +35,11 @@ public List<Object> asList() {
return keys == null ? emptyList() : Arrays.asList(keys);
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOfObject(keys);
}

@Override
public int hashCode() {
return hashCode;
Expand Down
Loading