diff --git a/cmd/pump/pump.toml b/cmd/pump/pump.toml index 65a906f0e..28ffa75ea 100644 --- a/cmd/pump/pump.toml +++ b/cmd/pump/pump.toml @@ -30,6 +30,12 @@ pd-urls = "http://127.0.0.1:2379" # [storage] # Set to `true` (default) for best reliability, which prevents data loss when there is a power failure. # sync-log = true + +# stop write when disk available space less then the configured size +# 42 MB -> 42000000, 42 mib -> 44040192 +# default: 10 gib +# stop-write-at-available-space = "10 gib" + # # we suggest using the default config of the embedded LSM DB now, do not change it useless you know what you are doing # [storage.kv] diff --git a/go.mod b/go.mod index fee214e55..ac0444bad 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/coreos/etcd v3.3.0-rc.0.0.20180530235116-2b3aa7e1d49d+incompatible - github.com/dustin/go-humanize v1.0.0 // indirect + github.com/dustin/go-humanize v1.0.0 github.com/eapache/go-resiliency v1.1.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v0.0.0-20180227141424-093482f3f8ce // indirect diff --git a/pump/server.go b/pump/server.go index fa18fe404..461486cd3 100644 --- a/pump/server.go +++ b/pump/server.go @@ -129,6 +129,7 @@ func NewServer(cfg *Config) (*Server, error) { options := storage.DefaultOptions() options = options.WithKVConfig(cfg.Storage.KV) options = options.WithSync(cfg.Storage.GetSyncLog()) + options = options.WithStopWriteAtAvailableSpace(cfg.Storage.GetStopWriteAtAvailableSpace()) storage, err := storage.NewAppendWithResolver(cfg.DataDir, options, tiStore, lockResolver) if err != nil { @@ -184,12 +185,6 @@ func getPdClient(cfg *Config) (pd.Client, error) { // WriteBinlog implements the gRPC interface of pump server func (s *Server) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogReq) (*binlog.WriteBinlogResp, error) { - // pump client will write some empty Payload to detect weather pump is working, should avoid this - if in.Payload == nil { - ret := new(binlog.WriteBinlogResp) - return ret, nil - } - atomic.AddInt64(&s.writeBinlogCount, 1) return s.writeBinlog(ctx, in, false) } diff --git a/pump/storage/errors.go b/pump/storage/errors.go index fce0c3756..9b86b299d 100644 --- a/pump/storage/errors.go +++ b/pump/storage/errors.go @@ -5,4 +5,7 @@ import "github.com/pingcap/errors" var ( // ErrWrongMagic means the magic number mismatch ErrWrongMagic = errors.New("wrong magic") + + // ErrNoAvailableSpace means no available space + ErrNoAvailableSpace = errors.New("no available space") ) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 3e26888c6..a27224194 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -27,8 +27,9 @@ import ( ) const ( - maxTxnTimeoutSecond int64 = 600 - chanSize = 1 << 20 + maxTxnTimeoutSecond int64 = 600 + chanSize = 1 << 20 + defaultStopWriteAtAvailableSpace = 10 * (1 << 30) ) var ( @@ -65,8 +66,9 @@ var _ Storage = &Append{} // Append implement the Storage interface type Append struct { - dir string - vlog *valueLog + dir string + vlog *valueLog + storageSize storageSize metadata *leveldb.DB sorter *sorter @@ -212,6 +214,11 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL } append.wg.Add(1) + err = append.updateSize() + if err != nil { + return nil, errors.Trace(err) + } + go append.updateStatus() return } @@ -293,6 +300,20 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) { return quit } +func (a *Append) updateSize() error { + size, err := getStorageSize(a.dir) + if err != nil { + return errors.Annotatef(err, "update storage size failed, dir: %s", a.dir) + } + + storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity)) + storageSizeGauge.WithLabelValues("available").Set(float64(size.available)) + + atomic.StoreUint64(&a.storageSize.available, size.available) + atomic.StoreUint64(&a.storageSize.capacity, size.capacity) + return nil +} + func (a *Append) updateStatus() { defer a.wg.Done() @@ -322,12 +343,9 @@ func (a *Append) updateStatus() { atomic.StoreInt64(&a.latestTS, ts) } case <-updateSize: - size, err := getStorageSize(a.dir) + err := a.updateSize() if err != nil { - log.Error("update sotrage size err: ", err) - } else { - storageSizeGauge.WithLabelValues("capacity").Set(float64(size.capacity)) - storageSizeGauge.WithLabelValues("available").Set(float64(size.available)) + log.Error("update size failed", zap.Error(err)) } case <-logStatsTicker.C: var stats leveldb.DBStats @@ -344,6 +362,10 @@ func (a *Append) updateStatus() { } } +func (a *Append) writableOfSpace() bool { + return atomic.LoadUint64(&a.storageSize.available) > a.options.StopWriteAtAvailableSpace +} + func (a *Append) resolve(startTS int64) bool { latestTS := atomic.LoadInt64(&a.latestTS) if latestTS <= 0 { @@ -618,8 +640,25 @@ func (a *Append) MaxCommitTS() int64 { return atomic.LoadInt64(&a.maxCommitTS) } +func isFakeBinlog(binlog *pb.Binlog) bool { + return binlog.StartTs > 0 && binlog.StartTs == binlog.CommitTs +} + // WriteBinlog implement Storage.WriteBinlog func (a *Append) WriteBinlog(binlog *pb.Binlog) error { + if !a.writableOfSpace() { + // still accept fake binlog, so will not block drainer if fake binlog writes success + if !isFakeBinlog(binlog) { + return ErrNoAvailableSpace + } + } + + // pump client will write some empty Payload to detect whether pump is working, should avoid this + // Unmarshal(nil) will success... + if binlog.StartTs == 0 && binlog.CommitTs == 0 { + return nil + } + return errors.Trace(a.writeBinlog(binlog).err) } @@ -1028,8 +1067,8 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte } type storageSize struct { - capacity int - available int + capacity uint64 + available uint64 } func getStorageSize(dir string) (size storageSize, err error) { @@ -1056,16 +1095,26 @@ func getStorageSize(dir string) (size storageSize, err error) { } // Available blocks * size per block = available space in bytes - size.available = int(stat.Bavail) * int(bSize) - size.capacity = int(stat.Blocks) * int(bSize) + size.available = stat.Bavail * bSize + size.capacity = stat.Blocks * bSize return } // Config holds the configuration of storage type Config struct { - SyncLog *bool `toml:"sync-log" json:"sync-log"` - KV *KVConfig `toml:"kv" json:"kv"` + SyncLog *bool `toml:"sync-log" json:"sync-log"` + KV *KVConfig `toml:"kv" json:"kv"` + StopWriteAtAvailableSpace *HumanizeBytes `toml:"stop-write-at-available-space" json:"stop-write-at-available-space"` +} + +// GetStopWriteAtAvailableSpace return stop write available space +func (c *Config) GetStopWriteAtAvailableSpace() uint64 { + if c.StopWriteAtAvailableSpace == nil { + return defaultStopWriteAtAvailableSpace + } + + return c.StopWriteAtAvailableSpace.Uint64() } // GetSyncLog return sync-log config option diff --git a/pump/storage/util.go b/pump/storage/util.go index 19dcad8e9..7eb4debba 100644 --- a/pump/storage/util.go +++ b/pump/storage/util.go @@ -3,6 +3,9 @@ package storage import ( "encoding/binary" "sync/atomic" + + "github.com/dustin/go-humanize" + "github.com/pingcap/errors" ) var tsKeyPrefix = []byte("ts:") @@ -25,6 +28,32 @@ func encodeTSKey(ts int64) []byte { return buf } +// HumanizeBytes is used for humanize configure +type HumanizeBytes uint64 + +// Uint64 return bytes +func (b HumanizeBytes) Uint64() uint64 { + return uint64(b) +} + +// UnmarshalText implements UnmarshalText +func (b *HumanizeBytes) UnmarshalText(text []byte) error { + var err error + + if len(text) == 0 { + *b = 0 + return nil + } + + n, err := humanize.ParseBytes(string(text)) + if err != nil { + return errors.Annotatef(err, "text: %s", string(text)) + } + + *b = HumanizeBytes(n) + return nil +} + // test helper type memOracle struct { ts int64 diff --git a/pump/storage/util_test.go b/pump/storage/util_test.go index b8b1038ab..d9d53929f 100644 --- a/pump/storage/util_test.go +++ b/pump/storage/util_test.go @@ -5,6 +5,7 @@ import ( "sort" "testing" + "github.com/BurntSushi/toml" "github.com/pingcap/check" ) @@ -39,3 +40,22 @@ func (e *EncodeTSKeySuite) TestEncodeTSKey(c *check.C) { c.Assert(sorted, check.IsTrue) } + +type UtilSuite struct{} + +var _ = check.Suite(&UtilSuite{}) + +func (u *UtilSuite) TestHumanizeBytes(c *check.C) { + var s = struct { + DiskSize HumanizeBytes `toml:"disk_size" json:"disk_size"` + }{} + + tomlData := ` +disk_size = "42 MB" + + ` + + _, err := toml.Decode(tomlData, &s) + c.Assert(err, check.IsNil) + c.Assert(s.DiskSize.Uint64(), check.Equals, uint64(42*1000*1000)) +} diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index f20263959..964c06467 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -26,8 +26,9 @@ const ( // Options is the config options of Append and vlog type Options struct { - ValueLogFileSize int64 - Sync bool + ValueLogFileSize int64 + Sync bool + StopWriteAtAvailableSpace uint64 KVConfig *KVConfig } @@ -46,6 +47,12 @@ func (o *Options) WithKVConfig(kvConfig *KVConfig) *Options { return o } +// WithStopWriteAtAvailableSpace set the Config +func (o *Options) WithStopWriteAtAvailableSpace(bytes uint64) *Options { + o.StopWriteAtAvailableSpace = bytes + return o +} + // WithValueLogFileSize set the ValueLogFileSize func (o *Options) WithValueLogFileSize(size int64) *Options { o.ValueLogFileSize = size