Skip to content

Commit

Permalink
mvcc: add metrics to characterize large reads
Browse files Browse the repository at this point in the history
  • Loading branch information
jingyih committed Feb 12, 2019
1 parent 7e2c528 commit 676d2dd
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 21 deletions.
3 changes: 3 additions & 0 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func (b *backend) SizeInUse() int64 {
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
start := time.Now()
defer t.Stop()
for {
select {
Expand All @@ -315,6 +316,8 @@ func (b *backend) run() {
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
batchIntervalSec.Observe(time.Since(start).Seconds() * 1e3)
start = time.Now()
t.Reset(b.batchInterval)
b.createConcurrentReadTxs()
}
Expand Down
12 changes: 12 additions & 0 deletions mvcc/backend/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ var (
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
})

batchIntervalSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd_debugging",
Subsystem: "backend",
Name: "batch_interval_duration_milliseconds",
Help: "The distributions of batch interval in mvcc backend.",

// lowest bucket start of upper bound 0.1 msec (10 us) with factor 2
// highest bucket start of 0.1 msec * 2^13 == 819.2 msec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 14),
})
)

func init() {
Expand All @@ -92,4 +103,5 @@ func init() {
prometheus.MustRegister(writeSec)
prometheus.MustRegister(defragSec)
prometheus.MustRegister(snapshotTransferSec)
prometheus.MustRegister(batchIntervalSec)
}
1 change: 1 addition & 0 deletions mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type ReadView interface {
type TxnRead interface {
ReadView
// End marks the transaction is complete and ready to commit.
IsCommittedRead() bool
End()
}

Expand Down
31 changes: 18 additions & 13 deletions mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type storeTxnRead struct {
// for creating concurrent read tx when the read is expensive.
b backend.Backend
// is the transcation readonly?
ro bool
ro bool
isCommittedRead bool

firstRev int64
rev int64
Expand All @@ -48,16 +49,18 @@ func (s *store) Read() TxnRead {
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{
s: s,
tx: tx,
b: s.b,
ro: readonly,
firstRev: firstRev,
rev: rev})
s: s,
tx: tx,
b: s.b,
ro: readonly,
isCommittedRead: false,
firstRev: firstRev,
rev: rev})
}

func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
func (tr *storeTxnRead) IsCommittedRead() bool { return tr.isCommittedRead }

func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
return tr.rangeKeys(key, end, tr.Rev(), ro)
Expand All @@ -84,10 +87,11 @@ func (s *store) Write() TxnWrite {
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{
s: s,
txlocked: true,
tx: tx,
ro: readwrite,
s: s,
txlocked: true,
tx: tx,
ro: readwrite,
isCommittedRead: false,
},
tx: tx,
beginRev: s.currentRev,
Expand Down Expand Up @@ -161,6 +165,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
if limit > expensiveReadLimit && !tr.txlocked && tr.ro { // first expensive read in a read only transcation
// too many keys to range. upgrade the read transcation to concurrent read tx.
tr.tx = tr.b.CommittedReadTx()
tr.isCommittedRead = true
}
if !tr.txlocked {
tr.tx.Lock()
Expand Down
47 changes: 47 additions & 0 deletions mvcc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,49 @@ var (
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
})

committedReadCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "committed_read_total",
Help: "Total number of committed read.",
})



rangeSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "range_duration_milliseconds",
Help: "The latency distributions of range in mvcc store.",

// lowest bucket start of upper bound 0.01 msec (10 us) with factor 2
// highest bucket start of 0.01 msec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.01, 2, 14),
})

putSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "put_duration_milliseconds",
Help: "The latency distributions of put in mvcc store.",

// lowest bucket start of upper bound 0.01 msec (10 us) with factor 2
// highest bucket start of 0.01 msec * 2^13 == 81.92 msec
Buckets: prometheus.ExponentialBuckets(0.01, 2, 14),
})

committedReadSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Name: "committed_read_duration_milliseconds",
Help: "The latency distributions of committed read in mvcc store.",

// lowest bucket start of upper bound 0.01 msec (10 us) with factor 2
// highest bucket start of 0.01 msec * 2^15 == 327.68 msec
Buckets: prometheus.ExponentialBuckets(0.01, 2, 16),
})
)

func init() {
Expand All @@ -239,6 +282,10 @@ func init() {
prometheus.MustRegister(dbTotalSizeInUse)
prometheus.MustRegister(hashSec)
prometheus.MustRegister(hashRevSec)
prometheus.MustRegister(committedReadCounter)
prometheus.MustRegister(rangeSec)
prometheus.MustRegister(putSec)
prometheus.MustRegister(committedReadSec)
}

// ReportEventReceived reports that an event is received.
Expand Down
41 changes: 33 additions & 8 deletions mvcc/metrics_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,37 @@

package mvcc

import "go.etcd.io/etcd/lease"
import (
"go.etcd.io/etcd/lease"
"time"
"fmt"
)

type metricsTxnWrite struct {
TxnWrite
ranges uint
puts uint
deletes uint
ranges uint
puts uint
committedReads uint
deletes uint
rangeStart time.Time
putStart time.Time
}

func newMetricsTxnRead(tr TxnRead) TxnRead {
return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0}
txn := &metricsTxnWrite{}
txn.TxnWrite = &txnReadWrite{tr}
return txn
}

func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
return &metricsTxnWrite{tw, 0, 0, 0}
txn := &metricsTxnWrite{}
txn.TxnWrite = tw
return txn
}

func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
tw.ranges++
tw.rangeStart = time.Now()
return tw.TxnWrite.Range(key, end, ro)
}

Expand All @@ -43,6 +55,7 @@ func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {

func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw.puts++
tw.putStart = time.Now()
return tw.TxnWrite.Put(key, value, lease)
}

Expand All @@ -51,7 +64,19 @@ func (tw *metricsTxnWrite) End() {
if sum := tw.ranges + tw.puts + tw.deletes; sum > 1 {
txnCounter.Inc()
}
rangeCounter.Add(float64(tw.ranges))
putCounter.Add(float64(tw.puts))
deleteCounter.Add(float64(tw.deletes))
if tw.puts > 0 {
putCounter.Add(float64(tw.puts))
putSec.Observe(time.Since(tw.putStart).Seconds() * 1e3)
}
if tw.ranges > 0 {
// cannot determine if the read transaction is a committed read at the beginning of the transaction
if tw.TxnWrite.IsCommittedRead() {
committedReadSec.Observe(time.Since(tw.rangeStart).Seconds() * 1e3)
committedReadCounter.Inc()
} else {
rangeSec.Observe(time.Since(tw.rangeStart).Seconds() * 1e3)
rangeCounter.Add(float64(tw.ranges))
}
}
}

0 comments on commit 676d2dd

Please sign in to comment.