Skip to content

Commit

Permalink
Merge branch 'release-3.0' into automated-cherry-pick-of-pingcap#11237-…
Browse files Browse the repository at this point in the history
…release-3.0
  • Loading branch information
SunRunAway authored Jul 15, 2019
2 parents 5f6c7d5 + f83a13b commit 7229ef7
Show file tree
Hide file tree
Showing 59 changed files with 1,959 additions and 412 deletions.
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ func GetGlobalConfig() *Config {
return globalConf.Load().(*Config)
}

// StoreGlobalConfig stores a new config to the globalConf. It mostly uses in the test to avoid some data races.
func StoreGlobalConfig(config *Config) {
globalConf.Store(config)
}

// ReloadGlobalConfig reloads global configuration for this server.
func ReloadGlobalConfig() error {
confReloadLock.Lock()
Expand Down
29 changes: 18 additions & 11 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
field_types "github.com/pingcap/parser/types"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -1300,23 +1301,29 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e

err = d.doDDLJob(ctx, job)
if err == nil {
var preSplitAndScatter func()
// do pre-split and scatter.
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
preSplitAndScatter = func() { preSplitTableRegion(d.store, tbInfo, ctx.GetSessionVars().WaitTableSplitFinish) }
} else if atomic.LoadUint32(&EnableSplitTableRegion) != 0 {
sp, ok := d.store.(kv.SplitableStore)
if ok && atomic.LoadUint32(&EnableSplitTableRegion) != 0 {
var (
preSplit func()
scatterRegion bool
)
val, err := variable.GetGlobalSystemVar(ctx.GetSessionVars(), variable.TiDBScatterRegion)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] won't scatter region", zap.Error(err))
} else {
scatterRegion = variable.TiDBOptOn(val)
}
pi := tbInfo.GetPartitionInfo()
if pi != nil {
preSplitAndScatter = func() { splitPartitionTableRegion(d.store, pi) }
preSplit = func() { splitPartitionTableRegion(sp, pi, scatterRegion) }
} else {
preSplitAndScatter = func() { splitTableRegion(d.store, tbInfo.ID) }
preSplit = func() { splitTableRegion(sp, tbInfo, scatterRegion) }
}
}
if preSplitAndScatter != nil {
if ctx.GetSessionVars().WaitTableSplitFinish {
preSplitAndScatter()
if scatterRegion {
preSplit()
} else {
go preSplitAndScatter()
go preSplit()
}
}

Expand Down
121 changes: 121 additions & 0 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

func splitPartitionTableRegion(store kv.SplitableStore, pi *model.PartitionInfo, scatter bool) {
// Max partition count is 4096, should we sample and just choose some of the partition to split?
regionIDs := make([]uint64, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
regionIDs = append(regionIDs, splitRecordRegion(store, def.ID, scatter))
}
if scatter {
waitScatterRegionFinish(store, regionIDs)
}
}

func splitTableRegion(store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
regionIDs := make([]uint64, 0, len(tbInfo.Indices)+1)
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
// Example:
// ShardRowIDBits = 5
// PreSplitRegions = 3
//
// then will pre-split 2^(3-1) = 4 regions.
//
// in this code:
// max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16
// step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4;
//
// then split regionID is below:
// 4 << 59 = 2305843009213693952
// 8 << 59 = 4611686018427387904
// 12 << 59 = 6917529027641081856
//
// The 4 pre-split regions range is below:
// 0 ~ 2305843009213693952
// 2305843009213693952 ~ 4611686018427387904
// 4611686018427387904 ~ 6917529027641081856
// 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 )
//
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
// The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number.
// So we only need to split the region for the positive number.
max := int64(1 << (tbInfo.ShardRowIDBits - 1))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}
} else {
regionIDs = append(regionIDs, splitRecordRegion(store, tbInfo.ID, scatter))
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
waitScatterRegionFinish(store, regionIDs)
}
}

func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 {
tableStartKey := tablecodec.GenTablePrefix(tableID)
regionID, err := store.SplitRegion(tableStartKey, scatter)
if err != nil {
// It will be automatically split by TiKV later.
logutil.Logger(context.Background()).Warn("[ddl] split table region failed", zap.Error(err))
}
return regionID
}

func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 {
regionIDs := make([]uint64, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := store.SplitRegion(indexPrefix, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name),
zap.Stringer("index", idx.Name),
zap.Error(err))
}
regionIDs = append(regionIDs, regionID)
}
return regionIDs
}

