Skip to content

Commit

Permalink
mvcc: Limit number of bbolt read transactions opened by committed reads
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbetz committed Feb 13, 2019
1 parent 1baaac0 commit 95567d3
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 32 deletions.
33 changes: 8 additions & 25 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ var (

// minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
minSnapshotWarningTimeout = 30 * time.Second

// maxConcurrentReadTxns is the maximum number of bbolt transactions open at any time. When this
// limit is hit, committed read transaction requests must wait.
maxConcurrentCommittedReadTxs = uint64(10)
)

type Backend interface {
Expand Down Expand Up @@ -103,7 +107,7 @@ type backend struct {

readTx *readTx

concurrentReadTxCh chan chan ReadTx
committedReadScheduler *ReadScheduler

stopc chan struct{}
donec chan struct{}
Expand Down Expand Up @@ -177,8 +181,6 @@ func newBackend(bcfg BackendConfig) *backend {
buckets: make(map[string]*bolt.Bucket),
},

concurrentReadTxCh: make(chan chan ReadTx),

stopc: make(chan struct{}),
donec: make(chan struct{}),

Expand All @@ -187,6 +189,7 @@ func newBackend(bcfg BackendConfig) *backend {
expensiveReadLimit: bcfg.ExpensiveReadLimit,
}
b.batchTx = newBatchTxBuffered(b)
b.committedReadScheduler = NewReadScheduler(b, maxConcurrentCommittedReadTxs)
go b.run()
return b
}
Expand All @@ -201,9 +204,7 @@ func (b *backend) BatchTx() BatchTx {
func (b *backend) ReadTx() ReadTx { return b.readTx }

func (b *backend) CommittedReadTx() ReadTx {
rch := make(chan ReadTx)
b.concurrentReadTxCh <- rch
return <-rch
return b.committedReadScheduler.RequestReadTx()
}

// ForceCommit forces the current batching tx to commit.
Expand Down Expand Up @@ -326,25 +327,7 @@ func (b *backend) run() {
batchIntervalSec.Observe(time.Since(start).Seconds())
start = time.Now()
t.Reset(b.batchInterval)
b.createConcurrentReadTxs()
}
}

func (b *backend) createConcurrentReadTxs() {
// do not allow too many concurrent read txs.
// TODO: improve this by having a global pending counter?
for i := 0; i < 100; i++ {
select {
case rch := <-b.concurrentReadTxCh:
rtx, err := b.db.Begin(false)
if err != nil {
plog.Fatalf("cannot begin read tx (%s)", err)
}
rch <- &concurrentReadTx{tx: rtx}
default:
// no more to create.
return
}
b.committedReadScheduler.BeginPendingReads()
}
}

Expand Down
72 changes: 72 additions & 0 deletions mvcc/backend/concurrent_read_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backend

// ReadTxRequest is a channel to send a requested ReadTx to when it becomes available.
type ReadTxRequest = chan ReadTx

type ReadScheduler struct {
maxConcurrentReadTxs uint64
readTxCh chan ReadTxRequest
b *backend
pendingReadCounter *GaugedCounter
openReadCounter *GaugedCounter
}

func NewReadScheduler(b *backend, maxConcurrentReadTxs uint64) *ReadScheduler {
return &ReadScheduler{
maxConcurrentReadTxs: maxConcurrentReadTxs,
readTxCh: make(chan ReadTxRequest),
b: b,
pendingReadCounter: &GaugedCounter{0, pendingReadGauge},
openReadCounter: &GaugedCounter{0, openReadGauge},
}
}

// RequestReadTx requests a new ReadTx and blocks until it is available.
func (r *ReadScheduler) RequestReadTx() ReadTx {
rch := make(chan ReadTx)
r.pendingReadCounter.Inc()
defer r.pendingReadCounter.Dec()
r.readTxCh <- rch
return <-rch
}

// BeginPendingReads begins pending read transactions and sends them
// to the channels of all blocked RequestReadTx() callers.
// Ensures more than maxConcurrentReadTxns are running at the same time.
func (r *ReadScheduler) BeginPendingReads() {
// TODO(jpbetz): Rename. Pending already refers to buffered writes not yet flushed.
// do not allow too many concurrent read txs.
// TODO: improve this by having a global pending counter?
// TODO(jpbetz): This has the potential to backlog indefinitely under heavly load.
// If we're going to impose a limit here. We might want to do more to ensure we're
// managing context deadlines and cancelations also.

concurrentReadTxs := r.openReadCounter.Value()
for i := uint64(0); i < (r.maxConcurrentReadTxs - concurrentReadTxs); i++ {
select {
case rch := <-r.readTxCh:
rtx, err := r.b.db.Begin(false)
if err != nil {
plog.Fatalf("cannot begin read tx (%s)", err)
}
rch <- &MonitoredReadTx{r.openReadCounter, &concurrentReadTx{tx: rtx}}
default:
// no more to create.
return
}
}
}
7 changes: 7 additions & 0 deletions mvcc/backend/concurrent_read_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,10 @@ func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit in
func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
return unsafeForEach(rt.tx, bucketName, visitor)
}

