Skip to content

Commit

Permalink
statstics: correctly handle error when merging global stats (#47770)
Browse files Browse the repository at this point in the history
close #47771
  • Loading branch information
hawkingrei authored Oct 19, 2023
1 parent 90bd2dd commit d74298c
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 29 deletions.
2 changes: 1 addition & 1 deletion pkg/statistics/handle/globalstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ go_test(
"topn_bench_test.go",
],
flaky = True,
shard_count = 14,
shard_count = 18,
deps = [
":globalstats",
"//pkg/config",
Expand Down
103 changes: 75 additions & 28 deletions pkg/statistics/handle/globalstats/global_stats_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -79,8 +81,11 @@ type AsyncMergePartitionStats2GlobalStats struct {
PartitionDefinition map[int64]model.PartitionDefinition
tableInfo map[int64]*model.TableInfo
// key is partition id and histID
skipPartition map[skipItem]struct{}
exitWhenErrChan chan struct{}
skipPartition map[skipItem]struct{}
// ioWorker meet error, it will close this channel to notify cpuWorker.
ioWorkerExitWhenErrChan chan struct{}
// cpuWorker exit, it will close this channel to notify ioWorker.
cpuWorkerExitChan chan struct{}
globalTableInfo *model.TableInfo
histIDs []int64
globalStatsNDV []int64
Expand All @@ -97,20 +102,21 @@ func NewAsyncMergePartitionStats2GlobalStats(
is infoschema.InfoSchema) (*AsyncMergePartitionStats2GlobalStats, error) {
partitionNum := len(globalTableInfo.Partition.Definitions)
return &AsyncMergePartitionStats2GlobalStats{
statsHandle: statsHandle,
cmsketch: make(chan mergeItem[*statistics.CMSketch], 5),
fmsketch: make(chan mergeItem[*statistics.FMSketch], 5),
histogramAndTopn: make(chan mergeItem[*StatsWrapper]),
PartitionDefinition: make(map[int64]model.PartitionDefinition),
tableInfo: make(map[int64]*model.TableInfo),
partitionIDs: make([]int64, 0, partitionNum),
exitWhenErrChan: make(chan struct{}),
skipPartition: make(map[skipItem]struct{}),
allPartitionStats: make(map[int64]*statistics.Table),
globalTableInfo: globalTableInfo,
histIDs: histIDs,
is: is,
partitionNum: partitionNum,
statsHandle: statsHandle,
cmsketch: make(chan mergeItem[*statistics.CMSketch], 5),
fmsketch: make(chan mergeItem[*statistics.FMSketch], 5),
histogramAndTopn: make(chan mergeItem[*StatsWrapper]),
PartitionDefinition: make(map[int64]model.PartitionDefinition),
tableInfo: make(map[int64]*model.TableInfo),
partitionIDs: make([]int64, 0, partitionNum),
ioWorkerExitWhenErrChan: make(chan struct{}),
cpuWorkerExitChan: make(chan struct{}),
skipPartition: make(map[skipItem]struct{}),
allPartitionStats: make(map[int64]*statistics.Table),
globalTableInfo: globalTableInfo,
histIDs: histIDs,
is: is,
partitionNum: partitionNum,
}, nil
}

Expand Down Expand Up @@ -218,25 +224,32 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealErrPartitionColumnStatsMissin
func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context, isIndex bool) (err error) {
defer func() {
if r := recover(); r != nil {
close(a.exitWhenErrChan)
logutil.BgLogger().Warn("ioWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats"))
close(a.ioWorkerExitWhenErrChan)
err = errors.New(fmt.Sprint(r))
}
}()
err = a.loadFmsketch(sctx, isIndex)
if err != nil {
close(a.exitWhenErrChan)
close(a.ioWorkerExitWhenErrChan)
return err
}
close(a.fmsketch)
err = a.loadCMsketch(sctx, isIndex)
if err != nil {
close(a.exitWhenErrChan)
close(a.ioWorkerExitWhenErrChan)
return err
}
close(a.cmsketch)
failpoint.Inject("PanicSameTime", func(val failpoint.Value) {
if val, _ := val.(bool); val {
time.Sleep(1 * time.Second)
panic("test for PanicSameTime")
}
})
err = a.loadHistogramAndTopN(sctx, a.globalTableInfo, isIndex)
if err != nil {
close(a.exitWhenErrChan)
close(a.ioWorkerExitWhenErrChan)
return err
}
close(a.histogramAndTopn)
Expand All @@ -246,13 +259,14 @@ func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context,
func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) {
defer func() {
if r := recover(); r != nil {
close(a.exitWhenErrChan)
logutil.BgLogger().Warn("cpuWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats"))
err = errors.New(fmt.Sprint(r))
}
close(a.cpuWorkerExitChan)
}()
a.dealFMSketch()
select {
case <-a.exitWhenErrChan:
case <-a.ioWorkerExitWhenErrChan:
return nil
default:
for i := 0; i < a.globalStats.Num; i++ {
Expand All @@ -267,10 +281,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.Statem
}
err = a.dealCMSketch()
if err != nil {
logutil.BgLogger().Warn("dealCMSketch failed", zap.Error(err), zap.String("category", "stats"))
return err
}
failpoint.Inject("PanicSameTime", func(val failpoint.Value) {
if val, _ := val.(bool); val {
time.Sleep(1 * time.Second)
panic("test for PanicSameTime")
}
})
err = a.dealHistogramAndTopN(stmtCtx, sctx, opts, isIndex, tz, analyzeVersion)
if err != nil {
logutil.BgLogger().Warn("dealHistogramAndTopN failed", zap.Error(err), zap.String("category", "stats"))
return err
}
return nil
Expand Down Expand Up @@ -337,7 +359,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadFmsketch(sctx sessionctx.Cont
case a.fmsketch <- mergeItem[*statistics.FMSketch]{
fmsketch, i,
}:
case <-a.exitWhenErrChan:
case <-a.cpuWorkerExitChan:
logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats"))
return nil
}
}
Expand Down Expand Up @@ -367,7 +390,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont
case a.cmsketch <- mergeItem[*statistics.CMSketch]{
cmsketch, i,
}:
case <-a.exitWhenErrChan:
case <-a.cpuWorkerExitChan:
logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats"))
return nil
}
}
Expand All @@ -376,6 +400,12 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont
}