func waitScatterRegionFinish(store kv.SplitableStore, regionIDs []uint64) {
for _, regionID := range regionIDs {
err := store.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
}
95 changes: 0 additions & 95 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
Expand Down Expand Up @@ -340,99 +338,6 @@ func checkSafePoint(w *worker, snapshotTS uint64) error {
return gcutil.ValidateSnapshot(ctx, snapshotTS)
}

type splitableStore interface {
SplitRegion(splitKey kv.Key) error
SplitRegionAndScatter(splitKey kv.Key) (uint64, error)
WaitScatterRegionFinish(regionID uint64) error
}

func splitPartitionTableRegion(store kv.Storage, pi *model.PartitionInfo) {
// Max partition count is 4096, should we sample and just choose some of the partition to split?
for _, def := range pi.Definitions {
splitTableRegion(store, def.ID)
}
}

func splitTableRegion(store kv.Storage, tableID int64) {
s, ok := store.(splitableStore)
if !ok {
return
}
tableStartKey := tablecodec.GenTablePrefix(tableID)
if err := s.SplitRegion(tableStartKey); err != nil {
// It will be automatically split by TiKV later.
logutil.Logger(ddlLogCtx).Warn("[ddl] split table region failed", zap.Error(err))
}
}

func preSplitTableRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) {
s, ok := store.(splitableStore)
if !ok {
return
}
regionIDs := make([]uint64, 0, 1<<(tblInfo.PreSplitRegions-1)+len(tblInfo.Indices))

// Example:
// ShardRowIDBits = 5
// PreSplitRegions = 3
//
// then will pre-split 2^(3-1) = 4 regions.
//
// in this code:
// max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16
// step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4;
//
// then split regionID is below:
// 4 << 59 = 2305843009213693952
// 8 << 59 = 4611686018427387904
// 12 << 59 = 6917529027641081856
//
// The 4 pre-split regions range is below:
// 0 ~ 2305843009213693952
// 2305843009213693952 ~ 4611686018427387904
// 4611686018427387904 ~ 6917529027641081856
// 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 )
//
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions))
// The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number.
// So we only need to split the region for the positive number.
max := int64(1 << (tblInfo.ShardRowIDBits - 1))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tblInfo.ShardRowIDBits)
recordPrefix := tablecodec.GenTableRecordPrefix(tblInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := s.SplitRegionAndScatter(key)
if err != nil {
logutil.Logger(ddlLogCtx).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}

// Split index region.
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := s.SplitRegionAndScatter(indexPrefix)
if err != nil {
logutil.Logger(ddlLogCtx).Warn("[ddl] pre split table index region failed", zap.String("index", idx.Name.L), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}
if !waitTableSplitFinish {
return
}
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.Logger(ddlLogCtx).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
}

func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) {
alloc := autoid.NewAllocator(store, tblInfo.GetDBID(schemaID), tblInfo.IsAutoIncColUnsigned())
tbl, err := table.TableFromMeta(alloc, tblInfo)
Expand Down
52 changes: 52 additions & 0 deletions executor/admin_plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/util/chunk"
)

// AdminPluginsExec indicates AdminPlugins executor.
type AdminPluginsExec struct {
baseExecutor
Action core.AdminPluginsAction
Plugins []string
}

// Next implements the Executor Next interface.
func (e *AdminPluginsExec) Next(ctx context.Context, _ *chunk.Chunk) error {
switch e.Action {
case core.Enable:
return e.changeDisableFlagAndFlush(false)
case core.Disable:
return e.changeDisableFlagAndFlush(true)
}
return nil
}

func (e *AdminPluginsExec) changeDisableFlagAndFlush(disabled bool) error {
dom := domain.GetDomain(e.ctx)
for _, pluginName := range e.Plugins {
err := plugin.ChangeDisableFlagAndFlush(dom, pluginName, disabled)
if err != nil {
return err
}
}
return nil
}
7 changes: 7 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildChecksumTable(v)
case *plannercore.ReloadExprPushdownBlacklist:
return b.buildReloadExprPushdownBlacklist(v)
case *plannercore.AdminPlugins:
return b.buildAdminPlugins(v)
case *plannercore.DDL:
return b.buildDDL(v)
case *plannercore.Deallocate:
Expand Down Expand Up @@ -467,6 +469,10 @@ func (b *executorBuilder) buildReloadExprPushdownBlacklist(v *plannercore.Reload
return &ReloadExprPushdownBlacklistExec{baseExecutor{ctx: b.ctx}}
}

func (b *executorBuilder) buildAdminPlugins(v *plannercore.AdminPlugins) Executor {
return &AdminPluginsExec{baseExecutor: baseExecutor{ctx: b.ctx}, Action: v.Action, Plugins: v.Plugins}
}

func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor {
base := newBaseExecutor(b.ctx, nil, v.ExplainID())
base.initCap = chunk.ZeroCapacity
Expand Down Expand Up @@ -547,6 +553,7 @@ func (b *executorBuilder) buildShow(v *plannercore.Show) Executor {
DBName: model.NewCIStr(v.DBName),
Table: v.Table,
Column: v.Column,
IndexName: v.IndexName,
User: v.User,
Roles: v.Roles,
IfNotExists: v.IfNotExists,
Expand Down
Loading

0 comments on commit 7229ef7

Please sign in to comment.