Skip to content

Commit

Permalink
lightning: replace grpc gzip compressor with klauspost/compress/gzip (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepymole authored Mar 7, 2023
1 parent 0a57d81 commit 7d7350f
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 7 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "local",
srcs = [
"compress.go",
"duplicate.go",
"engine.go",
"iterator.go",
Expand Down Expand Up @@ -55,6 +56,7 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_google_uuid//:uuid",
"@com_github_klauspost_compress//gzip",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/errorpb",
Expand All @@ -71,7 +73,6 @@ go_library(
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//encoding/gzip",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//slices",
Expand All @@ -87,6 +88,7 @@ go_test(
name = "local_test",
timeout = "short",
srcs = [
"compress_test.go",
"duplicate_test.go",
"engine_test.go",
"iterator_test.go",
Expand Down
76 changes: 76 additions & 0 deletions br/pkg/lightning/backend/local/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
"io"
"sync"

"github.com/klauspost/compress/gzip" // faster than stdlib
"google.golang.org/grpc"
)

var (
_ grpc.Compressor = (*gzipCompressor)(nil)
_ grpc.Decompressor = (*gzipDecompressor)(nil)
)

type gzipCompressor struct{}

var gzipWriterPool = sync.Pool{
New: func() any {
return gzip.NewWriter(io.Discard)
},
}

func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
z := gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(z)
z.Reset(w)
if _, err := z.Write(p); err != nil {
return err
}
return z.Close()
}

func (c *gzipCompressor) Type() string {
return "gzip"
}

type gzipDecompressor struct{}

var gzipReaderPool = sync.Pool{
New: func() any {
return &gzip.Reader{}
},
}

func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
z := gzipReaderPool.Get().(*gzip.Reader)
if err := z.Reset(r); err != nil {
gzipReaderPool.Put(z)
return nil, err
}

defer func() {
_ = z.Close()
gzipReaderPool.Put(z)
}()
return io.ReadAll(z)
}

func (d *gzipDecompressor) Type() string {
return "gzip"
}
101 changes: 101 additions & 0 deletions br/pkg/lightning/backend/local/compress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package local

import (
"bytes"
"compress/gzip" // use standard library to verify the result
"crypto/rand"
"io"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

func TestGzipCompressor(t *testing.T) {
compressor := &gzipCompressor{}
require.Equal(t, "gzip", compressor.Type())

input := make([]byte, 1<<20)
_, err := rand.Read(input)
require.NoError(t, err)

buf := &bytes.Buffer{}
err = compressor.Do(buf, input)
require.NoError(t, err)

compressed := buf.Bytes()
z, err := gzip.NewReader(bytes.NewReader(compressed))
require.NoError(t, err)
uncompressed, err := io.ReadAll(z)
require.NoError(t, err)
require.NoError(t, z.Close())

require.Equal(t, input, uncompressed)
}

func TestGzipDecompressor(t *testing.T) {
decompressor := &gzipDecompressor{}
require.Equal(t, "gzip", decompressor.Type())

input := make([]byte, 1<<20)
_, err := rand.Read(input)
require.NoError(t, err)

buf := &bytes.Buffer{}
z := gzip.NewWriter(buf)
_, err = z.Write(input)
require.NoError(t, err)
require.NoError(t, z.Close())

uncompressed, err := decompressor.Do(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)

require.Equal(t, input, uncompressed)
}

func BenchmarkGzipCompressor(b *testing.B) {
benchCompressor(b, &gzipCompressor{})
}

func BenchmarkGrpcGzipCompressor(b *testing.B) {
benchCompressor(b, grpc.NewGZIPCompressor())
}

func benchCompressor(b *testing.B, compressor grpc.Compressor) {
input := make([]byte, 1<<20)
_, err := rand.Read(input)
require.NoError(b, err)

b.ResetTimer()
for i := 0; i < b.N; i++ {
buf := &bytes.Buffer{}
err = compressor.Do(buf, input)
require.NoError(b, err)
}
}

func BenchmarkGzipDecompressor(b *testing.B) {
benchDecompressor(b, &gzipDecompressor{})
}

func BenchmarkGrpcGzipDecompressor(b *testing.B) {
benchDecompressor(b, grpc.NewGZIPDecompressor())
}

func benchDecompressor(b *testing.B, decompressor grpc.Decompressor) {
input := make([]byte, 1<<20)
_, err := rand.Read(input)
require.NoError(b, err)

buf := &bytes.Buffer{}
z := gzip.NewWriter(buf)
_, err = z.Write(input)
require.NoError(b, err)
require.NoError(b, z.Close())

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := decompressor.Do(bytes.NewReader(buf.Bytes()))
require.NoError(b, err)
}
}
11 changes: 6 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)
Expand All @@ -88,12 +87,10 @@ const (
// A large retry times is for tolerating tikv cluster failures.
maxWriteAndIngestRetryTimes = 30
maxRetryBackoffSecond = 30
maxRetryBackoffTime = 30 * time.Second

gRPCKeepAliveTime = 10 * time.Minute
gRPCKeepAliveTimeout = 5 * time.Minute
gRPCBackOffMaxDelay = 10 * time.Minute
writeStallSleepTime = 10 * time.Second

// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096
Expand Down Expand Up @@ -183,9 +180,13 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)
)
switch f.compressionType {
case config.CompressionNone:
// do nothing
// do nothing
case config.CompressionGzip:
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
// Use custom compressor/decompressor to speed up compression/decompression.
// Note that here we don't use grpc.UseCompressor option although it's the recommended way.
// Because gprc-go uses a global registry to store compressor/decompressor, we can't make sure
// the compressor/decompressor is not registered by other components.
opts = append(opts, grpc.WithCompressor(&gzipCompressor{}), grpc.WithDecompressor(&gzipDecompressor{}))
default:
return nil, common.ErrInvalidConfig.GenWithStack("unsupported compression type %s", f.compressionType)
}
Expand Down
4 changes: 3 additions & 1 deletion build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,9 @@
"br/pkg/restore/split/client.go": "github.com/golang/protobuf deprecated",
"br/pkg/streamhelper/advancer_cliext.go": "github.com/golang/protobuf deprecated",
"br/pkg/lightning/checkpoints/checkpoints.go": "cfg.TikvImporter.Addr is deprecated",
"br/pkg/lightning/checkpoints/glue_checkpoint.go": "cfg.TikvImporter.Addr is deprecated"
"br/pkg/lightning/checkpoints/glue_checkpoint.go": "cfg.TikvImporter.Addr is deprecated",
"br/pkg/lightning/backend/local/local.go": "grpc Compressor/Decompressor is deprecated",
"br/pkg/lightning/backend/local/compress.go": "grpc Compressor/Decompressor is deprecated"
},
"only_files": {
"util/gctuner": "util/gctuner",
Expand Down

0 comments on commit 7d7350f

Please sign in to comment.