func (m *MonitoredReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
return m.Tx.UnsafeRange(bucketName, key, endKey, limit)
}
func (m *MonitoredReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
return m.Tx.UnsafeForEach(bucketName, visitor)
}
20 changes: 18 additions & 2 deletions mvcc/backend/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,26 @@ var (
batchIntervalSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd_debugging",
Subsystem: "backend",
Name: "batch_interval_duration_Seconds",
Name: "batch_interval_duration_seconds",
Help: "The distributions of batch interval in mvcc backend.",

// lowest bucket start of upper bound 0.0001 sec (0.1 msec) with factor 2
// highest bucket start of 0.1 msec * 2^13 == 0.8192 sec
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 14),
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})

openReadGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "backend",
Name: "open_reads",
Help: "The number of open bbolt read transactions.",
})

pendingReadGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "backend",
Name: "pending_reads",
Help: "The number of pending committed reads.",
})
)

Expand All @@ -104,4 +118,6 @@ func init() {
prometheus.MustRegister(defragSec)
prometheus.MustRegister(snapshotTransferSec)
prometheus.MustRegister(batchIntervalSec)
prometheus.MustRegister(openReadGauge)
prometheus.MustRegister(pendingReadGauge)
}
38 changes: 38 additions & 0 deletions mvcc/backend/read_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"bytes"
"math"
"sync"
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"
bolt "go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -118,3 +120,39 @@ func (rt *readTx) reset() {
rt.buckets = make(map[string]*bolt.Bucket)
rt.tx = nil
}

// MonitoredReadTx increments a GaugedCounter when each transaction is locked and decrements it
// when they are unlocked.
type MonitoredReadTx struct {
Counter *GaugedCounter
Tx ReadTx
}

func (m *MonitoredReadTx) Lock() {
m.Counter.Inc()
m.Tx.Lock()
}
func (m *MonitoredReadTx) Unlock() {
m.Tx.Unlock()
m.Counter.Dec()
}

// GaugeCounter is an atomic counter that also emits a prometheus gauge metric of the count.
type GaugedCounter struct {
count uint64 // atomic uint64
gauge prometheus.Gauge
}

func (c *GaugedCounter) Inc() {
c.gauge.Inc()
atomic.AddUint64(&c.count, 1)
}

func (c *GaugedCounter) Dec() {
c.gauge.Dec()
atomic.AddUint64(&c.count, ^uint64(0))
}

func (c *GaugedCounter) Value() uint64 {
return atomic.LoadUint64(&c.count)
}
8 changes: 3 additions & 5 deletions mvcc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,6 @@ var (
Help: "Total number of committed read.",
})



rangeSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd_debugging",
Subsystem: "mvcc",
Expand All @@ -236,7 +234,7 @@ var (

// lowest bucket start of upper bound 0.00001 sec (0.01 msec) with factor 2
// highest bucket start of 0.01 msec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.00001, 2, 14),
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})

putSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Expand All @@ -247,7 +245,7 @@ var (

// lowest bucket start of upper bound 0.00001 sec (0.01 msec) with factor 2
// highest bucket start of 0.01 msec * 2^13 == 81.92 msec
Buckets: prometheus.ExponentialBuckets(0.00001, 2, 14),
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})

committedReadSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Expand All @@ -258,7 +256,7 @@ var (

// lowest bucket start of upper bound 0.00001 sec (0.01 msec) with factor 2
// highest bucket start of 0.01 msec * 2^15 == 327.68 msec
Buckets: prometheus.ExponentialBuckets(0.00001, 2, 16),
Buckets: prometheus.ExponentialBuckets(0.01, 2, 16),
})
)

Expand Down

0 comments on commit 95567d3

Please sign in to comment.