diff --git a/ddl/index.go b/ddl/index.go index d664472f7539f..be91c301e55b0 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -740,9 +740,9 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt return model.ReorgTypeNone, err } if variable.EnableDistTask.Load() { - _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli) + _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName) } else { - _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil) + _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, job.ReorgMeta.ResourceGroupName) } if err != nil { return model.ReorgTypeNone, err @@ -921,7 +921,7 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, return true, 0, nil } ctx := logutil.WithCategory(w.ctx, "ddl-ingest") - bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, job.ID, nil) + bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, job.ID, nil, job.ReorgMeta.ResourceGroupName) if err != nil { ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) return false, ver, errors.Trace(err) @@ -1819,7 +1819,7 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgIn // addTableIndex handles the add index reorganization state for a table. func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { // TODO: Support typeAddIndexMergeTmpWorker. - if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx { + if reorgInfo.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx { if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { err := w.executeDistGlobalTask(reorgInfo) if err != nil { @@ -1831,7 +1831,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { } if indexInfo.Unique { ctx := logutil.WithCategory(w.ctx, "ddl-ingest") - bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil) + bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil, reorgInfo.ReorgMeta.ResourceGroupName) if err != nil { return err } diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index c3254c134dd1c..d1e9f907e460c 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -35,7 +35,7 @@ import ( // BackendCtxMgr is used to manage the backend context. type BackendCtxMgr interface { CheckAvailable() (bool, error) - Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) + Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error) Unregister(jobID int64) Load(jobID int64) (BackendCtx, bool) } @@ -87,7 +87,7 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) { } // Register creates a new backend and registers it to the backend context. -func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) { +func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error) { bc, exist := m.Load(jobID) if !exist { m.memRoot.RefreshConsumption() @@ -100,7 +100,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6 logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err } - bd, err := createLocalBackend(ctx, cfg) + bd, err := createLocalBackend(ctx, cfg, resourceGroupName) if err != nil { logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err @@ -119,7 +119,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6 return bc, nil } -func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error) { +func createLocalBackend(ctx context.Context, cfg *Config, resourceGroupName string) (*local.Backend, error) { tls, err := cfg.Lightning.ToTLS() if err != nil { logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Error(err)) @@ -134,7 +134,7 @@ func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error if cfg.IsRaftKV2 { raftKV2SwitchModeDuration = config.DefaultSwitchTiKVModeInterval } - backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName, "", kvutil.ExplicitTypeDDL, raftKV2SwitchModeDuration) + backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName, resourceGroupName, kvutil.ExplicitTypeDDL, raftKV2SwitchModeDuration) return local.NewBackend(ctx, tls, backendConfig, regionSizeGetter) } diff --git a/ddl/ingest/mock.go b/ddl/ingest/mock.go index b2e8d9ed32675..e0c8812b28c9b 100644 --- a/ddl/ingest/mock.go +++ b/ddl/ingest/mock.go @@ -47,7 +47,7 @@ func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) { } // Register implements BackendCtxMgr.Register interface. -func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error) { +func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string) (BackendCtx, error) { logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index 852052f12f9ca..c8274be428c28 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -65,7 +65,7 @@ func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, return nil, errors.New("index info not found") } - bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, jobMeta.ID, d.etcdCli) + bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, jobMeta.ID, d.etcdCli, jobMeta.ReorgMeta.ResourceGroupName) if err != nil { return nil, errors.Trace(err) } diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 6bc617e50d1ac..becc553e5a98d 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -84,7 +84,7 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) { tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);") // Mock there is a running ingest job. - _, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil) + _, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, "") require.NoError(t, err) wg := &sync.WaitGroup{} wg.Add(2)