diff --git a/executor/builder.go b/executor/builder.go index 20c4f9525e92a..32340f910a6bd 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -681,6 +681,9 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { }, } + var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr + loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt) + return loadDataExec } diff --git a/executor/insert_common.go b/executor/insert_common.go index 861630ea03c11..86f2e429c819e 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -35,6 +35,7 @@ type InsertValues struct { batchChecker rowCount uint64 + curBatchCnt uint64 maxRowsInBatch uint64 lastInsertID uint64 hasRefCols bool @@ -378,6 +379,20 @@ func (e *InsertValues) getRow(ctx context.Context, vals []types.Datum) ([]types. return e.fillRow(ctx, row, hasValue) } +func (e *InsertValues) getRowInPlace(ctx context.Context, vals []types.Datum, rowBuf []types.Datum) ([]types.Datum, error) { + hasValue := make([]bool, len(e.Table.Cols())) + for i, v := range vals { + casted, err := table.CastValue(e.ctx, v, e.insertColumns[i].ToInfo()) + if e.filterErr(err) != nil { + return nil, err + } + offset := e.insertColumns[i].Offset + rowBuf[offset] = casted + hasValue[offset] = true + } + return e.fillRow(ctx, rowBuf, hasValue) +} + func (e *InsertValues) filterErr(err error) error { if err == nil { return nil @@ -547,9 +562,9 @@ func (e *InsertValues) batchCheckAndInsert(rows [][]types.Datum, addRecord func( } // append warnings and get no duplicated error rows for i, r := range e.toBeCheckedRows { + skip := false if r.handleKey != nil { if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found { - rows[i] = nil e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr) continue } @@ -557,15 +572,15 @@ func (e *InsertValues) batchCheckAndInsert(rows [][]types.Datum, addRecord func( for _, uk := range r.uniqueKeys { if _, found := e.dupKVs[string(uk.newKV.key)]; found { // If duplicate keys were found in BatchGet, mark row = nil. - rows[i] = nil e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr) + skip = true break } } // If row was checked with no duplicate keys, // it should be add to values map for the further row check. // There may be duplicate keys inside the insert statement. - if rows[i] != nil { + if !skip { e.ctx.GetSessionVars().StmtCtx.AddCopiedRows(1) _, err = addRecord(rows[i]) if err != nil { diff --git a/executor/load_data.go b/executor/load_data.go index 16d55d5f36114..dc4c4f4024444 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/stringutil" "go.uber.org/zap" @@ -112,7 +113,12 @@ type LoadDataInfo struct { // SetMaxRowsInBatch sets the max number of rows to insert in a batch. func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { e.maxRowsInBatch = limit - e.rows = make([][]types.Datum, 0, limit) + if uint64(cap(e.rows)) < limit { + e.rows = make([][]types.Datum, 0, limit) + for i := 0; uint64(i) < limit; i++ { + e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols()))) + } + } } // getValidData returns prevData and curData that starts from starting symbol. @@ -174,7 +180,7 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte) ([]byte, []byte, bool) } endIdx := -1 if len(curData) >= curStartIdx { - endIdx = strings.Index(string(curData[curStartIdx:]), e.LinesInfo.Terminated) + endIdx = strings.Index(string(hack.String(curData[curStartIdx:])), e.LinesInfo.Terminated) } if endIdx == -1 { // no terminated symbol @@ -253,8 +259,9 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) if err != nil { return nil, false, err } - e.rows = append(e.rows, e.colsToRow(ctx, cols)) + e.colsToRow(ctx, cols) e.rowCount++ + e.curBatchCnt++ if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 { reachLimit = true logutil.BgLogger().Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize), @@ -268,15 +275,15 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) // CheckAndInsertOneBatch is used to commit one transaction batch full filled data func (e *LoadDataInfo) CheckAndInsertOneBatch() error { var err error - if len(e.rows) == 0 { + if e.curBatchCnt == 0 { return err } - e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(e.rows))) - err = e.batchCheckAndInsert(e.rows, e.addRecordLD) + e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt) + err = e.batchCheckAndInsert(e.rows[0:e.curBatchCnt], e.addRecordLD) if err != nil { return err } - e.rows = e.rows[:0] + e.curBatchCnt = 0 return err } @@ -312,7 +319,7 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []field) []types.Datu e.row[i].SetString(string(cols[i].str)) } } - row, err := e.getRow(ctx, e.row) + row, err := e.getRowInPlace(ctx, e.row, e.rows[e.curBatchCnt]) if err != nil { e.handleWarning(err) return nil