From afaf503bb07927bff128accc5a691946efc92a93 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 24 May 2023 14:55:19 +0800 Subject: [PATCH 1/6] loaddata: don't split transaction and revert some change Signed-off-by: lance6716 --- executor/builder.go | 1 - executor/importer/import.go | 38 +- executor/importer/import_test.go | 5 - executor/insert_common.go | 10 - executor/load_data.go | 535 ++++++------------- executor/test/loaddatatest/load_data_test.go | 22 +- executor/test/writetest/write_test.go | 2 +- planner/core/common_plans.go | 3 - planner/core/planbuilder.go | 1 - session/session.go | 21 +- 10 files changed, 177 insertions(+), 461 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 25c419ed5304a..d860d8c579f62 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -985,7 +985,6 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { baseExecutor: base, loadDataWorker: worker, FileLocRef: v.FileLocRef, - OnDuplicate: v.OnDuplicate, } } diff --git a/executor/importer/import.go b/executor/importer/import.go index 50de1761b6fd8..edce036ff73bf 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -79,7 +79,6 @@ const ( addIndexOption = "add_index" analyzeOption = "analyze_table" threadOption = "thread" - batchSizeOption = "batch_size" maxWriteSpeedOption = "max_write_speed" splitFileOption = "split_file" recordErrorsOption = "record_errors" @@ -98,7 +97,6 @@ var ( addIndexOption: true, analyzeOption: true, threadOption: true, - batchSizeOption: true, maxWriteSpeedOption: true, splitFileOption: true, recordErrorsOption: true, @@ -163,7 +161,6 @@ type Plan struct { AddIndex bool Analyze config.PostOpLevel ThreadCnt int64 - BatchSize int64 MaxWriteSpeed config.ByteSize SplitFile bool MaxRecordedErrors int64 @@ -241,17 +238,10 @@ func getImportantSysVars(sctx sessionctx.Context) map[string]string { return res } -// NewPlan creates a new load data plan. -func NewPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table.Table) (*Plan, error) { +// NewPlanFromLoadDataPlan creates a import plan from LOAD DATA. +func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table.Table) (*Plan, error) { fullTableName := common.UniqueTable(plan.Table.Schema.L, plan.Table.Name.L) logger := log.L().With(zap.String("table", fullTableName)) - var format string - if plan.Format != nil { - format = strings.ToLower(*plan.Format) - } else { - // without FORMAT 'xxx' clause, default to DELIMITED DATA - format = LoadDataFormatDelimitedData - } charset := plan.Charset if charset == nil { // https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-character-set @@ -267,23 +257,12 @@ func NewPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table. plan.OnDuplicate != ast.OnDuplicateKeyHandlingIgnore p := &Plan{ - TableInfo: tbl.Meta(), - DBName: plan.Table.Schema.O, - DBID: plan.Table.DBInfo.ID, - + DBName: plan.Table.Schema.O, Path: plan.Path, - Format: format, + Format: LoadDataFormatDelimitedData, Restrictive: restrictive, IgnoreLines: plan.IgnoreLines, - - SQLMode: userSctx.GetSessionVars().SQLMode, - Charset: charset, - ImportantSysVars: getImportantSysVars(userSctx), - - DistSQLScanConcurrency: userSctx.GetSessionVars().DistSQLScanConcurrency(), - } - if err := p.initOptions(userSctx, plan.Options); err != nil { - return nil, err + Charset: charset, } return p, nil } @@ -430,7 +409,6 @@ func (p *Plan) initDefaultOptions() { p.AddIndex = true p.Analyze = config.OpLevelOptional p.ThreadCnt = int64(threadCnt) - p.BatchSize = 1000 p.MaxWriteSpeed = unlimitedWriteSpeed p.SplitFile = false p.MaxRecordedErrors = 100 @@ -526,12 +504,6 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) } } - if opt, ok := specifiedOptions[batchSizeOption]; ok { - p.BatchSize, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{}) - if err != nil || isNull || p.BatchSize < 0 { - return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) - } - } if opt, ok := specifiedOptions[maxWriteSpeedOption]; ok { v, isNull, err = opt.Value.EvalString(seCtx, chunk.Row{}) if err != nil || isNull { diff --git a/executor/importer/import_test.go b/executor/importer/import_test.go index 33d7dbecf3731..8f0783d1fb844 100644 --- a/executor/importer/import_test.go +++ b/executor/importer/import_test.go @@ -104,10 +104,6 @@ func TestInitOptions(t *testing.T) { {OptionStr: threadOption + "=-100", Err: exeerrors.ErrInvalidOptionVal}, {OptionStr: threadOption + "=null", Err: exeerrors.ErrInvalidOptionVal}, - {OptionStr: batchSizeOption + "='aa'", Err: exeerrors.ErrInvalidOptionVal}, - {OptionStr: batchSizeOption + "='11aa'", Err: exeerrors.ErrInvalidOptionVal}, - {OptionStr: batchSizeOption + "=null", Err: exeerrors.ErrInvalidOptionVal}, - {OptionStr: maxWriteSpeedOption + "='aa'", Err: exeerrors.ErrInvalidOptionVal}, {OptionStr: maxWriteSpeedOption + "='11aa'", Err: exeerrors.ErrInvalidOptionVal}, {OptionStr: maxWriteSpeedOption + "=null", Err: exeerrors.ErrInvalidOptionVal}, @@ -159,7 +155,6 @@ func TestInitOptions(t *testing.T) { analyzeOption+"='required', "+ distributedOption+"=false, "+ threadOption+"='100000', "+ - batchSizeOption+"=2000, "+ maxWriteSpeedOption+"='200mib', "+ splitFileOption+"=true, "+ recordErrorsOption+"=123, "+ diff --git a/executor/insert_common.go b/executor/insert_common.go index 215150d6a1a04..4ed9d9b9774d9 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -19,7 +19,6 @@ import ( "context" "fmt" "math" - "sync" "time" "github.com/pingcap/errors" @@ -94,11 +93,6 @@ type InsertValues struct { stats *InsertRuntimeStat - // in LOAD DATA, one InsertValues is used by two goroutine, we need to lock - // when using the txn - isLoadData bool - txnInUse sync.Mutex - // fkChecks contains the foreign key checkers. fkChecks []*FKCheckExec fkCascades []*FKCascadeExec @@ -1056,10 +1050,6 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F if shardFmt.IncrementalMask()&autoRandomID != autoRandomID { return 0, autoid.ErrAutoRandReadFailed } - if e.isLoadData { - e.txnInUse.Lock() - defer e.txnInUse.Unlock() - } _, err = e.ctx.Txn(true) if err != nil { return 0, err diff --git a/executor/load_data.go b/executor/load_data.go index ef4f5ce904eb5..2331fc9306200 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -20,7 +20,6 @@ import ( "io" "math" "strings" - "sync" "sync/atomic" "time" @@ -28,11 +27,9 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/disttask/loaddata" "github.com/pingcap/tidb/executor/asyncloaddata" "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -40,7 +37,6 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" @@ -61,23 +57,16 @@ type LoadDataExec struct { baseExecutor FileLocRef ast.FileLocRefTp - OnDuplicate ast.OnDuplicateKeyHandlingType loadDataWorker *LoadDataWorker } // Next implements the Executor Next interface. -func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { - req.GrowAndReset(e.maxChunkSize) - ctx = kv.WithInternalSourceType(ctx, kv.InternalLoadData) - +func (e *LoadDataExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) { switch e.FileLocRef { case ast.FileLocServerOrRemote: - _, err2 := e.loadDataWorker.loadRemote(ctx) - if err2 != nil { - return err2 - } + return e.loadDataWorker.loadRemote(ctx) case ast.FileLocClient: - // let caller use handleQuerySpecial to read data in this connection + // let caller use handleFileTransInConn to read data in this connection sctx := e.loadDataWorker.UserSctx val := sctx.Value(LoadDataVarKey) if val != nil { @@ -89,14 +78,6 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { return nil } -// commitTask is used for passing data from processStream goroutine to commitWork goroutine. -type commitTask struct { - cnt uint64 - rows [][]types.Datum - - fileSize int64 -} - type planInfo struct { ID int Columns []*ast.ColumnName @@ -107,14 +88,10 @@ type planInfo struct { type LoadDataWorker struct { UserSctx sessionctx.Context - importPlan *importer.Plan controller *importer.LoadDataController planInfo planInfo - // only use in distributed load data - stmt string - table table.Table - progress *asyncloaddata.Progress + table table.Table } func setNonRestrictiveFlags(stmtCtx *stmtctx.StatementContext) { @@ -131,7 +108,7 @@ func NewLoadDataWorker( plan *plannercore.LoadData, tbl table.Table, ) (w *LoadDataWorker, err error) { - importPlan, err := importer.NewPlan(userSctx, plan, tbl) + importPlan, err := importer.NewPlanFromLoadDataPlan(userSctx, plan, tbl) if err != nil { return nil, err } @@ -148,262 +125,137 @@ func NewLoadDataWorker( loadDataWorker := &LoadDataWorker{ UserSctx: userSctx, table: tbl, - importPlan: importPlan, - stmt: plan.Stmt, controller: controller, planInfo: planInfo{ ID: plan.ID(), Columns: plan.Columns, GenColExprs: plan.GenCols.Exprs, }, - progress: asyncloaddata.NewProgress(controller.ImportMode == importer.LogicalImportMode), } return loadDataWorker, nil } -func (e *LoadDataWorker) loadRemote(ctx context.Context) (int64, error) { +func (e *LoadDataWorker) loadRemote(ctx context.Context) error { if err2 := e.controller.InitDataFiles(ctx); err2 != nil { - return 0, err2 + return err2 } - return e.load(ctx, nil) + return e.load(ctx, e.controller.GetLoadDataReaderInfos()) } // LoadLocal reads from client connection and do load data job. func (e *LoadDataWorker) LoadLocal(ctx context.Context, r io.ReadCloser) error { - _, err := e.load(ctx, r) - return err -} - -func (e *LoadDataWorker) load(ctx context.Context, r io.ReadCloser) (jboID int64, err error) { - s, err2 := CreateSession(e.UserSctx) - if err2 != nil { - return 0, err2 - } - defer CloseSession(s) - - sqlExec := s.(sqlexec.SQLExecutor) - if err2 = e.controller.CheckRequirements(ctx, sqlExec); err2 != nil { - return 0, err2 - } - - job, err2 := asyncloaddata.CreateLoadDataJob( - ctx, - sqlExec, - e.GetInfilePath(), - e.controller.DBName, - e.table.Meta().Name.O, - e.controller.ImportMode, - e.UserSctx.GetSessionVars().User.String(), - ) - if err2 != nil { - return 0, err2 - } - - importCtx := ctx - - jobImporter, err2 := e.getJobImporter(importCtx, job, r) - if err2 != nil { - return 0, err2 + compressTp := mydump.ParseCompressionOnFileExtension(e.GetInfilePath()) + compressTp2, err := mydump.ToStorageCompressType(compressTp) + if err != nil { + return err } - - return job.ID, e.importJob(importCtx, jobImporter) + readers := []importer.LoadDataReaderInfo{{ + Opener: func(_ context.Context) (io.ReadSeekCloser, error) { + addedSeekReader := NewSimpleSeekerOnReadCloser(r) + return storage.InterceptDecompressReader(addedSeekReader, compressTp2) + }}} + return e.load(ctx, readers) } -func (e *LoadDataWorker) importJob(ctx context.Context, jobImporter importer.JobImporter) (err error) { - defer func() { - _ = jobImporter.Close() - }() - if e.controller.FileLocRef == ast.FileLocServerOrRemote { - e.progress.SourceFileSize = e.controller.TotalFileSize - } +func (e *LoadDataWorker) load(ctx context.Context, readerInfos []importer.LoadDataReaderInfo) error { + group, groupCtx := errgroup.WithContext(ctx) - param := jobImporter.Param() - job, group, groupCtx, done := param.Job, param.Group, param.GroupCtx, param.Done + encoder, committer, err := initEncodeCommitWorkers(e) + if err != nil { + return err + } - var result importer.JobImportResult - defer func() { - job.OnComplete(err, result.Msg) - }() + // main goroutine -> readerInfoCh -> processOneStream goroutines + readerInfoCh := make(chan importer.LoadDataReaderInfo, 1) + // processOneStream goroutines -> commitTaskCh -> commitWork goroutines + commitTaskCh := make(chan commitTask, taskQueueSize) + // commitWork goroutines -> done -> UpdateJobProgress goroutine - err = job.StartJob(ctx) - if err != nil { + // TODO: support explicit transaction and non-autocommit + if err = sessiontxn.NewTxn(groupCtx, e.UserSctx); err != nil { return err } - // UpdateJobProgress goroutine. + // processOneStream goroutines. + group.Go(func() error { + err2 := encoder.processStream(groupCtx, readerInfoCh, commitTaskCh) + if err2 == nil { + close(commitTaskCh) + } + return err2 + }) + // commitWork goroutines. group.Go(func() error { - // ProgressUpdateRoutineFn must be run in this group, since on job cancel/drop, we depend on it to trigger - // the cancel of the other routines in this group. - return job.ProgressUpdateRoutineFn(ctx, done, groupCtx.Done(), e.progress) + failpoint.Inject("BeforeCommitWork", nil) + return committer.commitWork(groupCtx, commitTaskCh) }) - jobImporter.Import() - err = group.Wait() - result = jobImporter.Result() - e.setResult(result) - return err -} -func (e *LoadDataWorker) setResult(result importer.JobImportResult) { - userStmtCtx := e.UserSctx.GetSessionVars().StmtCtx - userStmtCtx.SetMessage(result.Msg) - userStmtCtx.SetAffectedRows(result.Affected) - userStmtCtx.SetWarnings(result.Warnings) - userStmtCtx.LastInsertID = result.LastInsertID -} - -func (e *LoadDataWorker) getJobImporter(ctx context.Context, job *asyncloaddata.Job, r io.ReadCloser) (importer.JobImporter, error) { - group, groupCtx := errgroup.WithContext(ctx) - param := &importer.JobImportParam{ - Job: job, - Group: group, - GroupCtx: groupCtx, - Done: make(chan struct{}), - Progress: e.progress, - } - - if e.importPlan.Distributed { - // TODO: Right now some test cases will fail if we run single-node import using NewDistImporterCurrNode - // directly, so we use EnableDistTask(false on default) to difference them now. - if variable.EnableDistTask.Load() { - return loaddata.NewDistImporter(param, e.importPlan, e.stmt) +sendReaderInfoLoop: + for _, info := range readerInfos { + select { + case <-groupCtx.Done(): + break sendReaderInfoLoop + case readerInfoCh <- info: } - return loaddata.NewDistImporterCurrNode(param, e.importPlan, e.stmt) } - - if e.controller.ImportMode == importer.LogicalImportMode { - return newLogicalJobImporter(param, e, r) - } - // TODO: Replace it with NewDistImporterCurrNode after we fix the test cases. - return importer.NewTableImporter(param, e.controller) + close(readerInfoCh) + err = group.Wait() + e.setResult(encoder.exprWarnings) + return err } -// GetInfilePath get infile path. -func (e *LoadDataWorker) GetInfilePath() string { - return e.controller.Path -} +func (e *LoadDataWorker) setResult(colAssignExprWarnings []stmtctx.SQLWarn) { + stmtCtx := e.UserSctx.GetSessionVars().StmtCtx + numWarnings := uint64(stmtCtx.WarningCount()) + numRecords := stmtCtx.RecordRows() + numDeletes := stmtCtx.DeletedRows() + numSkipped := stmtCtx.RecordRows() - stmtCtx.CopiedRows() -// GetController get load data controller. -// used in unit test. -func (e *LoadDataWorker) GetController() *importer.LoadDataController { - return e.controller -} + // col assign expr warnings is generated during init, it's static + // we need to generate it for each row processed. + numWarnings += numRecords * uint64(len(colAssignExprWarnings)) -// TestLoad is a helper function for unit test. -func (e *LoadDataWorker) TestLoad(parser mydump.Parser) error { - jobImporter, err2 := newLogicalJobImporter(nil, e, nil) - if err2 != nil { - return err2 - } - err := ResetContextOfStmt(jobImporter.encodeWorker.ctx, &ast.LoadDataStmt{}) - if err != nil { - return err - } - setNonRestrictiveFlags(jobImporter.encodeWorker.ctx.GetSessionVars().StmtCtx) - err = ResetContextOfStmt(jobImporter.commitWorker.ctx, &ast.LoadDataStmt{}) - if err != nil { - return err + if numWarnings > math.MaxUint16 { + numWarnings = math.MaxUint16 } - setNonRestrictiveFlags(jobImporter.commitWorker.ctx.GetSessionVars().StmtCtx) - ctx := context.Background() - for i := uint64(0); i < jobImporter.controller.IgnoreLines; i++ { - //nolint: errcheck - _ = parser.ReadRow() - } - err = jobImporter.encodeWorker.readOneBatchRows(ctx, parser) - if err != nil { - return err - } - err = sessiontxn.NewTxn(ctx, jobImporter.commitWorker.ctx) - if err != nil { - return err - } - err = jobImporter.commitWorker.checkAndInsertOneBatch( - ctx, - jobImporter.encodeWorker.rows, - jobImporter.encodeWorker.curBatchCnt) - if err != nil { - return err - } - jobImporter.encodeWorker.resetBatch() - jobImporter.commitWorker.ctx.StmtCommit(ctx) - err = jobImporter.commitWorker.ctx.CommitTxn(ctx) - if err != nil { - return err + msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings) + warns := make([]stmtctx.SQLWarn, numWarnings) + n := copy(warns, stmtCtx.GetWarnings()) + for i := 0; i < int(numRecords) && n < len(warns); i++ { + n += copy(warns[n:], colAssignExprWarnings) } - result := jobImporter.Result() - e.setResult(result) - return nil -} - -// TODO: remove or rename this struct -type logicalJobImporter struct { - *importer.JobImportParam - // only used on interactive load data - userSctx sessionctx.Context - controller *importer.LoadDataController - // encodeWorker and commitWorker will share same InsertValues - encodeWorker *encodeWorker - commitWorker *commitWorker - readerInfos []importer.LoadDataReaderInfo + stmtCtx.SetMessage(msg) + stmtCtx.SetWarnings(warns) } -var _ importer.JobImporter = &logicalJobImporter{} - -func newLogicalJobImporter(param *importer.JobImportParam, e *LoadDataWorker, r io.ReadCloser) (*logicalJobImporter, error) { - ji := &logicalJobImporter{ - JobImportParam: param, - userSctx: e.UserSctx, - controller: e.controller, - } - compressTp := mydump.ParseCompressionOnFileExtension(e.GetInfilePath()) - compressTp2, err := mydump.ToStorageCompressType(compressTp) - if err != nil { - return nil, err - } - if err := ji.initEncodeCommitWorkers(e); err != nil { - return nil, err - } - if e.controller.FileLocRef == ast.FileLocClient { - ji.readerInfos = []importer.LoadDataReaderInfo{{ - Opener: func(_ context.Context) (io.ReadSeekCloser, error) { - addedSeekReader := NewSimpleSeekerOnReadCloser(r) - return storage.InterceptDecompressReader(addedSeekReader, compressTp2) - }}} - } else { - ji.readerInfos = e.controller.GetLoadDataReaderInfos() - } - return ji, nil -} - -func (ji *logicalJobImporter) initEncodeCommitWorkers(e *LoadDataWorker) (err error) { - insertValues, err2 := ji.createInsertValues(e) +func initEncodeCommitWorkers(e *LoadDataWorker) (*encodeWorker, *commitWorker, error) { + insertValues, err2 := createInsertValues(e) if err2 != nil { - return err2 + return nil, nil, err2 } colAssignExprs, exprWarnings, err2 := e.controller.CreateColAssignExprs(insertValues.ctx) if err2 != nil { - return err2 + return nil, nil, err2 } - ji.encodeWorker = &encodeWorker{ + enc := &encodeWorker{ InsertValues: insertValues, controller: e.controller, colAssignExprs: colAssignExprs, exprWarnings: exprWarnings, killed: &e.UserSctx.GetSessionVars().Killed, } - ji.encodeWorker.resetBatch() - ji.commitWorker = &commitWorker{ + enc.resetBatch() + com := &commitWorker{ InsertValues: insertValues, controller: e.controller, - progress: e.progress, } - return nil + return enc, com, nil } // createInsertValues creates InsertValues from userSctx. -func (ji *logicalJobImporter) createInsertValues(e *LoadDataWorker) (insertVal *InsertValues, err error) { +func createInsertValues(e *LoadDataWorker) (insertVal *InsertValues, err error) { insertColumns := e.controller.InsertColumns hasExtraHandle := false for _, col := range insertColumns { @@ -416,16 +268,14 @@ func (ji *logicalJobImporter) createInsertValues(e *LoadDataWorker) (insertVal * } } ret := &InsertValues{ - baseExecutor: newBaseExecutor(ji.userSctx, nil, e.planInfo.ID), + baseExecutor: newBaseExecutor(e.UserSctx, nil, e.planInfo.ID), Table: e.table, Columns: e.planInfo.Columns, GenExprs: e.planInfo.GenColExprs, - maxRowsInBatch: uint64(e.controller.BatchSize), + maxRowsInBatch: 1000, insertColumns: insertColumns, rowLen: len(insertColumns), hasExtraHandle: hasExtraHandle, - isLoadData: true, - txnInUse: sync.Mutex{}, } if len(insertColumns) > 0 { ret.initEvalBuffer() @@ -434,100 +284,6 @@ func (ji *logicalJobImporter) createInsertValues(e *LoadDataWorker) (insertVal * return ret, nil } -func (ji *logicalJobImporter) Param() *importer.JobImportParam { - return ji.JobImportParam -} - -// Import implements importer.JobImporter interface. -func (ji *logicalJobImporter) Import() { - // main goroutine -> readerInfoCh -> processOneStream goroutines - readerInfoCh := make(chan importer.LoadDataReaderInfo, 1) - // processOneStream goroutines -> commitTaskCh -> commitWork goroutines - commitTaskCh := make(chan commitTask, taskQueueSize) - // commitWork goroutines -> done -> UpdateJobProgress goroutine - - param := ji.JobImportParam - - waitStartCtx := make(chan struct{}) - // TODO: support explicit transaction - param.Group.Go(func() error { - err := sessiontxn.NewTxn(param.GroupCtx, ji.userSctx) - close(waitStartCtx) - return err - }) - - <-waitStartCtx - - // processOneStream goroutines. - param.Group.Go(func() error { - err2 := ji.encodeWorker.processStream(param.GroupCtx, readerInfoCh, commitTaskCh) - if err2 == nil { - close(commitTaskCh) - } - return err2 - }) - // commitWork goroutines. - param.Group.Go(func() error { - failpoint.Inject("BeforeCommitWork", nil) - err2 := ji.commitWorker.commitWork(param.GroupCtx, commitTaskCh) - if err2 == nil { - close(param.Done) - } - return err2 - }) - - for i := range ji.readerInfos { - select { - case <-param.GroupCtx.Done(): - return - case readerInfoCh <- ji.readerInfos[i]: - } - } - close(readerInfoCh) -} - -// Result implements the importer.JobImporter interface. -func (ji *logicalJobImporter) Result() importer.JobImportResult { - stmtCtx := ji.userSctx.GetSessionVars().StmtCtx - numWarnings := uint64(stmtCtx.WarningCount()) - numAffected := stmtCtx.AffectedRows() - numRecords := stmtCtx.RecordRows() - numDeletes := stmtCtx.DeletedRows() - numSkipped := stmtCtx.RecordRows() - stmtCtx.CopiedRows() - - // col assign expr warnings is generated during init, it's static - // we need to generate it for each row processed. - colAssignExprWarnings := ji.encodeWorker.exprWarnings - numWarnings += numRecords * uint64(len(colAssignExprWarnings)) - - if numWarnings > math.MaxUint16 { - numWarnings = math.MaxUint16 - } - - msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings) - warns := make([]stmtctx.SQLWarn, numWarnings) - n := copy(warns, stmtCtx.GetWarnings()) - for i := 0; i < int(numRecords) && n < len(warns); i++ { - n += copy(warns[n:], colAssignExprWarnings) - } - - return importer.JobImportResult{ - Msg: msg, - LastInsertID: ji.encodeWorker.lastInsertID, - Affected: numAffected, - Warnings: warns, - } -} - -// Close implements the importer.JobImporter interface. -func (ji *logicalJobImporter) Close() error { - v := ji.encodeWorker.InsertValues - if v.runtimeStats != nil && v.stats != nil { - ji.userSctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(v.id, v.stats) - } - return nil -} - // encodeWorker is a sub-worker of LoadDataWorker that dedicated to encode data. type encodeWorker struct { *InsertValues @@ -540,6 +296,12 @@ type encodeWorker struct { rows [][]types.Datum } +// commitTask is used for passing data from processStream goroutine to commitWork goroutine. +type commitTask struct { + cnt uint64 + rows [][]types.Datum +} + // processStream always trys to build a parser from channel and process it. When // it returns nil, it means all data is read. func (w *encodeWorker) processStream( @@ -588,10 +350,6 @@ func (w *encodeWorker) processOneStream( checkKilled := time.NewTicker(30 * time.Second) defer checkKilled.Stop() - var ( - loggedError = false - lastScannedSize = int64(0) - ) for { // prepare batch and enqueue task if err = w.readOneBatchRows(ctx, parser); err != nil { @@ -602,12 +360,6 @@ func (w *encodeWorker) processOneStream( } TrySendTask: - scannedSize, err := parser.ScannedPos() - if err != nil && !loggedError { - loggedError = true - logutil.Logger(ctx).Error(" LOAD DATA failed to read current file offset by seek", - zap.Error(err)) - } select { case <-ctx.Done(): return ctx.Err() @@ -618,12 +370,10 @@ func (w *encodeWorker) processOneStream( } goto TrySendTask case outCh <- commitTask{ - cnt: w.curBatchCnt, - rows: w.rows, - fileSize: scannedSize - lastScannedSize, + cnt: w.curBatchCnt, + rows: w.rows, }: } - lastScannedSize = scannedSize // reset rows buffer, will reallocate buffer but NOT reuse w.resetBatch() } @@ -763,7 +513,6 @@ func (w *encodeWorker) parserData2TableData( type commitWorker struct { *InsertValues controller *importer.LoadDataController - progress *asyncloaddata.Progress } // commitWork commit batch sequentially. When returns nil, it means the job is @@ -777,24 +526,24 @@ func (w *commitWorker) commitWork(ctx context.Context, inCh <-chan commitTask) ( zap.Stack("stack")) err = errors.Errorf("%v", r) } + + if err != nil { + background := context.Background() + w.ctx.StmtRollback(background, false) + w.ctx.RollbackTxn(background) + } else { + if err = w.ctx.CommitTxn(ctx); err != nil { + logutil.Logger(ctx).Error("commit error refresh", zap.Error(err)) + } + } }() var ( - taskCnt uint64 - backgroundCtx = context.Background() + taskCnt uint64 ) - err = sessiontxn.NewTxn(ctx, w.ctx) - if err != nil { - return err - } for { select { case <-ctx.Done(): - w.ctx.StmtRollback(backgroundCtx, false) - w.txnInUse.Lock() - //nolint:revive,all_revive - defer w.txnInUse.Unlock() - _ = w.ctx.RefreshTxnCtx(backgroundCtx) return ctx.Err() case task, ok := <-inCh: if !ok { @@ -804,47 +553,28 @@ func (w *commitWorker) commitWork(ctx context.Context, inCh <-chan commitTask) ( if err = w.commitOneTask(ctx, task); err != nil { return err } - w.progress.LoadedRowCnt.Add(task.cnt) - w.progress.LoadedFileSize.Add(task.fileSize) taskCnt++ logutil.Logger(ctx).Info("commit one task success", zap.Duration("commit time usage", time.Since(start)), zap.Uint64("keys processed", task.cnt), zap.Uint64("taskCnt processed", taskCnt), ) - failpoint.Inject("AfterCommitOneTask", nil) - failpoint.Inject("SyncAfterCommitOneTask", func() { - importer.TestSyncCh <- struct{}{} - <-importer.TestSyncCh - }) } } } -// commitOneTask insert Data from LoadDataWorker.rows, then make commit and refresh txn +// commitOneTask insert Data from LoadDataWorker.rows, then commit the modification +// like a statement. func (w *commitWorker) commitOneTask(ctx context.Context, task commitTask) error { - var err error - defer func() { - if err != nil { - w.ctx.StmtRollback(ctx, false) - } - }() - err = w.checkAndInsertOneBatch(ctx, task.rows, task.cnt) + err := w.checkAndInsertOneBatch(ctx, task.rows, task.cnt) if err != nil { logutil.Logger(ctx).Error("commit error CheckAndInsert", zap.Error(err)) return err } - failpoint.Inject("commitOneTaskErr", func() error { - return errors.New("mock commit one task error") + failpoint.Inject("commitOneTaskErr", func() { + failpoint.Return(errors.New("mock commit one task error")) }) w.ctx.StmtCommit(ctx) - w.txnInUse.Lock() - defer w.txnInUse.Unlock() - // Make sure that there are no retries when committing. - if err = w.ctx.RefreshTxnCtx(ctx); err != nil { - logutil.Logger(ctx).Error("commit error refresh", zap.Error(err)) - return err - } return nil } @@ -906,6 +636,61 @@ func (w *commitWorker) addRecordLD(ctx context.Context, row []types.Datum) error return nil } +// GetInfilePath get infile path. +func (e *LoadDataWorker) GetInfilePath() string { + return e.controller.Path +} + +// GetController get load data controller. +// used in unit test. +func (e *LoadDataWorker) GetController() *importer.LoadDataController { + return e.controller +} + +// TestLoadLocal is a helper function for unit test. +func (e *LoadDataWorker) TestLoadLocal(parser mydump.Parser) error { + if err := ResetContextOfStmt(e.UserSctx, &ast.LoadDataStmt{}); err != nil { + return err + } + setNonRestrictiveFlags(e.UserSctx.GetSessionVars().StmtCtx) + encoder, committer, err := initEncodeCommitWorkers(e) + if err != nil { + return err + } + + ctx := context.Background() + err = sessiontxn.NewTxn(ctx, e.UserSctx) + if err != nil { + return err + } + + for i := uint64(0); i < e.controller.IgnoreLines; i++ { + //nolint: errcheck + _ = parser.ReadRow() + } + + err = encoder.readOneBatchRows(ctx, parser) + if err != nil { + return err + } + + err = committer.checkAndInsertOneBatch( + ctx, + encoder.rows, + encoder.curBatchCnt) + if err != nil { + return err + } + encoder.resetBatch() + committer.ctx.StmtCommit(ctx) + err = committer.ctx.CommitTxn(ctx) + if err != nil { + return err + } + e.setResult(encoder.exprWarnings) + return nil +} + var _ io.ReadSeekCloser = (*SimpleSeekerOnReadCloser)(nil) // SimpleSeekerOnReadCloser provides Seek(0, SeekCurrent) on ReadCloser. diff --git a/executor/test/loaddatatest/load_data_test.go b/executor/test/loaddatatest/load_data_test.go index 78537d6e346aa..9336457e11fa6 100644 --- a/executor/test/loaddatatest/load_data_test.go +++ b/executor/test/loaddatatest/load_data_test.go @@ -51,7 +51,7 @@ func checkCases( nil) require.NoError(t, err) - err = ld.TestLoad(parser) + err = ld.TestLoadLocal(parser) require.NoError(t, err) require.Equal(t, tt.expectedMsg, tk.Session().LastMessage(), tt.expected) tk.MustQuery(selectSQL).Check(testkit.RowsWithSep("|", tt.expected...)) @@ -71,26 +71,6 @@ func TestLoadDataInitParam(t *testing.T) { require.ErrorIs(t, tk.ExecToErr("load data infile '' into table load_data_test"), exeerrors.ErrLoadDataEmptyPath) - require.ErrorIs(t, tk.ExecToErr("load data infile '/a' format '' into table load_data_test"), - exeerrors.ErrLoadDataUnsupportedFormat) - require.ErrorIs(t, tk.ExecToErr("load data infile '/a' format 'aaa' into table load_data_test"), - exeerrors.ErrLoadDataUnsupportedFormat) - require.ErrorIs(t, tk.ExecToErr("load data local infile '/a' format 'parquet' into table load_data_test"), - exeerrors.ErrLoadParquetFromLocal) - require.ErrorIs(t, tk.ExecToErr("load data local infile '/a' into table load_data_test with import_mode='physical'"), - exeerrors.ErrLoadDataLocalUnsupportedOption) - require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'sql file' into table load_data_test fields terminated by 'a'"), - "cannot specify FIELDS ... or LINES") - require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'parquet' into table load_data_test fields terminated by 'a'"), - "cannot specify FIELDS ... or LINES") - require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'sql file' into table load_data_test lines terminated by 'a'"), - "cannot specify FIELDS ... or LINES") - require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'parquet' into table load_data_test lines terminated by 'a'"), - "cannot specify FIELDS ... or LINES") - require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'parquet' into table load_data_test ignore 0 lines"), - "cannot specify FIELDS ... or LINES") - require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'parquet' into table load_data_test ignore 3 lines"), - "cannot specify FIELDS ... or LINES") require.ErrorContains(t, tk.ExecToErr("load data infile '/a' into table load_data_test fields defined null by 'a' optionally enclosed"), "must specify FIELDS [OPTIONALLY] ENCLOSED BY") require.ErrorContains(t, tk.ExecToErr("load data infile '/a' into table load_data_test lines terminated by ''"), diff --git a/executor/test/writetest/write_test.go b/executor/test/writetest/write_test.go index 7c8b78ddaffc6..92c271de0a1dd 100644 --- a/executor/test/writetest/write_test.go +++ b/executor/test/writetest/write_test.go @@ -1465,7 +1465,7 @@ func checkCases( nil) require.NoError(t, err) - err = ld.TestLoad(parser) + err = ld.TestLoadLocal(parser) require.NoError(t, err) require.Equal(t, tt.expectedMsg, tk.Session().LastMessage(), tt.expected) tk.MustQuery(selectSQL).Check(testkit.RowsWithSep("|", tt.expected...)) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 5545deb42f267..3ca2e25f10afd 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -577,9 +577,6 @@ type LoadData struct { Options []*LoadDataOpt GenCols InsertGeneratedColumns - - // only use for distributed load data - Stmt string } // LoadDataOpt represents load data option. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 8258755d75097..8b1c1643d32f8 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -4317,7 +4317,6 @@ func (b *PlanBuilder) buildLoadData(ctx context.Context, ld *ast.LoadDataStmt) ( ColumnAssignments: ld.ColumnAssignments, ColumnsAndUserVars: ld.ColumnsAndUserVars, Options: options, - Stmt: ld.Text(), }.Init(b.ctx) user := b.ctx.GetSessionVars().User var insertErr, deleteErr error diff --git a/session/session.go b/session/session.go index 18614041f51c3..9f93a8be58d80 100644 --- a/session/session.go +++ b/session/session.go @@ -2295,26 +2295,25 @@ func (s *session) validateStatementReadOnlyInStaleness(stmtNode ast.StmtNode) er return nil } -// querySpecialKeys contains the keys of special query, the special query will be handled by handleFileTransInConn method. -var querySpecialKeys = []fmt.Stringer{ +// fileTransInConnKeys contains the keys of queries that will be handled by handleFileTransInConn. +var fileTransInConnKeys = []fmt.Stringer{ executor.LoadDataVarKey, executor.LoadStatsVarKey, executor.IndexAdviseVarKey, executor.PlanReplayerLoadVarKey, } -func (s *session) hasQuerySpecial() bool { - found := false +func (s *session) hasFileTransInConn() bool { s.mu.RLock() - for _, k := range querySpecialKeys { + defer s.mu.RUnlock() + + for _, k := range fileTransInConnKeys { v := s.mu.values[k] if v != nil { - found = true - break + return true } } - s.mu.RUnlock() - return found + return false } // runStmt executes the sqlexec.Statement and commit or rollback the current transaction. @@ -2385,8 +2384,8 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. } err = finishStmt(ctx, se, err, s) - if se.hasQuerySpecial() { - // The special query will be handled later in handleQuerySpecial, + if se.hasFileTransInConn() { + // The query will be handled later in handleFileTransInConn, // then should call the ExecStmt.FinishExecuteStmt to finish this statement. se.SetValue(ExecStmtVarKey, s.(*executor.ExecStmt)) } else { From 1fd1062c510345e80a396b4a09c576b00b50a060 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 24 May 2023 15:08:56 +0800 Subject: [PATCH 2/6] fix bazel Signed-off-by: lance6716 --- executor/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index d1a36a0b937be..ac14ebd8dda72 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -110,7 +110,6 @@ go_library( "//ddl/placement", "//ddl/schematracker", "//distsql", - "//disttask/loaddata", "//domain", "//domain/infosync", "//errno", From 5db0e761808301eb8cd7aca4cb83e87711d72f54 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 24 May 2023 15:32:52 +0800 Subject: [PATCH 3/6] fix CI Signed-off-by: lance6716 --- executor/importer/import_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/executor/importer/import_test.go b/executor/importer/import_test.go index 8f0783d1fb844..c0db2fa8e7d64 100644 --- a/executor/importer/import_test.go +++ b/executor/importer/import_test.go @@ -42,7 +42,6 @@ func TestInitDefaultOptions(t *testing.T) { require.Equal(t, config.OpLevelOptional, plan.Analyze) require.Equal(t, false, plan.Distributed) require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt) - require.Equal(t, int64(1000), plan.BatchSize) require.Equal(t, unlimitedWriteSpeed, plan.MaxWriteSpeed) require.Equal(t, false, plan.SplitFile) require.Equal(t, int64(100), plan.MaxRecordedErrors) @@ -170,7 +169,6 @@ func TestInitOptions(t *testing.T) { require.False(t, plan.Distributed, sql) require.Equal(t, config.OpLevelRequired, plan.Analyze, sql) require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt, sql) - require.Equal(t, int64(2000), plan.BatchSize, sql) require.Equal(t, config.ByteSize(200<<20), plan.MaxWriteSpeed, sql) require.True(t, plan.SplitFile, sql) require.Equal(t, int64(123), plan.MaxRecordedErrors, sql) From abd1decbe0750c1d0024eed601e75e6c19b297b4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 24 May 2023 16:59:42 +0800 Subject: [PATCH 4/6] fix CI Signed-off-by: lance6716 --- executor/load_data.go | 9 +-------- server/server_test.go | 12 ++---------- tests/realtikvtest/loaddatatest/load_data_test.go | 1 + 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/executor/load_data.go b/executor/load_data.go index 2331fc9306200..e3741f16c49f5 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -626,14 +626,7 @@ func (w *commitWorker) addRecordLD(ctx context.Context, row []types.Datum) error if row == nil { return nil } - err := w.addRecord(ctx, row) - if err != nil { - if w.controller.Restrictive { - return err - } - w.handleWarning(err) - } - return nil + return w.addRecord(ctx, row) } // GetInfilePath get infile path. diff --git a/server/server_test.go b/server/server_test.go index d4a65d4507ce0..c7192882999c2 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1135,16 +1135,8 @@ func (cli *testServerClient) runTestLoadData(t *testing.T, server *Server) { require.NoError(t, err) } rs, err = dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table test fields terminated by '\t- ' lines starting by 'xxx ' terminated by '\n' with batch_size = 3, thread=1", path)) - require.NoError(t, err) - lastID, err = rs.LastInsertId() - require.NoError(t, err) - require.Equal(t, int64(10), lastID) - affectedRows, err = rs.RowsAffected() - require.NoError(t, err) - require.Equal(t, int64(799), affectedRows) - rows = dbt.MustQuery("select * from test") - require.Truef(t, rows.Next(), "unexpected data") - require.NoError(t, rows.Close()) + // should be Transaction is too large + require.ErrorContains(t, err, "Transaction is too large") // don't support lines terminated is "" _, err = dbt.GetDB().Exec(fmt.Sprintf("load data local infile %q into table test lines terminated by '' with thread=1", path)) require.NotNil(t, err) diff --git a/tests/realtikvtest/loaddatatest/load_data_test.go b/tests/realtikvtest/loaddatatest/load_data_test.go index 7a9a214a8286e..8b5a31170991a 100644 --- a/tests/realtikvtest/loaddatatest/load_data_test.go +++ b/tests/realtikvtest/loaddatatest/load_data_test.go @@ -471,6 +471,7 @@ func (s *mockGCSSuite) testMixedCompression(importMode string, distributed bool) } func (s *mockGCSSuite) TestLoadSQLDump() { + s.T().Skip("skip due to LOAD DATA does not support FORMAT") s.testLoadSQLDump(importer.LogicalImportMode, false) //s.testLoadSQLDump(importer.PhysicalImportMode, false) //s.testLoadSQLDump(importer.PhysicalImportMode, true) From eddafdf50b34317f0728d6beda19469c4d24f5fb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 24 May 2023 17:32:18 +0800 Subject: [PATCH 5/6] skip tests Signed-off-by: lance6716 --- tests/realtikvtest/loaddatatest/operate_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/realtikvtest/loaddatatest/operate_test.go b/tests/realtikvtest/loaddatatest/operate_test.go index bf9779f165970..133d5180d4d45 100644 --- a/tests/realtikvtest/loaddatatest/operate_test.go +++ b/tests/realtikvtest/loaddatatest/operate_test.go @@ -30,6 +30,7 @@ import ( ) func (s *mockGCSSuite) TestOperateRunningJob() { + s.T().Skip("skip for now") s.testOperateRunningJob(importer.LogicalImportMode) //s.testOperateRunningJob(importer.PhysicalImportMode) } From 0033a0c372ab235bda0855fda4d88ea15bcc949f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 25 May 2023 11:07:13 +0800 Subject: [PATCH 6/6] fix CI Signed-off-by: lance6716 --- executor/test/loadremotetest/error_test.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/executor/test/loadremotetest/error_test.go b/executor/test/loadremotetest/error_test.go index 2b476914a54b8..b174951b762dd 100644 --- a/executor/test/loadremotetest/error_test.go +++ b/executor/test/loadremotetest/error_test.go @@ -71,14 +71,6 @@ func (s *mockGCSSuite) TestErrorMessage() { Content: []byte("1\t2\n" + "1\t4\n"), }) - err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' - FORMAT '123' INTO TABLE t;`, gcsEndpoint)) - checkClientErrorMessage(s.T(), err, - "ERROR 8157 (HY000): The FORMAT '123' is not supported") - err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' - FORMAT 'sql file' INTO TABLE t;`, gcsEndpoint)) - checkClientErrorMessage(s.T(), err, - "ERROR 8160 (HY000): Failed to read source files. Reason: syntax error: unexpected Integer (1) at offset 1, expecting start of row. Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)") err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s' INTO TABLE t LINES STARTING BY '\n';`, gcsEndpoint)) checkClientErrorMessage(s.T(), err,