Skip to content

Commit

Permalink
Merge branch 'master' into fix-40225-x
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jan 4, 2023
2 parents cf0a709 + f483b39 commit f707b16
Show file tree
Hide file tree
Showing 45 changed files with 2,527 additions and 13 deletions.
18 changes: 17 additions & 1 deletion autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "autoid_service",
Expand All @@ -9,6 +9,7 @@ go_library(
"//config",
"//kv",
"//meta",
"//meta/autoid",
"//metrics",
"//owner",
"//parser/model",
Expand All @@ -23,3 +24,18 @@ go_library(
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "autoid_service_test",
srcs = ["autoid_test.go"],
embed = [":autoid_service"],
deps = [
"//parser/model",
"//testkit",
"@com_github_pingcap_kvproto//pkg/autoid",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
],
)
13 changes: 11 additions & 2 deletions autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
autoid1 "github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -253,6 +254,7 @@ type Service struct {
func New(selfAddr string, etcdAddr []string, store kv.Storage, tlsConfig *tls.Config) *Service {
cfg := config.GetGlobalConfig()
etcdLogCfg := zap.NewProductionConfig()

cli, err := clientv3.New(clientv3.Config{
LogConfig: &etcdLogCfg,
Endpoints: etcdAddr,
Expand All @@ -270,9 +272,12 @@ func New(selfAddr string, etcdAddr []string, store kv.Storage, tlsConfig *tls.Co
if err != nil {
panic(err)
}
return newWithCli(selfAddr, cli, store)
}

func newWithCli(selfAddr string, cli *clientv3.Client, store kv.Storage) *Service {
l := owner.NewOwnerManager(context.Background(), cli, "autoid", selfAddr, autoIDLeaderPath)
err = l.CampaignOwner()
err := l.CampaignOwner()
if err != nil {
panic(err)
}
Expand All @@ -299,7 +304,7 @@ func (m *mockClient) Rebase(ctx context.Context, in *autoid.RebaseRequest, opts
var global = make(map[string]*mockClient)

// MockForTest is used for testing, the UT test and unistore use this.
func MockForTest(store kv.Storage) *mockClient {
func MockForTest(store kv.Storage) autoid.AutoIDAllocClient {
uuid := store.UUID()
ret, ok := global[uuid]
if !ok {
Expand Down Expand Up @@ -515,3 +520,7 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi
}
return &autoid.RebaseResponse{}, nil
}

func init() {
autoid1.MockForTest = MockForTest
}
202 changes: 202 additions & 0 deletions autoid_service/autoid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package autoid

import (
"context"
"fmt"
"math"
"net"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type autoIDResp struct {
*autoid.AutoIDResponse
error
*testing.T
}

func (resp autoIDResp) check(min, max int64) {
require.NoError(resp.T, resp.error)
require.Equal(resp.T, resp.AutoIDResponse, &autoid.AutoIDResponse{Min: min, Max: max})
}

func (resp autoIDResp) checkErrmsg() {
require.NoError(resp.T, resp.error)
require.True(resp.T, len(resp.GetErrmsg()) > 0)
}

type rebaseResp struct {
*autoid.RebaseResponse
error
*testing.T
}

func (resp rebaseResp) check(msg string) {
require.NoError(resp.T, resp.error)
require.Equal(resp.T, string(resp.RebaseResponse.GetErrmsg()), msg)
}

func TestAPI(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
cli := MockForTest(store)
tk.MustExec("use test")
tk.MustExec("create table t (id int key auto_increment);")
is := dom.InfoSchema()
dbInfo, ok := is.SchemaByName(model.NewCIStr("test"))
require.True(t, ok)

tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tbInfo := tbl.Meta()

ctx := context.Background()
checkCurrValue := func(t *testing.T, cli autoid.AutoIDAllocClient, min, max int64) {
req := &autoid.AutoIDRequest{DbID: dbInfo.ID, TblID: tbInfo.ID, N: 0}
resp, err := cli.AllocAutoID(ctx, req)
require.NoError(t, err)
require.Equal(t, resp, &autoid.AutoIDResponse{Min: min, Max: max})
}
autoIDRequest := func(t *testing.T, cli autoid.AutoIDAllocClient, unsigned bool, n uint64, more ...int64) autoIDResp {
increment := int64(1)
offset := int64(1)
if len(more) >= 1 {
increment = more[0]
}
if len(more) >= 2 {
offset = more[1]
}
req := &autoid.AutoIDRequest{DbID: dbInfo.ID, TblID: tbInfo.ID, IsUnsigned: unsigned, N: n, Increment: increment, Offset: offset}
resp, err := cli.AllocAutoID(ctx, req)
return autoIDResp{resp, err, t}
}
rebaseRequest := func(t *testing.T, cli autoid.AutoIDAllocClient, unsigned bool, n int64, force ...struct{}) rebaseResp {
req := &autoid.RebaseRequest{
DbID: dbInfo.ID,
TblID: tbInfo.ID,
Base: n,
IsUnsigned: unsigned,
Force: len(force) > 0,
}
resp, err := cli.Rebase(ctx, req)
return rebaseResp{resp, err, t}
}
var force = struct{}{}

// basic auto id operation
autoIDRequest(t, cli, false, 1).check(0, 1)
autoIDRequest(t, cli, false, 10).check(1, 11)
checkCurrValue(t, cli, 11, 11)
autoIDRequest(t, cli, false, 128).check(11, 139)
autoIDRequest(t, cli, false, 1, 10, 5).check(139, 145)

// basic rebase operation
rebaseRequest(t, cli, false, 666).check("")
autoIDRequest(t, cli, false, 1).check(666, 667)

rebaseRequest(t, cli, false, 6666).check("")
autoIDRequest(t, cli, false, 1).check(6666, 6667)

// rebase will not decrease the value without 'force'
rebaseRequest(t, cli, false, 44).check("")
checkCurrValue(t, cli, 6667, 6667)
rebaseRequest(t, cli, false, 44, force).check("")
checkCurrValue(t, cli, 44, 44)

// max increase 1
rebaseRequest(t, cli, false, math.MaxInt64, force).check("")
checkCurrValue(t, cli, math.MaxInt64, math.MaxInt64)
autoIDRequest(t, cli, false, 1).checkErrmsg()

rebaseRequest(t, cli, true, 0, force).check("")
checkCurrValue(t, cli, 0, 0)
autoIDRequest(t, cli, true, 1).check(0, 1)
autoIDRequest(t, cli, true, 10).check(1, 11)
autoIDRequest(t, cli, true, 128).check(11, 139)
autoIDRequest(t, cli, true, 1, 10, 5).check(139, 145)

// max increase 1
rebaseRequest(t, cli, true, math.MaxInt64).check("")
checkCurrValue(t, cli, math.MaxInt64, math.MaxInt64)
autoIDRequest(t, cli, true, 1).check(math.MaxInt64, math.MinInt64)
autoIDRequest(t, cli, true, 1).check(math.MinInt64, math.MinInt64+1)

rebaseRequest(t, cli, true, -1).check("")
checkCurrValue(t, cli, -1, -1)
autoIDRequest(t, cli, true, 1).check(-1, 0)
}

func TestGRPC(t *testing.T) {
integration.BeforeTestExternal(t)
store := testkit.CreateMockStore(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
etcdCli := cluster.RandClient()

var addr string
var listener net.Listener
for port := 10080; ; port++ {
var err error
addr = fmt.Sprintf("127.0.0.1:%d", port)
listener, err = net.Listen("tcp", addr)
if err == nil {
break
}
}
defer listener.Close()

service := newWithCli(addr, etcdCli, store)
defer service.Close()

var i int
for !service.leaderShip.IsOwner() {
time.Sleep(100 * time.Millisecond)
i++
if i >= 20 {
break
}
}
require.Less(t, i, 20)

grpcServer := grpc.NewServer()
autoid.RegisterAutoIDAllocServer(grpcServer, service)
go func() {
grpcServer.Serve(listener)
}()
defer grpcServer.Stop()

grpcConn, err := grpc.Dial("127.0.0.1:10080", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
cli := autoid.NewAutoIDAllocClient(grpcConn)
_, err = cli.AllocAutoID(context.Background(), &autoid.AutoIDRequest{
DbID: 0,
TblID: 0,
N: 1,
Increment: 1,
Offset: 1,
IsUnsigned: false,
})
require.NoError(t, err)
}
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ go_test(
flaky = True,
shard_count = 50,
deps = [
"//autoid_service",
"//config",
"//ddl/ingest",
"//ddl/placement",
Expand Down
1 change: 1 addition & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/pingcap/errors"
_ "github.com/pingcap/tidb/autoid_service"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/schematracker"
Expand Down
8 changes: 8 additions & 0 deletions domain/historical_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ package domain

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics/handle"
)

var (
generateHistoricalStatsSuccessCounter = metrics.HistoricalStatsCounter.WithLabelValues("generate", "success")
generateHistoricalStatsFailedCounter = metrics.HistoricalStatsCounter.WithLabelValues("generate", "fail")
)

// HistoricalStatsWorker indicates for dump historical stats
type HistoricalStatsWorker struct {
tblCH chan int64
Expand Down Expand Up @@ -52,8 +58,10 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle *
return errors.Errorf("cannot get DBInfo by TableID %d", tableID)
}
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil {
generateHistoricalStatsFailedCounter.Inc()
return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O)
}
generateHistoricalStatsSuccessCounter.Inc()
return nil
}

Expand Down
12 changes: 11 additions & 1 deletion domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
Expand Down Expand Up @@ -167,6 +168,13 @@ func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.
}
}

var (
planReplayerCaptureTaskSendCounter = metrics.PlanReplayerTaskCounter.WithLabelValues("capture", "send")
planReplayerCaptureTaskDiscardCounter = metrics.PlanReplayerTaskCounter.WithLabelValues("capture", "discard")

planReplayerRegisterTaskGauge = metrics.PlanReplayerRegisterTaskGauge
)

type planReplayerHandle struct {
*planReplayerTaskCollectorHandle
*planReplayerTaskDumpHandle
Expand All @@ -181,9 +189,10 @@ func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) bool {
if !task.IsContinuesCapture {
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
}
planReplayerCaptureTaskSendCounter.Inc()
return true
default:
// TODO: add metrics here
planReplayerCaptureTaskDiscardCounter.Inc()
// directly discard the task if the task channel is full in order not to block the query process
logutil.BgLogger().Warn("discard one plan replayer dump task",
zap.String("sql-digest", task.SQLDigest), zap.String("plan-digest", task.PlanDigest))
Expand Down Expand Up @@ -221,6 +230,7 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
}
}
h.setupTasks(tasks)
planReplayerRegisterTaskGauge.Set(float64(len(tasks)))
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -145,6 +146,11 @@ func (tne *tableNameExtractor) handleIsView(t *ast.TableName) (bool, error) {
return true, nil
}

var (
planReplayerDumpTaskSuccess = metrics.PlanReplayerTaskCounter.WithLabelValues("dump", "success")
planReplayerDumpTaskFailed = metrics.PlanReplayerTaskCounter.WithLabelValues("dump", "fail")
)

// DumpPlanReplayerInfo will dump the information about sqls.
// The files will be organized into the following format:
/*
Expand Down Expand Up @@ -212,6 +218,9 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
zap.Strings("sqls", sqls))
}
errMsg = err.Error()
planReplayerDumpTaskFailed.Inc()
} else {
planReplayerDumpTaskSuccess.Inc()
}
err1 := zw.Close()
if err1 != nil {
Expand Down
Loading

0 comments on commit f707b16

Please sign in to comment.