Skip to content

Commit

Permalink
lightning: support compression when sending kv pairs to tikv (#41164)
Browse files Browse the repository at this point in the history
close #41163
  • Loading branch information
sleepymole authored Feb 9, 2023
1 parent 4997e7c commit b359ee7
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 21 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_library(
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//encoding/gzip",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//slices",
Expand Down
71 changes: 54 additions & 17 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"math"
"net"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -77,6 +78,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -130,18 +132,25 @@ type ImportClientFactory interface {
}

type importClientFactoryImpl struct {
conns *common.GRPCConns
splitCli split.SplitClient
tls *common.TLS
tcpConcurrency int
conns *common.GRPCConns
splitCli split.SplitClient
tls *common.TLS
tcpConcurrency int
compressionType config.CompressionType
}

func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl {
func newImportClientFactoryImpl(
splitCli split.SplitClient,
tls *common.TLS,
tcpConcurrency int,
compressionType config.CompressionType,
) *importClientFactoryImpl {
return &importClientFactoryImpl{
conns: common.NewGRPCConns(),
splitCli: splitCli,
tls: tls,
tcpConcurrency: tcpConcurrency,
conns: common.NewGRPCConns(),
splitCli: splitCli,
tls: tls,
tcpConcurrency: tcpConcurrency,
compressionType: compressionType,
}
}

Expand All @@ -150,11 +159,14 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)
if err != nil {
return nil, errors.Trace(err)
}
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
var opts []grpc.DialOption
if f.tls.TLSConfig() != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig()))
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig())))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()

bfConf := backoff.DefaultConfig
bfConf.MaxDelay = gRPCBackOffMaxDelay
Expand All @@ -163,18 +175,34 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.DialContext(
ctx,
addr,
opt,
opts = append(opts,
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: gRPCKeepAliveTime,
Timeout: gRPCKeepAliveTimeout,
PermitWithoutStream: true,
}),
)
cancel()
switch f.compressionType {
case config.CompressionNone:
// do nothing
case config.CompressionGzip:
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
default:
return nil, common.ErrInvalidConfig.GenWithStack("unsupported compression type %s", f.compressionType)
}

failpoint.Inject("LoggingImportBytes", func() {
opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", target)
if err != nil {
return nil, err
}
return &loggingConn{Conn: conn}, nil
}))
})

conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -200,6 +228,15 @@ func (f *importClientFactoryImpl) Close() {
f.conns.Close()
}

type loggingConn struct {
net.Conn
}

func (c loggingConn) Write(b []byte) (int, error) {
log.L().Debug("import write", zap.Int("bytes", len(b)))
return c.Conn.Write(b)
}

// Range record start and end key for localStoreDir.DB
// so we can write it to tikv in streaming
type Range struct {
Expand Down Expand Up @@ -479,7 +516,7 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency)
importClientFactory := newImportClientFactoryImpl(splitCli, tls, rangeConcurrency, cfg.TikvImporter.CompressKVPairs)
duplicateDetection := cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone
keyAdapter := KeyAdapter(noopKeyAdapter{})
if duplicateDetection {
Expand Down
53 changes: 53 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,58 @@ func (dra DuplicateResolutionAlgorithm) String() string {
}
}

// CompressionType is the config type of compression algorithm.
type CompressionType int

const (
// CompressionNone means no compression.
CompressionNone CompressionType = iota
// CompressionGzip means gzip compression.
CompressionGzip
)

func (t *CompressionType) UnmarshalTOML(v interface{}) error {
if val, ok := v.(string); ok {
return t.FromStringValue(val)
}
return errors.Errorf("invalid compression-type '%v', please choose valid option between ['gzip']", v)
}

func (t CompressionType) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}

func (t *CompressionType) FromStringValue(s string) error {
switch strings.ToLower(s) {
case "":
*t = CompressionNone
case "gz", "gzip":
*t = CompressionGzip
default:
return errors.Errorf("invalid compression-type '%s', please choose valid option between ['gzip']", s)
}
return nil
}

func (t *CompressionType) MarshalJSON() ([]byte, error) {
return []byte(`"` + t.String() + `"`), nil
}

