From 7d7350f2b2947606ac9cfef6beb17417570cfb49 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Tue, 7 Mar 2023 15:33:19 +0800 Subject: [PATCH] lightning: replace grpc gzip compressor with klauspost/compress/gzip (#41974) close pingcap/tidb#41970 --- br/pkg/lightning/backend/local/BUILD.bazel | 4 +- br/pkg/lightning/backend/local/compress.go | 76 +++++++++++++ .../lightning/backend/local/compress_test.go | 101 ++++++++++++++++++ br/pkg/lightning/backend/local/local.go | 11 +- build/nogo_config.json | 4 +- 5 files changed, 189 insertions(+), 7 deletions(-) create mode 100644 br/pkg/lightning/backend/local/compress.go create mode 100644 br/pkg/lightning/backend/local/compress_test.go diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 60ec468fb8cb6..272790a405e70 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "local", srcs = [ + "compress.go", "duplicate.go", "engine.go", "iterator.go", @@ -55,6 +56,7 @@ go_library( "@com_github_docker_go_units//:go-units", "@com_github_google_btree//:btree", "@com_github_google_uuid//:uuid", + "@com_github_klauspost_compress//gzip", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/errorpb", @@ -71,7 +73,6 @@ go_library( "@org_golang_google_grpc//codes", "@org_golang_google_grpc//credentials", "@org_golang_google_grpc//credentials/insecure", - "@org_golang_google_grpc//encoding/gzip", "@org_golang_google_grpc//keepalive", "@org_golang_google_grpc//status", "@org_golang_x_exp//slices", @@ -87,6 +88,7 @@ go_test( name = "local_test", timeout = "short", srcs = [ + "compress_test.go", "duplicate_test.go", "engine_test.go", "iterator_test.go", diff --git a/br/pkg/lightning/backend/local/compress.go b/br/pkg/lightning/backend/local/compress.go new file mode 100644 index 0000000000000..bf5aea0924150 --- /dev/null +++ b/br/pkg/lightning/backend/local/compress.go @@ -0,0 +1,76 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "io" + "sync" + + "github.com/klauspost/compress/gzip" // faster than stdlib + "google.golang.org/grpc" +) + +var ( + _ grpc.Compressor = (*gzipCompressor)(nil) + _ grpc.Decompressor = (*gzipDecompressor)(nil) +) + +type gzipCompressor struct{} + +var gzipWriterPool = sync.Pool{ + New: func() any { + return gzip.NewWriter(io.Discard) + }, +} + +func (c *gzipCompressor) Do(w io.Writer, p []byte) error { + z := gzipWriterPool.Get().(*gzip.Writer) + defer gzipWriterPool.Put(z) + z.Reset(w) + if _, err := z.Write(p); err != nil { + return err + } + return z.Close() +} + +func (c *gzipCompressor) Type() string { + return "gzip" +} + +type gzipDecompressor struct{} + +var gzipReaderPool = sync.Pool{ + New: func() any { + return &gzip.Reader{} + }, +} + +func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) { + z := gzipReaderPool.Get().(*gzip.Reader) + if err := z.Reset(r); err != nil { + gzipReaderPool.Put(z) + return nil, err + } + + defer func() { + _ = z.Close() + gzipReaderPool.Put(z) + }() + return io.ReadAll(z) +} + +func (d *gzipDecompressor) Type() string { + return "gzip" +} diff --git a/br/pkg/lightning/backend/local/compress_test.go b/br/pkg/lightning/backend/local/compress_test.go new file mode 100644 index 0000000000000..04bffc7eaf907 --- /dev/null +++ b/br/pkg/lightning/backend/local/compress_test.go @@ -0,0 +1,101 @@ +package local + +import ( + "bytes" + "compress/gzip" // use standard library to verify the result + "crypto/rand" + "io" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +func TestGzipCompressor(t *testing.T) { + compressor := &gzipCompressor{} + require.Equal(t, "gzip", compressor.Type()) + + input := make([]byte, 1<<20) + _, err := rand.Read(input) + require.NoError(t, err) + + buf := &bytes.Buffer{} + err = compressor.Do(buf, input) + require.NoError(t, err) + + compressed := buf.Bytes() + z, err := gzip.NewReader(bytes.NewReader(compressed)) + require.NoError(t, err) + uncompressed, err := io.ReadAll(z) + require.NoError(t, err) + require.NoError(t, z.Close()) + + require.Equal(t, input, uncompressed) +} + +func TestGzipDecompressor(t *testing.T) { + decompressor := &gzipDecompressor{} + require.Equal(t, "gzip", decompressor.Type()) + + input := make([]byte, 1<<20) + _, err := rand.Read(input) + require.NoError(t, err) + + buf := &bytes.Buffer{} + z := gzip.NewWriter(buf) + _, err = z.Write(input) + require.NoError(t, err) + require.NoError(t, z.Close()) + + uncompressed, err := decompressor.Do(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + + require.Equal(t, input, uncompressed) +} + +func BenchmarkGzipCompressor(b *testing.B) { + benchCompressor(b, &gzipCompressor{}) +} + +func BenchmarkGrpcGzipCompressor(b *testing.B) { + benchCompressor(b, grpc.NewGZIPCompressor()) +} + +func benchCompressor(b *testing.B, compressor grpc.Compressor) { + input := make([]byte, 1<<20) + _, err := rand.Read(input) + require.NoError(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + buf := &bytes.Buffer{} + err = compressor.Do(buf, input) + require.NoError(b, err) + } +} + +func BenchmarkGzipDecompressor(b *testing.B) { + benchDecompressor(b, &gzipDecompressor{}) +} + +func BenchmarkGrpcGzipDecompressor(b *testing.B) { + benchDecompressor(b, grpc.NewGZIPDecompressor()) +} + +func benchDecompressor(b *testing.B, decompressor grpc.Decompressor) { + input := make([]byte, 1<<20) + _, err := rand.Read(input) + require.NoError(b, err) + + buf := &bytes.Buffer{} + z := gzip.NewWriter(buf) + _, err = z.Write(input) + require.NoError(b, err) + require.NoError(b, z.Close()) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := decompressor.Do(bytes.NewReader(buf.Bytes())) + require.NoError(b, err) + } +} diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 32effde381d3c..41b7c0dc5fc77 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -75,7 +75,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) @@ -88,12 +87,10 @@ const ( // A large retry times is for tolerating tikv cluster failures. maxWriteAndIngestRetryTimes = 30 maxRetryBackoffSecond = 30 - maxRetryBackoffTime = 30 * time.Second gRPCKeepAliveTime = 10 * time.Minute gRPCKeepAliveTimeout = 5 * time.Minute gRPCBackOffMaxDelay = 10 * time.Minute - writeStallSleepTime = 10 * time.Second // The max ranges count in a batch to split and scatter. maxBatchSplitRanges = 4096 @@ -183,9 +180,13 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) ) switch f.compressionType { case config.CompressionNone: - // do nothing + // do nothing case config.CompressionGzip: - opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) + // Use custom compressor/decompressor to speed up compression/decompression. + // Note that here we don't use grpc.UseCompressor option although it's the recommended way. + // Because gprc-go uses a global registry to store compressor/decompressor, we can't make sure + // the compressor/decompressor is not registered by other components. + opts = append(opts, grpc.WithCompressor(&gzipCompressor{}), grpc.WithDecompressor(&gzipDecompressor{})) default: return nil, common.ErrInvalidConfig.GenWithStack("unsupported compression type %s", f.compressionType) } diff --git a/build/nogo_config.json b/build/nogo_config.json index fed4526d791c9..12de0e45b4738 100644 --- a/build/nogo_config.json +++ b/build/nogo_config.json @@ -755,7 +755,9 @@ "br/pkg/restore/split/client.go": "github.com/golang/protobuf deprecated", "br/pkg/streamhelper/advancer_cliext.go": "github.com/golang/protobuf deprecated", "br/pkg/lightning/checkpoints/checkpoints.go": "cfg.TikvImporter.Addr is deprecated", - "br/pkg/lightning/checkpoints/glue_checkpoint.go": "cfg.TikvImporter.Addr is deprecated" + "br/pkg/lightning/checkpoints/glue_checkpoint.go": "cfg.TikvImporter.Addr is deprecated", + "br/pkg/lightning/backend/local/local.go": "grpc Compressor/Decompressor is deprecated", + "br/pkg/lightning/backend/local/compress.go": "grpc Compressor/Decompressor is deprecated" }, "only_files": { "util/gctuner": "util/gctuner",