diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 2cd903d3bc290..42fbadabc9e11 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -70,6 +70,7 @@ go_library( "@org_golang_google_grpc//codes", "@org_golang_google_grpc//credentials", "@org_golang_google_grpc//credentials/insecure", + "@org_golang_google_grpc//encoding/gzip", "@org_golang_google_grpc//keepalive", "@org_golang_google_grpc//status", "@org_golang_x_exp//slices", diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 6f0a2e6a130fd..cd2a7dd44c5cc 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "math" + "net" "os" "path/filepath" "strings" @@ -77,6 +78,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) @@ -130,18 +132,25 @@ type ImportClientFactory interface { } type importClientFactoryImpl struct { - conns *common.GRPCConns - splitCli split.SplitClient - tls *common.TLS - tcpConcurrency int + conns *common.GRPCConns + splitCli split.SplitClient + tls *common.TLS + tcpConcurrency int + compressionType config.CompressionType } -func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl { +func newImportClientFactoryImpl( + splitCli split.SplitClient, + tls *common.TLS, + tcpConcurrency int, + compressionType config.CompressionType, +) *importClientFactoryImpl { return &importClientFactoryImpl{ - conns: common.NewGRPCConns(), - splitCli: splitCli, - tls: tls, - tcpConcurrency: tcpConcurrency, + conns: common.NewGRPCConns(), + splitCli: splitCli, + tls: tls, + tcpConcurrency: tcpConcurrency, + compressionType: compressionType, } } @@ -150,11 +159,14 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) if err != nil { return nil, errors.Trace(err) } - opt := grpc.WithTransportCredentials(insecure.NewCredentials()) + var opts []grpc.DialOption if f.tls.TLSConfig() != nil { - opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig())) + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig()))) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } ctx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() bfConf := backoff.DefaultConfig bfConf.MaxDelay = gRPCBackOffMaxDelay @@ -163,10 +175,7 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) if addr == "" { addr = store.GetAddress() } - conn, err := grpc.DialContext( - ctx, - addr, - opt, + opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: gRPCKeepAliveTime, @@ -174,7 +183,26 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) PermitWithoutStream: true, }), ) - cancel() + switch f.compressionType { + case config.CompressionNone: + // do nothing + case config.CompressionGzip: + opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) + default: + return nil, common.ErrInvalidConfig.GenWithStack("unsupported compression type %s", f.compressionType) + } + + failpoint.Inject("LoggingImportBytes", func() { + opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { + conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", target) + if err != nil { + return nil, err + } + return &loggingConn{Conn: conn}, nil + })) + }) + + conn, err := grpc.DialContext(ctx, addr, opts...) if err != nil { return nil, errors.Trace(err) } @@ -200,6 +228,15 @@ func (f *importClientFactoryImpl) Close() { f.conns.Close() } +type loggingConn struct { + net.Conn +} + +func (c loggingConn) Write(b []byte) (int, error) { + log.L().Debug("import write", zap.Int("bytes", len(b))) + return c.Conn.Write(b) +} + // Range record start and end key for localStoreDir.DB // so we can write it to tikv in streaming type Range struct { @@ -479,7 +516,7 @@ func NewLocalBackend( if err != nil { return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } - importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency) + importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency, cfg.TikvImporter.CompressKVPairs) duplicateDetection := cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone keyAdapter := KeyAdapter(noopKeyAdapter{}) if duplicateDetection { diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 7038bd4d9255d..d535478c5f4c4 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -468,6 +468,58 @@ func (dra DuplicateResolutionAlgorithm) String() string { } } +// CompressionType is the config type of compression algorithm. +type CompressionType int + +const ( + // CompressionNone means no compression. + CompressionNone CompressionType = iota + // CompressionGzip means gzip compression. + CompressionGzip +) + +func (t *CompressionType) UnmarshalTOML(v interface{}) error { + if val, ok := v.(string); ok { + return t.FromStringValue(val) + } + return errors.Errorf("invalid compression-type '%v', please choose valid option between ['gzip']", v) +} + +func (t CompressionType) MarshalText() ([]byte, error) { + return []byte(t.String()), nil +} + +func (t *CompressionType) FromStringValue(s string) error { + switch strings.ToLower(s) { + case "": + *t = CompressionNone + case "gz", "gzip": + *t = CompressionGzip + default: + return errors.Errorf("invalid compression-type '%s', please choose valid option between ['gzip']", s) + } + return nil +} + +func (t *CompressionType) MarshalJSON() ([]byte, error) { + return []byte(`"` + t.String() + `"`), nil +} + +func (t *CompressionType) UnmarshalJSON(data []byte) error { + return t.FromStringValue(strings.Trim(string(data), `"`)) +} + +func (t CompressionType) String() string { + switch t { + case CompressionGzip: + return "gzip" + case CompressionNone: + return "" + default: + panic(fmt.Sprintf("invalid compression type '%d'", t)) + } +} + // PostRestore has some options which will be executed after kv restored. type PostRestore struct { Checksum PostOpLevel `toml:"checksum" json:"checksum"` @@ -583,6 +635,7 @@ type TikvImporter struct { OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"` MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"` SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"` + CompressKVPairs CompressionType `toml:"compress-kv-pairs" json:"compress-kv-pairs"` RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"` RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"` SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"` diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index f590391740ec4..91edd8aa46c12 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -1144,3 +1144,17 @@ func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) { )) require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg)) } + +func TestCompressionType(t *testing.T) { + var ct config.CompressionType + require.NoError(t, ct.FromStringValue("")) + require.Equal(t, config.CompressionNone, ct) + require.NoError(t, ct.FromStringValue("gzip")) + require.Equal(t, config.CompressionGzip, ct) + require.NoError(t, ct.FromStringValue("gz")) + require.Equal(t, config.CompressionGzip, ct) + require.EqualError(t, ct.FromStringValue("zstd"), "invalid compression-type 'zstd', please choose valid option between ['gzip']") + + require.Equal(t, "", config.CompressionNone.String()) + require.Equal(t, "gzip", config.CompressionGzip.String()) +} diff --git a/br/tests/lightning_compress/config.toml b/br/tests/lightning_compress/config.toml index 000018c5c41d4..f4452fe7664a6 100644 --- a/br/tests/lightning_compress/config.toml +++ b/br/tests/lightning_compress/config.toml @@ -12,7 +12,3 @@ enable = true schema = "tidb_lightning_checkpoint_test" driver = "mysql" keep-after-success = true - -[tikv-importer] -send-kv-pairs=10 -region-split-size = 1024 diff --git a/br/tests/lightning_import_compress/config.toml b/br/tests/lightning_import_compress/config.toml new file mode 100644 index 0000000000000..30df6c8e0c98e --- /dev/null +++ b/br/tests/lightning_import_compress/config.toml @@ -0,0 +1,5 @@ +[tikv-importer] +backend = 'local' + +[mydumper.csv] +header = false diff --git a/br/tests/lightning_import_compress/config_gz.toml b/br/tests/lightning_import_compress/config_gz.toml new file mode 100644 index 0000000000000..d26e6ae237c18 --- /dev/null +++ b/br/tests/lightning_import_compress/config_gz.toml @@ -0,0 +1,6 @@ +[tikv-importer] +backend = 'local' +compress-kv-pairs = 'gz' + +[mydumper.csv] +header = false diff --git a/br/tests/lightning_import_compress/config_gzip.toml b/br/tests/lightning_import_compress/config_gzip.toml new file mode 100644 index 0000000000000..24a873a27599b --- /dev/null +++ b/br/tests/lightning_import_compress/config_gzip.toml @@ -0,0 +1,6 @@ +[tikv-importer] +backend = 'local' +compress-kv-pairs = 'gzip' + +[mydumper.csv] +header = false diff --git a/br/tests/lightning_import_compress/run.sh b/br/tests/lightning_import_compress/run.sh new file mode 100644 index 0000000000000..e6414881e4e0e --- /dev/null +++ b/br/tests/lightning_import_compress/run.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/LoggingImportBytes=return" + +mkdir -p "$TEST_DIR/data" + +cat <"$TEST_DIR/data/test-schema-create.sql" +CREATE DATABASE test; +EOF +cat <"$TEST_DIR/data/test.t-schema.sql" +CREATE TABLE test.t (id int primary key, a int, b int, c int); +EOF + +# Generate 200k rows. Total size is about 5MiB. +for i in {1..200000}; do + echo "$i,$i,$i,$i" >>"$TEST_DIR/data/test.t.0.csv" +done + +LOG_FILE1="$TEST_DIR/lightning-import-compress1.log" +LOG_FILE2="$TEST_DIR/lightning-import-compress2.log" +LOG_FILE3="$TEST_DIR/lightning-import-compress3.log" + +run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config.toml" --log-file "$LOG_FILE1" -L debug +run_sql 'DROP DATABASE test;' +run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config_gz.toml" --log-file "$LOG_FILE2" -L debug +run_sql 'DROP DATABASE test;' +run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config_gzip.toml" --log-file "$LOG_FILE3" -L debug + +uncompress=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress1.log | + grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}') +gzip=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress2.log | + grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}') +gz=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress3.log | + grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}') + +echo "uncompress: ${uncompress}, gzip: ${gzip}, gz: ${gz}" +if [ "$uncompress" -le "$gzip" ] || [ "$uncompress" -le "$gz" ]; then + echo "compress is not working" + exit 1 +fi