func (t *CompressionType) UnmarshalJSON(data []byte) error {
return t.FromStringValue(strings.Trim(string(data), `"`))
}

func (t CompressionType) String() string {
switch t {
case CompressionGzip:
return "gzip"
case CompressionNone:
return ""
default:
panic(fmt.Sprintf("invalid compression type '%d'", t))
}
}

// PostRestore has some options which will be executed after kv restored.
type PostRestore struct {
Checksum PostOpLevel `toml:"checksum" json:"checksum"`
Expand Down Expand Up @@ -583,6 +635,7 @@ type TikvImporter struct {
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
CompressKVPairs CompressionType `toml:"compress-kv-pairs" json:"compress-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"`
SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"`
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,3 +1144,17 @@ func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) {
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))
}

func TestCompressionType(t *testing.T) {
var ct config.CompressionType
require.NoError(t, ct.FromStringValue(""))
require.Equal(t, config.CompressionNone, ct)
require.NoError(t, ct.FromStringValue("gzip"))
require.Equal(t, config.CompressionGzip, ct)
require.NoError(t, ct.FromStringValue("gz"))
require.Equal(t, config.CompressionGzip, ct)
require.EqualError(t, ct.FromStringValue("zstd"), "invalid compression-type 'zstd', please choose valid option between ['gzip']")

require.Equal(t, "", config.CompressionNone.String())
require.Equal(t, "gzip", config.CompressionGzip.String())
}
4 changes: 0 additions & 4 deletions br/tests/lightning_compress/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,3 @@ enable = true
schema = "tidb_lightning_checkpoint_test"
driver = "mysql"
keep-after-success = true

[tikv-importer]
send-kv-pairs=10
region-split-size = 1024
5 changes: 5 additions & 0 deletions br/tests/lightning_import_compress/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tikv-importer]
backend = 'local'

[mydumper.csv]
header = false
6 changes: 6 additions & 0 deletions br/tests/lightning_import_compress/config_gz.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[tikv-importer]
backend = 'local'
compress-kv-pairs = 'gz'

[mydumper.csv]
header = false
6 changes: 6 additions & 0 deletions br/tests/lightning_import_compress/config_gzip.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[tikv-importer]
backend = 'local'
compress-kv-pairs = 'gzip'

[mydumper.csv]
header = false
56 changes: 56 additions & 0 deletions br/tests/lightning_import_compress/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/bash
#
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/LoggingImportBytes=return"

mkdir -p "$TEST_DIR/data"

cat <<EOF >"$TEST_DIR/data/test-schema-create.sql"
CREATE DATABASE test;
EOF
cat <<EOF >"$TEST_DIR/data/test.t-schema.sql"
CREATE TABLE test.t (id int primary key, a int, b int, c int);
EOF

# Generate 200k rows. Total size is about 5MiB.
for i in {1..200000}; do
echo "$i,$i,$i,$i" >>"$TEST_DIR/data/test.t.0.csv"
done

LOG_FILE1="$TEST_DIR/lightning-import-compress1.log"
LOG_FILE2="$TEST_DIR/lightning-import-compress2.log"
LOG_FILE3="$TEST_DIR/lightning-import-compress3.log"

run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config.toml" --log-file "$LOG_FILE1" -L debug
run_sql 'DROP DATABASE test;'
run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config_gz.toml" --log-file "$LOG_FILE2" -L debug
run_sql 'DROP DATABASE test;'
run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config_gzip.toml" --log-file "$LOG_FILE3" -L debug

uncompress=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress1.log |
grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}')
gzip=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress2.log |
grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}')
gz=$(grep "import write" /tmp/backup_restore_test/lightning-import-compress3.log |
grep -Eo "bytes=[0-9]+" | sed 's/bytes=//g' | awk '{sum+=$1} END {print sum}')

echo "uncompress: ${uncompress}, gzip: ${gzip}, gz: ${gz}"
if [ "$uncompress" -le "$gzip" ] || [ "$uncompress" -le "$gz" ]; then
echo "compress is not working"
exit 1
fi

0 comments on commit b359ee7

Please sign in to comment.