Skip to content

Commit

Permalink
Merge branch 'non-prep-dml-hints' into non-prep-fixupup
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Apr 17, 2023
2 parents 1a46daa + 1bb7784 commit 95d200c
Show file tree
Hide file tree
Showing 33 changed files with 933 additions and 624 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ func (en engine) GetID() int32 {
return en.id
}

func (en engine) GetUUID() uuid.UUID {
return en.uuid
}

// ClosedEngine represents a closed engine, allowing ingestion into the target.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
Expand Down
12 changes: 12 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,18 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges
return firstErr.Get()
}

// GetImportedKVCount returns the number of imported KV pairs of some engine.
func (local *Backend) GetImportedKVCount(engineUUID uuid.UUID) int64 {
v, ok := local.engines.Load(engineUUID)
if !ok {
// we get it after import, but before clean up, so this should not happen
// todo: return error
return 0
}
e := v.(*Engine)
return e.importedKVCount.Load()
}

// ResetEngine reset the engine and reclaim the space.
func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error {
// the only way to reset the engine + reclaim the space is to delete and reopen it 🤷
Expand Down
1 change: 1 addition & 0 deletions disttask/loaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
"//disttask/framework/scheduler",
"//executor/asyncloaddata",
"//executor/importer",
"//table/tables",
"//util/logutil",
Expand Down
2 changes: 2 additions & 0 deletions disttask/loaddata/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/disttask/framework/dispatcher"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/executor/asyncloaddata"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -90,6 +91,7 @@ func generateSubtaskMetas(ctx context.Context, taskMeta *TaskMeta) ([]*SubtaskMe

tableImporter, err := importer.NewTableImporter(&importer.JobImportParam{
GroupCtx: ctx,
Progress: asyncloaddata.NewProgress(controller.ImportMode == importer.LogicalImportMode),
}, controller)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions disttask/loaddata/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/scheduler"
"github.com/pingcap/tidb/executor/asyncloaddata"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -55,6 +56,7 @@ func (s *ImportScheduler) InitSubtaskExecEnv(ctx context.Context) error {

tableImporter, err := importer.NewTableImporter(&importer.JobImportParam{
GroupCtx: ctx,
Progress: asyncloaddata.NewProgress(controller.ImportMode == importer.LogicalImportMode),
}, controller)
if err != nil {
return err
Expand Down
13 changes: 1 addition & 12 deletions executor/asyncloaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,18 @@ go_test(
name = "asyncloaddata_test",
timeout = "short",
srcs = [
"detach_test.go",
"main_test.go",
"operate_test.go",
"progress_test.go",
"show_test.go",
"util_test.go",
],
embed = [":asyncloaddata"],
flaky = True,
race = "on",
shard_count = 7,
shard_count = 6,
deps = [
"//br/pkg/lightning/config",
"//executor",
"//executor/importer",
"//kv",
"//parser/auth",
"//testkit",
"//util/sqlexec",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
"@org_uber_go_goleak//:goleak",
],
)
88 changes: 0 additions & 88 deletions executor/asyncloaddata/detach_test.go

This file was deleted.

46 changes: 35 additions & 11 deletions executor/asyncloaddata/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,53 @@ import (
"go.uber.org/atomic"
)

// LogicalImportProgress is the progress info of the logical import mode.
type LogicalImportProgress struct {
// LoadedFileSize is the size of the data that's loaded in bytes. It's
// larger than the actual loaded data size, but due to the fact that reading
// is once-a-block and a block may generate multiple tasks that are
// concurrently executed, we can't know the actual loaded data size easily.
LoadedFileSize atomic.Int64
}

// PhysicalImportProgress is the progress info of the physical import mode.
type PhysicalImportProgress struct {
// EncodeFileSize is the size of the file that has finished KV encoding in bytes.
// it should equal to SourceFileSize eventually.
EncodeFileSize atomic.Int64
}

// Progress is the progress of the LOAD DATA task.
type Progress struct {
// SourceFileSize is the size of the source file in bytes. When we can't get
// the size of the source file, it will be set to -1.
// Currently, the value is read by seek(0, end), when LOAD DATA LOCAL we wrap
// SimpleSeekerOnReadCloser on MySQL client connection which doesn't support
// it.
SourceFileSize int64
// LoadedFileSize is the size of the data that will be loaded in bytes. It's
// larger than the actual loaded data size, but due to the fact that reading
// is once-a-block and a block may generate multiple tasks that are
// concurrently executed, we can't know the actual loaded data size easily.
LoadedFileSize *atomic.Int64
SourceFileSize int64
*LogicalImportProgress `json:",inline"`
*PhysicalImportProgress `json:",inline"`
// LoadedRowCnt is the number of rows that has been loaded.
LoadedRowCnt *atomic.Uint64
// for physical mode, it's the number of rows that has been imported into TiKV.
// in SHOW LOAD JOB we call it Imported_Rows, to make it compatible with 7.0,
// the variable name is not changed.
LoadedRowCnt atomic.Uint64
}

// NewProgress creates a new Progress.
func NewProgress() *Progress {
// todo: better pass import mode, but it causes import cycle.
func NewProgress(logicalImport bool) *Progress {
var li *LogicalImportProgress
var pi *PhysicalImportProgress
if logicalImport {
li = &LogicalImportProgress{}
} else {
pi = &PhysicalImportProgress{}
}
return &Progress{
SourceFileSize: -1,
LoadedFileSize: atomic.NewInt64(0),
LoadedRowCnt: atomic.NewUint64(0),
SourceFileSize: -1,
LogicalImportProgress: li,
PhysicalImportProgress: pi,
}
}

Expand Down
19 changes: 18 additions & 1 deletion executor/asyncloaddata/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

func TestProgressMarshalUnmarshal(t *testing.T) {
p := NewProgress()
p := NewProgress(true)
require.Nil(t, p.PhysicalImportProgress)
p.SourceFileSize = 123
p.LoadedFileSize.Store(456)
p.LoadedRowCnt.Store(789)
Expand All @@ -35,4 +36,20 @@ func TestProgressMarshalUnmarshal(t *testing.T) {
require.Equal(t, int64(111), p2.SourceFileSize)
require.Equal(t, int64(222), p2.LoadedFileSize.Load())
require.Equal(t, uint64(333), p2.LoadedRowCnt.Load())

p = NewProgress(false)
require.Nil(t, p.LogicalImportProgress)
p.SourceFileSize = 123
p.EncodeFileSize.Store(100)
p.LoadedRowCnt.Store(789)

s = p.String()
require.Equal(t, `{"SourceFileSize":123,"EncodeFileSize":100,"LoadedRowCnt":789}`, s)

s2 = `{"SourceFileSize":111,"EncodeFileSize":222,"LoadedRowCnt":333}`
p2, err = ProgressFromJSON([]byte(s2))
require.NoError(t, err)
require.Equal(t, int64(111), p2.SourceFileSize)
require.Equal(t, int64(222), p2.EncodeFileSize.Load())
require.Equal(t, uint64(333), p2.LoadedRowCnt.Load())
}
16 changes: 13 additions & 3 deletions executor/asyncloaddata/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net/url"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand All @@ -33,6 +34,14 @@ import (
"go.uber.org/zap"
)

// vars used for test.
var (
// TestSyncCh is used in unit test to synchronize the execution of LOAD DATA.
TestSyncCh = make(chan struct{})
// TestLastLoadDataJobID last created job id, used in unit test.
TestLastLoadDataJobID atomic.Int64
)

// Job import job.
type Job struct {
ID int64
Expand Down Expand Up @@ -84,12 +93,13 @@ func CreateLoadDataJob(
if len(rows) != 1 {
return nil, errors.Errorf("unexpected result length: %d", len(rows))
}

failpoint.Inject("SaveLastLoadDataJobID", func() {
TestLastLoadDataJobID.Store(rows[0].GetInt64(0))
})
return NewJob(rows[0].GetInt64(0), conn, user), nil
}

// TestSyncCh is used in unit test to synchronize the execution of LOAD DATA.
var TestSyncCh = make(chan struct{})

// StartJob tries to start a not-yet-started job with jobID. It will not return
// error when there's no matched job.
func (j *Job) StartJob(ctx context.Context) error {
Expand Down
1 change: 1 addition & 0 deletions executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"//util/stringutil",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
Loading

0 comments on commit 95d200c

Please sign in to comment.