Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move tightly coupled metrics out of packages #1242

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type DB interface {

// Impl returns the underlying database object
Impl() any

// WithListener registers an EventListener
WithListener(listener EventListener) DB
}

// Iterator is an iterator over a DB's key/value pairs.
Expand Down
15 changes: 15 additions & 0 deletions db/event_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package db

type EventListener interface {
OnIO(write bool)
}

type SelectiveListener struct {
OnIOCb func(write bool)
}

func (l *SelectiveListener) OnIO(write bool) {
if l.OnIOCb != nil {
l.OnIOCb(write)
}

Check warning on line 14 in db/event_listener.go

View check run for this annotation

Codecov / codecov/patch

db/event_listener.go#L13-L14

Added lines #L13 - L14 were not covered by tests
}
33 changes: 11 additions & 22 deletions db/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@ import (
"sync"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/metrics"
"github.com/NethermindEth/juno/utils"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/prometheus/client_golang/prometheus"
)

var _ db.DB = (*DB)(nil)

type DB struct {
pebble *pebble.DB
wMutex *sync.Mutex

// metrics
readCounter prometheus.Counter
writeCounter prometheus.Counter
pebble *pebble.DB
wMutex *sync.Mutex
listener db.EventListener
}

// New opens a new database at the given path
Expand All @@ -32,17 +27,6 @@ func New(path string, logger pebble.Logger) (db.DB, error) {
if err != nil {
return nil, err
}

pDB.readCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "db",
Name: "read",
})
pDB.writeCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "db",
Name: "write",
})
metrics.MustRegister(pDB.readCounter, pDB.writeCounter)

return pDB, nil
}

Expand All @@ -67,14 +51,19 @@ func newPebble(path string, options *pebble.Options) (*DB, error) {
if err != nil {
return nil, err
}
return &DB{pebble: pDB, wMutex: new(sync.Mutex)}, nil
return &DB{pebble: pDB, wMutex: new(sync.Mutex), listener: &db.SelectiveListener{}}, nil
}

// WithListener registers an EventListener
func (d *DB) WithListener(listener db.EventListener) db.DB {
omerfirmak marked this conversation as resolved.
Show resolved Hide resolved
d.listener = listener
return d
}

// NewTransaction : see db.DB.NewTransaction
func (d *DB) NewTransaction(update bool) db.Transaction {
txn := &Transaction{
readCounter: d.readCounter,
writeCounter: d.writeCounter,
listener: d.listener,
}
if update {
d.wMutex.Lock()
Expand Down
21 changes: 20 additions & 1 deletion db/pebble/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,31 @@ var noop = func(val []byte) error {
return nil
}

type eventListener struct {
WriteCount int
ReadCount int
}

func (l *eventListener) OnIO(write bool) {
if write {
l.WriteCount++
} else {
l.ReadCount++
}
}

func TestTransaction(t *testing.T) {
listener := eventListener{}
t.Run("new transaction can retrieve existing value", func(t *testing.T) {
testDB := pebble.NewMemTest()
testDB := pebble.NewMemTest().WithListener(&listener)
t.Cleanup(func() {
require.NoError(t, testDB.Close())
})

txn := testDB.NewTransaction(true)
require.NoError(t, txn.Set([]byte("key"), []byte("value")))
assert.Equal(t, 1, listener.WriteCount)
assert.Equal(t, 0, listener.ReadCount)

require.NoError(t, txn.Commit())

Expand All @@ -33,6 +49,9 @@ func TestTransaction(t *testing.T) {
assert.Equal(t, "value", string(val))
return nil
}))
assert.Equal(t, 1, listener.WriteCount)
assert.Equal(t, 1, listener.ReadCount)

require.NoError(t, readOnlyTxn.Discard())
})

Expand Down
18 changes: 4 additions & 14 deletions db/pebble/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils"
"github.com/cockroachdb/pebble"
"github.com/prometheus/client_golang/prometheus"
)

var ErrDiscardedTransaction = errors.New("discarded txn")
Expand All @@ -19,10 +18,7 @@ type Transaction struct {
batch *pebble.Batch
snapshot *pebble.Snapshot
lock *sync.Mutex

// metrics
readCounter prometheus.Counter
writeCounter prometheus.Counter
listener db.EventListener
}

// Discard : see db.Transaction.Discard
Expand Down Expand Up @@ -64,9 +60,7 @@ func (t *Transaction) Set(key, val []byte) error {
return errors.New("empty key")
}

if t.writeCounter != nil {
t.writeCounter.Inc()
}
t.listener.OnIO(true)
return t.batch.Set(key, val, pebble.Sync)
}

Expand All @@ -76,9 +70,7 @@ func (t *Transaction) Delete(key []byte) error {
return errors.New("read only transaction")
}

if t.writeCounter != nil {
t.writeCounter.Inc()
}
t.listener.OnIO(true)
return t.batch.Delete(key, pebble.Sync)
}

