Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-supervisor: Create logdb for each chain #11043

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
op-supervisor: Create logdb for each chain.
  • Loading branch information
ajsutton committed Jun 28, 2024
commit cd143707939af05a9ba97f26d22f0594fe75bd94
66 changes: 50 additions & 16 deletions op-supervisor/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Metricer interface {
CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool)

RecordDBEntryCount(chainID *big.Int, count int64)
RecordDBSearchEntriesRead(chainID *big.Int, count int64)

Document() []opmetrics.DocumentedMetric
}

Expand All @@ -29,9 +32,12 @@ type Metrics struct {

opmetrics.RPCMetrics

SizeVec *prometheus.GaugeVec
GetVec *prometheus.CounterVec
AddVec *prometheus.CounterVec
CacheSizeVec *prometheus.GaugeVec
CacheGetVec *prometheus.CounterVec
CacheAddVec *prometheus.CounterVec

DBEntryCountVec *prometheus.GaugeVec
DBSearchEntriesReadVec *prometheus.HistogramVec

info prometheus.GaugeVec
up prometheus.Gauge
Expand Down Expand Up @@ -71,32 +77,48 @@ func NewMetrics(procName string) *Metrics {
Help: "1 if the op-supervisor has finished starting up",
}),

SizeVec: factory.NewGaugeVec(prometheus.GaugeOpts{
CacheSizeVec: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "source_rpc_cache_size",
Help: "source rpc cache cache size",
Help: "Source rpc cache cache size",
}, []string{
"chain",
"type",
}),
GetVec: factory.NewCounterVec(prometheus.CounterOpts{
CacheGetVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "source_rpc_cache_get",
Help: "source rpc cache lookups, hitting or not",
Help: "Source rpc cache lookups, hitting or not",
}, []string{
"chain",
"type",
"hit",
}),
AddVec: factory.NewCounterVec(prometheus.CounterOpts{
CacheAddVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "source_rpc_cache_add",
Help: "source rpc cache additions, evicting previous values or not",
Help: "Source rpc cache additions, evicting previous values or not",
}, []string{
"chain",
"type",
"evicted",
}),

DBEntryCountVec: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "logdb_entries_current",
Help: "Current number of entries in the log database by chain ID",
}, []string{
"chain",
}),
DBSearchEntriesReadVec: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Name: "logdb_search_entries_read",
Help: "Entries read per search of the log database",
Buckets: []float64{1, 2, 5, 10, 100, 200, 256},
}, []string{
"chain",
}),
}
}

Expand All @@ -120,20 +142,32 @@ func (m *Metrics) RecordUp() {
}

func (m *Metrics) CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) {
chain := chainID.String()
m.SizeVec.WithLabelValues(chain, label).Set(float64(cacheSize))
chain := chainIDLabel(chainID)
m.CacheSizeVec.WithLabelValues(chain, label).Set(float64(cacheSize))
if evicted {
m.AddVec.WithLabelValues(chain, label, "true").Inc()
m.CacheAddVec.WithLabelValues(chain, label, "true").Inc()
} else {
m.AddVec.WithLabelValues(chain, label, "false").Inc()
m.CacheAddVec.WithLabelValues(chain, label, "false").Inc()
}
}

func (m *Metrics) CacheGet(chainID *big.Int, label string, hit bool) {
chain := chainID.String()
chain := chainIDLabel(chainID)
if hit {
m.GetVec.WithLabelValues(chain, label, "true").Inc()
m.CacheGetVec.WithLabelValues(chain, label, "true").Inc()
} else {
m.GetVec.WithLabelValues(chain, label, "false").Inc()
m.CacheGetVec.WithLabelValues(chain, label, "false").Inc()
}
}

func (m *Metrics) RecordDBEntryCount(chainID *big.Int, count int64) {
m.DBEntryCountVec.WithLabelValues(chainIDLabel(chainID)).Set(float64(count))
}

func (m *Metrics) RecordDBSearchEntriesRead(chainID *big.Int, count int64) {
m.DBSearchEntriesReadVec.WithLabelValues(chainIDLabel(chainID)).Observe(float64(count))
}

func chainIDLabel(chainID *big.Int) string {
return chainID.Text(10)
}
3 changes: 3 additions & 0 deletions op-supervisor/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ func (*noopMetrics) RecordUp() {}

func (m *noopMetrics) CacheAdd(_ *big.Int, _ string, _ int, _ bool) {}
func (m *noopMetrics) CacheGet(_ *big.Int, _ string, _ bool) {}

func (m *noopMetrics) RecordDBEntryCount(_ *big.Int, _ int64) {}
func (m *noopMetrics) RecordDBSearchEntriesRead(_ *big.Int, _ int64) {}
55 changes: 42 additions & 13 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,28 @@ import (
"errors"
"fmt"
"io"
"math/big"
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)

type Metrics interface {
source.Metrics
}

type SupervisorBackend struct {
started atomic.Bool
logger log.Logger

chainMonitors []*source.ChainMonitor

// TODO(protocol-quest#287): collection of logdbs per chain
// TODO(protocol-quest#288): collection of logdb updating services per chain
logDBs []*db.DB
}

var _ frontend.Backend = (*SupervisorBackend)(nil)
Expand All @@ -37,8 +35,23 @@ var _ io.Closer = (*SupervisorBackend)(nil)

