Skip to content

Commit

Permalink
loaddata: revert concurrent writing to TiKV and WITH detached (#44056)
Browse files Browse the repository at this point in the history
ref #44078
  • Loading branch information
lance6716 authored May 23, 2023
1 parent 8c1fae8 commit 0ac7473
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 277 deletions.
1 change: 0 additions & 1 deletion executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ go_test(
"join_pkg_test.go",
"join_test.go",
"joiner_test.go",
"load_data_test.go",
"main_test.go",
"memtable_reader_test.go",
"merge_join_test.go",
Expand Down
3 changes: 1 addition & 2 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,13 @@ const (
maxWriteSpeedOption = "max_write_speed"
splitFileOption = "split_file"
recordErrorsOption = "record_errors"
detachedOption = "detached"

// test option, not for user
distributedOption = "__distributed"
)

var (
detachedOption = plannercore.DetachedOption

// name -> whether the option has value
supportedOptions = map[string]bool{
importModeOption: true,
Expand Down
10 changes: 10 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -93,6 +94,11 @@ type InsertValues struct {

stats *InsertRuntimeStat

// in LOAD DATA, one InsertValues is used by two goroutine, we need to lock
// when using the txn
isLoadData bool
txnInUse sync.Mutex

// fkChecks contains the foreign key checkers.
fkChecks []*FKCheckExec
fkCascades []*FKCascadeExec
Expand Down Expand Up @@ -1050,6 +1056,10 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F
if shardFmt.IncrementalMask()&autoRandomID != autoRandomID {
return 0, autoid.ErrAutoRandReadFailed
}
if e.isLoadData {
e.txnInUse.Lock()
defer e.txnInUse.Unlock()
}
_, err = e.ctx.Txn(true)
if err != nil {
return 0, err
Expand Down
Loading

0 comments on commit 0ac7473

Please sign in to comment.