Skip to content

Commit

Permalink
Support for Alert Cooldown Configurations (base-org#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethen Pociask authored Aug 9, 2023
1 parent 6a6d7e1 commit 650c4df
Show file tree
Hide file tree
Showing 33 changed files with 432 additions and 199 deletions.
23 changes: 21 additions & 2 deletions docs/architecture/alerting.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,24 @@ The Slack alert destination is a configurable destination that allows alerts to
**NOTE: As of now Pessimism can only post alerts to a single slack channel**

### Cooldown
**NOTE: This is currently unimplemented**
To ensure that alerts aren't spammed to destinations once invoked, a time based cooldown value should exist that specifies how long an heuristicSession must wait before it can propagate a trigged alert again. This value should be configurable by the user via a JSON-RPC request.
To ensure that alerts aren't spammed to destinations once invoked, a time based cooldown value (`cooldown_time`) can be defined within the `alert_params` of a heuristic session config. This time value determines how long a heuristic session must wait before being allowed to alert again.

An example of this is shown below:
```json
{
"network": "layer1",
"pipeline_type": "live",
"type": "balance_enforcement",
"start_height": null,
"alerting_params": {
"cooldown_time": 10,
"message": "",
"destination": "slack"
},
"heuristic_params": {
"address": "0xfC0157aA4F5DB7177830ACddB3D5a9BB5BE9cc5e",
"lower": 1,
"upper": 2
}
}
```
4 changes: 2 additions & 2 deletions docs/architecture/engine.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ type Heuristic interface {
The heuristic input type is a `RegisterType` that defines the type of data that the heuristic will receive as input. The heuristic input type is defined by the `InputType()` method of the `Heuristic` interface. The heuristic input type is used by the `RiskEngine` to determine if the input data is compatible with the heuristic. If the input data is not compatible with the heuristic, the `RiskEngine` will not execute the heuristic and will return an error.

### Addressing
All heuristics have a boolean property `Addressing` which determines if the heuristic is addressable. To be addressable, an heuristic must only execute under the context of a single address.
All heuristics have a boolean property `Addressing` which determines if the heuristic is addressable. To be addressable, a heuristic must only execute under the context of a single address.

For example, a `balance_enforcement` heuristic session will be addressable because it only executes invalidation logic for the native ETH balance of a single address.

### Heuristic States
State is used to represent the current state of an heuristic. The state of an heuristic is represented by a `HeuristicState` type. The following states are supported:
State is used to represent the current state of a heuristic. The state of a heuristic is represented by a `HeuristicState` type. The following states are supported:
- `Running` - The heuristic is currently running and is being executed by the `RiskEngine`
- `Inactive` - The heuristic is currently inactive and is not being executed by the `RiskEngine`
- `Paused` - The heuristic is currently paused and is not being executed by the `RiskEngine`
Expand Down
4 changes: 2 additions & 2 deletions docs/architecture/etl.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ graph LR;

### (TBD) Aggregator
**NOTE - This component type is still in-development**
Aggregators are used to solve the problem where a pipe or an heuristic input will require multiple sources of data to perform an execution sequence. Since aggregators are subscribing to more than one data stream with different output frequencies, they must employ a synchronization policy for collecting and propagating multi-data inputs within a highly asynchronous environment.
Aggregators are used to solve the problem where a pipe or a heuristic input will require multiple sources of data to perform an execution sequence. Since aggregators are subscribing to more than one data stream with different output frequencies, they must employ a synchronization policy for collecting and propagating multi-data inputs within a highly asynchronous environment.

#### Attributes
* Able to read heterogenous transit data from an arbitrary number of component ingresses
Expand All @@ -137,7 +137,7 @@ _Only send output at the update of a single ingress stream_

Single Value Subscription refers to a synchronization policy where a bucketed multi-data tuple is submitted every time there’s an update to a single input data queue.

For example we can have an heuristic that subscribes to blocks from two heterogenous chains (layer1, layer2) or `{ChainA, ChainB}`, let's assume `BLOCK_TIME(ChainA) > BLOCK_TIME(ChainB)`.
For example we can have a heuristic that subscribes to blocks from two heterogenous chains (layer1, layer2) or `{ChainA, ChainB}`, let's assume `BLOCK_TIME(ChainA) > BLOCK_TIME(ChainB)`.

We can either specify that the heuristic will run every time there's an update or a new block from `ChainA`:
```
Expand Down
2 changes: 1 addition & 1 deletion docs/openapi/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ paths:
get:
tags:
- heuristic
summary: Returns an heuristic session.
summary: Returns a heuristic session.
description: >-
Returns heuristic session metadata for some specified session uuid. If no uuid is specified, then all sessions are returned. **NOTE - This is currently unimplemented.**
parameters:
Expand Down
16 changes: 8 additions & 8 deletions e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type SysTestSuite struct {
Close func()

TestSlackSvr *TestSlackServer
TestPagerdutyServer *TestPagerdutyServer
TestPagerDutyServer *TestPagerDutyServer
}

// L2TestSuite ... Stores all the information needed to run an e2e L2Geth test
Expand All @@ -51,7 +51,7 @@ type L2TestSuite struct {
Close func()

TestSlackSvr *TestSlackServer
TestPagerdutyServer *TestPagerdutyServer
TestPagerDutyServer *TestPagerDutyServer
}

// CreateSysTestSuite ... Creates a new L2Geth test suite
Expand All @@ -72,7 +72,7 @@ func CreateL2TestSuite(t *testing.T) *L2TestSuite {
slackServer := NewTestSlackServer()
appCfg.AlertConfig.SlackConfig.URL = slackServer.Server.URL

pagerdutyServer := NewTestPagerdutyServer()
pagerdutyServer := NewTestPagerDutyServer()
appCfg.AlertConfig.MediumPagerDutyCfg.AlertEventsURL = pagerdutyServer.Server.URL

pess, kill, err := app.NewPessimismApp(ctx, appCfg)
Expand All @@ -98,7 +98,7 @@ func CreateL2TestSuite(t *testing.T) *L2TestSuite {
},
AppCfg: appCfg,
TestSlackSvr: slackServer,
TestPagerdutyServer: pagerdutyServer,
TestPagerDutyServer: pagerdutyServer,
}
}

Expand Down Expand Up @@ -129,7 +129,7 @@ func CreateSysTestSuite(t *testing.T) *SysTestSuite {
slackServer := NewTestSlackServer()
appCfg.AlertConfig.SlackConfig.URL = slackServer.Server.URL

pagerdutyServer := NewTestPagerdutyServer()
pagerdutyServer := NewTestPagerDutyServer()
appCfg.AlertConfig.MediumPagerDutyCfg.AlertEventsURL = pagerdutyServer.Server.URL

pess, kill, err := app.NewPessimismApp(ctx, appCfg)
Expand All @@ -155,7 +155,7 @@ func CreateSysTestSuite(t *testing.T) *SysTestSuite {
},
AppCfg: appCfg,
TestSlackSvr: slackServer,
TestPagerdutyServer: pagerdutyServer,
TestPagerDutyServer: pagerdutyServer,
}
}

Expand Down Expand Up @@ -189,10 +189,10 @@ func DefaultTestConfig() *config.Config {
URL: "",
Channel: "test",
},
MediumPagerDutyCfg: &client.PagerdutyConfig{
MediumPagerDutyCfg: &client.PagerDutyConfig{
AlertEventsURL: "",
},
HighPagerDutyCfg: &client.PagerdutyConfig{
HighPagerDutyCfg: &client.PagerDutyConfig{
AlertEventsURL: "",
},
},
Expand Down
88 changes: 83 additions & 5 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Test_Balance_Enforcement(t *testing.T) {
Data: nil,
})

assert.Equal(t, len(ts.TestPagerdutyServer.PagerdutyAlerts()), 0, "No alerts should be sent before the transaction is sent")
assert.Equal(t, len(ts.TestPagerDutyServer.PagerDutyAlerts()), 0, "No alerts should be sent before the transaction is sent")

// Send the transaction to drain Alice's account of almost all ETH.
_, err = ts.L2Geth.AddL2Block(context.Background(), drainAliceTx)
Expand All @@ -95,7 +95,7 @@ func Test_Balance_Enforcement(t *testing.T) {
time.Sleep(1 * time.Second)

// Check that the balance enforcement was triggered using the mocked server cache.
posts := ts.TestPagerdutyServer.PagerdutyAlerts()
posts := ts.TestPagerDutyServer.PagerDutyAlerts()
slackPosts := ts.TestSlackSvr.SlackAlerts()
assert.Greater(t, len(slackPosts), 0, "No balance enforcement alert was sent")
assert.Greater(t, len(posts), 0, "No balance enforcement alert was sent")
Expand Down Expand Up @@ -124,14 +124,92 @@ func Test_Balance_Enforcement(t *testing.T) {
// Wait for Pessimism to process the balance change.
time.Sleep(1 * time.Second)

// Empty the mocked Pagerduty server cache.
ts.TestPagerdutyServer.ClearAlerts()
// Empty the mocked PagerDuty server cache.
ts.TestPagerDutyServer.ClearAlerts()

// Wait to ensure that no new alerts are sent.
time.Sleep(1 * time.Second)

// Ensure that no new alerts were sent.
assert.Equal(t, len(ts.TestPagerdutyServer.Payloads), 0, "No alerts should be sent after the transaction is sent")
assert.Equal(t, len(ts.TestPagerDutyServer.Payloads), 0, "No alerts should be sent after the transaction is sent")
}

// Test_Balance_Enforce_With_CoolDown ... Tests the E2E flow of a single
// balance enforcement heuristic session on L2 network with a cooldown.
func Test_Balance_Enforce_With_CoolDown(t *testing.T) {

ts := e2e.CreateL2TestSuite(t)
defer ts.Close()

alice := ts.L2Cfg.Secrets.Addresses().Alice
bob := ts.L2Cfg.Secrets.Addresses().Bob

alertMsg := "one baby to another says:"
// Deploy a balance enforcement heuristic session for Alice using a cooldown.
err := ts.App.BootStrap([]*models.SessionRequestParams{{
Network: core.Layer2.String(),
PType: core.Live.String(),
HeuristicType: core.BalanceEnforcement.String(),
StartHeight: nil,
EndHeight: nil,
AlertingParams: &core.AlertPolicy{
// Set a cooldown of one minute.
CoolDown: 60,
Dest: core.Slack.String(),
Msg: alertMsg,
},
SessionParams: map[string]interface{}{
"address": alice.String(),
"lower": 3, // i.e. alert if balance is less than 3 ETH
},
}})

assert.NoError(t, err, "Failed to bootstrap balance enforcement heuristic session")

// Get Alice's balance.
aliceAmt, err := ts.L2Geth.L2Client.BalanceAt(context.Background(), alice, nil)
assert.NoError(t, err, "Failed to get Alice's balance")

// Determine the gas cost of the transaction.
gasAmt := 1_000_001
bigAmt := big.NewInt(1_000_001)
gasPrice := big.NewInt(int64(ts.L2Cfg.DeployConfig.L2GenesisBlockGasLimit))

gasCost := gasPrice.Mul(gasPrice, bigAmt)

signer := types.LatestSigner(ts.L2Geth.L2ChainConfig)

// Create a transaction from Alice to Bob that will drain almost all of Alice's ETH.
drainAliceTx := types.MustSignNewTx(ts.L2Cfg.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: big.NewInt(int64(ts.L2Cfg.DeployConfig.L2ChainID)),
Nonce: 0,
GasTipCap: big.NewInt(100),
GasFeeCap: big.NewInt(100000),
Gas: uint64(gasAmt),
To: &bob,
// Subtract the gas cost from the amount sent to Bob.
Value: aliceAmt.Sub(aliceAmt, gasCost),
Data: nil,
})

// Send the transaction to drain Alice's account of almost all ETH.
_, err = ts.L2Geth.AddL2Block(context.Background(), drainAliceTx)
assert.NoError(t, err, "Failed to create L2 block with transaction")

// Wait for Pessimism to process the balance change and send a notification to the mocked Slack server.
time.Sleep(2 * time.Second)

// Check that the balance enforcement was triggered using the mocked server cache.
posts := ts.TestSlackSvr.SlackAlerts()

assert.Equal(t, len(posts), 1, "No balance enforcement alert was sent")
assert.Contains(t, posts[0].Text, "balance_enforcement", "Balance enforcement alert was not sent")
assert.Contains(t, posts[0].Text, alertMsg)

// Ensure that no new alerts are sent for provided cooldown period.
time.Sleep(1 * time.Second)
posts = ts.TestSlackSvr.SlackAlerts()
assert.Equal(t, 1, len(posts), "No alerts should be sent after the transaction is sent")
}

// Test_Contract_Event ... Tests the E2E flow of a single
Expand Down
32 changes: 16 additions & 16 deletions e2e/test_pagerduty_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ import (
"go.uber.org/zap"
)

// TestPagerdutyServer ... Mock server for testing pagerduty alerts
type TestPagerdutyServer struct {
// TestPagerDutyServer ... Mock server for testing pagerduty alerts
type TestPagerDutyServer struct {
Server *httptest.Server
Payloads []*client.PagerdutyRequest
Payloads []*client.PagerDutyRequest
}

// NewTestPagerdutyServer ... Creates a new mock pagerduty server
func NewTestPagerdutyServer() *TestPagerdutyServer {
ts := &TestPagerdutyServer{
Payloads: []*client.PagerdutyRequest{},
// NewTestPagerDutyServer ... Creates a new mock pagerduty server
func NewTestPagerDutyServer() *TestPagerDutyServer {
ts := &TestPagerDutyServer{
Payloads: []*client.PagerDutyRequest{},
}

ts.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case "/":
ts.mockPagerdutyPost(w, r)
ts.mockPagerDutyPost(w, r)
default:
http.NotFoundHandler().ServeHTTP(w, r)
}
Expand All @@ -37,13 +37,13 @@ func NewTestPagerdutyServer() *TestPagerdutyServer {
}

// Close ... Closes the server
func (svr *TestPagerdutyServer) Close() {
func (svr *TestPagerDutyServer) Close() {
svr.Server.Close()
}

// mockPagerdutyPost ... Mocks a pagerduty post request
func (svr *TestPagerdutyServer) mockPagerdutyPost(w http.ResponseWriter, r *http.Request) {
var alert *client.PagerdutyRequest
// mockPagerDutyPost ... Mocks a pagerduty post request
func (svr *TestPagerDutyServer) mockPagerDutyPost(w http.ResponseWriter, r *http.Request) {
var alert *client.PagerDutyRequest

if err := json.NewDecoder(r.Body).Decode(&alert); err != nil {
w.WriteHeader(http.StatusBadRequest)
Expand All @@ -57,14 +57,14 @@ func (svr *TestPagerdutyServer) mockPagerdutyPost(w http.ResponseWriter, r *http
_, _ = w.Write([]byte(`{"status":success, "message":""}`))
}

// PagerdutyAlerts ... Returns the pagerduty alerts
func (svr *TestPagerdutyServer) PagerdutyAlerts() []*client.PagerdutyRequest {
// PagerDutyAlerts ... Returns the pagerduty alerts
func (svr *TestPagerDutyServer) PagerDutyAlerts() []*client.PagerDutyRequest {
logging.NoContext().Info("Payloads", zap.Any("payloads", svr.Payloads))

return svr.Payloads
}

// ClearAlerts ... Clears the alerts
func (svr *TestPagerdutyServer) ClearAlerts() {
svr.Payloads = []*client.PagerdutyRequest{}
func (svr *TestPagerDutyServer) ClearAlerts() {
svr.Payloads = []*client.PagerDutyRequest{}
}
44 changes: 44 additions & 0 deletions internal/alert/cooldown _test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package alert_test

import (
"fmt"
"testing"
"time"

"github.com/base-org/pessimism/internal/alert"
"github.com/base-org/pessimism/internal/core"
"github.com/stretchr/testify/assert"
)

func Test_CoolDown(t *testing.T) {
var testCases = []struct {
name string
construction func() alert.CoolDownHandler
testFunc func(t *testing.T, cdh alert.CoolDownHandler)
}{
{
name: "Test_CoolDownHandler",
construction: alert.NewCoolDownHandler,
testFunc: func(t *testing.T, cdh alert.CoolDownHandler) {
// Add a cooldown for one second
cdh.Add(core.NilSUUID(), time.Duration(1_000_000_000))

cooled := cdh.IsCoolDown(core.NilSUUID())
assert.True(t, cooled)

// Sleep for one second
time.Sleep(1_000_000_000)
cdh.Update()
cooled = cdh.IsCoolDown(core.NilSUUID())
assert.False(t, cooled)
},
},
}

for i, tc := range testCases {
t.Run(fmt.Sprintf("%d-%s", i, tc.name), func(t *testing.T) {
tc.testFunc(t, tc.construction())
})
}

}
Loading

0 comments on commit 650c4df

Please sign in to comment.