Expand All @@ -96,9 +88,7 @@ func (t *Transaction) Get(key []byte, cb func([]byte) error) error {
return ErrDiscardedTransaction
}

if t.readCounter != nil {
t.readCounter.Inc()
}
t.listener.OnIO(false)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return db.ErrKeyNotFound
Expand Down
37 changes: 37 additions & 0 deletions jsonrpc/event_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package jsonrpc

import "time"

type NewRequestListener interface {
OnNewRequest(method string)
}

type EventListener interface {
NewRequestListener
OnRequestHandled(method string, took time.Duration)
OnRequestFailed(method string, data any)
}

type SelectiveListener struct {
OnNewRequestCb func(method string)
OnRequestHandledCb func(method string, took time.Duration)
OnRequestFailedCb func(method string, data any)
}

func (l *SelectiveListener) OnNewRequest(method string) {
if l.OnNewRequestCb != nil {
l.OnNewRequestCb(method)
}

Check warning on line 24 in jsonrpc/event_listener.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/event_listener.go#L23-L24

Added lines #L23 - L24 were not covered by tests
}

func (l *SelectiveListener) OnRequestHandled(method string, took time.Duration) {
if l.OnRequestHandledCb != nil {
l.OnRequestHandledCb(method, took)
}

Check warning on line 30 in jsonrpc/event_listener.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/event_listener.go#L27-L30

Added lines #L27 - L30 were not covered by tests
}

func (l *SelectiveListener) OnRequestFailed(method string, data any) {
if l.OnRequestFailedCb != nil {
l.OnRequestFailedCb(method, data)
}

Check warning on line 36 in jsonrpc/event_listener.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/event_listener.go#L33-L36

Added lines #L33 - L36 were not covered by tests
}
39 changes: 39 additions & 0 deletions jsonrpc/event_listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package jsonrpc_test

import "time"

type CountingEventListener struct {
OnNewRequestLogs []string
OnRequestHandledCalls []struct {
method string
took time.Duration
}
OnRequestFailedCalls []struct {
method string
data any
}
}

func (l *CountingEventListener) OnNewRequest(method string) {
l.OnNewRequestLogs = append(l.OnNewRequestLogs, method)
}

func (l *CountingEventListener) OnRequestHandled(method string, took time.Duration) {
l.OnRequestHandledCalls = append(l.OnRequestHandledCalls, struct {
method string
took time.Duration
}{
method: method,
took: took,
})
}

func (l *CountingEventListener) OnRequestFailed(method string, data any) {
l.OnRequestFailedCalls = append(l.OnRequestFailedCalls, struct {
method string
data any
}{
method: method,
data: data,
})
}
28 changes: 14 additions & 14 deletions jsonrpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,30 @@ package jsonrpc
import (
"net/http"

"github.com/NethermindEth/juno/metrics"
"github.com/NethermindEth/juno/utils"
"github.com/prometheus/client_golang/prometheus"
)

const MaxRequestBodySize = 10 * 1024 * 1024 // 10MB

type HTTP struct {
rpc *Server
log utils.SimpleLogger
requests prometheus.Counter
rpc *Server
log utils.SimpleLogger

listener NewRequestListener
}

func NewHTTP(rpc *Server, log utils.SimpleLogger) *HTTP {
h := &HTTP{
rpc: rpc,
log: log,
requests: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "rpc",
Subsystem: "http",
Name: "requests",
}),
rpc: rpc,
log: log,
listener: &SelectiveListener{},
}
metrics.MustRegister(h.requests)
return h
}

// WithListener registers a NewRequestListener
func (h *HTTP) WithListener(listener NewRequestListener) *HTTP {
h.listener = listener
return h
}

Expand All @@ -45,7 +45,7 @@ func (h *HTTP) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
}

req.Body = http.MaxBytesReader(writer, req.Body, MaxRequestBodySize)
h.requests.Inc()
h.listener.OnNewRequest("any")
resp, err := h.rpc.HandleReader(req.Context(), req.Body)
writer.Header().Set("Content-Type", "application/json")
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion jsonrpc/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ func TestHTTP(t *testing.T) {
},
Params: []jsonrpc.Parameter{{Name: "msg"}},
}
listener := CountingEventListener{}
log := utils.NewNopZapLogger()
rpc := jsonrpc.NewServer(1, log)
rpc := jsonrpc.NewServer(1, log).WithListener(&listener)
require.NoError(t, rpc.RegisterMethod(method))

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -41,6 +42,7 @@ func TestHTTP(t *testing.T) {
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
assert.Len(t, listener.OnNewRequestLogs, 1)
t.Cleanup(func() {
require.NoError(t, resp.Body.Close())
})
Expand Down Expand Up @@ -69,5 +71,6 @@ func TestHTTP(t *testing.T) {
require.Equal(t, http.StatusNotFound, resp.StatusCode)
require.NoError(t, resp.Body.Close())
})
assert.Len(t, listener.OnNewRequestLogs, 1)
})
}
Loading
Loading