Skip to content

Commit

Permalink
loaddata: don't split transaction and revert some change (#44136)
Browse files Browse the repository at this point in the history
ref #44078
  • Loading branch information
lance6716 authored May 25, 2023
1 parent 4d9a1f1 commit 6e11eef
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 487 deletions.
1 change: 0 additions & 1 deletion executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ go_library(
"//ddl/placement",
"//ddl/schematracker",
"//distsql",
"//disttask/loaddata",
"//domain",
"//domain/infosync",
"//errno",
Expand Down
1 change: 0 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,6 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
baseExecutor: base,
loadDataWorker: worker,
FileLocRef: v.FileLocRef,
OnDuplicate: v.OnDuplicate,
}
}

Expand Down
38 changes: 5 additions & 33 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ const (
addIndexOption = "add_index"
analyzeOption = "analyze_table"
threadOption = "thread"
batchSizeOption = "batch_size"
maxWriteSpeedOption = "max_write_speed"
splitFileOption = "split_file"
recordErrorsOption = "record_errors"
Expand All @@ -98,7 +97,6 @@ var (
addIndexOption: true,
analyzeOption: true,
threadOption: true,
batchSizeOption: true,
maxWriteSpeedOption: true,
splitFileOption: true,
recordErrorsOption: true,
Expand Down Expand Up @@ -163,7 +161,6 @@ type Plan struct {
AddIndex bool
Analyze config.PostOpLevel
ThreadCnt int64
BatchSize int64
MaxWriteSpeed config.ByteSize
SplitFile bool
MaxRecordedErrors int64
Expand Down Expand Up @@ -241,17 +238,10 @@ func getImportantSysVars(sctx sessionctx.Context) map[string]string {
return res
}

// NewPlan creates a new load data plan.
func NewPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table.Table) (*Plan, error) {
// NewPlanFromLoadDataPlan creates a import plan from LOAD DATA.
func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table.Table) (*Plan, error) {
fullTableName := common.UniqueTable(plan.Table.Schema.L, plan.Table.Name.L)
logger := log.L().With(zap.String("table", fullTableName))
var format string
if plan.Format != nil {
format = strings.ToLower(*plan.Format)
} else {
// without FORMAT 'xxx' clause, default to DELIMITED DATA
format = LoadDataFormatDelimitedData
}
charset := plan.Charset
if charset == nil {
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-character-set
Expand All @@ -267,23 +257,12 @@ func NewPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table.
plan.OnDuplicate != ast.OnDuplicateKeyHandlingIgnore

p := &Plan{
TableInfo: tbl.Meta(),
DBName: plan.Table.Schema.O,
DBID: plan.Table.DBInfo.ID,

DBName: plan.Table.Schema.O,
Path: plan.Path,
Format: format,
Format: LoadDataFormatDelimitedData,
Restrictive: restrictive,
IgnoreLines: plan.IgnoreLines,

SQLMode: userSctx.GetSessionVars().SQLMode,
Charset: charset,
ImportantSysVars: getImportantSysVars(userSctx),

DistSQLScanConcurrency: userSctx.GetSessionVars().DistSQLScanConcurrency(),
}
if err := p.initOptions(userSctx, plan.Options); err != nil {
return nil, err
Charset: charset,
}
return p, nil
}
Expand Down Expand Up @@ -430,7 +409,6 @@ func (p *Plan) initDefaultOptions() {
p.AddIndex = true
p.Analyze = config.OpLevelOptional
p.ThreadCnt = int64(threadCnt)
p.BatchSize = 1000
p.MaxWriteSpeed = unlimitedWriteSpeed
p.SplitFile = false
p.MaxRecordedErrors = 100
Expand Down Expand Up @@ -526,12 +504,6 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[batchSizeOption]; ok {
p.BatchSize, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull || p.BatchSize < 0 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[maxWriteSpeedOption]; ok {
v, isNull, err = opt.Value.EvalString(seCtx, chunk.Row{})
if err != nil || isNull {
Expand Down
7 changes: 0 additions & 7 deletions executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func TestInitDefaultOptions(t *testing.T) {
require.Equal(t, config.OpLevelOptional, plan.Analyze)
require.Equal(t, false, plan.Distributed)
require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt)
require.Equal(t, int64(1000), plan.BatchSize)
require.Equal(t, unlimitedWriteSpeed, plan.MaxWriteSpeed)
require.Equal(t, false, plan.SplitFile)
require.Equal(t, int64(100), plan.MaxRecordedErrors)
Expand Down Expand Up @@ -104,10 +103,6 @@ func TestInitOptions(t *testing.T) {
{OptionStr: threadOption + "=-100", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: threadOption + "=null", Err: exeerrors.ErrInvalidOptionVal},

{OptionStr: batchSizeOption + "='aa'", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: batchSizeOption + "='11aa'", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: batchSizeOption + "=null", Err: exeerrors.ErrInvalidOptionVal},

{OptionStr: maxWriteSpeedOption + "='aa'", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: maxWriteSpeedOption + "='11aa'", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: maxWriteSpeedOption + "=null", Err: exeerrors.ErrInvalidOptionVal},
Expand Down Expand Up @@ -159,7 +154,6 @@ func TestInitOptions(t *testing.T) {
analyzeOption+"='required', "+
distributedOption+"=false, "+
threadOption+"='100000', "+
batchSizeOption+"=2000, "+
maxWriteSpeedOption+"='200mib', "+
splitFileOption+"=true, "+
recordErrorsOption+"=123, "+
Expand All @@ -175,7 +169,6 @@ func TestInitOptions(t *testing.T) {
require.False(t, plan.Distributed, sql)
require.Equal(t, config.OpLevelRequired, plan.Analyze, sql)
require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt, sql)
require.Equal(t, int64(2000), plan.BatchSize, sql)
require.Equal(t, config.ByteSize(200<<20), plan.MaxWriteSpeed, sql)
require.True(t, plan.SplitFile, sql)
require.Equal(t, int64(123), plan.MaxRecordedErrors, sql)
Expand Down
10 changes: 0 additions & 10 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -94,11 +93,6 @@ type InsertValues struct {

stats *InsertRuntimeStat

// in LOAD DATA, one InsertValues is used by two goroutine, we need to lock
// when using the txn
isLoadData bool
txnInUse sync.Mutex

// fkChecks contains the foreign key checkers.
fkChecks []*FKCheckExec
fkCascades []*FKCascadeExec
Expand Down Expand Up @@ -1056,10 +1050,6 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F
if shardFmt.IncrementalMask()&autoRandomID != autoRandomID {
return 0, autoid.ErrAutoRandReadFailed
}
if e.isLoadData {
e.txnInUse.Lock()
defer e.txnInUse.Unlock()
}
_, err = e.ctx.Txn(true)
if err != nil {
return 0, err
Expand Down
Loading

0 comments on commit 6e11eef

Please sign in to comment.