From e0a5c87ebc6aacefe1a57946e264be24921a0202 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20IRMAK?= Date: Fri, 15 Sep 2023 18:43:17 +0300 Subject: [PATCH 1/4] Move metric counting out of db --- db/db.go | 3 +++ db/event_listener.go | 15 +++++++++++++++ db/pebble/db.go | 33 +++++++++++---------------------- db/pebble/db_test.go | 21 ++++++++++++++++++++- db/pebble/transaction.go | 18 ++++-------------- node/metrics.go | 28 ++++++++++++++++++++++++++++ node/node.go | 1 + 7 files changed, 82 insertions(+), 37 deletions(-) create mode 100644 db/event_listener.go create mode 100644 node/metrics.go diff --git a/db/db.go b/db/db.go index 2ec2ad7bde..8bec967a2f 100644 --- a/db/db.go +++ b/db/db.go @@ -26,6 +26,9 @@ type DB interface { // Impl returns the underlying database object Impl() any + + // WithListener registers an EventListener + WithListener(listener EventListener) DB } // Iterator is an iterator over a DB's key/value pairs. diff --git a/db/event_listener.go b/db/event_listener.go new file mode 100644 index 0000000000..8d871d3b5f --- /dev/null +++ b/db/event_listener.go @@ -0,0 +1,15 @@ +package db + +type EventListener interface { + OnIO(write bool) +} + +type SelectiveListener struct { + OnIOCb func(write bool) +} + +func (l *SelectiveListener) OnIO(write bool) { + if l.OnIOCb != nil { + l.OnIOCb(write) + } +} diff --git a/db/pebble/db.go b/db/pebble/db.go index 95bde89cf4..a86fad9c69 100644 --- a/db/pebble/db.go +++ b/db/pebble/db.go @@ -6,22 +6,17 @@ import ( "sync" "github.com/NethermindEth/juno/db" - "github.com/NethermindEth/juno/metrics" "github.com/NethermindEth/juno/utils" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/vfs" - "github.com/prometheus/client_golang/prometheus" ) var _ db.DB = (*DB)(nil) type DB struct { - pebble *pebble.DB - wMutex *sync.Mutex - - // metrics - readCounter prometheus.Counter - writeCounter prometheus.Counter + pebble *pebble.DB + wMutex *sync.Mutex + listener db.EventListener } // New opens a new database at the given path @@ -32,17 +27,6 @@ func New(path string, logger pebble.Logger) (db.DB, error) { if err != nil { return nil, err } - - pDB.readCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "db", - Name: "read", - }) - pDB.writeCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "db", - Name: "write", - }) - metrics.MustRegister(pDB.readCounter, pDB.writeCounter) - return pDB, nil } @@ -67,14 +51,19 @@ func newPebble(path string, options *pebble.Options) (*DB, error) { if err != nil { return nil, err } - return &DB{pebble: pDB, wMutex: new(sync.Mutex)}, nil + return &DB{pebble: pDB, wMutex: new(sync.Mutex), listener: &db.SelectiveListener{}}, nil +} + +// WithListener registers an EventListener +func (d *DB) WithListener(listener db.EventListener) db.DB { + d.listener = listener + return d } // NewTransaction : see db.DB.NewTransaction func (d *DB) NewTransaction(update bool) db.Transaction { txn := &Transaction{ - readCounter: d.readCounter, - writeCounter: d.writeCounter, + listener: d.listener, } if update { d.wMutex.Lock() diff --git a/db/pebble/db_test.go b/db/pebble/db_test.go index c7f00847ad..8ae4a73cb7 100644 --- a/db/pebble/db_test.go +++ b/db/pebble/db_test.go @@ -16,15 +16,31 @@ var noop = func(val []byte) error { return nil } +type eventListener struct { + WriteCount int + ReadCount int +} + +func (l *eventListener) OnIO(write bool) { + if write { + l.WriteCount++ + } else { + l.ReadCount++ + } +} + func TestTransaction(t *testing.T) { + listener := eventListener{} t.Run("new transaction can retrieve existing value", func(t *testing.T) { - testDB := pebble.NewMemTest() + testDB := pebble.NewMemTest().WithListener(&listener) t.Cleanup(func() { require.NoError(t, testDB.Close()) }) txn := testDB.NewTransaction(true) require.NoError(t, txn.Set([]byte("key"), []byte("value"))) + assert.Equal(t, 1, listener.WriteCount) + assert.Equal(t, 0, listener.ReadCount) require.NoError(t, txn.Commit()) @@ -33,6 +49,9 @@ func TestTransaction(t *testing.T) { assert.Equal(t, "value", string(val)) return nil })) + assert.Equal(t, 1, listener.WriteCount) + assert.Equal(t, 1, listener.ReadCount) + require.NoError(t, readOnlyTxn.Discard()) }) diff --git a/db/pebble/transaction.go b/db/pebble/transaction.go index f54ae85de2..49594eab08 100644 --- a/db/pebble/transaction.go +++ b/db/pebble/transaction.go @@ -8,7 +8,6 @@ import ( "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/utils" "github.com/cockroachdb/pebble" - "github.com/prometheus/client_golang/prometheus" ) var ErrDiscardedTransaction = errors.New("discarded txn") @@ -19,10 +18,7 @@ type Transaction struct { batch *pebble.Batch snapshot *pebble.Snapshot lock *sync.Mutex - - // metrics - readCounter prometheus.Counter - writeCounter prometheus.Counter + listener db.EventListener } // Discard : see db.Transaction.Discard @@ -64,9 +60,7 @@ func (t *Transaction) Set(key, val []byte) error { return errors.New("empty key") } - if t.writeCounter != nil { - t.writeCounter.Inc() - } + t.listener.OnIO(true) return t.batch.Set(key, val, pebble.Sync) } @@ -76,9 +70,7 @@ func (t *Transaction) Delete(key []byte) error { return errors.New("read only transaction") } - if t.writeCounter != nil { - t.writeCounter.Inc() - } + t.listener.OnIO(true) return t.batch.Delete(key, pebble.Sync) } @@ -96,9 +88,7 @@ func (t *Transaction) Get(key []byte, cb func([]byte) error) error { return ErrDiscardedTransaction } - if t.readCounter != nil { - t.readCounter.Inc() - } + t.listener.OnIO(false) if err != nil { if errors.Is(err, pebble.ErrNotFound) { return db.ErrKeyNotFound diff --git a/node/metrics.go b/node/metrics.go new file mode 100644 index 0000000000..52639fb184 --- /dev/null +++ b/node/metrics.go @@ -0,0 +1,28 @@ +package node + +import ( + "github.com/NethermindEth/juno/db" + "github.com/prometheus/client_golang/prometheus" +) + +func makeDBMetrics() db.EventListener { + readCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "db", + Name: "read", + }) + writeCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "db", + Name: "write", + }) + prometheus.MustRegister(readCounter, writeCounter) + + return &db.SelectiveListener{ + OnIOCb: func(write bool) { + if write { + writeCounter.Inc() + } else { + readCounter.Inc() + } + }, + } +} diff --git a/node/node.go b/node/node.go index e5c58b109e..126efc47a4 100644 --- a/node/node.go +++ b/node/node.go @@ -136,6 +136,7 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen services = append(services, makeRPCOverWebsocket(cfg.WebsocketHost, cfg.WebsocketPort, jsonrpcServer, log)) } if cfg.Metrics { + database.WithListener(makeDBMetrics()) services = append(services, makeMetrics(cfg.MetricsHost, cfg.MetricsPort)) } if cfg.GRPC { From ecc4260dbe74a6c620639993759921aec34a5481 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20IRMAK?= Date: Fri, 15 Sep 2023 12:00:30 +0300 Subject: [PATCH 2/4] Move metric counting out of jsonrpc instead export an EventListener interface --- jsonrpc/event_listener.go | 37 ++++++++++++++++++++ jsonrpc/event_listener_test.go | 39 +++++++++++++++++++++ jsonrpc/http.go | 28 +++++++-------- jsonrpc/http_test.go | 5 ++- jsonrpc/server.go | 54 +++++++++++----------------- jsonrpc/server_test.go | 40 +++++++++++++++------ jsonrpc/websocket.go | 28 +++++++-------- jsonrpc/websocket_test.go | 9 +++-- node/http.go | 14 +++++--- node/metrics.go | 64 ++++++++++++++++++++++++++++++++++ node/node.go | 5 +-- 11 files changed, 241 insertions(+), 82 deletions(-) create mode 100644 jsonrpc/event_listener.go create mode 100644 jsonrpc/event_listener_test.go diff --git a/jsonrpc/event_listener.go b/jsonrpc/event_listener.go new file mode 100644 index 0000000000..a6f909f371 --- /dev/null +++ b/jsonrpc/event_listener.go @@ -0,0 +1,37 @@ +package jsonrpc + +import "time" + +type NewRequestListener interface { + OnNewRequest(method string) +} + +type EventListener interface { + NewRequestListener + OnRequestHandled(method string, took time.Duration) + OnRequestFailed(method string, data any) +} + +type SelectiveListener struct { + OnNewRequestCb func(method string) + OnRequestHandledCb func(method string, took time.Duration) + OnRequestFailedCb func(method string, data any) +} + +func (l *SelectiveListener) OnNewRequest(method string) { + if l.OnNewRequestCb != nil { + l.OnNewRequestCb(method) + } +} + +func (l *SelectiveListener) OnRequestHandled(method string, took time.Duration) { + if l.OnRequestHandledCb != nil { + l.OnRequestHandledCb(method, took) + } +} + +func (l *SelectiveListener) OnRequestFailed(method string, data any) { + if l.OnRequestFailedCb != nil { + l.OnRequestFailedCb(method, data) + } +} diff --git a/jsonrpc/event_listener_test.go b/jsonrpc/event_listener_test.go new file mode 100644 index 0000000000..664b69e840 --- /dev/null +++ b/jsonrpc/event_listener_test.go @@ -0,0 +1,39 @@ +package jsonrpc_test + +import "time" + +type CountingEventListener struct { + OnNewRequestLogs []string + OnRequestHandledCalls []struct { + method string + took time.Duration + } + OnRequestFailedCalls []struct { + method string + data any + } +} + +func (l *CountingEventListener) OnNewRequest(method string) { + l.OnNewRequestLogs = append(l.OnNewRequestLogs, method) +} + +func (l *CountingEventListener) OnRequestHandled(method string, took time.Duration) { + l.OnRequestHandledCalls = append(l.OnRequestHandledCalls, struct { + method string + took time.Duration + }{ + method: method, + took: took, + }) +} + +func (l *CountingEventListener) OnRequestFailed(method string, data any) { + l.OnRequestFailedCalls = append(l.OnRequestFailedCalls, struct { + method string + data any + }{ + method: method, + data: data, + }) +} diff --git a/jsonrpc/http.go b/jsonrpc/http.go index f5f1289cf4..b068ee4c02 100644 --- a/jsonrpc/http.go +++ b/jsonrpc/http.go @@ -3,30 +3,30 @@ package jsonrpc import ( "net/http" - "github.com/NethermindEth/juno/metrics" "github.com/NethermindEth/juno/utils" - "github.com/prometheus/client_golang/prometheus" ) const MaxRequestBodySize = 10 * 1024 * 1024 // 10MB type HTTP struct { - rpc *Server - log utils.SimpleLogger - requests prometheus.Counter + rpc *Server + log utils.SimpleLogger + + listener NewRequestListener } func NewHTTP(rpc *Server, log utils.SimpleLogger) *HTTP { h := &HTTP{ - rpc: rpc, - log: log, - requests: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "rpc", - Subsystem: "http", - Name: "requests", - }), + rpc: rpc, + log: log, + listener: &SelectiveListener{}, } - metrics.MustRegister(h.requests) + return h +} + +// WithListener registers a NewRequestListener +func (h *HTTP) WithListener(listener NewRequestListener) *HTTP { + h.listener = listener return h } @@ -45,7 +45,7 @@ func (h *HTTP) ServeHTTP(writer http.ResponseWriter, req *http.Request) { } req.Body = http.MaxBytesReader(writer, req.Body, MaxRequestBodySize) - h.requests.Inc() + h.listener.OnNewRequest("any") resp, err := h.rpc.HandleReader(req.Context(), req.Body) writer.Header().Set("Content-Type", "application/json") if err != nil { diff --git a/jsonrpc/http_test.go b/jsonrpc/http_test.go index afd103647d..790ec325d0 100644 --- a/jsonrpc/http_test.go +++ b/jsonrpc/http_test.go @@ -22,8 +22,9 @@ func TestHTTP(t *testing.T) { }, Params: []jsonrpc.Parameter{{Name: "msg"}}, } + listener := CountingEventListener{} log := utils.NewNopZapLogger() - rpc := jsonrpc.NewServer(1, log) + rpc := jsonrpc.NewServer(1, log).WithListener(&listener) require.NoError(t, rpc.RegisterMethod(method)) ctx, cancel := context.WithCancel(context.Background()) @@ -41,6 +42,7 @@ func TestHTTP(t *testing.T) { resp, err := client.Do(req) require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) + assert.Len(t, listener.OnNewRequestLogs, 1) t.Cleanup(func() { require.NoError(t, resp.Body.Close()) }) @@ -69,5 +71,6 @@ func TestHTTP(t *testing.T) { require.Equal(t, http.StatusNotFound, resp.StatusCode) require.NoError(t, resp.Body.Close()) }) + assert.Len(t, listener.OnNewRequestLogs, 1) }) } diff --git a/jsonrpc/server.go b/jsonrpc/server.go index 85ab2e2ff8..0ceb322e05 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -11,10 +11,9 @@ import ( "reflect" "strings" "sync" + "time" - "github.com/NethermindEth/juno/metrics" "github.com/NethermindEth/juno/utils" - "github.com/prometheus/client_golang/prometheus" "github.com/sourcegraph/conc/pool" ) @@ -113,11 +112,7 @@ type Server struct { validator Validator pool *pool.Pool log utils.SimpleLogger - - // metrics - requests *prometheus.CounterVec - failedRequests *prometheus.CounterVec - requestLatencies *prometheus.HistogramVec + listener EventListener } type Validator interface { @@ -127,27 +122,12 @@ type Validator interface { // NewServer instantiates a JSONRPC server func NewServer(poolMaxGoroutines int, log utils.SimpleLogger) *Server { s := &Server{ - log: log, - methods: make(map[string]Method), - pool: pool.New().WithMaxGoroutines(poolMaxGoroutines), - requests: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "rpc", - Subsystem: "server", - Name: "requests", - }, []string{"method"}), - failedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "rpc", - Subsystem: "server", - Name: "failed_requests", - }, []string{"method"}), - requestLatencies: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "rpc", - Subsystem: "server", - Name: "requests_latency", - }, []string{"method"}), - } - - metrics.MustRegister(s.requests, s.failedRequests, s.requestLatencies) + log: log, + methods: make(map[string]Method), + pool: pool.New().WithMaxGoroutines(poolMaxGoroutines), + listener: &SelectiveListener{}, + } + return s } @@ -157,6 +137,12 @@ func (s *Server) WithValidator(validator Validator) *Server { return s } +// WithListener registers an EventListener +func (s *Server) WithListener(listener EventListener) *Server { + s.listener = listener + return s +} + // RegisterMethod verifies and creates an endpoint that the server recognises. // // - name is the method name @@ -347,18 +333,20 @@ func (s *Server) handleRequest(ctx context.Context, req *request) (*response, er return res, nil } - timer := prometheus.NewTimer(s.requestLatencies.WithLabelValues(req.Method)) + handlerTimer := time.Now() + s.listener.OnNewRequest(req.Method) args, err := s.buildArguments(ctx, req.Params, calledMethod) if err != nil { res.Error = Err(InvalidParams, err.Error()) + s.listener.OnRequestFailed(req.Method, err) return res, nil } defer func() { - took := timer.ObserveDuration() - s.log.Debugw("Responding to RPC request", "method", req.Method, "id", req.ID, "took", took) + handlerTook := time.Since(handlerTimer) + s.listener.OnRequestHandled(req.Method, handlerTook) + s.log.Debugw("Responding to RPC request", "method", req.Method, "id", req.ID, "took", handlerTook) }() - s.requests.WithLabelValues(req.Method).Inc() tuple := reflect.ValueOf(calledMethod.Handler).Call(args) if res.ID == nil { // notification return nil, nil @@ -366,7 +354,7 @@ func (s *Server) handleRequest(ctx context.Context, req *request) (*response, er if errAny := tuple[1].Interface(); !isNil(errAny) { res.Error = errAny.(*Error) - s.failedRequests.WithLabelValues(req.Method).Inc() + s.listener.OnRequestFailed(req.Method, err) return res, nil } res.Result = tuple[0].Interface() diff --git a/jsonrpc/server_test.go b/jsonrpc/server_test.go index ebbc36a77b..c0342ec952 100644 --- a/jsonrpc/server_test.go +++ b/jsonrpc/server_test.go @@ -149,15 +149,19 @@ func TestHandle(t *testing.T) { }, }, } - server := jsonrpc.NewServer(1, utils.NewNopZapLogger()).WithValidator(validator.New()) + + listener := CountingEventListener{} + server := jsonrpc.NewServer(1, utils.NewNopZapLogger()).WithValidator(validator.New()).WithListener(&listener) for _, m := range methods { require.NoError(t, server.RegisterMethod(m)) } tests := map[string]struct { - isBatch bool - req string - res string + isBatch bool + req string + res string + checkNewRequestEvent bool + checkFailedEvent bool }{ "invalid json": { req: `{]`, @@ -188,8 +192,9 @@ func TestHandle(t *testing.T) { res: `{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid Params","data":"missing/unexpected params in list"},"id":3}`, }, "too many params": { - req: `{"jsonrpc" : "2.0", "method" : "method", "params" : [3, false, "error message", "too many"] , "id" : 3}`, - res: `{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid Params","data":"missing/unexpected params in list"},"id":3}`, + req: `{"jsonrpc" : "2.0", "method" : "method", "params" : [3, false, "error message", "too many"] , "id" : 3}`, + res: `{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid Params","data":"missing/unexpected params in list"},"id":3}`, + checkFailedEvent: true, }, "list params": { req: `{"jsonrpc" : "2.0", "method" : "method", "params" : [3, false, "error message"] , "id" : 3}`, @@ -341,12 +346,14 @@ func TestHandle(t *testing.T) { res: `[{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid Request","data":"unsupported RPC request version"},"id":5},{"jsonrpc":"2.0","result":{"doubled":88},"id":6}]`, }, "invalid value in struct": { - req: `{"jsonrpc" : "2.0", "method" : "validation", "params" : [ {"A": 0} ], "id" : 1}`, - res: `{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid Params","data":"Key: 'validationStruct.A' Error:Field validation for 'A' failed on the 'min' tag"},"id":1}`, + req: `{"jsonrpc" : "2.0", "method" : "validation", "params" : [ {"A": 0} ], "id" : 1}`, + res: `{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid Params","data":"Key: 'validationStruct.A' Error:Field validation for 'A' failed on the 'min' tag"},"id":1}`, + checkFailedEvent: true, }, "valid value in struct": { - req: `{"jsonrpc" : "2.0", "method" : "validation", "params" : [{"A": 1}], "id" : 1}`, - res: `{"jsonrpc":"2.0","result":1,"id":1}`, + req: `{"jsonrpc" : "2.0", "method" : "validation", "params" : [{"A": 1}], "id" : 1}`, + res: `{"jsonrpc":"2.0","result":1,"id":1}`, + checkNewRequestEvent: true, }, "invalid value in struct pointer": { req: `{"jsonrpc" : "2.0", "method" : "validationPointer", "params" : [ {"A": 0} ], "id" : 1}`, @@ -441,6 +448,10 @@ func TestHandle(t *testing.T) { for desc, test := range tests { t.Run(desc, func(t *testing.T) { + oldNewRequestEventCount := len(listener.OnNewRequestLogs) + oldRequestFailedEventCount := len(listener.OnRequestFailedCalls) + oldRequestHandledCalls := len(listener.OnRequestHandledCalls) + res, err := server.Handle(context.Background(), []byte(test.req)) require.NoError(t, err) @@ -449,6 +460,15 @@ func TestHandle(t *testing.T) { } else { assert.Equal(t, test.res, string(res)) } + if test.checkNewRequestEvent { + assert.Greater(t, len(listener.OnNewRequestLogs), oldNewRequestEventCount) + if !test.checkFailedEvent { + assert.Greater(t, len(listener.OnRequestHandledCalls), oldRequestHandledCalls) + } + } + if test.checkFailedEvent { + assert.Greater(t, len(listener.OnRequestFailedCalls), oldRequestFailedEventCount) + } }) } } diff --git a/jsonrpc/websocket.go b/jsonrpc/websocket.go index 8f15418ceb..8404b4f55f 100644 --- a/jsonrpc/websocket.go +++ b/jsonrpc/websocket.go @@ -7,9 +7,7 @@ import ( "strings" "time" - "github.com/NethermindEth/juno/metrics" "github.com/NethermindEth/juno/utils" - "github.com/prometheus/client_golang/prometheus" "nhooyr.io/websocket" ) @@ -19,8 +17,7 @@ type Websocket struct { rpc *Server log utils.SimpleLogger connParams *WebsocketConnParams - // metrics - requests prometheus.Counter + listener NewRequestListener } func NewWebsocket(rpc *Server, log utils.SimpleLogger) *Websocket { @@ -28,14 +25,9 @@ func NewWebsocket(rpc *Server, log utils.SimpleLogger) *Websocket { rpc: rpc, log: log, connParams: DefaultWebsocketConnParams(), - requests: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "rpc", - Subsystem: "ws", - Name: "requests", - }), + listener: &SelectiveListener{}, } - metrics.MustRegister(ws.requests) return ws } @@ -45,6 +37,12 @@ func (ws *Websocket) WithConnParams(p *WebsocketConnParams) *Websocket { return ws } +// WithListener registers a NewRequestListener +func (ws *Websocket) WithListener(listener NewRequestListener) *Websocket { + ws.listener = listener + return ws +} + // ServeHTTP processes an HTTP request and upgrades it to a websocket connection. // The connection's entire "lifetime" is spent in this function. func (ws *Websocket) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -56,7 +54,7 @@ func (ws *Websocket) ServeHTTP(w http.ResponseWriter, r *http.Request) { // TODO include connection information, such as the remote address, in the logs. - wsc := newWebsocketConn(conn, ws.rpc, ws.connParams, ws.requests) + wsc := newWebsocketConn(conn, ws.rpc, ws.connParams, ws.listener) err = wsc.ReadWriteLoop(r.Context()) @@ -99,16 +97,16 @@ type websocketConn struct { conn *websocket.Conn rpc *Server params *WebsocketConnParams - requests prometheus.Counter + listener NewRequestListener } -func newWebsocketConn(conn *websocket.Conn, rpc *Server, params *WebsocketConnParams, requests prometheus.Counter) *websocketConn { +func newWebsocketConn(conn *websocket.Conn, rpc *Server, params *WebsocketConnParams, listener NewRequestListener) *websocketConn { conn.SetReadLimit(params.ReadLimit) return &websocketConn{ conn: conn, rpc: rpc, params: params, - requests: requests, + listener: listener, } } @@ -123,7 +121,7 @@ func (wsc *websocketConn) ReadWriteLoop(ctx context.Context) error { // TODO write responses concurrently. Unlike gorilla/websocket, nhooyr.io/websocket // permits concurrent writes. - wsc.requests.Inc() + wsc.listener.OnNewRequest("any") // Decode the message, call the handler, encode the response. resp, err := wsc.rpc.Handle(ctx, r) if err != nil { diff --git a/jsonrpc/websocket_test.go b/jsonrpc/websocket_test.go index da28f366e1..a215c28765 100644 --- a/jsonrpc/websocket_test.go +++ b/jsonrpc/websocket_test.go @@ -14,7 +14,7 @@ import ( ) // The caller is responsible for closing the connection. -func testConnection(t *testing.T, ctx context.Context) *websocket.Conn { +func testConnection(t *testing.T, ctx context.Context, listener jsonrpc.EventListener) *websocket.Conn { method := jsonrpc.Method{ Name: "test_echo", Params: []jsonrpc.Parameter{{Name: "msg"}}, @@ -22,7 +22,7 @@ func testConnection(t *testing.T, ctx context.Context) *websocket.Conn { return msg, nil }, } - rpc := jsonrpc.NewServer(1, utils.NewNopZapLogger()) + rpc := jsonrpc.NewServer(1, utils.NewNopZapLogger()).WithListener(listener) require.NoError(t, rpc.RegisterMethod(method)) // Server @@ -39,7 +39,9 @@ func testConnection(t *testing.T, ctx context.Context) *websocket.Conn { func TestHandler(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - conn := testConnection(t, ctx) + + listener := CountingEventListener{} + conn := testConnection(t, ctx, &listener) msg := `{"jsonrpc" : "2.0", "method" : "test_echo", "params" : [ "abc123" ], "id" : 1}` err := conn.Write(context.Background(), websocket.MessageText, []byte(msg)) @@ -49,6 +51,7 @@ func TestHandler(t *testing.T) { _, got, err := conn.Read(context.Background()) require.NoError(t, err) assert.Equal(t, want, string(got)) + assert.Len(t, listener.OnNewRequestLogs, 1) require.NoError(t, conn.Close(websocket.StatusNormalClosure, "")) } diff --git a/node/http.go b/node/http.go index 411e621379..05198da58c 100644 --- a/node/http.go +++ b/node/http.go @@ -61,16 +61,22 @@ func makeHTTPService(host string, port uint16, handler http.Handler) *httpServic } } -func makeRPCOverHTTP(host string, port uint16, jsonrpcServer *jsonrpc.Server, log utils.SimpleLogger) *httpService { - httpHandler := jsonrpc.NewHTTP(jsonrpcServer, log) +func makeRPCOverHTTP(host string, port uint16, server *jsonrpc.Server, log utils.SimpleLogger, metricsEnabled bool) *httpService { + httpHandler := jsonrpc.NewHTTP(server, log) + if metricsEnabled { + httpHandler.WithListener(makeHTTPMetrics()) + } mux := http.NewServeMux() mux.Handle("/", httpHandler) mux.Handle("/v0_4", httpHandler) return makeHTTPService(host, port, mux) } -func makeRPCOverWebsocket(host string, port uint16, jsonrpcServer *jsonrpc.Server, log utils.SimpleLogger) *httpService { - wsHandler := jsonrpc.NewWebsocket(jsonrpcServer, log) +func makeRPCOverWebsocket(host string, port uint16, server *jsonrpc.Server, log utils.SimpleLogger, metricsEnabled bool) *httpService { + wsHandler := jsonrpc.NewWebsocket(server, log) + if metricsEnabled { + wsHandler.WithListener(makeWSMetrics()) + } mux := http.NewServeMux() mux.Handle("/", wsHandler) mux.Handle("/v0_4", wsHandler) diff --git a/node/metrics.go b/node/metrics.go index 52639fb184..4cece9968e 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -1,7 +1,10 @@ package node import ( + "time" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/jsonrpc" "github.com/prometheus/client_golang/prometheus" ) @@ -26,3 +29,64 @@ func makeDBMetrics() db.EventListener { }, } } + +func makeHTTPMetrics() jsonrpc.NewRequestListener { + reqCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "rpc", + Subsystem: "http", + Name: "requests", + }) + prometheus.MustRegister(reqCounter) + + return &jsonrpc.SelectiveListener{ + OnNewRequestCb: func(method string) { + reqCounter.Inc() + }, + } +} + +func makeWSMetrics() jsonrpc.NewRequestListener { + reqCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "rpc", + Subsystem: "ws", + Name: "requests", + }) + prometheus.MustRegister(reqCounter) + + return &jsonrpc.SelectiveListener{ + OnNewRequestCb: func(method string) { + reqCounter.Inc() + }, + } +} + +func makeRPCMetrics() jsonrpc.EventListener { + requests := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "rpc", + Subsystem: "server", + Name: "requests", + }, []string{"method"}) + failedRequests := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "rpc", + Subsystem: "server", + Name: "failed_requests", + }, []string{"method"}) + requestLatencies := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "rpc", + Subsystem: "server", + Name: "requests_latency", + }, []string{"method"}) + prometheus.MustRegister(requests, failedRequests, requestLatencies) + + return &jsonrpc.SelectiveListener{ + OnNewRequestCb: func(method string) { + requests.WithLabelValues(method).Inc() + }, + OnRequestHandledCb: func(method string, took time.Duration) { + requestLatencies.WithLabelValues(method).Observe(took.Seconds()) + }, + OnRequestFailedCb: func(method string, data any) { + failedRequests.WithLabelValues(method).Inc() + }, + } +} diff --git a/node/node.go b/node/node.go index 126efc47a4..cd05fcd568 100644 --- a/node/node.go +++ b/node/node.go @@ -130,13 +130,14 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen } } if cfg.HTTP { - services = append(services, makeRPCOverHTTP(cfg.HTTPHost, cfg.HTTPPort, jsonrpcServer, log)) + services = append(services, makeRPCOverHTTP(cfg.HTTPHost, cfg.HTTPPort, jsonrpcServer, log, cfg.Metrics)) } if cfg.Websocket { - services = append(services, makeRPCOverWebsocket(cfg.WebsocketHost, cfg.WebsocketPort, jsonrpcServer, log)) + services = append(services, makeRPCOverWebsocket(cfg.WebsocketHost, cfg.WebsocketPort, jsonrpcServer, log, cfg.Metrics)) } if cfg.Metrics { database.WithListener(makeDBMetrics()) + jsonrpcServer.WithListener(makeRPCMetrics()) services = append(services, makeMetrics(cfg.MetricsHost, cfg.MetricsPort)) } if cfg.GRPC { From 1a5bf20d6c7d0e5c54e4eae4ef734141ba417c0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20IRMAK?= Date: Fri, 15 Sep 2023 18:27:57 +0300 Subject: [PATCH 3/4] Move metric counting out of sync --- node/metrics.go | 48 ++++++++++++++++++++++ node/node.go | 1 + sync/event_listener.go | 25 ++++++++++++ sync/sync.go | 93 ++++++++++-------------------------------- 4 files changed, 95 insertions(+), 72 deletions(-) create mode 100644 sync/event_listener.go diff --git a/node/metrics.go b/node/metrics.go index 4cece9968e..ac23151fe7 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -3,8 +3,10 @@ package node import ( "time" + "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/sync" "github.com/prometheus/client_golang/prometheus" ) @@ -90,3 +92,49 @@ func makeRPCMetrics() jsonrpc.EventListener { }, } } + +func makeSyncMetrics(syncReader sync.Reader, bcReader blockchain.Reader) sync.EventListener { + opTimerHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "sync", + Name: "timers", + }, []string{"op"}) + blockCount := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "sync", + Name: "blocks", + }) + reorgCount := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "sync", + Name: "reorganisations", + }) + chainHeightGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "sync", + Name: "blockchain_height", + }, func() float64 { + height, _ := bcReader.Height() + return float64(height) + }) + bestBlockGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "sync", + Name: "best_known_block_number", + }, func() float64 { + bestHeader := syncReader.HighestBlockHeader() + if bestHeader != nil { + return float64(bestHeader.Number) + } + return 0 + }) + + prometheus.MustRegister(opTimerHistogram, blockCount, chainHeightGauge, bestBlockGauge, reorgCount) + + return &sync.SelectiveListener{ + OnSyncStepDoneCb: func(op string, blockNum uint64, took time.Duration) { + opTimerHistogram.WithLabelValues(op).Observe(took.Seconds()) + if op == sync.OpStore { + blockCount.Inc() + } + }, + OnReorgCb: func(blockNum uint64) { + reorgCount.Inc() + }, + } +} diff --git a/node/node.go b/node/node.go index cd05fcd568..cc3d78a691 100644 --- a/node/node.go +++ b/node/node.go @@ -138,6 +138,7 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen if cfg.Metrics { database.WithListener(makeDBMetrics()) jsonrpcServer.WithListener(makeRPCMetrics()) + synchronizer.WithListener(makeSyncMetrics(synchronizer, chain)) services = append(services, makeMetrics(cfg.MetricsHost, cfg.MetricsPort)) } if cfg.GRPC { diff --git a/sync/event_listener.go b/sync/event_listener.go new file mode 100644 index 0000000000..31e5f86c07 --- /dev/null +++ b/sync/event_listener.go @@ -0,0 +1,25 @@ +package sync + +import "time" + +type EventListener interface { + OnSyncStepDone(op string, blockNum uint64, took time.Duration) + OnReorg(blockNum uint64) +} + +type SelectiveListener struct { + OnSyncStepDoneCb func(op string, blockNum uint64, took time.Duration) + OnReorgCb func(blockNum uint64) +} + +func (l *SelectiveListener) OnSyncStepDone(op string, blockNum uint64, took time.Duration) { + if l.OnSyncStepDoneCb != nil { + l.OnSyncStepDoneCb(op, blockNum, took) + } +} + +func (l *SelectiveListener) OnReorg(blockNum uint64) { + if l.OnReorgCb != nil { + l.OnReorgCb(blockNum) + } +} diff --git a/sync/sync.go b/sync/sync.go index 9d7b78005e..218464a7c8 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -11,11 +11,9 @@ import ( "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" - "github.com/NethermindEth/juno/metrics" "github.com/NethermindEth/juno/service" "github.com/NethermindEth/juno/starknetdata" "github.com/NethermindEth/juno/utils" - "github.com/prometheus/client_golang/prometheus" "github.com/sourcegraph/conc/stream" ) @@ -25,9 +23,9 @@ var ( ) const ( - opVerifyLabel = "verify" - opStoreLabel = "store" - opFetchLabel = "fetch" + OpVerify = "verify" + OpStore = "store" + OpFetch = "fetch" ) //go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader @@ -43,19 +41,11 @@ type Synchronizer struct { startingBlockNumber *uint64 highestBlockHeader atomic.Pointer[core.Header] - log utils.SimpleLogger + log utils.SimpleLogger + listener EventListener pendingPollInterval time.Duration - - catchUpMode bool - - // metrics - opTimerHistogram *prometheus.HistogramVec - blockCount prometheus.Counter - chainHeightGauge prometheus.Gauge - bestBlockGauge prometheus.Gauge - reorgCount prometheus.Counter - transactionCount prometheus.Counter + catchUpMode bool } func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, @@ -66,40 +56,14 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, starknetData: starkNetData, log: log, pendingPollInterval: pendingPollInterval, - - opTimerHistogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "sync", - Name: "timers", - }, []string{"op"}), - blockCount: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "sync", - Name: "blocks", - }), - chainHeightGauge: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "sync", - Name: "blockchain_height", - }), - bestBlockGauge: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "sync", - Name: "best_known_block_number", - }), - reorgCount: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "sync", - Name: "reorganisations", - }), - transactionCount: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "sync", - Name: "transactions", - }), + listener: &SelectiveListener{}, } - metrics.MustRegister( - s.opTimerHistogram, - s.blockCount, - s.chainHeightGauge, - s.bestBlockGauge, - s.reorgCount, - s.transactionCount, - ) + return s +} + +// WithListener registers an EventListener +func (s *Synchronizer) WithListener(listener EventListener) *Synchronizer { + s.listener = listener return s } @@ -191,9 +155,9 @@ func (s *Synchronizer) fetchUnknownClasses(ctx context.Context, stateUpdate *cor func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stateUpdate *core.StateUpdate, newClasses map[felt.Felt]core.Class, resetStreams context.CancelFunc, ) stream.Callback { - timer := prometheus.NewTimer(s.opTimerHistogram.WithLabelValues(opVerifyLabel)) + verifyTimer := time.Now() commitments, err := s.blockchain.SanityCheckNewHeight(block, stateUpdate, newClasses) - timer.ObserveDuration() + s.listener.OnSyncStepDone(OpVerify, block.Number, time.Since(verifyTimer)) return func() { select { case <-ctx.Done(): @@ -204,9 +168,9 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat resetStreams() return } - timer := prometheus.NewTimer(s.opTimerHistogram.WithLabelValues(opStoreLabel)) + storeTimer := time.Now() err = s.blockchain.Store(block, commitments, stateUpdate, newClasses) - timer.ObserveDuration() + s.listener.OnSyncStepDone(OpStore, block.Number, time.Since(storeTimer)) if err != nil { if errors.Is(err, blockchain.ErrParentDoesNotMatchHead) { @@ -231,14 +195,11 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat } if highestBlockHeader == nil || highestBlockHeader.Number < block.Number { - if s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header) { - s.bestBlockGauge.Set(float64(block.Header.Number)) - } + s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header) } s.log.Infow("Stored Block", "number", block.Number, "hash", block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString()) - s.updateStats(block) } } } @@ -290,9 +251,9 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) { default: curHeight, curStreamCtx, curCancel := nextHeight, streamCtx, streamCancel fetchers.Go(func() stream.Callback { - timer := prometheus.NewTimer(s.opTimerHistogram.WithLabelValues(opFetchLabel)) + fetchTimer := time.Now() cb := s.fetcherTask(curStreamCtx, curHeight, verifiers, curCancel) - timer.ObserveDuration() + s.listener.OnSyncStepDone(OpFetch, curHeight, time.Since(fetchTimer)) return cb }) nextHeight++ @@ -331,7 +292,7 @@ func (s *Synchronizer) revertHead(forkBlock *core.Block) { } else { s.log.Infow("Reverted HEAD", "reverted", localHead) } - s.reorgCount.Inc() + s.listener.OnReorg(head.Number) } func (s *Synchronizer) pollPending(ctx context.Context, sem chan struct{}) { @@ -377,7 +338,6 @@ func (s *Synchronizer) pollLatest(ctx context.Context, sem chan struct{}) { } else { s.highestBlockHeader.Store(highestBlock.Header) } - s.bestBlockGauge.Set(float64(highestBlock.Header.Number)) }() default: } @@ -431,17 +391,6 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error { }) } -func (s *Synchronizer) updateStats(block *core.Block) { - var ( - transactions = block.TransactionCount - currentHeight = block.Number - ) - - s.blockCount.Inc() - s.chainHeightGauge.Set(float64(currentHeight)) - s.transactionCount.Add(float64(transactions)) -} - func (s *Synchronizer) StartingBlockNumber() (uint64, error) { if s.startingBlockNumber == nil { return 0, errors.New("not running") From c702ab70caaca36a756af3f43888eaf5f1969599 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20IRMAK?= Date: Mon, 18 Sep 2023 12:12:46 +0300 Subject: [PATCH 4/4] Remove metrics package --- metrics/metrics.go | 13 ------------- node/node.go | 3 --- 2 files changed, 16 deletions(-) delete mode 100644 metrics/metrics.go diff --git a/metrics/metrics.go b/metrics/metrics.go deleted file mode 100644 index 1dbd534b74..0000000000 --- a/metrics/metrics.go +++ /dev/null @@ -1,13 +0,0 @@ -package metrics - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -var Enabled = false - -func MustRegister(collectors ...prometheus.Collector) { - if Enabled { - prometheus.MustRegister(collectors...) - } -} diff --git a/node/node.go b/node/node.go index cc3d78a691..12209f7406 100644 --- a/node/node.go +++ b/node/node.go @@ -19,7 +19,6 @@ import ( "github.com/NethermindEth/juno/db/pebble" "github.com/NethermindEth/juno/jsonrpc" "github.com/NethermindEth/juno/l1" - "github.com/NethermindEth/juno/metrics" "github.com/NethermindEth/juno/migration" "github.com/NethermindEth/juno/p2p" "github.com/NethermindEth/juno/rpc" @@ -86,8 +85,6 @@ type Node struct { // New sets the config and logger to the StarknetNode. // Any errors while parsing the config on creating logger will be returned. func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen - metrics.Enabled = cfg.Metrics - if cfg.DatabasePath == "" { dirPrefix, err := utils.DefaultDataDir() if err != nil {