Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rc: resource name support import into #46902

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,9 +740,9 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt
return model.ReorgTypeNone, err
}
if variable.EnableDistTask.Load() {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli)
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName)
} else {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil)
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, job.ReorgMeta.ResourceGroupName)
}
if err != nil {
return model.ReorgTypeNone, err
Expand Down Expand Up @@ -921,7 +921,7 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
return true, 0, nil
}
ctx := logutil.WithCategory(w.ctx, "ddl-ingest")
bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, job.ID, nil)
bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, job.ID, nil, job.ReorgMeta.ResourceGroupName)
if err != nil {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1819,7 +1819,7 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgIn
// addTableIndex handles the add index reorganization state for a table.
func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
// TODO: Support typeAddIndexMergeTmpWorker.
if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx {
if reorgInfo.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx {
if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
err := w.executeDistGlobalTask(reorgInfo)
if err != nil {
Expand All @@ -1831,7 +1831,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
}
if indexInfo.Unique {
ctx := logutil.WithCategory(w.ctx, "ddl-ingest")
bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil)
bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// BackendCtxMgr is used to manage the backend context.
type BackendCtxMgr interface {
CheckAvailable() (bool, error)
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error)
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) {
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error) {
bc, exist := m.Load(jobID)
if !exist {
m.memRoot.RefreshConsumption()
Expand All @@ -100,7 +100,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
bd, err := createLocalBackend(ctx, cfg)
bd, err := createLocalBackend(ctx, cfg, resourceGroupName)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand All @@ -119,7 +119,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
return bc, nil
}

func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error) {
func createLocalBackend(ctx context.Context, cfg *Config, resourceGroupName string) (*local.Backend, error) {
tls, err := cfg.Lightning.ToTLS()
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Error(err))
Expand All @@ -134,7 +134,7 @@ func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error
if cfg.IsRaftKV2 {
raftKV2SwitchModeDuration = config.DefaultSwitchTiKVModeInterval
}
backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName, "", kvutil.ExplicitTypeDDL, raftKV2SwitchModeDuration)
backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName, resourceGroupName, kvutil.ExplicitTypeDDL, raftKV2SwitchModeDuration)
return local.NewBackend(ctx, tls, backendConfig, regionSizeGetter)
}

Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error) {
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string) (BackendCtx, error) {
logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
2 changes: 1 addition & 1 deletion ddl/stage_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl,
return nil, errors.New("index info not found")
}

bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, jobMeta.ID, d.etcdCli)
bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, jobMeta.ID, d.etcdCli, jobMeta.ReorgMeta.ResourceGroupName)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) {
tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);")

// Mock there is a running ingest job.
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil)
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, "")
require.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(2)
Expand Down