Skip to content

Commit

Permalink
Merge branch 'master' into revert-35438-fix-mpp-netcost
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jun 24, 2022
2 parents 525703a + d0c1d1f commit 48e39c3
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 19 deletions.
13 changes: 8 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,14 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
// If RunWorker is true, we need campaign owner and do DDL job.
// Otherwise, we needn't do that.
if RunWorker {
d.ownerManager.SetBeOwnerHook(func() {
var err error
d.ddlSeqNumMu.seqNum, err = d.GetNextDDLSeqNum()
if err != nil {
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
}
})

err := d.ownerManager.CampaignOwner()
if err != nil {
return errors.Trace(err)
Expand All @@ -497,11 +505,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
asyncNotify(worker.ddlJobCh)
}

d.ddlSeqNumMu.seqNum, err = d.GetNextDDLSeqNum()
if err != nil {
return err
}

go d.schemaSyncer.StartCleanWork()
if config.TableLockEnabled() {
d.wg.Add(1)
Expand Down
31 changes: 21 additions & 10 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type Manager interface {
Cancel()
// RequireOwner requires the ownerManager is owner.
RequireOwner(ctx context.Context) error

// SetBeOwnerHook sets a hook. The hook is called before becoming an owner.
SetBeOwnerHook(hook func())
}

const (
Expand All @@ -68,16 +71,17 @@ type DDLOwnerChecker interface {

// ownerManager represents the structure which is used for electing owner.
type ownerManager struct {
id string // id is the ID of the manager.
key string
ctx context.Context
prompt string
logPrefix string
logCtx context.Context
etcdCli *clientv3.Client
cancel context.CancelFunc
elec unsafe.Pointer
wg sync.WaitGroup
id string // id is the ID of the manager.
key string
ctx context.Context
prompt string
logPrefix string
logCtx context.Context
etcdCli *clientv3.Client
cancel context.CancelFunc
elec unsafe.Pointer
wg sync.WaitGroup
beOwnerHook func()
}

// NewOwnerManager creates a new Manager.
Expand Down Expand Up @@ -117,6 +121,10 @@ func (m *ownerManager) RequireOwner(ctx context.Context) error {
return nil
}

func (m *ownerManager) SetBeOwnerHook(hook func()) {
m.beOwnerHook = hook
}

// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
var ManagerSessionTTL = 60

Expand Down Expand Up @@ -166,6 +174,9 @@ func (m *ownerManager) ResignOwner(ctx context.Context) error {
}

func (m *ownerManager) toBeOwner(elec *concurrency.Election) {
if m.beOwnerHook != nil {
m.beOwnerHook()
}
atomic.StorePointer(&m.elec, unsafe.Pointer(elec))
}

Expand Down
14 changes: 11 additions & 3 deletions owner/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ var _ Manager = &mockManager{}
// It's used for local store and testing.
// So this worker will always be the owner.
type mockManager struct {
owner int32
id string // id is the ID of manager.
cancel context.CancelFunc
owner int32
id string // id is the ID of manager.
cancel context.CancelFunc
beOwnerHook func()
}

// NewMockManager creates a new mock Manager.
Expand All @@ -52,6 +53,9 @@ func (m *mockManager) IsOwner() bool {
}

func (m *mockManager) toBeOwner() {
if m.beOwnerHook != nil {
m.beOwnerHook()
}
atomic.StoreInt32(&m.owner, 1)
}

Expand Down Expand Up @@ -91,3 +95,7 @@ func (m *mockManager) ResignOwner(ctx context.Context) error {
func (m *mockManager) RequireOwner(context.Context) error {
return nil
}

func (m *mockManager) SetBeOwnerHook(hook func()) {
m.beOwnerHook = hook
}
4 changes: 3 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,9 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti
pagingRange := resp.pbResp.Range
// only paging requests need to calculate the next ranges
if pagingRange == nil {
return nil, errors.New("lastRange in paging should not be nil")
// If the storage engine doesn't support paging protocol, it should have return all the region data.
// So we finish here.
return nil, nil
}
// calculate next ranges and grow the paging size
task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc)
Expand Down

0 comments on commit 48e39c3

Please sign in to comment.