Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Retry when placement PutBundles failed #30590

Merged
merged 3 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if job.Type == model.ActionAddTablePartition {
// It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo.
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundles(context.TODO(), rollbackBundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -1208,7 +1208,7 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
return ver, errors.Trace(err)
}

err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -1412,7 +1412,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func onAlterPlacementPolicy(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
cp := bundle.Clone()
bundles = append(bundles, cp.Reset(placement.RuleIndexPartition, []int64{id}))
}
err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down
8 changes: 4 additions & 4 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
}

// Send the placement bundle to PD.
err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -580,7 +580,7 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
return ver, errors.Trace(err)
}

err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func onAlterTablePartitionOptions(d *ddlCtx, t *meta.Meta, job *model.Job) (ver

// Send the placement bundle to PD.
if bundle != nil {
err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle})
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
}

if err != nil {
Expand Down Expand Up @@ -1353,7 +1353,7 @@ func onAlterTablePlacement(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,

// Send the placement bundle to PD.
if bundle != nil {
err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle})
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
}

if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions domain/infosync/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infosync

import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/dbterror"
)

var (
// ErrHTTPServiceError means we got a http response with a status code which is not '2xx'
ErrHTTPServiceError = dbterror.ClassDomain.NewStdErr(
errno.ErrHTTPServiceError, mysql.Message("HTTP request failed with status %s", nil),
)
)
41 changes: 40 additions & 1 deletion domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ const (
TopologyPrometheus = "/topology/prometheus"
// TablePrometheusCacheExpiry is the expiry time for prometheus address cache.
TablePrometheusCacheExpiry = 10 * time.Second
// RequestRetryInterval is the sleep time before next retry for http request
RequestRetryInterval = 200 * time.Millisecond
// SyncBundlesMaxRetry is the max retry times for sync placement bundles
SyncBundlesMaxRetry = 3
)

// ErrPrometheusAddrIsNotSet is the error that Prometheus address is not set in PD and etcd
Expand Down Expand Up @@ -353,7 +357,7 @@ func doRequest(ctx context.Context, addrs []string, route, method string, body i
return nil, err
}
if res.StatusCode != http.StatusOK {
err = errors.Errorf("%s", bodyBytes)
err = ErrHTTPServiceError.FastGen("%s", bodyBytes)
if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusPreconditionFailed {
err = nil
bodyBytes = nil
Expand Down Expand Up @@ -427,6 +431,16 @@ func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error)

// PutRuleBundles is used to post specific rule bundles to PD.
func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
failpoint.Inject("putRuleBundlesError", func(isServiceError failpoint.Value) {
var err error
if isServiceError.(bool) {
err = ErrHTTPServiceError.FastGen("mock service error")
} else {
err = errors.New("mock other error")
}
failpoint.Return(err)
})

is, err := getGlobalInfoSyncer()
if err != nil {
return err
Expand All @@ -435,6 +449,31 @@ func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
return is.placementManager.PutRuleBundles(ctx, bundles)
}

// PutRuleBundlesWithRetry will retry for specified times when PutRuleBundles failed
func PutRuleBundlesWithRetry(ctx context.Context, bundles []*placement.Bundle, maxRetry int, interval time.Duration) (err error) {
if maxRetry < 0 {
maxRetry = 0
}

for i := 0; i <= maxRetry; i++ {
if err = PutRuleBundles(ctx, bundles); err == nil || ErrHTTPServiceError.Equal(err) {
return err
}

if i != maxRetry {
logutil.BgLogger().Warn("Error occurs when PutRuleBundles, retry", zap.Error(err))
time.Sleep(interval)
}
}

return
}

// PutRuleBundlesWithDefaultRetry will retry for default times
func PutRuleBundlesWithDefaultRetry(ctx context.Context, bundles []*placement.Bundle) (err error) {
return PutRuleBundlesWithRetry(ctx, bundles, SyncBundlesMaxRetry, RequestRetryInterval)
}

func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
allInfo := make(map[string]*ServerInfo)
if is.etcdCli == nil {
Expand Down
65 changes: 65 additions & 0 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/testbridge"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/integration"
Expand Down Expand Up @@ -145,3 +147,66 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}
return len(resp.Kvs) == 1, nil
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
require.NoError(t, err)
bundle = bundle.Reset(placement.RuleIndexTable, []int64{1024})

t.Run("serviceErrorShouldNotRetry", func(t *testing.T) {
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
}()

err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
require.Error(t, err)
require.Equal(t, "[domain:8243]mock service error", err.Error())

got, err := GetRuleBundle(context.TODO(), bundle.ID)
require.NoError(t, err)
require.True(t, got.IsEmpty())
})

t.Run("nonServiceErrorShouldRetry", func(t *testing.T) {
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "3*return(false)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
}()

err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
require.NoError(t, err)

got, err := GetRuleBundle(context.TODO(), bundle.ID)
require.NoError(t, err)

gotJSON, err := json.Marshal(got)
require.NoError(t, err)

expectJSON, err := json.Marshal(bundle)
require.NoError(t, err)

require.Equal(t, expectJSON, gotJSON)
})

t.Run("nonServiceErrorRetryAndFail", func(t *testing.T) {
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "4*return(false)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
}()

err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
require.Error(t, err)
require.Equal(t, "mock other error", err.Error())

got, err := GetRuleBundle(context.TODO(), bundle.ID)
require.NoError(t, err)
require.True(t, got.IsEmpty())
})
}
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ const (
ErrPlacementPolicyWithDirectOption = 8240
ErrPlacementPolicyInUse = 8241
ErrOptOnCacheTable = 8242
ErrHTTPServiceError = 8243
// TiKV/PD/TiFlash errors.
ErrPDServerTimeout = 9001
ErrTiKVServerTimeout = 9002
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,11 @@ error = '''
Information schema is changed during the execution of the statement(for example, table definition may be updated by other DDL ran in parallel). If you see this error often, try increasing `tidb_max_delta_schema_count`. [try again later]
'''

["domain:8243"]
error = '''
HTTP request failed with status %s
'''

["domain:9009"]
error = '''
Prometheus address is not set in PD and etcd
Expand Down
2 changes: 1 addition & 1 deletion store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,7 @@ func (w *GCWorker) doGCPlacementRules(dr util.DelRangeTask) (err error) {
for _, id := range physicalTableIDs {
bundles = append(bundles, placement.NewBundle(id))
}
return infosync.PutRuleBundles(context.TODO(), bundles)
return infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
}

func (w *GCWorker) doGCLabelRules(dr util.DelRangeTask) (err error) {
Expand Down