Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support go time package format gc time (#996) #997

Merged
merged 11 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions pkg/util/duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
)

var empty = ""
var _ toml.TextMarshaler = Duration(empty)
var _ toml.TextUnmarshaler = (*Duration)(&empty)
var _ json.Marshaler = Duration(empty)
var _ json.Unmarshaler = (*Duration)(&empty)

// Duration is a wrapper of time.Duration for TOML and JSON.
type Duration string

// NewDuration creates a Duration from time.Duration.
func NewDuration(duration time.Duration) Duration {
return Duration(duration.String())
}

// MarshalJSON returns the duration as a JSON string.
func (d Duration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, d)), nil
}

// UnmarshalJSON parses a JSON string into the duration.
func (d *Duration) UnmarshalJSON(text []byte) error {
s, err := strconv.Unquote(string(text))
if err != nil {
return errors.WithStack(err)
}
td := Duration(s)
_, err = td.ParseDuration()
if err != nil {
return errors.WithStack(err)
}
*d = Duration(s)
return nil
}

// UnmarshalText parses a TOML string into the duration.
func (d *Duration) UnmarshalText(text []byte) error {
var err error
td := Duration(text)
_, err = td.ParseDuration()
if err != nil {
return errors.WithStack(err)
}
*d = Duration(text)
return nil
}

// MarshalText returns the duration as a JSON string.
func (d Duration) MarshalText() ([]byte, error) {
return []byte(d), nil
}

// ParseDuration parses gc durations. The default unit is day.
func (d Duration) ParseDuration() (time.Duration, error) {
gc := string(d)
t, err := strconv.ParseUint(gc, 10, 64)
if err == nil {
return time.Duration(t) * 24 * time.Hour, nil
}
gcDuration, err := time.ParseDuration(gc)
if err != nil {
return 0, errors.Annotatef(err, "unsupported gc time %s, etc: use 7 for 7 day, 7h for 7 hour", gc)
}
return gcDuration, nil
}
42 changes: 42 additions & 0 deletions pkg/util/duration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"time"

. "github.com/pingcap/check"
)

type durationSuite struct{}

var _ = Suite(&durationSuite{})

func (s *durationSuite) TestParseDuration(c *C) {
gc := Duration("7")
expectDuration := 7 * 24 * time.Hour
duration, err := gc.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, expectDuration)

gc = "30m"
expectDuration = 30 * time.Minute
duration, err = gc.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, expectDuration)

gc = "7d"
_, err = gc.ParseDuration()
c.Assert(err, NotNil)
}
16 changes: 10 additions & 6 deletions pump/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
defaultListenAddr = "127.0.0.1:8250"
defautMaxKafkaSize = 1024 * 1024 * 1024
defaultHeartbeatInterval = 2
defaultGC = 7
defaultGC = "7"
defaultDataDir = "data.pump"

