Skip to content

Commit

Permalink
meta: fix the allocator batch size compute logic (#17271) (#17548)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored Jun 1, 2020
1 parent 58fc5bf commit 47571bc
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 16 deletions.
49 changes: 33 additions & 16 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ func (alloc *allocator) Rebase(tableID, requiredBase int64, allocIDs bool) error

// NextStep return new auto id step according to previous step and consuming time.
func NextStep(curStep int64, consumeDur time.Duration) int64 {
failpoint.Inject("mockAutoIDCustomize", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(3)
}
})
failpoint.Inject("mockAutoIDChange", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(step)
Expand Down Expand Up @@ -387,21 +392,19 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64, increment, offset
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep = NextStep(alloc.step, consumeDur)
}
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep <= n1 {
nextStep = mathutil.MinInt64(n1*2, maxStep)
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err1 error
newBase, err1 = m.GetAutoTableID(alloc.dbID, tableID)
if err1 != nil {
return err1
}
// CalcNeededBatchSize calculates the total batch size needed on global base.
n1 = CalcNeededBatchSize(newBase, int64(n), increment, offset, alloc.isUnsigned)
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep < n1 {
nextStep = n1
}
tmpStep := mathutil.MinInt64(math.MaxInt64-newBase, nextStep)
// The global rest is not enough for alloc.
if tmpStep < n1 {
Expand All @@ -414,6 +417,10 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64, increment, offset
if err != nil {
return 0, 0, err
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
alloc.lastAllocTime = time.Now()
if newBase == math.MaxInt64 {
return 0, 0, ErrAutoincReadFailed
Expand Down Expand Up @@ -454,21 +461,19 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64, increment, offse
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep = NextStep(alloc.step, consumeDur)
}
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep <= n1 {
nextStep = mathutil.MinInt64(n1*2, maxStep)
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err1 error
newBase, err1 = m.GetAutoTableID(alloc.dbID, tableID)
if err1 != nil {
return err1
}
// CalcNeededBatchSize calculates the total batch size needed on new base.
n1 = CalcNeededBatchSize(newBase, int64(n), increment, offset, alloc.isUnsigned)
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep < n1 {
nextStep = n1
}
tmpStep := int64(mathutil.MinUint64(math.MaxUint64-uint64(newBase), uint64(nextStep)))
// The global rest is not enough for alloc.
if tmpStep < n1 {
Expand All @@ -481,6 +486,10 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64, increment, offse
if err != nil {
return 0, 0, err
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
alloc.lastAllocTime = time.Now()
if uint64(newBase) == math.MaxUint64 {
return 0, 0, ErrAutoincReadFailed
Expand All @@ -497,3 +506,11 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64, increment, offse
alloc.base = int64(uint64(alloc.base) + uint64(n1))
return min, alloc.base, nil
}

// TestModifyBaseAndEndInjection exported for testing modifying the base and end.
func TestModifyBaseAndEndInjection(alloc Allocator, base, end int64) {
alloc.(*allocator).mu.Lock()
alloc.(*allocator).base = base
alloc.(*allocator).end = end
alloc.(*allocator).mu.Unlock()
}
51 changes: 51 additions & 0 deletions meta/autoid/autoid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,54 @@ func BenchmarkAllocator_Alloc(b *testing.B) {
alloc.Alloc(2, 1, 1, 1)
}
}

// Fix a computation logic bug in allocator computation.
func (*testSuite) TestAllocComputationIssue(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/meta/autoid/mockAutoIDCustomize", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/meta/autoid/mockAutoIDCustomize"), IsNil)
}()

store, err := mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
defer store.Close()

err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
err = m.CreateDatabase(&model.DBInfo{ID: 1, Name: model.NewCIStr("a")})
c.Assert(err, IsNil)
err = m.CreateTableOrView(1, &model.TableInfo{ID: 1, Name: model.NewCIStr("t")})
c.Assert(err, IsNil)
err = m.CreateTableOrView(1, &model.TableInfo{ID: 2, Name: model.NewCIStr("t1")})
c.Assert(err, IsNil)
return nil
})
c.Assert(err, IsNil)

// Since the test here is applicable to any type of allocators, autoid.RowIDAllocType is chosen.
unsignedAlloc := autoid.NewAllocator(store, 1, true)
c.Assert(unsignedAlloc, NotNil)
signedAlloc := autoid.NewAllocator(store, 1, false)
c.Assert(signedAlloc, NotNil)

// the next valid two value must be 13 & 16, batch size = 6.
err = unsignedAlloc.Rebase(1, 10, false)
c.Assert(err, IsNil)
// the next valid two value must be 10 & 13, batch size = 6.
err = signedAlloc.Rebase(2, 7, false)
c.Assert(err, IsNil)
// Simulate the rest cache is not enough for next batch, assuming 10 & 13, batch size = 4.
autoid.TestModifyBaseAndEndInjection(unsignedAlloc, 9, 9)
// Simulate the rest cache is not enough for next batch, assuming 10 & 13, batch size = 4.
autoid.TestModifyBaseAndEndInjection(signedAlloc, 4, 6)

// Here will recompute the new allocator batch size base on new base = 10, which will get 6.
min, max, err := unsignedAlloc.Alloc(1, 2, 3, 1)
c.Assert(err, IsNil)
c.Assert(min, Equals, int64(10))
c.Assert(max, Equals, int64(16))
min, max, err = signedAlloc.Alloc(2, 2, 3, 1)
c.Assert(err, IsNil)
c.Assert(min, Equals, int64(7))
c.Assert(max, Equals, int64(13))
}

0 comments on commit 47571bc

Please sign in to comment.