Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add more structured logger #9697

Merged
merged 4 commits into from
May 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/raftsnap"

"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ import (
"github.com/coreos/etcd/raftsnap"
"github.com/coreos/etcd/version"
"github.com/coreos/etcd/wal"
humanize "github.com/dustin/go-humanize"

"github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog"
humanize "github.com/dustin/go-humanize"
"go.uber.org/zap"
)

Expand Down
6 changes: 5 additions & 1 deletion mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,11 @@ func (b *backend) Snapshot() Snapshot {
defer b.mu.RUnlock()
tx, err := b.db.Begin(false)
if err != nil {
plog.Fatalf("cannot begin tx (%s)", err)
if b.lg != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err))
} else {
plog.Fatalf("cannot begin tx (%s)", err)
}
}

stopc, donec := make(chan struct{}), make(chan struct{})
Expand Down
76 changes: 68 additions & 8 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

bolt "github.com/coreos/bbolt"
"go.uber.org/zap"
)

type BatchTx interface {
Expand All @@ -47,7 +48,15 @@ type batchTx struct {
func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
if err != nil && err != bolt.ErrBucketExists {
plog.Fatalf("cannot create bucket %s (%v)", name, err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to create a bucket",
zap.String("bucket-name", string(name)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot create bucket %s (%v)", name, err)
}
}
t.pending++
}
Expand All @@ -65,15 +74,30 @@ func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
}
if seq {
// it is useful to increase fill percent when the workloads are mostly append-only.
// this can delay the page split and reduce space usage.
bucket.FillPercent = 0.9
}
if err := bucket.Put(key, value); err != nil {
plog.Fatalf("cannot put key into bucket (%v)", err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot put key into bucket (%v)", err)
}
}
t.pending++
}
Expand All @@ -82,7 +106,14 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
}
return unsafeRange(bucket.Cursor(), key, endKey, limit)
}
Expand Down Expand Up @@ -113,11 +144,26 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
plog.Fatalf("bucket %s does not exist", bucketName)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
} else {
plog.Fatalf("bucket %s does not exist", bucketName)
}
}
err := bucket.Delete(key)
if err != nil {
plog.Fatalf("cannot delete key from bucket (%v)", err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to delete a key",
zap.String("bucket-name", string(bucketName)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot delete key from bucket (%v)", err)
}
}
t.pending++
}
Expand Down Expand Up @@ -177,7 +223,14 @@ func (t *batchTx) commit(stop bool) {

t.pending = 0
if err != nil {
plog.Fatalf("cannot commit tx (%s)", err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to commit tx",
zap.Error(err),
)
} else {
plog.Fatalf("cannot commit tx (%s)", err)
}
}
}
if !stop {
Expand Down Expand Up @@ -236,7 +289,14 @@ func (t *batchTxBuffered) commit(stop bool) {
func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
plog.Fatalf("cannot rollback tx (%s)", err)
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to rollback tx",
zap.Error(err),
)
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
t.backend.readTx.reset()
}
Expand Down
18 changes: 9 additions & 9 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func (ti *treeIndex) Put(key []byte, rev revision) {
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(rev.main, rev.sub)
keyi.put(ti.lg, rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(rev.main, rev.sub)
okeyi.put(ti.lg, rev.main, rev.sub)
}

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
Expand All @@ -72,7 +72,7 @@ func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, v
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
return keyi.get(atRev)
return keyi.get(ti.lg, atRev)
}

func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
Expand Down Expand Up @@ -112,7 +112,7 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
return []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
if rev, _, _, err := ki.get(atRev); err == nil {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
}
})
Expand All @@ -128,7 +128,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
return [][]byte{key}, []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
if rev, _, _, err := ki.get(atRev); err == nil {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
keys = append(keys, ki.key)
}
Expand All @@ -147,7 +147,7 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
}

ki := item.(*keyIndex)
return ki.tombstone(rev.main, rev.sub)
return ki.tombstone(ti.lg, rev.main, rev.sub)
}

// RangeSince returns all revisions from key(including) to end(excluding)
Expand All @@ -165,7 +165,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
return nil
}
keyi = item.(*keyIndex)
return keyi.since(rev)
return keyi.since(ti.lg, rev)
}

endi := &keyIndex{key: end}
Expand All @@ -175,7 +175,7 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
return false
}
curKeyi := item.(*keyIndex)
revs = append(revs, curKeyi.since(rev)...)
revs = append(revs, curKeyi.since(ti.lg, rev)...)
return true
})
sort.Sort(revisions(revs))
Expand All @@ -199,7 +199,7 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
//Lock is needed here to prevent modification to the keyIndex while
//compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(rev, available)
keyi.compact(ti.lg, rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
Expand Down
4 changes: 2 additions & 2 deletions mvcc/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,10 @@ func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) {
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
keyi.restore(created, modified, ver)
keyi.restore(ti.lg, created, modified, ver)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(modified.main, modified.sub)
okeyi.put(ti.lg, modified.main, modified.sub)
}
72 changes: 59 additions & 13 deletions mvcc/key_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"github.com/google/btree"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -73,11 +74,21 @@ type keyIndex struct {
}

// put puts a revision to the keyIndex.
func (ki *keyIndex) put(main int64, sub int64) {
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub}

if !rev.GreaterThan(ki.modified) {
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
if lg != nil {
lg.Panic(
"'put' with an unexpected smaller revision",
zap.Int64("given-revision-main", rev.main),
zap.Int64("given-revision-sub", rev.sub),
zap.Int64("modified-revision-main", ki.modified.main),
zap.Int64("modified-revision-sub", ki.modified.sub),
)
} else {
plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
}
}
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
Expand All @@ -92,9 +103,16 @@ func (ki *keyIndex) put(main int64, sub int64) {
ki.modified = rev
}

func (ki *keyIndex) restore(created, modified revision, ver int64) {
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
if len(ki.generations) != 0 {
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
if lg != nil {
lg.Panic(
"'restore' got an unexpected non-empty generations",
zap.Int("generations-size", len(ki.generations)),
)
} else {
plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
}
}

ki.modified = modified
Expand All @@ -106,24 +124,38 @@ func (ki *keyIndex) restore(created, modified revision, ver int64) {
// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
// It also creates a new empty generation in the keyIndex.
// It returns ErrRevisionNotFound when tombstone on an empty generation.
func (ki *keyIndex) tombstone(main int64, sub int64) error {
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
if lg != nil {
lg.Panic(
"'tombstone' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
}
}
if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound
}
ki.put(main, sub)
ki.put(lg, main, sub)
ki.generations = append(ki.generations, generation{})
keysGauge.Dec()
return nil
}

// get gets the modified, created revision and version of the key that satisfies the given atRev.
// Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err error) {
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
if lg != nil {
lg.Panic(
"'get' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
}
g := ki.findGeneration(atRev)
if g.isEmpty() {
Expand All @@ -141,9 +173,16 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err
// since returns revisions since the given rev. Only the revision with the
// largest sub revision will be returned if multiple revisions have the same
// main revision.
func (ki *keyIndex) since(rev int64) []revision {
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
if lg != nil {
lg.Panic(
"'since' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
}
since := revision{rev, 0}
var gi int
Expand Down Expand Up @@ -182,9 +221,16 @@ func (ki *keyIndex) since(rev int64) []revision {
// revision than the given atRev except the largest one (If the largest one is
// a tombstone, it will not be kept).
// If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
if lg != nil {
lg.Panic(
"'compact' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}
}

genIdx, revIndex := ki.doCompact(atRev, available)
Expand Down
Loading