func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs))
logDBs := make([]*db.DB, len(cfg.L2RPCs))
for i, rpc := range cfg.L2RPCs {
monitor, err := source.NewChainMonitor(ctx, logger, m, rpc)
rpcClient, chainID, err := createRpcClient(ctx, logger, rpc)
if err != nil {
return nil, err
}
cm := newChainMetrics(chainID, m)
path, err := prepLogDBPath(chainID, cfg.Datadir)
if err != nil {
return nil, fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
}
logDB, err := db.NewFromFile(logger, cm, path)
if err != nil {
return nil, fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
logDBs[i] = logDB
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient)
if err != nil {
return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
Expand All @@ -47,14 +60,26 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
return &SupervisorBackend{
logger: logger,
chainMonitors: chainMonitors,
logDBs: logDBs,
}, nil
}

func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client.RPC, *big.Int, error) {
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
return nil, nil, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
}
chainID, err := ethClient.ChainID(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
}
return client.NewBaseRPCClient(ethClient.Client()), chainID, nil
}

func (su *SupervisorBackend) Start(ctx context.Context) error {
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// TODO(protocol-quest#288): start logdb updating services of all chains
for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start chain monitor: %w", err)
Expand All @@ -67,13 +92,17 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) {
return errors.New("already stopped")
}
// TODO(protocol-quest#288): stop logdb updating services of all chains
var errs error
for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
}
}
for _, logDB := range su.logDBs {
if err := logDB.Close(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to close logdb: %w", err))
}
}
return errs
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package source
package backend

import (
"math/big"

"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
)

type Metrics interface {
CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool)

RecordDBEntryCount(chainID *big.Int, count int64)
RecordDBSearchEntriesRead(chainID *big.Int, count int64)
}

// chainMetrics is an adapter between the metrics API expected by clients that assume there's only a single chain
// and the actual metrics implementation which requires a chain ID to identify the source chain.
type chainMetrics struct {
Expand All @@ -28,4 +37,13 @@ func (c *chainMetrics) CacheGet(label string, hit bool) {
c.delegate.CacheGet(c.chainID, label, hit)
}

func (c *chainMetrics) RecordDBEntryCount(count int64) {
c.delegate.RecordDBEntryCount(c.chainID, count)
}

func (c *chainMetrics) RecordDBSearchEntriesRead(count int64) {
c.delegate.RecordDBSearchEntriesRead(c.chainID, count)
}

var _ caching.Metrics = (*chainMetrics)(nil)
var _ db.Metrics = (*chainMetrics)(nil)
8 changes: 4 additions & 4 deletions op-supervisor/supervisor/backend/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ const (
)

type Metrics interface {
RecordEntryCount(count int64)
RecordSearchEntriesRead(count int64)
RecordDBEntryCount(count int64)
RecordDBSearchEntriesRead(count int64)
}

type logContext struct {
Expand Down Expand Up @@ -166,7 +166,7 @@ func (db *DB) trimInvalidTrailingEntries() error {
}

func (db *DB) updateEntryCountMetric() {
db.m.RecordEntryCount(db.lastEntryIdx() + 1)
db.m.RecordDBEntryCount(db.lastEntryIdx() + 1)
}

// ClosestBlockInfo returns the block number and hash of the highest recorded block at or before blockNum.
Expand Down Expand Up @@ -244,7 +244,7 @@ func (db *DB) findLogInfo(blockNum uint64, logIdx uint32) (TruncatedHash, *itera
}
db.log.Trace("Starting search", "entry", entryIdx, "blockNum", i.current.blockNum, "logIdx", i.current.logIdx)
defer func() {
db.m.RecordSearchEntriesRead(i.entriesRead)
db.m.RecordDBSearchEntriesRead(i.entriesRead)
}()
for {
evtBlockNum, evtLogIdx, evtHash, err := i.NextLog()
Expand Down
4 changes: 2 additions & 2 deletions op-supervisor/supervisor/backend/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,11 +881,11 @@ type stubMetrics struct {
entriesReadForSearch int64
}

func (s *stubMetrics) RecordEntryCount(count int64) {
func (s *stubMetrics) RecordDBEntryCount(count int64) {
s.entryCount = count
}

func (s *stubMetrics) RecordSearchEntriesRead(count int64) {
func (s *stubMetrics) RecordDBSearchEntriesRead(count int64) {
s.entriesReadForSearch = count
}

Expand Down
24 changes: 24 additions & 0 deletions op-supervisor/supervisor/backend/file_layout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package backend

import (
"fmt"
"math/big"
"os"
"path/filepath"
)

func prepLogDBPath(chainID *big.Int, datadir string) (string, error) {
dir, err := prepChainDir(chainID, datadir)
if err != nil {
return "", err
}
return filepath.Join(dir, "log.db"), nil
}

func prepChainDir(chainID *big.Int, datadir string) (string, error) {
dir := filepath.Join(datadir, chainID.Text(10))
if err := os.MkdirAll(dir, 0755); err != nil {
return "", fmt.Errorf("failed to create chain directory %v: %w", dir, err)
}
return dir, nil
}
28 changes: 28 additions & 0 deletions op-supervisor/supervisor/backend/file_layout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package backend

import (
"math/big"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
)

func TestLogDBPath(t *testing.T) {
base := t.TempDir()
chainIDStr := "42984928492928428424243444"
chainID, ok := new(big.Int).SetString(chainIDStr, 10)
require.True(t, ok)
expected := filepath.Join(base, "subdir", chainIDStr, "log.db")
path, err := prepLogDBPath(chainID, filepath.Join(base, "subdir"))
require.NoError(t, err)
require.Equal(t, expected, path)

// Check it still works when directories exist
require.NoError(t, os.WriteFile(path, []byte("test"), 0o644))

path, err = prepLogDBPath(chainID, filepath.Join(base, "subdir"))
require.NoError(t, err)
require.Equal(t, expected, path)
}
Loading