Skip to content

Commit

Permalink
rc: resource name support import into (#46902)
Browse files Browse the repository at this point in the history
close #46656
  • Loading branch information
okJiang authored Sep 13, 2023
1 parent 8f8c433 commit b2ebb3d
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 13 deletions.
10 changes: 5 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,9 +740,9 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt
return model.ReorgTypeNone, err
}
if variable.EnableDistTask.Load() {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli)
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName)
} else {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil)
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, job.ReorgMeta.ResourceGroupName)
}
if err != nil {
return model.ReorgTypeNone, err
Expand Down Expand Up @@ -921,7 +921,7 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
return true, 0, nil
}
ctx := logutil.WithCategory(w.ctx, "ddl-ingest")
bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, job.ID, nil)
bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, job.ID, nil, job.ReorgMeta.ResourceGroupName)
if err != nil {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1833,7 +1833,7 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgIn
// addTableIndex handles the add index reorganization state for a table.
func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
// TODO: Support typeAddIndexMergeTmpWorker.
if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx {
if reorgInfo.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx {
if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
err := w.executeDistGlobalTask(reorgInfo)
if err != nil {
Expand All @@ -1845,7 +1845,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
}
if indexInfo.Unique {
ctx := logutil.WithCategory(w.ctx, "ddl-ingest")
bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil)
bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// BackendCtxMgr is used to manage the backend context.
type BackendCtxMgr interface {
CheckAvailable() (bool, error)
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error)
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) {
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error) {
bc, exist := m.Load(jobID)
if !exist {
m.memRoot.RefreshConsumption()
Expand All @@ -100,7 +100,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
bd, err := createLocalBackend(ctx, cfg)
bd, err := createLocalBackend(ctx, cfg, resourceGroupName)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand All @@ -119,7 +119,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
return bc, nil
}

func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error) {
func createLocalBackend(ctx context.Context, cfg *Config, resourceGroupName string) (*local.Backend, error) {
tls, err := cfg.Lightning.ToTLS()
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Error(err))
Expand All @@ -134,7 +134,7 @@ func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error
if cfg.IsRaftKV2 {
raftKV2SwitchModeDuration = config.DefaultSwitchTiKVModeInterval
}
backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName, "", kvutil.ExplicitTypeDDL, raftKV2SwitchModeDuration)
backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName, resourceGroupName, kvutil.ExplicitTypeDDL, raftKV2SwitchModeDuration)
return local.NewBackend(ctx, tls, backendConfig, regionSizeGetter)
}

Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error) {
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string) (BackendCtx, error) {
logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
2 changes: 1 addition & 1 deletion ddl/stage_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl,
return nil, errors.New("index info not found")
}

bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, jobMeta.ID, d.etcdCli)
bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, jobMeta.ID, d.etcdCli, jobMeta.ReorgMeta.ResourceGroupName)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) {
tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);")

// Mock there is a running ingest job.
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil)
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, "")
require.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(2)
Expand Down

0 comments on commit b2ebb3d

Please sign in to comment.