Skip to content

Commit

Permalink
local backend: fix worker err overriden by job generation err (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored and zimulala committed Nov 10, 2023
1 parent 38cb4f3 commit b52a427
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 18 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,9 @@ func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by
LowerBound: lowerBound,
UpperBound: upperBound,
}
failpoint.Inject("mockGetFirstAndLastKey", func() {
failpoint.Return(lowerBound, upperBound, nil)
})

iter := e.newKVIter(context.Background(), opt)
//nolint: errcheck
Expand Down
40 changes: 22 additions & 18 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,26 +1674,30 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region

failpoint.Label("afterStartWorker")

err := local.prepareAndSendJob(
workerCtx,
engine,
regionRanges,
regionSplitSize,
regionSplitKeys,
jobToWorkerCh,
&jobWg,
)
if err != nil {
firstErr.Set(err)
workGroup.Go(func() error {
err := local.prepareAndSendJob(
workerCtx,
engine,
regionRanges,
regionSplitSize,
regionSplitKeys,
jobToWorkerCh,
&jobWg,
)
if err != nil {
return err
}

jobWg.Wait()
workerCancel()
_ = workGroup.Wait()
return firstErr.Get()
return nil
})
if err := workGroup.Wait(); err != nil {
if !common.IsContextCanceledError(err) {
log.FromContext(ctx).Error("do import meets error", zap.Error(err))
}
firstErr.Set(err)
}

jobWg.Wait()
workerCancel()
firstErr.Set(workGroup.Wait())
firstErr.Set(ctx.Err())
return firstErr.Get()
}

Expand Down
37 changes: 37 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,6 +2151,43 @@ func TestCtxCancelIsIgnored(t *testing.T) {
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
}

func TestWorkerFailedWhenGeneratingJobs(t *testing.T) {
backup := maxRetryBackoffSecond
maxRetryBackoffSecond = 1
t.Cleanup(func() {
maxRetryBackoffSecond = backup
})

_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()")
t.Cleanup(func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")
})

initRanges := []common.Range{
{Start: []byte{'c'}, End: []byte{'d'}},
}

ctx := context.Background()
l := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 1,
},
splitCli: initTestSplitClient(
[][]byte{{1}, {11}},
panicSplitRegionClient{},
),
}
e := &Engine{}
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
}

func TestExternalEngine(t *testing.T) {
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker", "return()")
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd
func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()

if err := ctx.Err(); err != nil {
return nil, err
}

if c.hook != nil {
key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit)
}
Expand Down

0 comments on commit b52a427

Please sign in to comment.