Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-2.1] pump: Add support to stop write at a limit avaliable space (#647) #659

Merged
merged 3 commits into from
Jul 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/pump/pump.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pd-urls = "http://127.0.0.1:2379"
# [storage]
# Set to `true` (default) for best reliability, which prevents data loss when there is a power failure.
# sync-log = true

# stop write when disk available space less then the configured size
# 42 MB -> 42000000, 42 mib -> 44040192
# default: 10 gib
# stop-write-at-available-space = "10 gib"

#
# we suggest using the default config of the embedded LSM DB now, do not change it useless you know what you are doing
# [storage.kv]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/coreos/etcd v3.3.0-rc.0.0.20180530235116-2b3aa7e1d49d+incompatible
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v0.0.0-20180227141424-093482f3f8ce // indirect
Expand Down
7 changes: 1 addition & 6 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func NewServer(cfg *Config) (*Server, error) {
options := storage.DefaultOptions()
options = options.WithKVConfig(cfg.Storage.KV)
options = options.WithSync(cfg.Storage.GetSyncLog())
options = options.WithStopWriteAtAvailableSpace(cfg.Storage.GetStopWriteAtAvailableSpace())

storage, err := storage.NewAppendWithResolver(cfg.DataDir, options, tiStore, lockResolver)
if err != nil {
Expand Down Expand Up @@ -184,12 +185,6 @@ func getPdClient(cfg *Config) (pd.Client, error) {

// WriteBinlog implements the gRPC interface of pump server
func (s *Server) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) {
// pump client will write some empty Payload to detect weather pump is working, should avoid this
if in.Payload == nil {
ret := new(binlog.WriteBinlogResp)
return ret, nil
}

atomic.AddInt64(&s.writeBinlogCount, 1)
return s.writeBinlog(ctx, in, false)
}
Expand Down
3 changes: 3 additions & 0 deletions pump/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ import "github.com/pingcap/errors"
var (
// ErrWrongMagic means the magic number mismatch
ErrWrongMagic = errors.New("wrong magic")

// ErrNoAvailableSpace means no available space
ErrNoAvailableSpace = errors.New("no available space")
)
79 changes: 64 additions & 15 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

const (
maxTxnTimeoutSecond int64 = 600
chanSize = 1 << 20
maxTxnTimeoutSecond int64 = 600
chanSize = 1 << 20
defaultStopWriteAtAvailableSpace = 10 * (1 << 30)
)

var (
Expand Down Expand Up @@ -65,8 +66,9 @@ var _ Storage = &Append{}

// Append implement the Storage interface
type Append struct {
dir string
vlog *valueLog
dir string
vlog *valueLog
storageSize storageSize

metadata *leveldb.DB
sorter *sorter
Expand Down Expand Up @@ -212,6 +214,11 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
}

append.wg.Add(1)
err = append.updateSize()
if err != nil {
return nil, errors.Trace(err)
}

go append.updateStatus()
return
}
Expand Down Expand Up @@ -293,6 +300,20 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) {
return quit
}

func (a *Append) updateSize() error {
size, err := getStorageSize(a.dir)
if err != nil {
return errors.Annotatef(err, "update storage size failed, dir: %s", a.dir)
}

storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity))
storageSizeGauge.WithLabelValues("available").Set(float64(size.available))

atomic.StoreUint64(&a.storageSize.available, size.available)
atomic.StoreUint64(&a.storageSize.capacity, size.capacity)
return nil
}

func (a *Append) updateStatus() {
defer a.wg.Done()

Expand Down Expand Up @@ -322,12 +343,9 @@ func (a *Append) updateStatus() {
atomic.StoreInt64(&a.latestTS, ts)
}
case <-updateSize:
size, err := getStorageSize(a.dir)
err := a.updateSize()
if err != nil {
log.Error("update sotrage size err: ", err)
} else {
storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity))
storageSizeGauge.WithLabelValues("available").Set(float64(size.available))
log.Error("update size failed", zap.Error(err))
}
case <-logStatsTicker.C:
var stats leveldb.DBStats
Expand All @@ -344,6 +362,10 @@ func (a *Append) updateStatus() {
}
}

func (a *Append) writableOfSpace() bool {
return atomic.LoadUint64(&a.storageSize.available) > a.options.StopWriteAtAvailableSpace
}

