Skip to content

Commit

Permalink
Merge branch 'master' into binding_set_parser
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzf678 authored Dec 14, 2022
2 parents a68b7be + d2eca72 commit 4c2a986
Show file tree
Hide file tree
Showing 31 changed files with 508 additions and 118 deletions.
2 changes: 1 addition & 1 deletion br/COMPATIBILITY_TEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Background

We had some incompatibility issues in the past, which made BR cannot restore backed up data in some situations.
So we need a test workflow to check the compatiblity.
So we need a test workflow to check the compatibility.

## Goal

Expand Down
14 changes: 13 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
return pebble.Open(dbPath, opts)
}

var (
// RunInTest indicates whether the current process is running in test.
RunInTest bool
// LastAlloc is the last ID allocator.
LastAlloc manual.Allocator
)

// NewLocalBackend creates new connections to tikv.
func NewLocalBackend(
ctx context.Context,
Expand Down Expand Up @@ -461,6 +468,11 @@ func NewLocalBackend(
} else {
writeLimiter = noopStoreWriteLimiter{}
}
alloc := manual.Allocator{}
if RunInTest {
alloc.RefCnt = new(atomic.Int64)
LastAlloc = alloc
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
Expand All @@ -486,7 +498,7 @@ func NewLocalBackend(
keyAdapter: keyAdapter,
errorMgr: errorMgr,
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)),
writeLimiter: writeLimiter,
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/manual/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ go_library(
cgo = True,
importpath = "github.com/pingcap/tidb/br/pkg/lightning/manual",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_atomic//:atomic"],
)
31 changes: 28 additions & 3 deletions br/pkg/lightning/manual/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,33 @@

package manual

type Allocator struct{}
import (
"fmt"

func (Allocator) Alloc(n int) []byte { return New(n) }
"go.uber.org/atomic"
)

func (Allocator) Free(b []byte) { Free(b) }
type Allocator struct {
RefCnt *atomic.Int64
}

func (a Allocator) Alloc(n int) []byte {
if a.RefCnt != nil {
a.RefCnt.Add(1)
}
return New(n)
}

func (a Allocator) Free(b []byte) {
if a.RefCnt != nil {
a.RefCnt.Add(-1)
}
Free(b)
}

func (a Allocator) CheckRefCnt() error {
if a.RefCnt != nil && a.RefCnt.Load() != 0 {
return fmt.Errorf("memory leak detected, refCnt: %d", a.RefCnt.Load())
}
return nil
}
29 changes: 28 additions & 1 deletion ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (ei *engineInfo) Clean() {
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
ei.openedEngine = nil
err = ei.closeWriters()
if err != nil {
logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
// Here the local intermediate files will be removed.
err = closedEngine.Cleanup(ei.ctx)
if err != nil {
Expand All @@ -101,8 +106,14 @@ func (ei *engineInfo) ImportAndClean() error {
return err1
}
ei.openedEngine = nil
err := ei.closeWriters()
if err != nil {
logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}

err := ei.diskRoot.UpdateUsageAndQuota()
err = ei.diskRoot.UpdateUsageAndQuota()
if err != nil {
logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
Expand Down Expand Up @@ -181,6 +192,22 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) {
}, nil
}

func (ei *engineInfo) closeWriters() error {
var firstErr error
for wid := range ei.writerCache.Keys() {
if w, ok := ei.writerCache.Load(wid); ok {
_, err := w.Close(ei.ctx)
if err != nil {
if firstErr == nil {
firstErr = err
}
}
}
ei.writerCache.Delete(wid)
}
return firstErr
}

// WriteRow Write one row into local writer buffer.
func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error {
kvs := make([]common.KvPair, 1)
Expand Down
1 change: 1 addition & 0 deletions ddl/ingest/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for ingest"
LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for ingest"
LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage"
LitErrCloseWriterErr string = "[ddl-ingest] close writer error"
)

func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error {
Expand Down
2 changes: 1 addition & 1 deletion docs/design/2019-11-05-index-advisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ for {
Note that executing `Swap and Re-evaluate` algorithm is necessary as the `reduced_cost` sometimes is a joint effect of several indexes and it's hard to tell each index's independent contribution to the final `reduced_cost`. For example, assume there is an extremely slow query in input workload and the desired indexes for this query is `a` and `b`. However, the number of allowed recommended indexes for the whole workload is limited and for some reason, `a` ranks top `n` in the final score list while `b` is not. But there are chances that without `b`, `a` can no more optimize that extremely slow query.

----------------------------------------------
### A quick exmaple for single-column index recommendation
### A quick example for single-column index recommendation

**Workload**:

Expand Down
2 changes: 1 addition & 1 deletion docs/design/2020-01-24-collations.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ The following features of the general collation algorithm will be supported:
* Tertiary Weight i.e. case
* PAD / NOPAD

All of them are supported by `text/collate` package of Go, so it is possible to map Go collations to some of UCA-based collations in MySQL like `utf8mb4_unicode_ci`/`utf8mb4_0900_ai_ci`, if we ignore the differences between UCA versions: current `text/collate` uses UCA version `6.2.0` and it is not changable. However, the collations in MySQL are with different UCA versions marked in the names, for example, `utf8mb4_0900_ai_ci` uses version `9.0`.
All of them are supported by `text/collate` package of Go, so it is possible to map Go collations to some of UCA-based collations in MySQL like `utf8mb4_unicode_ci`/`utf8mb4_0900_ai_ci`, if we ignore the differences between UCA versions: current `text/collate` uses UCA version `6.2.0` and it is not changeable. However, the collations in MySQL are with different UCA versions marked in the names, for example, `utf8mb4_0900_ai_ci` uses version `9.0`.

For non-standard UCA implementations in MySQL, i.e. the `utf8mb4_general_ci`. The implementation depends on our choice to the [Compatibility with MySQL](#compatibility-with-mysql) chapter, if a 100% compatibility of `utf8mb4_general_ci` is chosen, we need to implement it by our hands.

Expand Down
1 change: 1 addition & 0 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ type PlanReplayerDumpTask struct {
TblStats map[int64]interface{}

// variables used to dump the plan
StartTS uint64
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
SessionVars *variable.SessionVars
Expand Down
42 changes: 37 additions & 5 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
)

const (
// planReplayerSQLMeta indicates sql meta path for plan replayer
planReplayerSQLMeta = "sql_meta.toml"
// PlanReplayerConfigFile indicates config file path for plan replayer
PlanReplayerConfigFile = "config.toml"
// PlanReplayerMetaFile meta file path for plan replayer
Expand All @@ -55,6 +57,11 @@ const (
PlanReplayerGlobalBindingFile = "global_bindings.sql"
)

const (
// PlanReplayerSQLMetaStartTS indicates the startTS in plan replayer sql meta
PlanReplayerSQLMetaStartTS = "startTS"
)

type tableNamePair struct {
DBName string
TableName string
Expand Down Expand Up @@ -131,6 +138,7 @@ func (tne *tableNameExtractor) handleIsView(t *ast.TableName) (bool, error) {
// DumpPlanReplayerInfo will dump the information about sqls.
// The files will be organized into the following format:
/*
|-sql_meta.toml
|-meta.txt
|-schema
| |-db1.table1.schema.txt
Expand Down Expand Up @@ -164,7 +172,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
sessionVars := task.SessionVars
execStmts := task.ExecStmts
zw := zip.NewWriter(zf)
records := generateRecords(task)
var records []PlanReplayerStatusRecord
defer func() {
if err != nil {
logutil.BgLogger().Error("dump plan replayer failed", zap.Error(err))
Expand All @@ -183,6 +191,12 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
}
insertPlanReplayerStatus(ctx, sctx, records)
}()

// Dump SQLMeta
if err = dumpSQLMeta(zw, task); err != nil {
return err
}

// Dump config
if err = dumpConfig(zw); err != nil {
return err
Expand Down Expand Up @@ -244,10 +258,11 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
}

if len(task.EncodedPlan) > 0 {
records = generateRecords(task)
return dumpEncodedPlan(sctx, zw, task.EncodedPlan)
}
// Dump explain
return dumpExplain(sctx, zw, execStmts, task.Analyze)
return dumpExplain(sctx, zw, task, &records)
}

func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord {
Expand All @@ -265,6 +280,19 @@ func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord {
return records
}

func dumpSQLMeta(zw *zip.Writer, task *PlanReplayerDumpTask) error {
cf, err := zw.Create(planReplayerSQLMeta)
if err != nil {
return errors.AddStack(err)
}
varMap := make(map[string]string)
varMap[PlanReplayerSQLMetaStartTS] = strconv.FormatUint(task.StartTS, 10)
if err := toml.NewEncoder(cf).Encode(varMap); err != nil {
return errors.AddStack(err)
}
return nil
}

func dumpConfig(zw *zip.Writer) error {
cf, err := zw.Create(PlanReplayerConfigFile)
if err != nil {
Expand Down Expand Up @@ -488,12 +516,12 @@ func dumpEncodedPlan(ctx sessionctx.Context, zw *zip.Writer, encodedPlan string)
return nil
}

func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNode, isAnalyze bool) error {
for i, stmtExec := range execStmts {
func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, task *PlanReplayerDumpTask, records *[]PlanReplayerStatusRecord) error {
for i, stmtExec := range task.ExecStmts {
sql := stmtExec.Text()
var recordSets []sqlexec.RecordSet
var err error
if isAnalyze {
if task.Analyze {
// Explain analyze
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain analyze %s", sql))
if err != nil {
Expand Down Expand Up @@ -522,6 +550,10 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNod
return err
}
}
*records = append(*records, PlanReplayerStatusRecord{
OriginSQL: sql,
Token: task.FileName,
})
}
return nil
}
Expand Down
13 changes: 9 additions & 4 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,17 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
if c.Ctx.GetSessionVars().EnablePlanReplayerCapture && !c.Ctx.GetSessionVars().InRestrictedSQL {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode)
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}

return stmt, nil
}

func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode) {
func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
Expand All @@ -178,21 +182,22 @@ func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode)
sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode, startTS)
return
}
}
}
}

func sendPlanReplayerDumpTask(sqlDigest, planDigest string, sctx sessionctx.Context, stmtNode ast.StmtNode) {
func sendPlanReplayerDumpTask(sqlDigest, planDigest string, sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: domain.PlanReplayerTaskKey{
SQLDigest: sqlDigest,
PlanDigest: planDigest,
},
StartTS: startTS,
EncodePlan: GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.ClearStmtVars()
vars.PrevFoundInBinding = vars.FoundInBinding
vars.FoundInBinding = false
vars.DurationWaitTS = 0
return
}

Expand Down
1 change: 1 addition & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func checkFileName(s string) bool {
"global_bindings.sql",
"sql/sql0.sql",
"explain/sql0.txt",
"sql_meta.toml",
}
for _, f := range files {
if strings.Compare(f, s) == 0 {
Expand Down
22 changes: 15 additions & 7 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -133,7 +134,12 @@ func (e *PlanReplayerExec) createFile() error {
func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
startTS, err := sessiontxn.GetTxnManager(e.ctx).GetStmtReadTS()
if err != nil {
return err
}
task := &domain.PlanReplayerDumpTask{
StartTS: startTS,
FileName: fileName,
Zf: zf,
SessionVars: e.ctx.GetSessionVars(),
Expand Down Expand Up @@ -375,21 +381,23 @@ func createSchemaAndItems(ctx sessionctx.Context, f *zip.File) error {
if err != nil {
return errors.AddStack(err)
}
sqls := strings.Split(buf.String(), ";")
if len(sqls) != 3 {
return errors.New("plan replayer: create schema and tables failed")
}
originText := buf.String()
index1 := strings.Index(originText, ";")
createDatabaseSQL := originText[:index1+1]
index2 := strings.Index(originText[index1+1:], ";")
useDatabaseSQL := originText[index1+1:][:index2+1]
createTableSQL := originText[index1+1:][index2+1:]
c := context.Background()
// create database if not exists
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[0])
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, createDatabaseSQL)
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
// use database
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[1])
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, useDatabaseSQL)
if err != nil {
return err
}
// create table or view
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[2])
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, createTableSQL)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 4c2a986

Please sign in to comment.