// default interval time to generate fake binlog, the unit is second
Expand All @@ -64,8 +64,8 @@ type Config struct {
EtcdDialTimeout time.Duration
DataDir string `toml:"data-dir" json:"data-dir"`
HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"`
// pump only stores binlog events whose ts >= current time - GC(day)
GC int `toml:"gc" json:"gc"`
// pump only stores binlog events whose ts >= current time - GC Time. The default unit is day
GC util.Duration `toml:"gc" json:"gc"`
LogFile string `toml:"log-file" json:"log-file"`
Security security.Config `toml:"security" json:"security"`

Expand Down Expand Up @@ -99,7 +99,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of the PD endpoints")
fs.StringVar(&cfg.DataDir, "data-dir", "", "the path to store binlog data")
fs.IntVar(&cfg.HeartbeatInterval, "heartbeat-interval", defaultHeartbeatInterval, "number of seconds between heartbeat ticks")
fs.IntVar(&cfg.GC, "gc", defaultGC, "recycle binlog files older than gc days")
fs.StringVar((*string)(&cfg.GC), "gc", defaultGC, "recycle binlog files older than gc time. default unit is day. also accept 8h format time(max unit is hour)")
fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push")
Expand Down Expand Up @@ -185,8 +185,12 @@ func (cfg *Config) configFromFile(path string) error {
// validate checks whether the configuration is valid
func (cfg *Config) validate() error {
// check GC
if cfg.GC <= 0 {
return errors.Errorf("GC is %d, must bigger than 0", cfg.GC)
if duration, err := cfg.GC.ParseDuration(); err == nil {
if duration <= 0 {
return errors.Errorf("GC is %s, must bigger than 0", cfg.GC)
}
} else {
return errors.Errorf("parse GC time failed, err: %s", err)
}

// check ListenAddr
Expand Down
98 changes: 97 additions & 1 deletion pump/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"io/ioutil"
"os"
"path"
"time"

"github.com/BurntSushi/toml"
. "github.com/pingcap/check"
"github.com/pingcap/tidb-binlog/pkg/util"
)

var _ = Suite(&testConfigSuite{})
Expand All @@ -29,7 +31,7 @@ type testConfigSuite struct{}

func (s *testConfigSuite) TestValidate(c *C) {
cfg := Config{}
cfg.GC = 1
cfg.GC = util.NewDuration(24 * time.Hour)
cfg.ListenAddr = "http://:8250"
cfg.EtcdURLs = "http://192.168.10.23:7777"

Expand Down Expand Up @@ -155,6 +157,100 @@ func (s *testConfigSuite) TestConfigParsingFileWithInvalidArgs(c *C) {
c.Assert(err, ErrorMatches, ".*contained unknown configuration options: unrecognized-option-test.*")
}

func (s *testConfigSuite) TestConfigParsingIntegerDuration(c *C) {
yc := struct {
ListenAddr string `toml:"addr" json:"addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
BinlogDir string `toml:"data-dir" json:"data-dir"`
GC int `toml:"gc" json:"gc"`
HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"`
}{
"192.168.199.100:8260",
"192.168.199.100:8260",
"http://192.168.199.110:2379,http://hostname:2379",
"/tmp/pump",
5,
1500,
}

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
err := e.Encode(yc)
c.Assert(err, IsNil)

configFilename := path.Join(c.MkDir(), "pump_config_gc_int.toml")
err = ioutil.WriteFile(configFilename, buf.Bytes(), 0644)
c.Assert(err, IsNil)

args := []string{
"--config",
configFilename,
"-L", "debug",
}

cfg := NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)
duration, err := cfg.GC.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, 5*24*time.Hour)

// test whether gc config can be covered by command lines
args = []string{
"--config",
configFilename,
"-L", "debug",
"--gc", "3",
}
cfg = NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)
duration, err = cfg.GC.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, 3*24*time.Hour)
}

func (s *testConfigSuite) TestConfigParsingStringDuration(c *C) {
yc := struct {
ListenAddr string `toml:"addr" json:"addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
BinlogDir string `toml:"data-dir" json:"data-dir"`
GC string `toml:"gc" json:"gc"`
HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"`
}{
"192.168.199.100:8260",
"192.168.199.100:8260",
"http://192.168.199.110:2379,http://hostname:2379",
"/tmp/pump",
"30m",
1500,
}

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
err := e.Encode(yc)
c.Assert(err, IsNil)

configFilename := path.Join(c.MkDir(), "pump_config_gc_str.toml")
err = ioutil.WriteFile(configFilename, buf.Bytes(), 0644)
c.Assert(err, IsNil)

args := []string{
"--config",
configFilename,
"-L", "debug",
}

cfg := NewConfig()
err = cfg.Parse(args)
c.Assert(err, IsNil)
duration, err := cfg.GC.ParseDuration()
c.Assert(err, IsNil)
c.Assert(duration, Equals, 30*time.Minute)
}

func mustSuccess(c *C, err error) {
c.Assert(err, IsNil)
}
Expand Down
6 changes: 5 additions & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func init() {

// NewServer returns a instance of pump server
func NewServer(cfg *Config) (*Server, error) {
gcDuration, err := cfg.GC.ParseDuration()
if err != nil {
return nil, errors.Trace(err)
}
var metrics *util.MetricClient
if cfg.MetricsAddr != "" && cfg.MetricsInterval != 0 {
metrics = util.NewMetricClient(
Expand Down Expand Up @@ -175,7 +179,7 @@ func NewServer(cfg *Config) (*Server, error) {
cancel: cancel,
metrics: metrics,
tiStore: tiStore,
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
gcDuration: gcDuration,
pdCli: pdCli,
cfg: cfg,
triggerGC: make(chan time.Time),
Expand Down
3 changes: 2 additions & 1 deletion pump/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ func (s *newServerSuite) SetUpTest(c *C) {
LogLevel: "debug",
MetricsAddr: "192.168.199.100:5000",
MetricsInterval: 15,
GC: "7",
Security: security.Config{
SSLCA: "/path/to/ca.pem",
SSLCert: "/path/to/drainer.pem",
Expand Down Expand Up @@ -713,7 +714,7 @@ func (s *startServerSuite) TestStartPumpServer(c *C) {
ctx: ctx,
cancel: cancel,
tiStore: nil,
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
gcDuration: 24 * time.Hour,
pdCli: nil,
cfg: cfg,
triggerGC: make(chan time.Time),
Expand Down