func (a *Append) resolve(startTS int64) bool {
latestTS := atomic.LoadInt64(&a.latestTS)
if latestTS <= 0 {
Expand Down Expand Up @@ -618,8 +640,25 @@ func (a *Append) MaxCommitTS() int64 {
return atomic.LoadInt64(&a.maxCommitTS)
}

func isFakeBinlog(binlog *pb.Binlog) bool {
return binlog.StartTs > 0 && binlog.StartTs == binlog.CommitTs
}

// WriteBinlog implement Storage.WriteBinlog
func (a *Append) WriteBinlog(binlog *pb.Binlog) error {
if !a.writableOfSpace() {
// still accept fake binlog, so will not block drainer if fake binlog writes success
if !isFakeBinlog(binlog) {
return ErrNoAvailableSpace
}
}

// pump client will write some empty Payload to detect whether pump is working, should avoid this
// Unmarshal(nil) will success...
if binlog.StartTs == 0 && binlog.CommitTs == 0 {
return nil
}

return errors.Trace(a.writeBinlog(binlog).err)
}

Expand Down Expand Up @@ -1028,8 +1067,8 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
}

type storageSize struct {
capacity int
available int
capacity uint64
available uint64
}

func getStorageSize(dir string) (size storageSize, err error) {
Expand All @@ -1056,16 +1095,26 @@ func getStorageSize(dir string) (size storageSize, err error) {
}

// Available blocks * size per block = available space in bytes
size.available = int(stat.Bavail) * int(bSize)
size.capacity = int(stat.Blocks) * int(bSize)
size.available = stat.Bavail * bSize
size.capacity = stat.Blocks * bSize

return
}

// Config holds the configuration of storage
type Config struct {
SyncLog *bool `toml:"sync-log" json:"sync-log"`
KV *KVConfig `toml:"kv" json:"kv"`
SyncLog *bool `toml:"sync-log" json:"sync-log"`
KV *KVConfig `toml:"kv" json:"kv"`
StopWriteAtAvailableSpace *HumanizeBytes `toml:"stop-write-at-available-space" json:"stop-write-at-available-space"`
}

// GetStopWriteAtAvailableSpace return stop write available space
func (c *Config) GetStopWriteAtAvailableSpace() uint64 {
if c.StopWriteAtAvailableSpace == nil {
return defaultStopWriteAtAvailableSpace
}

return c.StopWriteAtAvailableSpace.Uint64()
}

// GetSyncLog return sync-log config option
Expand Down
29 changes: 29 additions & 0 deletions pump/storage/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package storage
import (
"encoding/binary"
"sync/atomic"

"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
)

var tsKeyPrefix = []byte("ts:")
Expand All @@ -25,6 +28,32 @@ func encodeTSKey(ts int64) []byte {
return buf
}

// HumanizeBytes is used for humanize configure
type HumanizeBytes uint64

// Uint64 return bytes
func (b HumanizeBytes) Uint64() uint64 {
return uint64(b)
}

// UnmarshalText implements UnmarshalText
func (b *HumanizeBytes) UnmarshalText(text []byte) error {
var err error

if len(text) == 0 {
*b = 0
return nil
}

n, err := humanize.ParseBytes(string(text))
if err != nil {
return errors.Annotatef(err, "text: %s", string(text))
}

*b = HumanizeBytes(n)
return nil
}

// test helper
type memOracle struct {
ts int64
Expand Down
20 changes: 20 additions & 0 deletions pump/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"
"testing"

"github.com/BurntSushi/toml"
"github.com/pingcap/check"
)

Expand Down Expand Up @@ -39,3 +40,22 @@ func (e *EncodeTSKeySuite) TestEncodeTSKey(c *check.C) {

c.Assert(sorted, check.IsTrue)
}

type UtilSuite struct{}

var _ = check.Suite(&UtilSuite{})

func (u *UtilSuite) TestHumanizeBytes(c *check.C) {
var s = struct {
DiskSize HumanizeBytes `toml:"disk_size" json:"disk_size"`
}{}

tomlData := `
disk_size = "42 MB"

`

_, err := toml.Decode(tomlData, &s)
c.Assert(err, check.IsNil)
c.Assert(s.DiskSize.Uint64(), check.Equals, uint64(42*1000*1000))
}
11 changes: 9 additions & 2 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ const (

// Options is the config options of Append and vlog
type Options struct {
ValueLogFileSize int64
Sync bool
ValueLogFileSize int64
Sync bool
StopWriteAtAvailableSpace uint64

KVConfig *KVConfig
}
Expand All @@ -46,6 +47,12 @@ func (o *Options) WithKVConfig(kvConfig *KVConfig) *Options {
return o
}

// WithStopWriteAtAvailableSpace set the Config
func (o *Options) WithStopWriteAtAvailableSpace(bytes uint64) *Options {
o.StopWriteAtAvailableSpace = bytes
return o
}

// WithValueLogFileSize set the ValueLogFileSize
func (o *Options) WithValueLogFileSize(size int64) *Options {
o.ValueLogFileSize = size
Expand Down