Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Dgraph): Add separate compression flag for z and wal dirs #6401

Merged
merged 4 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,13 @@ they form a Raft group and provide synchronous replication.
"log directory. mmap consumes more RAM, but provides better performance. If you pass "+
"two values separated by a comma the first value will be used for the postings "+
"directory and the second for the w directory.")
flag.Int("badger.compression_level", 3,
"The compression level for Badger. A higher value uses more resources.")
flag.String("badger.compression_level", "3,0",
"Specifies the compression level for the postings and write-ahead log "+
"directory. A higher value uses more resources. The value of 0 disables "+
"compression. If you pass two values separated by a comma the first "+
"value will be used for the postings directory (p) and the second for "+
"the wal directory (w). If a single value is passed the value is used "+
"as compression level for both directories.")
enc.RegisterFlags(flag)

// Snapshot and Transactions.
Expand Down Expand Up @@ -617,14 +622,21 @@ func run() {
wstoreBlockCacheSize := (cachePercent[3] * (totalCache << 20)) / 100
wstoreIndexCacheSize := (cachePercent[4] * (totalCache << 20)) / 100

compressionLevelString := Alpha.Conf.GetString("badger.compression_level")
compressionLevels, err := x.GetCompressionLevels(compressionLevelString)
x.Check(err)
postingDirCompressionLevel := compressionLevels[0]
walDirCompressionLevel := compressionLevels[1]

opts := worker.Options{
BadgerCompressionLevel: Alpha.Conf.GetInt("badger.compression_level"),
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
PBlockCacheSize: pstoreBlockCacheSize,
PIndexCacheSize: pstoreIndexCacheSize,
WBlockCacheSize: wstoreBlockCacheSize,
WIndexCacheSize: wstoreIndexCacheSize,
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
PostingDirCompressionLevel: postingDirCompressionLevel,
WALDirCompressionLevel: walDirCompressionLevel,
PBlockCacheSize: pstoreBlockCacheSize,
PIndexCacheSize: pstoreIndexCacheSize,
WBlockCacheSize: wstoreBlockCacheSize,
WIndexCacheSize: wstoreIndexCacheSize,

MutationsMode: worker.AllowMutations,
AuthToken: Alpha.Conf.GetString("auth_token"),
Expand Down
8 changes: 6 additions & 2 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ type Options struct {
// BadgerWalVlog is the name of the mode used to load the badger value log for the w directory.
BadgerWalVlog string

// BadgerCompressionLevel is the ZSTD compression level used by badger. A
// WALDirCompressionLevel is the ZSTD compression level used by WAL directory. A
// higher value means more CPU intensive compression and better compression
// ratio.
BadgerCompressionLevel int
WALDirCompressionLevel int
// PostingDirCompressionLevel is the ZSTD compression level used by Postings directory. A
// higher value means more CPU intensive compression and better compression
// ratio.
PostingDirCompressionLevel int
// WALDir is the path to the directory storing the write-ahead log.
WALDir string
// MutationsMode is the mode used to handle mutation requests.
Expand Down
19 changes: 12 additions & 7 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,25 @@ func setBadgerOptions(opt badger.Options, wal bool) badger.Options {
// Settings for the write-ahead log.
badgerTables = Config.BadgerWalTables
badgerVlog = Config.BadgerWalVlog
// Disable compression for WAL as it is supposed to be fast. Compression makes it a
// little slow (Though we save some disk space but it is not worth the slowness).
opt.Compression = options.None
glog.Infof("Setting WAL Dir Compression Level: %d", Config.WALDirCompressionLevel)
// Default value of WALDirCompressionLevel is 0 so compression will always
// be disabled, unless it is explicitly enabled by setting the value to greater than 0.
if Config.WALDirCompressionLevel != 0 {
// By default, compression is disabled in badger.
opt.Compression = options.ZSTD
opt.ZSTDCompressionLevel = Config.WALDirCompressionLevel
}
} else {
// Settings for the data directory.
badgerTables = Config.BadgerTables
badgerVlog = Config.BadgerVlog
glog.Infof("Setting Badger Compression Level: %d", Config.BadgerCompressionLevel)
// Default value of badgerCompressionLevel is 3 so compression will always
glog.Infof("Setting Posting Dir Compression Level: %d", Config.PostingDirCompressionLevel)
// Default value of postingDirCompressionLevel is 3 so compression will always
// be enabled, unless it is explicitly disabled by setting the value to 0.
if Config.BadgerCompressionLevel != 0 {
if Config.PostingDirCompressionLevel != 0 {
// By default, compression is disabled in badger.
opt.Compression = options.ZSTD
opt.ZSTDCompressionLevel = Config.BadgerCompressionLevel
opt.ZSTDCompressionLevel = Config.PostingDirCompressionLevel
}
}

Expand Down
35 changes: 35 additions & 0 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,41 @@ func GetCachePercentages(cpString string, numExpected int) ([]int64, error) {
return cachePercent, nil
}

// ParseCompressionLevel returns compression level(int) given the compression level(string)
func ParseCompressionLevel(compressionLevel string) (int, error) {
x, err := strconv.Atoi(compressionLevel)
if err != nil {
return 0, errors.Errorf("ERROR: unable to parse compression level(%s)", compressionLevel)
}
if x < 0 {
return 0, errors.Errorf("ERROR: compression level(%s) cannot be negative", compressionLevel)
}
return x, nil
}

// GetCompressionLevels returns the slice of compression levels given the "," (comma) separated
// compression levels(integers) string.
func GetCompressionLevels(compressionLevelsString string) ([]int, error) {
compressionLevels := strings.Split(compressionLevelsString, ",")
// Validity checks
if len(compressionLevels) != 1 && len(compressionLevels) != 2 {
return nil, errors.Errorf("ERROR: expected single integer or two comma separated integers")
}
var compressionLevelsInt []int
for _, cLevel := range compressionLevels {
x, err := ParseCompressionLevel(cLevel)
if err != nil {
return nil, err
}
compressionLevelsInt = append(compressionLevelsInt, x)
}
// Append the same compression level in case only one level was passed.
if len(compressionLevelsInt) == 1 {
compressionLevelsInt = append(compressionLevelsInt, compressionLevelsInt[0])
}
return compressionLevelsInt, nil
}

func ToHex(i uint64) []byte {
var b [16]byte
tmp := strconv.AppendUint(b[:0], i, 16)
Expand Down