diff --git a/integration/metrics_test.go b/integration/metrics_test.go index 19547102bfa..7dcac71e00a 100644 --- a/integration/metrics_test.go +++ b/integration/metrics_test.go @@ -16,6 +16,7 @@ package integration import ( "context" + "fmt" "net/http" "strconv" "testing" @@ -103,22 +104,39 @@ func testMetricDbSizeDefrag(t *testing.T, name string) { t.Fatal(kerr) } - // Put to move PendingPages to FreePages - if _, err = kvc.Put(context.TODO(), putreq); err != nil { - t.Fatal(err) - } - time.Sleep(500 * time.Millisecond) + validateAfterCompactionInUse := func() error { + // Put to move PendingPages to FreePages + if _, err = kvc.Put(context.TODO(), putreq); err != nil { + t.Fatal(err) + } + time.Sleep(500 * time.Millisecond) - afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes") - if err != nil { - t.Fatal(err) - } - aciu, err := strconv.Atoi(afterCompactionInUse) - if err != nil { - t.Fatal(err) + afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes") + if err != nil { + t.Fatal(err) + } + aciu, err := strconv.Atoi(afterCompactionInUse) + if err != nil { + t.Fatal(err) + } + if biu <= aciu { + return fmt.Errorf("expected less than %d, got %d after compaction", biu, aciu) + } + return nil } - if biu <= aciu { - t.Fatalf("expected less than %d, got %d after compaction", biu, aciu) + + // backend rollbacks read transaction asynchronously (PR #10523), + // which causes the result to be flaky. Retry 3 times. + maxRetry, retry := 3, 0 + for { + err := validateAfterCompactionInUse() + if err == nil { + break + } + retry++ + if retry >= maxRetry { + t.Fatalf(err.Error()) + } } // defrag should give freed space back to fs diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 0d3cd87ec4c..bffd74950b4 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -49,8 +49,11 @@ var ( ) type Backend interface { + // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523. ReadTx() ReadTx BatchTx() BatchTx + // ConcurrentReadTx returns a non-blocking read transaction. + ConcurrentReadTx() ReadTx Snapshot() Snapshot Hash(ignores map[IgnoreKey]struct{}) (uint32, error) @@ -63,6 +66,8 @@ type Backend interface { // Since the backend can manage free space in a non-byte unit such as // number of pages, the returned value can be not exactly accurate in bytes. SizeInUse() int64 + // OpenReadTxN returns the number of currently open read transactions in the backend. + OpenReadTxN() int64 Defrag() error ForceCommit() Close() error @@ -87,6 +92,8 @@ type backend struct { sizeInUse int64 // commits counts number of commits since start commits int64 + // openReadTxN is the number of currently open read transactions in the backend + openReadTxN int64 mu sync.RWMutex db *bolt.DB @@ -166,6 +173,7 @@ func newBackend(bcfg BackendConfig) *backend { txBuffer: txBuffer{make(map[string]*bucketBuffer)}, }, buckets: make(map[string]*bolt.Bucket), + txWg: new(sync.WaitGroup), }, stopc: make(chan struct{}), @@ -187,6 +195,24 @@ func (b *backend) BatchTx() BatchTx { func (b *backend) ReadTx() ReadTx { return b.readTx } +// ConcurrentReadTx creates and returns a new ReadTx, which: +// A) creates and keeps a copy of backend.readTx.txReadBuffer, +// B) references the boltdb read Tx (and its bucket cache) of current batch interval. +func (b *backend) ConcurrentReadTx() ReadTx { + b.readTx.RLock() + defer b.readTx.RUnlock() + // prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock(). + b.readTx.txWg.Add(1) + // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval. + return &concurrentReadTx{ + buf: b.readTx.buf.unsafeCopy(), + tx: b.readTx.tx, + txMu: &b.readTx.txMu, + buckets: b.readTx.buckets, + txWg: b.readTx.txWg, + } +} + // ForceCommit forces the current batching tx to commit. func (b *backend) ForceCommit() { b.batchTx.Commit() @@ -491,8 +517,10 @@ func (b *backend) begin(write bool) *bolt.Tx { size := tx.Size() db := tx.DB() + stats := db.Stats() atomic.StoreInt64(&b.size, size) - atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) + atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize))) + atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN)) return tx } @@ -509,6 +537,10 @@ func (b *backend) unsafeBegin(write bool) *bolt.Tx { return tx } +func (b *backend) OpenReadTxN() int64 { + return atomic.LoadInt64(&b.openReadTxN) +} + // NewTmpBackend creates a backend implementation for testing. func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") diff --git a/mvcc/backend/backend_test.go b/mvcc/backend/backend_test.go index 69bd2142368..d8fc2ec6422 100644 --- a/mvcc/backend/backend_test.go +++ b/mvcc/backend/backend_test.go @@ -250,6 +250,35 @@ func TestBackendWriteback(t *testing.T) { } } +// TestConcurrentReadTx ensures that current read transaction can see all prior writes stored in read buffer +func TestConcurrentReadTx(t *testing.T) { + b, tmpPath := NewTmpBackend(time.Hour, 10000) + defer cleanup(b, tmpPath) + + wtx1 := b.BatchTx() + wtx1.Lock() + wtx1.UnsafeCreateBucket([]byte("key")) + wtx1.UnsafePut([]byte("key"), []byte("abc"), []byte("ABC")) + wtx1.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1")) + wtx1.Unlock() + + wtx2 := b.BatchTx() + wtx2.Lock() + wtx2.UnsafePut([]byte("key"), []byte("def"), []byte("DEF")) + wtx2.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2")) + wtx2.Unlock() + + rtx := b.ConcurrentReadTx() + rtx.RLock() // no-op + k, v := rtx.UnsafeRange([]byte("key"), []byte("abc"), []byte("\xff"), 0) + rtx.RUnlock() + wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")} + wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")} + if !reflect.DeepEqual(wKey, k) || !reflect.DeepEqual(wVal, v) { + t.Errorf("want k=%+v, v=%+v; got k=%+v, v=%+v", wKey, wVal, k, v) + } +} + // TestBackendWritebackForEach checks that partially written / buffered // data is visited in the same order as fully committed data. func TestBackendWritebackForEach(t *testing.T) { diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 77d0648b8c4..d5c8a88c353 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -306,13 +306,18 @@ func (t *batchTxBuffered) commit(stop bool) { func (t *batchTxBuffered) unsafeCommit(stop bool) { if t.backend.readTx.tx != nil { - if err := t.backend.readTx.tx.Rollback(); err != nil { - if t.backend.lg != nil { - t.backend.lg.Fatal("failed to rollback tx", zap.Error(err)) - } else { - plog.Fatalf("cannot rollback tx (%s)", err) + // wait all store read transactions using the current boltdb tx to finish, + // then close the boltdb tx + go func(tx *bolt.Tx, wg *sync.WaitGroup) { + wg.Wait() + if err := tx.Rollback(); err != nil { + if t.backend.lg != nil { + t.backend.lg.Fatal("failed to rollback tx", zap.Error(err)) + } else { + plog.Fatalf("cannot rollback tx (%s)", err) + } } - } + }(t.backend.readTx.tx, t.backend.readTx.txWg) t.backend.readTx.reset() } diff --git a/mvcc/backend/read_tx.go b/mvcc/backend/read_tx.go index 7b8d855eb76..91fe72ec558 100644 --- a/mvcc/backend/read_tx.go +++ b/mvcc/backend/read_tx.go @@ -42,10 +42,13 @@ type readTx struct { mu sync.RWMutex buf txReadBuffer - // txmu protects accesses to buckets and tx on Range requests. - txmu sync.RWMutex + // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle. + // txMu protects accesses to buckets and tx on Range requests. + txMu sync.RWMutex tx *bolt.Tx buckets map[string]*bolt.Bucket + // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done. + txWg *sync.WaitGroup } func (rt *readTx) Lock() { rt.mu.Lock() } @@ -71,23 +74,23 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][] // find/cache bucket bn := string(bucketName) - rt.txmu.RLock() + rt.txMu.RLock() bucket, ok := rt.buckets[bn] - rt.txmu.RUnlock() + rt.txMu.RUnlock() if !ok { - rt.txmu.Lock() + rt.txMu.Lock() bucket = rt.tx.Bucket(bucketName) rt.buckets[bn] = bucket - rt.txmu.Unlock() + rt.txMu.Unlock() } // ignore missing bucket since may have been created in this batch if bucket == nil { return keys, vals } - rt.txmu.Lock() + rt.txMu.Lock() c := bucket.Cursor() - rt.txmu.Unlock() + rt.txMu.Unlock() k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) return append(k2, keys...), append(v2, vals...) @@ -108,9 +111,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err if err := rt.buf.ForEach(bucketName, getDups); err != nil { return err } - rt.txmu.Lock() + rt.txMu.Lock() err := unsafeForEach(rt.tx, bucketName, visitNoDup) - rt.txmu.Unlock() + rt.txMu.Unlock() if err != nil { return err } @@ -121,4 +124,87 @@ func (rt *readTx) reset() { rt.buf.reset() rt.buckets = make(map[string]*bolt.Bucket) rt.tx = nil + rt.txWg = new(sync.WaitGroup) +} + +// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation? +type concurrentReadTx struct { + buf txReadBuffer + txMu *sync.RWMutex + tx *bolt.Tx + buckets map[string]*bolt.Bucket + txWg *sync.WaitGroup +} + +func (rt *concurrentReadTx) Lock() {} +func (rt *concurrentReadTx) Unlock() {} + +// RLock is no-op. concurrentReadTx does not need to be locked after it is created. +func (rt *concurrentReadTx) RLock() {} + +// RUnlock signals the end of concurrentReadTx. +func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() } + +func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { + dups := make(map[string]struct{}) + getDups := func(k, v []byte) error { + dups[string(k)] = struct{}{} + return nil + } + visitNoDup := func(k, v []byte) error { + if _, ok := dups[string(k)]; ok { + return nil + } + return visitor(k, v) + } + if err := rt.buf.ForEach(bucketName, getDups); err != nil { + return err + } + rt.txMu.Lock() + err := unsafeForEach(rt.tx, bucketName, visitNoDup) + rt.txMu.Unlock() + if err != nil { + return err + } + return rt.buf.ForEach(bucketName, visitor) +} + +func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + if endKey == nil { + // forbid duplicates for single keys + limit = 1 + } + if limit <= 0 { + limit = math.MaxInt64 + } + if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) { + panic("do not use unsafeRange on non-keys bucket") + } + keys, vals := rt.buf.Range(bucketName, key, endKey, limit) + if int64(len(keys)) == limit { + return keys, vals + } + + // find/cache bucket + bn := string(bucketName) + rt.txMu.RLock() + bucket, ok := rt.buckets[bn] + rt.txMu.RUnlock() + if !ok { + rt.txMu.Lock() + bucket = rt.tx.Bucket(bucketName) + rt.buckets[bn] = bucket + rt.txMu.Unlock() + } + + // ignore missing bucket since may have been created in this batch + if bucket == nil { + return keys, vals + } + rt.txMu.Lock() + c := bucket.Cursor() + rt.txMu.Unlock() + + k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) + return append(k2, keys...), append(v2, vals...) } diff --git a/mvcc/backend/tx_buffer.go b/mvcc/backend/tx_buffer.go index 56e885dbfbc..d73463823ca 100644 --- a/mvcc/backend/tx_buffer.go +++ b/mvcc/backend/tx_buffer.go @@ -88,6 +88,19 @@ func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) er return nil } +// unsafeCopy returns a copy of txReadBuffer, caller should acquire backend.readTx.RLock() +func (txr *txReadBuffer) unsafeCopy() txReadBuffer { + txrCopy := txReadBuffer{ + txBuffer: txBuffer{ + buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)), + }, + } + for bucketName, bucket := range txr.txBuffer.buckets { + txrCopy.txBuffer.buckets[bucketName] = bucket.Copy() + } + return txrCopy +} + type kv struct { key []byte val []byte @@ -179,3 +192,12 @@ func (bb *bucketBuffer) Less(i, j int) bool { return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0 } func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] } + +func (bb *bucketBuffer) Copy() *bucketBuffer { + bbCopy := bucketBuffer{ + buf: make([]kv, len(bb.buf)), + used: bb.used, + } + copy(bbCopy.buf, bb.buf) + return &bbCopy +} diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 187f94029b7..bc6b895ed7a 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -354,6 +354,9 @@ func (s *store) restore() error { reportDbTotalSizeInUseInBytesMu.Lock() reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) } reportDbTotalSizeInUseInBytesMu.Unlock() + reportDbOpenReadTxNMu.Lock() + reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) } + reportDbOpenReadTxNMu.Unlock() min, max := newRevBytes(), newRevBytes() revToBytes(revision{main: 1}, min) diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 91183969901..4683fb64ade 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -15,6 +15,7 @@ package mvcc import ( + "bytes" "crypto/rand" "encoding/binary" "fmt" @@ -22,6 +23,8 @@ import ( mrand "math/rand" "os" "reflect" + "sort" + "strconv" "sync" "testing" "time" @@ -645,30 +648,173 @@ func TestTxnPut(t *testing.T) { } } -func TestTxnBlockBackendForceCommit(t *testing.T) { +// TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation +func TestConcurrentReadNotBlockingWrite(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) - txn := s.Read() + // write something to read later + s.Put([]byte("foo"), []byte("bar"), lease.NoLease) + // readTx simulates a long read request + readTx1 := s.Read() + + // write should not be blocked by reads done := make(chan struct{}) go func() { - s.b.ForceCommit() + s.Put([]byte("foo"), []byte("newBar"), lease.NoLease) // this is a write Txn done <- struct{}{} }() select { case <-done: - t.Fatalf("failed to block ForceCommit") - case <-time.After(100 * time.Millisecond): + case <-time.After(1 * time.Second): + t.Fatalf("write should not be blocked by read") + } + + // readTx2 simulates a short read request + readTx2 := s.Read() + ro := RangeOptions{Limit: 1, Rev: 0, Count: false} + ret, err := readTx2.Range([]byte("foo"), nil, ro) + if err != nil { + t.Fatalf("failed to range: %v", err) + } + // readTx2 should see the result of new write + w := mvccpb.KeyValue{ + Key: []byte("foo"), + Value: []byte("newBar"), + CreateRevision: 2, + ModRevision: 3, + Version: 2, + } + if !reflect.DeepEqual(ret.KVs[0], w) { + t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w) + } + readTx2.End() + + ret, err = readTx1.Range([]byte("foo"), nil, ro) + if err != nil { + t.Fatalf("failed to range: %v", err) + } + // readTx1 should not see the result of new write + w = mvccpb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 2, + ModRevision: 2, + Version: 1, + } + if !reflect.DeepEqual(ret.KVs[0], w) { + t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w) + } + readTx1.End() +} + +// TestConcurrentReadTxAndWrite creates random concurrent Reads and Writes, and ensures Reads always see latest Writes +func TestConcurrentReadTxAndWrite(t *testing.T) { + var ( + numOfReads = 100 + numOfWrites = 100 + maxNumOfPutsPerWrite = 10 + committedKVs kvs // committedKVs records the key-value pairs written by the finished Write Txns + mu sync.Mutex // mu protectes committedKVs + ) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + defer os.Remove(tmpPath) + + var wg sync.WaitGroup + wg.Add(numOfWrites) + for i := 0; i < numOfWrites; i++ { + go func() { + defer wg.Done() + time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time + + tx := s.Write() + numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1 + var pendingKvs kvs + for j := 0; j < numOfPuts; j++ { + k := []byte(strconv.Itoa(mrand.Int())) + v := []byte(strconv.Itoa(mrand.Int())) + tx.Put(k, v, lease.NoLease) + pendingKvs = append(pendingKvs, kv{k, v}) + } + // reads should not see above Puts until write is finished + mu.Lock() + committedKVs = merge(committedKVs, pendingKvs) // update shared data structure + tx.End() + mu.Unlock() + }() + } + + wg.Add(numOfReads) + for i := 0; i < numOfReads; i++ { + go func() { + defer wg.Done() + time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time + + mu.Lock() + wKVs := make(kvs, len(committedKVs)) + copy(wKVs, committedKVs) + tx := s.Read() + mu.Unlock() + // get all keys in backend store, and compare with wKVs + ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{}) + tx.End() + if err != nil { + t.Errorf("failed to range keys: %v", err) + return + } + if len(wKVs) == 0 && len(ret.KVs) == 0 { // no committed KVs yet + return + } + var result kvs + for _, keyValue := range ret.KVs { + result = append(result, kv{keyValue.Key, keyValue.Value}) + } + if !reflect.DeepEqual(wKVs, result) { + t.Errorf("unexpected range result") // too many key value pairs, skip printing them + } + }() } - txn.End() + // wait until go routines finish or timeout + doneC := make(chan struct{}) + go func() { + wg.Wait() + close(doneC) + }() select { - case <-done: - case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO - testutil.FatalStack(t, "failed to execute ForceCommit") + case <-doneC: + case <-time.After(5 * time.Minute): + testutil.FatalStack(t, "timeout") + } +} + +type kv struct { + key []byte + val []byte +} + +type kvs []kv + +func (kvs kvs) Len() int { return len(kvs) } +func (kvs kvs) Less(i, j int) bool { return bytes.Compare(kvs[i].key, kvs[j].key) < 0 } +func (kvs kvs) Swap(i, j int) { kvs[i], kvs[j] = kvs[j], kvs[i] } + +func merge(dst, src kvs) kvs { + dst = append(dst, src...) + sort.Stable(dst) + // remove duplicates, using only the newest value + // ref: tx_buffer.go + widx := 0 + for ridx := 1; ridx < len(dst); ridx++ { + if !bytes.Equal(dst[widx].key, dst[ridx].key) { + widx++ + } + dst[widx] = dst[ridx] } + return dst[:widx+1] } // TODO: test attach key to lessor @@ -754,9 +900,11 @@ type fakeBackend struct { func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx } +func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx } func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil } func (b *fakeBackend) Size() int64 { return 0 } func (b *fakeBackend) SizeInUse() int64 { return 0 } +func (b *fakeBackend) OpenReadTxN() int64 { return 0 } func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 088ea734141..9698254644d 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -31,13 +31,11 @@ type storeTxnRead struct { func (s *store) Read() TxnRead { s.mu.RLock() - tx := s.b.ReadTx() s.revMu.RLock() - // tx.RLock() blocks txReadBuffer for reading, which could potentially block the following two operations: - // A) writeback from txWriteBuffer to txReadBuffer at the end of a write transaction (TxnWrite). - // B) starting of a new backend batch transaction, where the pending changes need to be committed to boltdb - // and txReadBuffer needs to be reset. - tx.RLock() + // backend holds b.readTx.RLock() only when creating the concurrentReadTx. After + // ConcurrentReadTx is created, it will not block write transaction. + tx := s.b.ConcurrentReadTx() + tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created. firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev}) @@ -51,7 +49,7 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, } func (tr *storeTxnRead) End() { - tr.tx.RUnlock() + tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx. tr.s.mu.RUnlock() } diff --git a/mvcc/metrics.go b/mvcc/metrics.go index 9163cc7c66d..4f3c49aefa0 100644 --- a/mvcc/metrics.go +++ b/mvcc/metrics.go @@ -194,6 +194,23 @@ var ( reportDbTotalSizeInUseInBytesMu sync.RWMutex reportDbTotalSizeInUseInBytes = func() float64 { return 0 } + dbOpenReadTxN = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "db_open_read_transactions", + Help: "The number of currently open read transactions", + }, + + func() float64 { + reportDbOpenReadTxNMu.RLock() + defer reportDbOpenReadTxNMu.RUnlock() + return reportDbOpenReadTxN() + }, + ) + // overridden by mvcc initialization + reportDbOpenReadTxNMu sync.RWMutex + reportDbOpenReadTxN = func() float64 { return 0 } + hashSec = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "etcd", Subsystem: "mvcc", @@ -237,6 +254,7 @@ func init() { prometheus.MustRegister(dbTotalSize) prometheus.MustRegister(dbTotalSizeDebugging) prometheus.MustRegister(dbTotalSizeInUse) + prometheus.MustRegister(dbOpenReadTxN) prometheus.MustRegister(hashSec) prometheus.MustRegister(hashRevSec) }