func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx sessionctx.Context, tableInfo *model.TableInfo, isIndex bool) error {
failpoint.Inject("ErrorSameTime", func(val failpoint.Value) {
if val, _ := val.(bool); val {
time.Sleep(1 * time.Second)
failpoint.Return(errors.New("ErrorSameTime returned error"))
}
})
for i := 0; i < a.globalStats.Num; i++ {
hists := make([]*statistics.Histogram, 0, a.partitionNum)
topn := make([]*statistics.TopN, 0, a.partitionNum)
Expand All @@ -402,7 +432,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx session
case a.histogramAndTopn <- mergeItem[*StatsWrapper]{
NewStatsWrapper(hists, topn), i,
}:
case <-a.exitWhenErrChan:
case <-a.cpuWorkerExitChan:
logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats"))
return nil
}
}
Expand All @@ -422,13 +453,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealFMSketch() {
} else {
a.globalStats.Fms[fms.idx].MergeFMSketch(fms.item)
}
case <-a.exitWhenErrChan:
case <-a.ioWorkerExitWhenErrChan:
return
}
}
}

func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error {
failpoint.Inject("dealCMSketchErr", func(val failpoint.Value) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("dealCMSketch returned error"))
}
})
for {
select {
case cms, ok := <-a.cmsketch:
Expand All @@ -443,13 +479,24 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error {
return err
}
}
case <-a.exitWhenErrChan:
case <-a.ioWorkerExitWhenErrChan:
return nil
}
}
}

func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) {
failpoint.Inject("dealHistogramAndTopNErr", func(val failpoint.Value) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("dealHistogramAndTopNErr returned error"))
}
})
failpoint.Inject("ErrorSameTime", func(val failpoint.Value) {
if val, _ := val.(bool); val {
time.Sleep(1 * time.Second)
failpoint.Return(errors.New("ErrorSameTime returned error"))
}
})
for {
select {
case item, ok := <-a.histogramAndTopn:
Expand Down Expand Up @@ -478,7 +525,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stm
a.globalStats.Hg[item.idx].Buckets[j].NDV = 0
}
a.globalStats.Hg[item.idx].NDV = a.globalStatsNDV[item.idx]
case <-a.exitWhenErrChan:
case <-a.ioWorkerExitWhenErrChan:
return nil
}
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/statistics/handle/globalstats/globalstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@ func TestGlobalStatsPanicInIOWorker(t *testing.T) {
simpleTest(t)
}

func TestGlobalStatsWithCMSketchErr(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealCMSketchErr"
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
simpleTest(t)
}

func TestGlobalStatsWithHistogramAndTopNErr(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealHistogramAndTopNErr"
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
simpleTest(t)
}

func TestGlobalStatsPanicInCPUWorker(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicInCPUWorker"
require.NoError(t, failpoint.Enable(fpName, "panic(\"inject panic\")"))
Expand All @@ -85,6 +103,24 @@ func TestGlobalStatsPanicInCPUWorker(t *testing.T) {
simpleTest(t)
}

func TestGlobalStatsPanicSametime(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicSameTime"
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
simpleTest(t)
}

func TestGlobalStatsErrorSametime(t *testing.T) {
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/ErrorSameTime"
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
simpleTest(t)
}

func TestBuildGlobalLevelStats(t *testing.T) {
store := testkit.CreateMockStore(t)
testKit := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit d74298c

Please sign in to comment.