From a26be01b32983e87f41ce8f064575021d4d8bccb Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 27 Jul 2020 14:05:55 +0530 Subject: [PATCH 1/7] fix(task): Handle closed DB state in compare function --- worker/task.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/worker/task.go b/worker/task.go index 5d0c1669df4..15f4328af82 100644 --- a/worker/task.go +++ b/worker/task.go @@ -1224,8 +1224,14 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e return ctx.Err() default: } + var filterErr error algo.ApplyFilter(arg.out.UidMatrix[row], func(uid uint64, i int) bool { + // Stop processing the filter on the first occurrence of an error. + if filterErr != nil { + return false + } + switch lang { case "": if isList { @@ -1252,6 +1258,14 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e return false } + // Context could get cancelled while we were applying the filter. + select { + case <-ctx.Done(): + filterErr = ctx.Err() + return false + default: + } + pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs) if err != nil { filterErr = err From 56082903bdcb2708ee938c04734277acbaeac258 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 29 Jul 2020 13:25:13 +0530 Subject: [PATCH 2/7] Revert "fix(task): Handle closed DB state in compare function" This reverts commit a26be01b32983e87f41ce8f064575021d4d8bccb. --- worker/task.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/worker/task.go b/worker/task.go index 15f4328af82..5d0c1669df4 100644 --- a/worker/task.go +++ b/worker/task.go @@ -1224,14 +1224,8 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e return ctx.Err() default: } - var filterErr error algo.ApplyFilter(arg.out.UidMatrix[row], func(uid uint64, i int) bool { - // Stop processing the filter on the first occurrence of an error. - if filterErr != nil { - return false - } - switch lang { case "": if isList { @@ -1258,14 +1252,6 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e return false } - // Context could get cancelled while we were applying the filter. - select { - case <-ctx.Done(): - filterErr = ctx.Err() - return false - default: - } - pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs) if err != nil { filterErr = err From 561af6af75f167d20587d7cce0d1417d4d7e3fa4 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 29 Jul 2020 15:27:24 +0530 Subject: [PATCH 3/7] Use atomic uint32 to denote closed badger state --- posting/mvcc.go | 11 +++++++++++ worker/server_state.go | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/posting/mvcc.go b/posting/mvcc.go index c15b8744618..64282fcd83a 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -52,6 +52,14 @@ var ( // an invalid key (e.g the key to a single part of a larger multi-part list). ErrInvalidKey = errors.Errorf("cannot read posting list using multi-part list key") + // ErrBadgerClosed is returned when Badger is closed and then we try to + // read something from it. + ErrBadgerClosed = errors.Errorf("read after closing badger") + + // BadgerClosed denotes if the badger store is closed. It should be accessed atomically. + // We set it to 1 just before closing badger DB. + BadgerClosed uint32 + // IncrRollup is used to batch keys for rollup incrementally. IncrRollup = &incrRollupi{ keysCh: make(chan *[][]byte), @@ -352,6 +360,9 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { } func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + if atomic.LoadUint32(&BadgerClosed) == 1 { + return nil, ErrBadgerClosed + } txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() diff --git a/worker/server_state.go b/worker/server_state.go index 898b5a99a88..dc07db3edcd 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -20,11 +20,13 @@ import ( "context" "math" "os" + "sync/atomic" "time" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" @@ -185,6 +187,8 @@ func (s *ServerState) initStorage() { // Dispose stops and closes all the resources inside the server state. func (s *ServerState) Dispose() { s.gcCloser.SignalAndWait() + atomic.StoreUint32(&posting.BadgerClosed, 1) + if err := s.Pstore.Close(); err != nil { glog.Errorf("Error while closing postings store: %v", err) } From 9e750220a6cab9b07b9f88d1a2fded01d5e0ae5e Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 26 Aug 2020 21:50:12 +0530 Subject: [PATCH 4/7] Revert "Use atomic uint32 to denote closed badger state" This reverts commit 561af6af75f167d20587d7cce0d1417d4d7e3fa4. --- posting/mvcc.go | 11 ----------- worker/server_state.go | 4 ---- 2 files changed, 15 deletions(-) diff --git a/posting/mvcc.go b/posting/mvcc.go index 64282fcd83a..c15b8744618 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -52,14 +52,6 @@ var ( // an invalid key (e.g the key to a single part of a larger multi-part list). ErrInvalidKey = errors.Errorf("cannot read posting list using multi-part list key") - // ErrBadgerClosed is returned when Badger is closed and then we try to - // read something from it. - ErrBadgerClosed = errors.Errorf("read after closing badger") - - // BadgerClosed denotes if the badger store is closed. It should be accessed atomically. - // We set it to 1 just before closing badger DB. - BadgerClosed uint32 - // IncrRollup is used to batch keys for rollup incrementally. IncrRollup = &incrRollupi{ keysCh: make(chan *[][]byte), @@ -360,9 +352,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { } func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { - if atomic.LoadUint32(&BadgerClosed) == 1 { - return nil, ErrBadgerClosed - } txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() diff --git a/worker/server_state.go b/worker/server_state.go index dc07db3edcd..898b5a99a88 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -20,13 +20,11 @@ import ( "context" "math" "os" - "sync/atomic" "time" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" "github.com/dgraph-io/badger/v2/y" - "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" @@ -187,8 +185,6 @@ func (s *ServerState) initStorage() { // Dispose stops and closes all the resources inside the server state. func (s *ServerState) Dispose() { s.gcCloser.SignalAndWait() - atomic.StoreUint32(&posting.BadgerClosed, 1) - if err := s.Pstore.Close(); err != nil { glog.Errorf("Error while closing postings store: %v", err) } From 6533daa16339163d8db7c3ad09e65182be4a244d Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 26 Aug 2020 21:55:46 +0530 Subject: [PATCH 5/7] Update badger to v2.2007.2 --- dgraph/cmd/bulk/reduce.go | 2 +- dgraph/cmd/zero/run.go | 2 +- go.mod | 2 +- go.sum | 4 ++-- posting/mvcc.go | 3 +++ worker/server_state.go | 8 +++----- 6 files changed, 11 insertions(+), 10 deletions(-) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 19085e38dcc..4031702b725 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -116,7 +116,7 @@ func (r *reducer) createBadger(i int) *badger.DB { opt := badger.DefaultOptions(r.opt.shardOutputDirs[i]).WithSyncWrites(false). WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */). - WithLogger(nil).WithMaxCacheSize(1 << 20). + WithLogger(nil).WithBlockCacheSize(1 << 20). WithEncryptionKey(r.opt.EncryptionKey).WithCompression(bo.None) // Overwrite badger options based on the options provided by the user. diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index c420f185c10..8e54f50e5da 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -231,7 +231,7 @@ func run() { // Open raft write-ahead log and initialize raft node. x.Checkf(os.MkdirAll(opts.w, 0700), "Error while creating WAL dir.") kvOpt := badger.LSMOnlyOptions(opts.w).WithSyncWrites(false).WithTruncate(true). - WithValueLogFileSize(64 << 20).WithMaxCacheSize(10 << 20).WithLoadBloomsOnOpen(false) + WithValueLogFileSize(64 << 20).WithBlockCacheSize(10 << 20).WithLoadBloomsOnOpen(false) kvOpt.ZSTDCompressionLevel = 3 diff --git a/go.mod b/go.mod index a3d0573f9f6..1fbe2c794c3 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200718033852-37ee16d8ad1c + github.com/dgraph-io/badger/v2 v2.2007.2-0.20200826122734-bc243f38bfe1 github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de diff --git a/go.sum b/go.sum index 59a58118f0d..be4345bf0f9 100644 --- a/go.sum +++ b/go.sum @@ -78,8 +78,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200718033852-37ee16d8ad1c h1:LoEZfU53r3H1et4WY9M0h1c3fuCljBnn3pk/7TB5eWY= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200718033852-37ee16d8ad1c/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= +github.com/dgraph-io/badger/v2 v2.2007.2-0.20200826122734-bc243f38bfe1 h1:vPPlQYByX3+Z3NOaB06i7VCb0bNOznVQ9eEnqLxbrH0= +github.com/dgraph-io/badger/v2 v2.2007.2-0.20200826122734-bc243f38bfe1/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 h1:DTgOrw91nMIukDm/WEvdobPLl0LgeDd/JE66+24jBks= github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig= github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 h1:NSl3XXyON9bgmBJSAvr5FPrgILAovtoTs7FwdtaZZq0= diff --git a/posting/mvcc.go b/posting/mvcc.go index c15b8744618..80f40d7e617 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -352,6 +352,9 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { } func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + if pstore.IsClosed() { + return nil, badger.ErrDBClosed + } txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() diff --git a/worker/server_state.go b/worker/server_state.go index 898b5a99a88..eddd8fe4c76 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -130,7 +130,7 @@ func (s *ServerState) initStorage() { opt := badger.LSMOnlyOptions(Config.WALDir) opt = setBadgerOptions(opt) opt.ValueLogMaxEntries = 10000 // Allow for easy space reclamation. - opt.MaxCacheSize = 10 << 20 // 10 mb of cache size for WAL. + opt.BlockCacheSize = 10 << 20 // 10 mb of cache size for WAL. // We should always force load LSM tables to memory, disregarding user settings, because // Raft.Advance hits the WAL many times. If the tables are not in memory, retrieval slows @@ -157,10 +157,8 @@ func (s *ServerState) initStorage() { opt := badger.DefaultOptions(Config.PostingDir). WithValueThreshold(1 << 10 /* 1KB */). WithNumVersionsToKeep(math.MaxInt32). - WithMaxCacheSize(1 << 30). - WithKeepBlockIndicesInCache(true). - WithKeepBlocksInCache(true). - WithMaxBfCacheSize(500 << 20) // 500 MB of bloom filter cache. + WithBlockCacheSize(1 << 30). + WithIndexCacheSize(500 << 20) // 500 MB of bloom filter cache. opt = setBadgerOptions(opt) // Print the options w/o exposing key. From 999dd8f82fa8da7822eeab0d67c77190240427fe Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 27 Aug 2020 15:14:26 +0530 Subject: [PATCH 6/7] Revert "Update badger to v2.2007.2" This reverts commit 6533daa16339163d8db7c3ad09e65182be4a244d. --- dgraph/cmd/bulk/reduce.go | 2 +- dgraph/cmd/zero/run.go | 2 +- go.mod | 2 +- go.sum | 4 ++-- posting/mvcc.go | 3 --- worker/server_state.go | 8 +++++--- 6 files changed, 10 insertions(+), 11 deletions(-) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 4031702b725..19085e38dcc 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -116,7 +116,7 @@ func (r *reducer) createBadger(i int) *badger.DB { opt := badger.DefaultOptions(r.opt.shardOutputDirs[i]).WithSyncWrites(false). WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */). - WithLogger(nil).WithBlockCacheSize(1 << 20). + WithLogger(nil).WithMaxCacheSize(1 << 20). WithEncryptionKey(r.opt.EncryptionKey).WithCompression(bo.None) // Overwrite badger options based on the options provided by the user. diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 8e54f50e5da..c420f185c10 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -231,7 +231,7 @@ func run() { // Open raft write-ahead log and initialize raft node. x.Checkf(os.MkdirAll(opts.w, 0700), "Error while creating WAL dir.") kvOpt := badger.LSMOnlyOptions(opts.w).WithSyncWrites(false).WithTruncate(true). - WithValueLogFileSize(64 << 20).WithBlockCacheSize(10 << 20).WithLoadBloomsOnOpen(false) + WithValueLogFileSize(64 << 20).WithMaxCacheSize(10 << 20).WithLoadBloomsOnOpen(false) kvOpt.ZSTDCompressionLevel = 3 diff --git a/go.mod b/go.mod index 1fbe2c794c3..a3d0573f9f6 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/dgraph-io/badger/v2 v2.2007.2-0.20200826122734-bc243f38bfe1 + github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200718033852-37ee16d8ad1c github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de diff --git a/go.sum b/go.sum index be4345bf0f9..59a58118f0d 100644 --- a/go.sum +++ b/go.sum @@ -78,8 +78,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v2 v2.2007.2-0.20200826122734-bc243f38bfe1 h1:vPPlQYByX3+Z3NOaB06i7VCb0bNOznVQ9eEnqLxbrH0= -github.com/dgraph-io/badger/v2 v2.2007.2-0.20200826122734-bc243f38bfe1/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200718033852-37ee16d8ad1c h1:LoEZfU53r3H1et4WY9M0h1c3fuCljBnn3pk/7TB5eWY= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200718033852-37ee16d8ad1c/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 h1:DTgOrw91nMIukDm/WEvdobPLl0LgeDd/JE66+24jBks= github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig= github.com/dgraph-io/graphql-transport-ws v0.0.0-20200715131837-c0460019ead2 h1:NSl3XXyON9bgmBJSAvr5FPrgILAovtoTs7FwdtaZZq0= diff --git a/posting/mvcc.go b/posting/mvcc.go index 80f40d7e617..c15b8744618 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -352,9 +352,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { } func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { - if pstore.IsClosed() { - return nil, badger.ErrDBClosed - } txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() diff --git a/worker/server_state.go b/worker/server_state.go index eddd8fe4c76..898b5a99a88 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -130,7 +130,7 @@ func (s *ServerState) initStorage() { opt := badger.LSMOnlyOptions(Config.WALDir) opt = setBadgerOptions(opt) opt.ValueLogMaxEntries = 10000 // Allow for easy space reclamation. - opt.BlockCacheSize = 10 << 20 // 10 mb of cache size for WAL. + opt.MaxCacheSize = 10 << 20 // 10 mb of cache size for WAL. // We should always force load LSM tables to memory, disregarding user settings, because // Raft.Advance hits the WAL many times. If the tables are not in memory, retrieval slows @@ -157,8 +157,10 @@ func (s *ServerState) initStorage() { opt := badger.DefaultOptions(Config.PostingDir). WithValueThreshold(1 << 10 /* 1KB */). WithNumVersionsToKeep(math.MaxInt32). - WithBlockCacheSize(1 << 30). - WithIndexCacheSize(500 << 20) // 500 MB of bloom filter cache. + WithMaxCacheSize(1 << 30). + WithKeepBlockIndicesInCache(true). + WithKeepBlocksInCache(true). + WithMaxBfCacheSize(500 << 20) // 500 MB of bloom filter cache. opt = setBadgerOptions(opt) // Print the options w/o exposing key. From 1fe0b2a9b5016ef49a0ad655cbbf1b15a1007268 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 27 Aug 2020 15:15:28 +0530 Subject: [PATCH 7/7] Return err on closed DB --- posting/mvcc.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/posting/mvcc.go b/posting/mvcc.go index 9dcfaf98a75..a0bfc16f7dc 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -417,6 +417,9 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { } } + if pstore.IsClosed() { + return nil, badger.ErrDBClosed + } txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard()