-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EQL: Add CircuitBreaker for sequence queries #74381
Conversation
@@ -34,23 +40,42 @@ | |||
this.groups = new SequenceGroup[stages]; | |||
} | |||
|
|||
void add(int stage, Sequence sequence) { | |||
long add(int stage, Sequence sequence) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accounting/ram usage should not leak into the return values of add/remove/until.
} | ||
|
||
@Override | ||
public long ramBytesUsed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the ram used is computed in a batch, there's no point in keeping track per remote/until/add method. It's either incremental or a batch - currently it's a mix.
|
||
stats.seen++; | ||
|
||
bytesUsed += sequence.ramBytesUsed(); | ||
addAccountedMemory(bytesUsed, "matcher_sequence"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put the labels ("matcher_sequence") into a dedicated class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some few comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good start however I see a number of issues:
- The estimation is too intrusive.
Modifying the method signature that work on adding/removing matches to return estimates is a no-go. The estimation should be 'invisible' to the logic.
- Too expensive
Most objects are really light - a couple of strings and some ints. Whether it's estimating on the fly the object tree or caching its result this adds significant overhead (whether it's method calls or memory usage) since it's per object.
Further more the memory is evaluated for every hit which is accurate but expensive.
I think moving the estimation outside the object will work better (since the index strings are cached for example which is better addressed across objects instead of per instance) and can be estimated and thus cached.
That's because for example the doc id has typically the same length (so it's the same size across hits), the tiebreaker is the same across all hits while the timestamp is typically the same per batch of results.
Moreover the breaker check can be done per batch of entries (say every 100/256/500/1k hits) instead of every single one.
|
||
TumblingWindow w = new TumblingWindow(new PITAwareQueryClient(session), | ||
criteria.subList(0, completionStage), | ||
criteria.get(completionStage), | ||
matcher); | ||
matcher, | ||
session.circuitBreaker()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last parameter can be extracted in the TumblingWindow constructor from the matcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
matcher doesn't have circuitBreaker anymore, as all the calculations are done in the TumblingWindow
.
@@ -33,6 +37,11 @@ public String id() { | |||
return id; | |||
} | |||
|
|||
@Override | |||
public long ramBytesUsed() { | |||
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(index) + RamUsageEstimator.sizeOf(id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this doesn't change during the lifecycle of the object, it can be computed as a constant inside the constructor.
As a separate concern, the index string is cached during the same request to reduce object churn which the estimation should take into account otherwise it overestimates. Which against a lot of objects becomes an issue.
@@ -33,6 +37,15 @@ public long implicitTiebreaker() { | |||
return implicitTiebreaker; | |||
} | |||
|
|||
@Override | |||
public long ramBytesUsed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above - since the fields are final and this method is going to be called multiple times it's worth considering caching the long (and thus increasing its memory size) to save on virtual calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's already cached, as it's computed once and saved in a static variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method is incorrect - the timestamp and implicitTiebreaker need to be accounted (the tiebreaker comparable is only a reference so shallow).
Either these are accounted for or there's no point in tracking Ordinal at all.
It would be useful to see the impact the accounting takes on the call tree though some basic profiling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are accounted with the RamUsageEstimator.shallowSizeOfInstance(Ordinal.class)
, all 3 fields, and the tiebreaker just as an object Reference.
This is reverted in favour of calling a "batch" estimation after the calls to
This is done only in
Could you please provide some details on this idea? what do you mean "outside the object"? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good - sound be moved to a proper PR and shared with the rest of the group.
|
||
private final Logger log = LogManager.getLogger(SequenceMatcher.class); | ||
|
||
static class Stats { | ||
static class Stats implements Accountable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to count this class since it's just one per matcher.
public class Limit { | ||
public class Limit implements Accountable { | ||
|
||
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Limit.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe there's only one instance of this class so no need to account for it.
Pinging @elastic/es-ql (Team:QL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left another round of comments essentially around the circuit breaker directly into SequenceMatcher (which contains all the hooks and data structure needed).
@@ -16,7 +16,7 @@ | |||
|
|||
import static java.util.Collections.emptyList; | |||
|
|||
public class Limit { | |||
public class Limit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be removed.
@@ -12,6 +12,7 @@ | |||
import java.util.Objects; | |||
|
|||
public class KeyAndOrdinal { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No change, should be removed.
class KeyToSequences { | ||
class KeyToSequences implements Accountable { | ||
|
||
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(KeyToSequences.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's only one instance for this class, I don't think the shallow size is necessary.
|
||
private final Logger log = LogManager.getLogger(SequenceMatcher.class); | ||
|
||
static class Stats { | ||
static class Stats { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra whitespace, needs removal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still there.
public class SequenceMatcher { | ||
public class SequenceMatcher implements Accountable { | ||
|
||
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(SequenceMatcher.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's only one instance for this class, no need to account for it.
|
||
@SuppressWarnings(value = { "unchecked", "rawtypes" }) | ||
StageToKeys(int stages) { | ||
// use asList to create an immutable list already initialized to null | ||
this.stageToKey = Arrays.asList(new Set[stages]); | ||
ramBytesUsed = SHALLOW_SIZE + RamUsageEstimator.sizeOfCollection(stageToKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is wrong. The stateKey has a fixed length but each entry contains a set of SequenceKey
s which vary in length and it is suppose to be dynamic.
@@ -53,6 +54,7 @@ | |||
public class TumblingWindow implements Executable { | |||
|
|||
private static final int CACHE_MAX_SIZE = 64; | |||
private static final String CIRCUIT_BREAKER_LABEL = "sequence_matches"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what's the convention for circuit breakers however I would expect an eql prefix - if this is not picked up from the package name/plugin then let's add it here.
Regarding the name, I would relabel it to "sequences" and differentiate between "completed" and "in-flight" or "in-progress"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/elastic/elasticsearch/pull/74381/files#diff-92c7ef4c47b713f656bb9dd1c5dc054ccfa0473e6c485e4d0c362d9e7671539bR54
The name is "eql_sequence", I will rename it to plain "eql", and use sequences_completes
and sequences_inflight
for the method call.
@@ -58,7 +56,7 @@ public void testCancellationBeforeFieldCaps() throws InterruptedException { | |||
ClusterService mockClusterService = mockClusterService(); | |||
|
|||
IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE); | |||
PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList())); | |||
PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NoopCircuitBreaker("test")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's define only one method in the test for creating the executor and reuse that instead:
PlanExecutor = planExecutor()
...
public PlanExecutor planExecutor() {
return new PlanExecutor(client, indexResolver, new NoopCircuitBreaker("test"))
}
@@ -285,6 +290,11 @@ public void clear() { | |||
completed.clear(); | |||
} | |||
|
|||
@Override | |||
public long ramBytesUsed() { | |||
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(keyToSequences) + RamUsageEstimator.sizeOf(stageToKeys); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One needs to add completed
which represents the completed sequences.
While the other two structures represent the in-flight sequences.
I'm not sure whether we want to differentiate between them or not (and label them appropriately since the results are typically significantly smaller than the intermediate data).
// and for each subquery every "fetch_size" docs. Doing RAM accounting on object creation is | ||
// expensive, so we just calculate the difference in bytes of the total memory that the matcher's | ||
// structure occupy, before and after the match() call. | ||
private boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, HitReference>> hits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all the data and thus tracking is done in the sequence matcher, it's easier to move the accounting there.
Keep the TumblingWindow
intact, move the tracking directly into matcher.match
(no need to wrap it) including clearing the memory.
} | ||
log.trace("{}", stats); | ||
return true; | ||
trackMemory(ramBytesUsedInFlight, ramBytesUsedCompleted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that trackMemory
is not called in the if (headLimit) {...
branch. Why is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it's an early exit, and no new objects are created/added in the structures.
|
||
@Override | ||
public void setCircuitBreaker(CircuitBreaker circuitBreaker) { | ||
//assert circuitBreaker.getName().equals(TRAINED_MODEL_CIRCUIT_BREAKER_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//assert circuitBreaker.getName().equals(TRAINED_MODEL_CIRCUIT_BREAKER_NAME); |
})); | ||
|
||
CIRCUIT_BREAKER.startBreaking(); | ||
window.execute(wrap(p -> {}, ex -> assertEquals(CircuitBreakingException.class, ex.getClass()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test would also pass if the circuit breaker never throws / is never called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx! Fixed with expectThrows()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is CircuitBreakerTests
enough? It only tests that a breaker trips when a sequence runs, but there is nothing checked about when it trips, if it reaches the limit or not.
I've added more low-level test with b246745 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@@ -33,6 +37,15 @@ public long implicitTiebreaker() { | |||
return implicitTiebreaker; | |||
} | |||
|
|||
@Override | |||
public long ramBytesUsed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method is incorrect - the timestamp and implicitTiebreaker need to be accounted (the tiebreaker comparable is only a reference so shallow).
Either these are accounted for or there's no point in tracking Ordinal at all.
It would be useful to see the impact the accounting takes on the call tree though some basic profiling.
|
||
public static final SequenceKey NONE = new SequenceKey(); | ||
|
||
private final Object[] keys; | ||
private final int hashCode; | ||
|
||
SequenceKey(Object... keys) { | ||
public SequenceKey(Object... keys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the Tests into the sequence
package to avoid this, so I revert it.
|
||
private final Logger log = LogManager.getLogger(SequenceMatcher.class); | ||
|
||
static class Stats { | ||
static class Stats { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still there.
The sequence matching algorithm holds some structures to keep track of the matched and potentially matching sequences of events. When large amount of events, sequence stages needs to be processed but also when the requested size of the query (number of sequences to return) is large, those structure can potentially increase the memory footprint. Add a CircuitBreaker which can be configured through cluster settings, which accounts for the memory used during the execution of a sequence query. The memory accounting takes place every fetch_size number of processed events (docs), to avoid significant performance overhead. (cherry picked from commit c6f0fb8)
Add documentation for the newly introduced CircuitBreaker, which is used to restrict the memory usage for an EQL sequence query to avoid OutOfMemory exceptions. Follows: elastic#74381
Add documentation for the newly introduced CircuitBreaker, which is used to restrict the memory usage for an EQL sequence query to avoid OutOfMemory exceptions. Follows: #74381
Add documentation for the newly introduced CircuitBreaker, which is used to restrict the memory usage for an EQL sequence query to avoid OutOfMemory exceptions. Follows: elastic#74381
Add documentation for the newly introduced CircuitBreaker, which is used to restrict the memory usage for an EQL sequence query to avoid OutOfMemory exceptions. Follows: elastic#74381
Add documentation for the newly introduced CircuitBreaker, which is used to restrict the memory usage for an EQL sequence query to avoid OutOfMemory exceptions. Follows: #74381 Co-authored-by: Marios Trivyzas <matriv@gmail.com>
Add documentation for the newly introduced CircuitBreaker, which is used to restrict the memory usage for an EQL sequence query to avoid OutOfMemory exceptions. Follows: #74381 Co-authored-by: Marios Trivyzas <matriv@gmail.com>
The sequence matching algorithm holds some structures to keep track of
the matched and potentially matching sequences of events. When large
amount of events, sequence stages needs to be processed but also when
the requested
size
of the query (number of sequences to return) is large,those structure can potentially increase the memory footprint.
Add a CircuitBreaker which can be configured through cluster settings,
which accounts for the memory used during the execution of a sequence
query. The memory accounting takes place every
fetch_size
numberof processed events (docs), to avoid significant performance overhead.