From 974de5ba035ed2477e1352047e663a4ce93f6df6 Mon Sep 17 00:00:00 2001 From: Rajas Vanjape Date: Fri, 4 Sep 2020 23:24:02 +0530 Subject: [PATCH 1/3] Add separate compression flag for z and wal dirs --- dgraph/cmd/alpha/run.go | 28 +++++++++++++++++--------- worker/config.go | 8 ++++++-- worker/server_state.go | 19 +++++++++++------- x/x.go | 44 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 81 insertions(+), 18 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index f1fbd257b97..84867c67100 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -116,8 +116,11 @@ they form a Raft group and provide synchronous replication. "log directory. mmap consumes more RAM, but provides better performance. If you pass "+ "two values separated by a comma the first value will be used for the postings "+ "directory and the second for the w directory.") - flag.Int("badger.compression_level", 3, - "The compression level for Badger. A higher value uses more resources.") + flag.String("badger.compression_level", "3,0", + "Specifies the compression level for the postings and write-ahead log directory."+ + " A higher value uses more resources. If you pass two values separated by a comma the "+ + "first value will be used for the postings directory and the second for the w directory. "+ + "If a single value is passed the value is used as compression level for both directories.") enc.RegisterFlags(flag) // Snapshot and Transactions. @@ -613,14 +616,21 @@ func run() { wstoreBlockCacheSize := (cachePercent[3] * (totalCache << 20)) / 100 wstoreIndexCacheSize := (cachePercent[4] * (totalCache << 20)) / 100 + compressionLevelString := Alpha.Conf.GetString("badger.compression_level") + compressionLevels, err := x.GetCompressionLevels(compressionLevelString) + x.Check(err) + postingDirCompressionLevel := compressionLevels[0] + wALDirCompressionLevel := compressionLevels[1] + opts := worker.Options{ - BadgerCompressionLevel: Alpha.Conf.GetInt("badger.compression_level"), - PostingDir: Alpha.Conf.GetString("postings"), - WALDir: Alpha.Conf.GetString("wal"), - PBlockCacheSize: pstoreBlockCacheSize, - PIndexCacheSize: pstoreIndexCacheSize, - WBlockCacheSize: wstoreBlockCacheSize, - WIndexCacheSize: wstoreIndexCacheSize, + PostingDir: Alpha.Conf.GetString("postings"), + WALDir: Alpha.Conf.GetString("wal"), + PostingDirCompressionLevel: postingDirCompressionLevel, + WALDirCompressionLevel: wALDirCompressionLevel, + PBlockCacheSize: pstoreBlockCacheSize, + PIndexCacheSize: pstoreIndexCacheSize, + WBlockCacheSize: wstoreBlockCacheSize, + WIndexCacheSize: wstoreIndexCacheSize, MutationsMode: worker.AllowMutations, AuthToken: Alpha.Conf.GetString("auth_token"), diff --git a/worker/config.go b/worker/config.go index 4220c8c177f..fe6c6dd91a2 100644 --- a/worker/config.go +++ b/worker/config.go @@ -48,10 +48,14 @@ type Options struct { // BadgerWalVlog is the name of the mode used to load the badger value log for the w directory. BadgerWalVlog string - // BadgerCompressionLevel is the ZSTD compression level used by badger. A + // WALDirCompressionLevel is the ZSTD compression level used by WAL directory. A // higher value means more CPU intensive compression and better compression // ratio. - BadgerCompressionLevel int + WALDirCompressionLevel int + // PostingDirCompressionLevel is the ZSTD compression level used by Postings directory. A + // higher value means more CPU intensive compression and better compression + // ratio. + PostingDirCompressionLevel int // WALDir is the path to the directory storing the write-ahead log. WALDir string // MutationsMode is the mode used to handle mutation requests. diff --git a/worker/server_state.go b/worker/server_state.go index 23d78a8bc02..8713e2e4097 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -83,20 +83,25 @@ func setBadgerOptions(opt badger.Options, wal bool) badger.Options { // Settings for the write-ahead log. badgerTables = Config.BadgerWalTables badgerVlog = Config.BadgerWalVlog - // Disable compression for WAL as it is supposed to be fast. Compression makes it a - // little slow (Though we save some disk space but it is not worth the slowness). - opt.Compression = options.None + glog.Infof("Setting WAL Dir Compression Level: %d", Config.WALDirCompressionLevel) + // Default value of WALDirCompressionLevel is 0 so compression will always + // be disabled, unless it is explicitly enabled by setting the value to greater than 0. + if Config.WALDirCompressionLevel != 0 { + // By default, compression is disabled in badger. + opt.Compression = options.ZSTD + opt.ZSTDCompressionLevel = Config.WALDirCompressionLevel + } } else { // Settings for the data directory. badgerTables = Config.BadgerTables badgerVlog = Config.BadgerVlog - glog.Infof("Setting Badger Compression Level: %d", Config.BadgerCompressionLevel) - // Default value of badgerCompressionLevel is 3 so compression will always + glog.Infof("Setting Posting Dir Compression Level: %d", Config.PostingDirCompressionLevel) + // Default value of postingDirCompressionLevel is 3 so compression will always // be enabled, unless it is explicitly disabled by setting the value to 0. - if Config.BadgerCompressionLevel != 0 { + if Config.PostingDirCompressionLevel != 0 { // By default, compression is disabled in badger. opt.Compression = options.ZSTD - opt.ZSTDCompressionLevel = Config.BadgerCompressionLevel + opt.ZSTDCompressionLevel = Config.PostingDirCompressionLevel } } diff --git a/x/x.go b/x/x.go index ac755d0c55b..4a5dc898f8c 100644 --- a/x/x.go +++ b/x/x.go @@ -1147,3 +1147,47 @@ func GetCachePercentages(cpString string, numExpected int) ([]int64, error) { return cachePercent, nil } + +// ParseCompressionLevel returns compression level(int) given the compression level(string) +func ParseCompressionLevel(compressionLevel string) (int, error) { + x, err := strconv.Atoi(compressionLevel) + if err != nil { + return 0, errors.Errorf("ERROR: unable to parse compression level(%s)", compressionLevel) + } + if x < 0 { + return 0, errors.Errorf("ERROR: compression level(%s) cannot be negative", compressionLevel) + } + return x, nil +} + +// GetCompressionLevels returns the slice of compression levels given the "," (comma) separated +// compression levels(integers) string. +func GetCompressionLevels(compressionLevelsString string) ([]int, error) { + compressionLevels := strings.Split(compressionLevelsString, ",") + // Validity checks + if len(compressionLevels) != 1 && len(compressionLevels) != 2 { + return nil, errors.Errorf("ERROR: expected single integer or two comma separated integers") + } + var compressionLevelsInt []int + if len(compressionLevels) == 1 { + x, err := ParseCompressionLevel(compressionLevels[0]) + if err != nil { + return nil, err + } + // Appending twice. One for PostingsDir, other for WALDir + compressionLevelsInt = append(compressionLevelsInt, x) + compressionLevelsInt = append(compressionLevelsInt, x) + } else { + x, err := ParseCompressionLevel(compressionLevels[0]) + if err != nil { + return nil, err + } + compressionLevelsInt = append(compressionLevelsInt, x) + x, err = ParseCompressionLevel(compressionLevels[1]) + if err != nil { + return nil, err + } + compressionLevelsInt = append(compressionLevelsInt, x) + } + return compressionLevelsInt, nil +} From b641b1e424eaa6aa2a6edd4bd5dbcd0804804e67 Mon Sep 17 00:00:00 2001 From: Rajas Vanjape Date: Tue, 8 Sep 2020 11:57:33 +0530 Subject: [PATCH 2/3] Address comments --- dgraph/cmd/alpha/run.go | 10 ++++++---- x/x.go | 21 ++++++--------------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 84867c67100..41e4cbb17b1 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -117,10 +117,12 @@ they form a Raft group and provide synchronous replication. "two values separated by a comma the first value will be used for the postings "+ "directory and the second for the w directory.") flag.String("badger.compression_level", "3,0", - "Specifies the compression level for the postings and write-ahead log directory."+ - " A higher value uses more resources. If you pass two values separated by a comma the "+ - "first value will be used for the postings directory and the second for the w directory. "+ - "If a single value is passed the value is used as compression level for both directories.") + "Specifies the compression level for the postings and write-ahead log "+ + "directory. A higher value uses more resources. The value of 0 disables "+ + "compression. If you pass two values separated by a comma the first "+ + "value will be used for the postings directory and the second for "+ + "the w directory. If a single value is passed the value is used as "+ + "compression level for both directories.") enc.RegisterFlags(flag) // Snapshot and Transactions. diff --git a/x/x.go b/x/x.go index 4a5dc898f8c..48a26b4322f 100644 --- a/x/x.go +++ b/x/x.go @@ -1169,25 +1169,16 @@ func GetCompressionLevels(compressionLevelsString string) ([]int, error) { return nil, errors.Errorf("ERROR: expected single integer or two comma separated integers") } var compressionLevelsInt []int - if len(compressionLevels) == 1 { - x, err := ParseCompressionLevel(compressionLevels[0]) - if err != nil { - return nil, err - } - // Appending twice. One for PostingsDir, other for WALDir - compressionLevelsInt = append(compressionLevelsInt, x) - compressionLevelsInt = append(compressionLevelsInt, x) - } else { - x, err := ParseCompressionLevel(compressionLevels[0]) - if err != nil { - return nil, err - } - compressionLevelsInt = append(compressionLevelsInt, x) - x, err = ParseCompressionLevel(compressionLevels[1]) + for _, cLevel := range compressionLevels { + x, err := ParseCompressionLevel(cLevel) if err != nil { return nil, err } compressionLevelsInt = append(compressionLevelsInt, x) } + // Append the same compression level in case only one level was passed. + if len(compressionLevelsInt) == 1 { + compressionLevelsInt = append(compressionLevelsInt, compressionLevelsInt[0]) + } return compressionLevelsInt, nil } From fcfd122faa2efa574a5421e0ed6a79fe680ac55f Mon Sep 17 00:00:00 2001 From: Rajas Vanjape Date: Tue, 8 Sep 2020 15:45:50 +0530 Subject: [PATCH 3/3] Address comments --- dgraph/cmd/alpha/run.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 45ca8a10f2d..1563119c40c 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -122,9 +122,9 @@ they form a Raft group and provide synchronous replication. "Specifies the compression level for the postings and write-ahead log "+ "directory. A higher value uses more resources. The value of 0 disables "+ "compression. If you pass two values separated by a comma the first "+ - "value will be used for the postings directory and the second for "+ - "the w directory. If a single value is passed the value is used as "+ - "compression level for both directories.") + "value will be used for the postings directory (p) and the second for "+ + "the wal directory (w). If a single value is passed the value is used "+ + "as compression level for both directories.") enc.RegisterFlags(flag) // Snapshot and Transactions. @@ -626,13 +626,13 @@ func run() { compressionLevels, err := x.GetCompressionLevels(compressionLevelString) x.Check(err) postingDirCompressionLevel := compressionLevels[0] - wALDirCompressionLevel := compressionLevels[1] + walDirCompressionLevel := compressionLevels[1] opts := worker.Options{ PostingDir: Alpha.Conf.GetString("postings"), WALDir: Alpha.Conf.GetString("wal"), PostingDirCompressionLevel: postingDirCompressionLevel, - WALDirCompressionLevel: wALDirCompressionLevel, + WALDirCompressionLevel: walDirCompressionLevel, PBlockCacheSize: pstoreBlockCacheSize, PIndexCacheSize: pstoreIndexCacheSize, WBlockCacheSize: wstoreBlockCacheSize,