diff --git a/README.md b/README.md index 0901dcf625edc..0bed69fe5a1bf 100644 --- a/README.md +++ b/README.md @@ -44,8 +44,7 @@ See the [Get Started](https://pingcap.github.io/tidb-dev-guide/get-started/intro You can join the following groups or channels to discuss or ask questions about TiDB, and to keep yourself informed of the latest TiDB updates: - Seek help when you use TiDB - - [TiDB Forum](https://ask.pingcap.com/) - - [Chinese TiDB Forum](https://asktug.com) + - TiDB Forum: [English](https://ask.pingcap.com/), [Chinese](https://asktug.com) - Slack channels: [#everyone](https://slack.tidb.io/invite?team=tidb-community&channel=everyone&ref=pingcap-tidb) (English), [#tidb-japan](https://slack.tidb.io/invite?team=tidb-community&channel=tidb-japan&ref=github-tidb) (Japanese) - [Stack Overflow](https://stackoverflow.com/questions/tagged/tidb) (questions tagged with #tidb) - Discuss TiDB's implementation and design diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index b09a1abad85ba..2cd903d3bc290 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -111,6 +111,7 @@ go_test( "//br/pkg/restore/split", "//br/pkg/utils", "//ddl", + "//keyspace", "//kv", "//parser", "//parser/ast", diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 8877c16ae7740..fe6cd110a026c 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -387,6 +387,7 @@ type DuplicateManager struct { tableName string splitCli split.SplitClient tikvCli *tikv.KVStore + tikvCodec tikv.Codec errorMgr *errormanager.ErrorManager decoder *kv.TableKVDecoder logger log.Logger @@ -401,6 +402,7 @@ func NewDuplicateManager( tableName string, splitCli split.SplitClient, tikvCli *tikv.KVStore, + tikvCodec tikv.Codec, errMgr *errormanager.ErrorManager, sessOpts *kv.SessionOptions, concurrency int, @@ -417,6 +419,7 @@ func NewDuplicateManager( tableName: tableName, splitCli: splitCli, tikvCli: tikvCli, + tikvCodec: tikvCodec, errorMgr: errMgr, decoder: decoder, logger: logger, @@ -439,6 +442,10 @@ func (m *DuplicateManager) RecordDataConflictError(ctx context.Context, stream D if err != nil { return errors.Trace(err) } + key, err = m.tikvCodec.DecodeKey(key) + if err != nil { + return errors.Trace(err) + } m.hasDupe.Store(true) h, err := m.decoder.DecodeHandleFromRowKey(key) @@ -504,6 +511,10 @@ func (m *DuplicateManager) RecordIndexConflictError(ctx context.Context, stream if err != nil { return errors.Trace(err) } + key, err = m.tikvCodec.DecodeKey(key) + if err != nil { + return errors.Trace(err) + } m.hasDupe.Store(true) h, err := m.decoder.DecodeHandleFromIndex(indexInfo, key, val) @@ -581,6 +592,11 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) { putToTaskFunc(ranges, indexInfo) }) } + + // Encode all the tasks + for i := range tasks { + tasks[i].StartKey, tasks[i].EndKey = m.tikvCodec.EncodeRange(tasks[i].StartKey, tasks[i].EndKey) + } return tasks, nil } diff --git a/br/pkg/lightning/backend/local/duplicate_test.go b/br/pkg/lightning/backend/local/duplicate_test.go index d1db76aae92f8..7c7d8a6182e25 100644 --- a/br/pkg/lightning/backend/local/duplicate_test.go +++ b/br/pkg/lightning/backend/local/duplicate_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" @@ -52,7 +53,7 @@ func TestBuildDupTask(t *testing.T) { {&lkv.SessionOptions{IndexID: info.Indices[1].ID}, false}, } for _, tc := range testCases { - dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, nil, + dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, keyspace.CodecV1, nil, tc.sessOpt, 4, atomic.NewBool(false), log.FromContext(context.Background())) require.NoError(t, err) tasks, err := local.BuildDuplicateTaskForTest(dupMgr) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 9b757ed91fde4..04ac7dce7a7fe 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/hack" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -1045,6 +1046,8 @@ type Writer struct { batchSize int64 lastMetaSeq int32 + + tikvCodec tikv.Codec } func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { @@ -1127,6 +1130,10 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [ return errorEngineClosed } + for i := range kvs { + kvs[i].Key = w.tikvCodec.EncodeKey(kvs[i].Key) + } + w.Lock() defer w.Unlock() diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 5b59ce5b37d65..05ae60c4497d7 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -349,12 +349,13 @@ func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.Che type local struct { engines sync.Map // sync version of map[uuid.UUID]*Engine - pdCtl *pdutil.PdController - splitCli split.SplitClient - tikvCli *tikvclient.KVStore - tls *common.TLS - pdAddr string - g glue.Glue + pdCtl *pdutil.PdController + splitCli split.SplitClient + tikvCli *tikvclient.KVStore + tls *common.TLS + pdAddr string + g glue.Glue + tikvCodec tikvclient.Codec localStoreDir string @@ -419,6 +420,7 @@ func NewLocalBackend( g glue.Glue, maxOpenFiles int, errorMgr *errormanager.ErrorManager, + keyspaceName string, ) (backend.Backend, error) { localFile := cfg.TikvImporter.SortedKVDir rangeConcurrency := cfg.TikvImporter.RangeConcurrency @@ -460,8 +462,19 @@ func NewLocalBackend( if err != nil { return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } - rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig())) - pdCliForTiKV := tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient()) + + var pdCliForTiKV *tikvclient.CodecPDClient + if keyspaceName == "" { + pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient()) + } else { + pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), keyspaceName) + if err != nil { + return backend.MakeBackend(nil), common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs() + } + } + + tikvCodec := pdCliForTiKV.GetCodec() + rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec)) tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli) if err != nil { return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() @@ -484,13 +497,14 @@ func NewLocalBackend( LastAlloc = alloc } local := &local{ - engines: sync.Map{}, - pdCtl: pdCtl, - splitCli: splitCli, - tikvCli: tikvCli, - tls: tls, - pdAddr: cfg.TiDB.PdAddr, - g: g, + engines: sync.Map{}, + pdCtl: pdCtl, + splitCli: splitCli, + tikvCli: tikvCli, + tls: tls, + pdAddr: cfg.TiDB.PdAddr, + g: g, + tikvCodec: tikvCodec, localStoreDir: localFile, rangeConcurrency: worker.NewPool(ctx, rangeConcurrency, "range"), @@ -975,6 +989,7 @@ func (local *local) WriteToTiKV( Start: firstKey, End: lastKey, }, + ApiVersion: local.tikvCodec.GetAPIVersion(), } leaderID := region.Leader.GetId() @@ -1676,7 +1691,7 @@ func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Tab }() atomicHasDupe := atomic.NewBool(false) - duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, + duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec, local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx)) if err != nil { return false, errors.Trace(err) @@ -1694,7 +1709,7 @@ func (local *local) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Ta }() atomicHasDupe := atomic.NewBool(false) - duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, + duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec, local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx)) if err != nil { return false, errors.Trace(err) @@ -1908,16 +1923,17 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon return nil, errors.Errorf("could not find engine for %s", engineUUID.String()) } engine := e.(*Engine) - return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer()) + return openLocalWriter(cfg, engine, local.tikvCodec, local.localWriterMemCacheSize, local.bufferPool.NewBuffer()) } -func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) { +func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, tikvCodec tikvclient.Codec, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) { w := &Writer{ engine: engine, memtableSizeLimit: cacheSize, kvBuffer: kvBuffer, isKVSorted: cfg.IsKVSorted, isWriteBatchSorted: true, + tikvCodec: tikvCodec, } // pre-allocate a long enough buffer to avoid a lot of runtime.growslice // this can help save about 3% of CPU. diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index a485bdecaca4a..04d63ffb7479b 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/keyspace" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" @@ -332,7 +333,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { pool := membuf.NewPool() defer pool.Destroy() kvBuffer := pool.NewBuffer() - w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer) + w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, keyspace.CodecV1, 1024, kvBuffer) require.NoError(t, err) ctx := context.Background() @@ -1290,6 +1291,7 @@ func TestCheckPeersBusy(t *testing.T) { bufferPool: membuf.NewPool(), supportMultiIngest: true, shouldCheckWriteStall: true, + tikvCodec: keyspace.CodecV1, } db, tmpPath := makePebbleDB(t, nil) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 46c38e112b57d..ae83d41efd7f6 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -412,6 +412,38 @@ var ( taskCfgRecorderKey = "taskCfgRecorderKey" ) +func getKeyspaceName(g glue.Glue) (string, error) { + db, err := g.GetDB() + if err != nil { + return "", err + } + if db == nil { + return "", nil + } + + rows, err := db.Query("show config where Type = 'tidb' and name = 'keyspace-name'") + if err != nil { + return "", err + } + //nolint: errcheck + defer rows.Close() + + var ( + _type string + _instance string + _name string + value string + ) + if rows.Next() { + err = rows.Scan(&_type, &_instance, &_name, &value) + if err != nil { + return "", err + } + } + + return value, rows.Err() +} + func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *options) (err error) { build.LogInfo(build.Lightning) o.logger.Info("cfg", zap.Stringer("cfg", taskCfg)) @@ -541,6 +573,13 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti dbMetas := mdl.GetDatabases() web.BroadcastInitProgress(dbMetas) + keyspaceName, err := getKeyspaceName(g) + if err != nil { + o.logger.Error("fail to get keyspace name", zap.Error(err)) + return errors.Trace(err) + } + o.logger.Info("acquired keyspace name", zap.String("keyspaceName", keyspaceName)) + param := &restore.ControllerParam{ DBMetas: dbMetas, Status: &l.status, @@ -550,6 +589,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti CheckpointStorage: o.checkpointStorage, CheckpointName: o.checkpointName, DupIndicator: o.dupIndicator, + KeyspaceName: keyspaceName, } var procedure *restore.Controller diff --git a/br/pkg/lightning/restore/BUILD.bazel b/br/pkg/lightning/restore/BUILD.bazel index 06e503e0519db..647c4cea4d191 100644 --- a/br/pkg/lightning/restore/BUILD.bazel +++ b/br/pkg/lightning/restore/BUILD.bazel @@ -45,6 +45,7 @@ go_library( "//br/pkg/version/build", "//ddl", "//errno", + "//keyspace", "//kv", "//meta/autoid", "//parser", diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 543eddc3435fd..1032419ab777f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/pingcap/tidb/keyspace" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" @@ -231,6 +232,8 @@ type Controller struct { preInfoGetter PreRestoreInfoGetter precheckItemBuilder *PrecheckItemBuilder + + keyspaceName string } // LightningStatus provides the finished bytes and total bytes of the current task. @@ -266,6 +269,8 @@ type ControllerParam struct { CheckpointName string // DupIndicator can expose the duplicate detection result to the caller DupIndicator *atomic.Bool + // Keyspace name + KeyspaceName string } func NewRestoreController( @@ -353,7 +358,7 @@ func NewRestoreControllerWithPauser( } } - backend, err = local.NewLocalBackend(ctx, tls, cfg, p.Glue, maxOpenFiles, errorMgr) + backend, err = local.NewLocalBackend(ctx, tls, cfg, p.Glue, maxOpenFiles, errorMgr, p.KeyspaceName) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err) } @@ -437,6 +442,8 @@ func NewRestoreControllerWithPauser( preInfoGetter: preInfoGetter, precheckItemBuilder: preCheckBuilder, + + keyspaceName: p.KeyspaceName, } return rc, nil @@ -1500,7 +1507,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { // Disable GC because TiDB enables GC already. kvStore, err = driver.TiKVDriver{}.OpenWithOptions( - fmt.Sprintf("tikv://%s?disableGC=true", rc.cfg.TiDB.PdAddr), + fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", rc.cfg.TiDB.PdAddr, rc.keyspaceName), driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()), ) if err != nil { @@ -2333,7 +2340,14 @@ func (cr *chunkRestore) deliverLoop( var startRealOffset, currRealOffset int64 // save to 0 at first for hasMoreKVs { - var dataChecksum, indexChecksum verify.KVChecksum + c := keyspace.CodecV1 + if t.kvStore != nil { + c = t.kvStore.GetCodec() + } + var ( + dataChecksum = verify.NewKVChecksumWithKeyspace(c) + indexChecksum = verify.NewKVChecksumWithKeyspace(c) + ) var columns []string var kvPacket []deliveredKVs // init these two field as checkpoint current value, so even if there are no kv pairs delivered, @@ -2360,7 +2374,7 @@ func (cr *chunkRestore) deliverLoop( hasMoreKVs = false break populate } - p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum) + p.kvs.ClassifyAndAppend(&dataKVs, dataChecksum, &indexKVs, indexChecksum) columns = p.columns currOffset = p.offset currRealOffset = p.realOffset @@ -2427,8 +2441,8 @@ func (cr *chunkRestore) deliverLoop( // No need to apply a lock since this is the only thread updating `cr.chunk.**`. // In local mode, we should write these checkpoints after engine flushed. lastOffset := cr.chunk.Chunk.Offset - cr.chunk.Checksum.Add(&dataChecksum) - cr.chunk.Checksum.Add(&indexChecksum) + cr.chunk.Checksum.Add(dataChecksum) + cr.chunk.Checksum.Add(indexChecksum) cr.chunk.Chunk.Offset = currOffset cr.chunk.Chunk.RealOffset = currRealOffset cr.chunk.Chunk.PrevRowIDMax = rowID diff --git a/br/pkg/lightning/verification/BUILD.bazel b/br/pkg/lightning/verification/BUILD.bazel index 9f308aeb81e11..122e4b0e8535f 100644 --- a/br/pkg/lightning/verification/BUILD.bazel +++ b/br/pkg/lightning/verification/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//br/pkg/lightning/common", + "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_zap//zapcore", ], ) diff --git a/br/pkg/lightning/verification/checksum.go b/br/pkg/lightning/verification/checksum.go index 0a44177f80fe4..9eb2130074eec 100644 --- a/br/pkg/lightning/verification/checksum.go +++ b/br/pkg/lightning/verification/checksum.go @@ -19,15 +19,18 @@ import ( "hash/crc64" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap/zapcore" ) var ecmaTable = crc64.MakeTable(crc64.ECMA) type KVChecksum struct { - bytes uint64 - kvs uint64 - checksum uint64 + base uint64 + prefixLen int + bytes uint64 + kvs uint64 + checksum uint64 } func NewKVChecksum(checksum uint64) *KVChecksum { @@ -36,6 +39,14 @@ func NewKVChecksum(checksum uint64) *KVChecksum { } } +func NewKVChecksumWithKeyspace(k tikv.Codec) *KVChecksum { + ks := k.GetKeyspace() + return &KVChecksum{ + base: crc64.Update(0, ecmaTable, ks), + prefixLen: len(ks), + } +} + func MakeKVChecksum(bytes uint64, kvs uint64, checksum uint64) KVChecksum { return KVChecksum{ bytes: bytes, @@ -45,10 +56,10 @@ func MakeKVChecksum(bytes uint64, kvs uint64, checksum uint64) KVChecksum { } func (c *KVChecksum) UpdateOne(kv common.KvPair) { - sum := crc64.Update(0, ecmaTable, kv.Key) + sum := crc64.Update(c.base, ecmaTable, kv.Key) sum = crc64.Update(sum, ecmaTable, kv.Val) - c.bytes += uint64(len(kv.Key) + len(kv.Val)) + c.bytes += uint64(c.prefixLen + len(kv.Key) + len(kv.Val)) c.kvs++ c.checksum ^= sum } @@ -62,11 +73,12 @@ func (c *KVChecksum) Update(kvs []common.KvPair) { ) for _, pair := range kvs { - sum = crc64.Update(0, ecmaTable, pair.Key) + sum = crc64.Update(c.base, ecmaTable, pair.Key) sum = crc64.Update(sum, ecmaTable, pair.Val) checksum ^= sum kvNum++ - bytes += (len(pair.Key) + len(pair.Val)) + bytes += c.prefixLen + bytes += len(pair.Key) + len(pair.Val) } c.bytes += uint64(bytes) diff --git a/cmd/explaintest/r/collation_check_use_collation_disabled.result b/cmd/explaintest/r/collation_check_use_collation_disabled.result index 2c0bd306f445b..9e633133b1f4f 100644 --- a/cmd/explaintest/r/collation_check_use_collation_disabled.result +++ b/cmd/explaintest/r/collation_check_use_collation_disabled.result @@ -154,4 +154,28 @@ insert into t1 values ('-1'); insert into t2 values (0x2d31, ''); select * from t1, t2 where t1.a in (t2.b, 3); a b c +drop table if exists t0; +drop table if exists t1; +CREATE TABLE t0(c0 BOOL, c1 INT); +CREATE TABLE t1 LIKE t0; +CREATE VIEW v0(c0) AS SELECT IS_IPV4(t0.c1) FROM t0, t1; +INSERT INTO t0(c0, c1) VALUES (true, 0); +INSERT INTO t1(c0, c1) VALUES (true, 2); +SELECT v0.c0 FROM v0; +c0 +0 +SELECT (v0.c0)NOT LIKE(BINARY v0.c0) FROM v0; +(v0.c0)NOT LIKE(BINARY v0.c0) +0 +SELECT v0.c0 FROM v0 WHERE (v0.c0)NOT LIKE(BINARY v0.c0); +c0 +desc format='brief' SELECT v0.c0 FROM v0 WHERE (v0.c0)NOT LIKE(BINARY v0.c0); +id estRows task access object operator info +Projection 80000000.00 root is_ipv4(cast(collation_check_use_collation.t0.c1, var_string(20)))->Column#7 +└─HashJoin 80000000.00 root CARTESIAN inner join + ├─Selection(Build) 8000.00 root not(like(cast(is_ipv4(cast(collation_check_use_collation.t0.c1, var_string(20))), var_string(20)), cast(is_ipv4(cast(collation_check_use_collation.t0.c1, var_string(20))), binary(1)), 92)) + │ └─TableReader 10000.00 root data:TableFullScan + │ └─TableFullScan 10000.00 cop[tikv] table:t0 keep order:false, stats:pseudo + └─TableReader(Probe) 10000.00 root data:TableFullScan + └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo use test diff --git a/cmd/explaintest/r/collation_check_use_collation_enabled.result b/cmd/explaintest/r/collation_check_use_collation_enabled.result index 838c6beba6535..3f1113fbcd868 100644 --- a/cmd/explaintest/r/collation_check_use_collation_enabled.result +++ b/cmd/explaintest/r/collation_check_use_collation_enabled.result @@ -173,4 +173,28 @@ insert into t1 values ('-1'); insert into t2 values (0x2d31, ''); select * from t1, t2 where t1.a in (t2.b, 3); a b c +drop table if exists t0; +drop table if exists t1; +CREATE TABLE t0(c0 BOOL, c1 INT); +CREATE TABLE t1 LIKE t0; +CREATE VIEW v0(c0) AS SELECT IS_IPV4(t0.c1) FROM t0, t1; +INSERT INTO t0(c0, c1) VALUES (true, 0); +INSERT INTO t1(c0, c1) VALUES (true, 2); +SELECT v0.c0 FROM v0; +c0 +0 +SELECT (v0.c0)NOT LIKE(BINARY v0.c0) FROM v0; +(v0.c0)NOT LIKE(BINARY v0.c0) +0 +SELECT v0.c0 FROM v0 WHERE (v0.c0)NOT LIKE(BINARY v0.c0); +c0 +desc format='brief' SELECT v0.c0 FROM v0 WHERE (v0.c0)NOT LIKE(BINARY v0.c0); +id estRows task access object operator info +Projection 80000000.00 root is_ipv4(cast(collation_check_use_collation.t0.c1, var_string(20)))->Column#7 +└─HashJoin 80000000.00 root CARTESIAN inner join + ├─Selection(Build) 8000.00 root not(like(cast(is_ipv4(cast(collation_check_use_collation.t0.c1, var_string(20))), var_string(20)), cast(is_ipv4(cast(collation_check_use_collation.t0.c1, var_string(20))), binary(1)), 92)) + │ └─TableReader 10000.00 root data:TableFullScan + │ └─TableFullScan 10000.00 cop[tikv] table:t0 keep order:false, stats:pseudo + └─TableReader(Probe) 10000.00 root data:TableFullScan + └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo use test diff --git a/cmd/explaintest/r/subquery.result b/cmd/explaintest/r/subquery.result index 0cf9302f425c0..ea5a17f2ff3e3 100644 --- a/cmd/explaintest/r/subquery.result +++ b/cmd/explaintest/r/subquery.result @@ -56,13 +56,11 @@ insert into exam values(1, 'math', 100); set names utf8 collate utf8_general_ci; explain format = 'brief' select * from stu where stu.name not in (select 'guo' from exam where exam.stu_id = stu.id); id estRows task access object operator info -Apply 10000.00 root CARTESIAN anti semi join, other cond:eq(test.stu.name, Column#8) +HashJoin 8000.00 root anti semi join, equal:[eq(test.stu.id, test.exam.stu_id)], other cond:eq(test.stu.name, "guo") ├─TableReader(Build) 10000.00 root data:TableFullScan -│ └─TableFullScan 10000.00 cop[tikv] table:stu keep order:false, stats:pseudo -└─Projection(Probe) 100000.00 root guo->Column#8 - └─TableReader 100000.00 root data:Selection - └─Selection 100000.00 cop[tikv] eq(test.exam.stu_id, test.stu.id) - └─TableFullScan 100000000.00 cop[tikv] table:exam keep order:false, stats:pseudo +│ └─TableFullScan 10000.00 cop[tikv] table:exam keep order:false, stats:pseudo +└─TableReader(Probe) 10000.00 root data:TableFullScan + └─TableFullScan 10000.00 cop[tikv] table:stu keep order:false, stats:pseudo select * from stu where stu.name not in (select 'guo' from exam where exam.stu_id = stu.id); id name set names utf8mb4; diff --git a/cmd/explaintest/t/collation_check_use_collation.test b/cmd/explaintest/t/collation_check_use_collation.test index 62fbea05ae628..adcd8695b38c0 100644 --- a/cmd/explaintest/t/collation_check_use_collation.test +++ b/cmd/explaintest/t/collation_check_use_collation.test @@ -110,5 +110,19 @@ insert into t1 values ('-1'); insert into t2 values (0x2d31, ''); select * from t1, t2 where t1.a in (t2.b, 3); +# issue 38736 +drop table if exists t0; +drop table if exists t1; +CREATE TABLE t0(c0 BOOL, c1 INT); +CREATE TABLE t1 LIKE t0; +CREATE VIEW v0(c0) AS SELECT IS_IPV4(t0.c1) FROM t0, t1; +INSERT INTO t0(c0, c1) VALUES (true, 0); +INSERT INTO t1(c0, c1) VALUES (true, 2); + +SELECT v0.c0 FROM v0; +SELECT (v0.c0)NOT LIKE(BINARY v0.c0) FROM v0; +SELECT v0.c0 FROM v0 WHERE (v0.c0)NOT LIKE(BINARY v0.c0); +desc format='brief' SELECT v0.c0 FROM v0 WHERE (v0.c0)NOT LIKE(BINARY v0.c0); + # cleanup environment use test diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go index 26344359dd6b9..a94639d89d6bb 100644 --- a/ddl/ingest/backend.go +++ b/ddl/ingest/backend.go @@ -19,7 +19,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" - "github.com/pingcap/tidb/br/pkg/lightning/config" + lightning "github.com/pingcap/tidb/br/pkg/lightning/config" tikv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" @@ -33,7 +33,7 @@ type BackendContext struct { jobID int64 backend *backend.Backend ctx context.Context - cfg *config.Config + cfg *lightning.Config EngMgr engineManager sysVars map[string]string diskRoot DiskRoot @@ -99,7 +99,7 @@ func (bc *BackendContext) Flush(indexID int64) error { logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID), zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()), zap.Uint64("max disk quota", bc.diskRoot.MaxQuota())) - err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)) + err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys)) if err != nil { logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("index ID", indexID), zap.Error(err), zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()), diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index 14bb4fb3aa67a..49788c9760b58 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -56,7 +56,7 @@ func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int if !ok { return nil, genBackendAllocMemFailedErr(m.memRoot, jobID) } - cfg, err := generateLightningConfig(m.memRoot, jobID, unique) + cfg, err := genConfig(m.memRoot, jobID, unique) if err != nil { logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err @@ -67,7 +67,7 @@ func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int return nil, err } - bcCtx := newBackendContext(ctx, jobID, &bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot) + bcCtx := newBackendContext(ctx, jobID, &bd, cfg.Lightning, defaultImportantVariables, m.memRoot, m.diskRoot) m.Store(jobID, bcCtx) m.memRoot.Consume(StructSizeBackendCtx) @@ -80,15 +80,16 @@ func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int return bc, nil } -func createLocalBackend(ctx context.Context, cfg *config.Config, glue glue.Glue) (backend.Backend, error) { - tls, err := cfg.ToTLS() +func createLocalBackend(ctx context.Context, cfg *Config, glue glue.Glue) (backend.Backend, error) { + tls, err := cfg.Lightning.ToTLS() if err != nil { logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Error(err)) return backend.Backend{}, err } - errorMgr := errormanager.New(nil, cfg, log.Logger{Logger: logutil.BgLogger()}) - return local.NewLocalBackend(ctx, tls, cfg, glue, int(LitRLimit), errorMgr) + logutil.BgLogger().Info("[ddl-ingest] create local backend for adding index", zap.String("keyspaceName", cfg.KeyspaceName)) + errorMgr := errormanager.New(nil, cfg.Lightning, log.Logger{Logger: logutil.BgLogger()}) + return local.NewLocalBackend(ctx, tls, cfg.Lightning, glue, int(LitRLimit), errorMgr, cfg.KeyspaceName) } func newBackendContext(ctx context.Context, jobID int64, be *backend.Backend, diff --git a/ddl/ingest/config.go b/ddl/ingest/config.go index 7fd251a361939..7362c2fbc5823 100644 --- a/ddl/ingest/config.go +++ b/ddl/ingest/config.go @@ -20,8 +20,8 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" - "github.com/pingcap/tidb/br/pkg/lightning/config" - tidbconf "github.com/pingcap/tidb/config" + lightning "github.com/pingcap/tidb/br/pkg/lightning/config" + tidb "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/size" "go.uber.org/zap" @@ -30,10 +30,16 @@ import ( // ImporterRangeConcurrencyForTest is only used for test. var ImporterRangeConcurrencyForTest *atomic.Int32 -func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config.Config, error) { - tidbCfg := tidbconf.GetGlobalConfig() - cfg := config.NewConfig() - cfg.TikvImporter.Backend = config.BackendLocal +// Config is the configuration for the lightning local backend used in DDL. +type Config struct { + Lightning *lightning.Config + KeyspaceName string +} + +func genConfig(memRoot MemRoot, jobID int64, unique bool) (*Config, error) { + tidbCfg := tidb.GetGlobalConfig() + cfg := lightning.NewConfig() + cfg.TikvImporter.Backend = lightning.BackendLocal // Each backend will build a single dir in lightning dir. cfg.TikvImporter.SortedKVDir = filepath.Join(LitSortPath, encodeBackendTag(jobID)) if ImporterRangeConcurrencyForTest != nil { @@ -47,9 +53,9 @@ func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config adjustImportMemory(memRoot, cfg) cfg.Checkpoint.Enable = true if unique { - cfg.TikvImporter.DuplicateResolution = config.DupeResAlgErr + cfg.TikvImporter.DuplicateResolution = lightning.DupeResAlgErr } else { - cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone + cfg.TikvImporter.DuplicateResolution = lightning.DupeResAlgNone } cfg.TiDB.PdAddr = tidbCfg.Path cfg.TiDB.Host = "127.0.0.1" @@ -59,7 +65,12 @@ func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config cfg.Security.CertPath = tidbCfg.Security.ClusterSSLCert cfg.Security.KeyPath = tidbCfg.Security.ClusterSSLKey - return cfg, err + c := &Config{ + Lightning: cfg, + KeyspaceName: tidb.GetGlobalKeyspaceName(), + } + + return c, err } var ( @@ -83,7 +94,7 @@ func generateLocalEngineConfig(id int64, dbName, tbName string) *backend.EngineC } // adjustImportMemory adjusts the lightning memory parameters according to the memory root's max limitation. -func adjustImportMemory(memRoot MemRoot, cfg *config.Config) { +func adjustImportMemory(memRoot MemRoot, cfg *lightning.Config) { var scale int64 // Try aggressive resource usage successful. if tryAggressiveMemory(memRoot, cfg) { @@ -104,8 +115,8 @@ func adjustImportMemory(memRoot MemRoot, cfg *config.Config) { return } - cfg.TikvImporter.LocalWriterMemCacheSize /= config.ByteSize(scale) - cfg.TikvImporter.EngineMemCacheSize /= config.ByteSize(scale) + cfg.TikvImporter.LocalWriterMemCacheSize /= lightning.ByteSize(scale) + cfg.TikvImporter.EngineMemCacheSize /= lightning.ByteSize(scale) // TODO: adjust range concurrency number to control total concurrency in the future. logutil.BgLogger().Info(LitInfoChgMemSetting, zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)), @@ -114,7 +125,7 @@ func adjustImportMemory(memRoot MemRoot, cfg *config.Config) { } // tryAggressiveMemory lightning memory parameters according memory root's max limitation. -func tryAggressiveMemory(memRoot MemRoot, cfg *config.Config) bool { +func tryAggressiveMemory(memRoot MemRoot, cfg *lightning.Config) bool { var defaultMemSize int64 defaultMemSize = int64(int(cfg.TikvImporter.LocalWriterMemCacheSize) * cfg.TikvImporter.RangeConcurrency) defaultMemSize += int64(cfg.TikvImporter.EngineMemCacheSize) diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index e8cd94d889188..fec246a5c5057 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1270,7 +1270,7 @@ func TestDisaggregatedTiFlash(t *testing.T) { tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") err = tk.ExecToErr("select * from t;") - require.Contains(t, err.Error(), "Please check tiflash_compute node is available") + require.Contains(t, err.Error(), "tiflash_compute node is unavailable") config.UpdateGlobal(func(conf *config.Config) { conf.DisaggregatedTiFlash = false @@ -1304,9 +1304,6 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) { require.NoError(t, err) tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") - needCheckTiFlashComputeNode := "false" - failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode)) - defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery") tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;") tk.MustExec("set @@tidb_partition_prune_mode = 'static';") diff --git a/expression/collation.go b/expression/collation.go index eebab0aa5bc1f..8b11d3198a40e 100644 --- a/expression/collation.go +++ b/expression/collation.go @@ -299,14 +299,6 @@ func deriveCollation(ctx sessionctx.Context, funcName string, args []Expression, return ec, nil } -// DeriveCollationFromExprs derives collation information from these expressions. -// Deprecated, use CheckAndDeriveCollationFromExprs instead. -// TODO: remove this function after the all usage is replaced by CheckAndDeriveCollationFromExprs -func DeriveCollationFromExprs(ctx sessionctx.Context, exprs ...Expression) (dstCharset, dstCollation string) { - collation := inferCollation(exprs...) - return collation.Charset, collation.Collation -} - // CheckAndDeriveCollationFromExprs derives collation information from these expressions, return error if derives collation error. func CheckAndDeriveCollationFromExprs(ctx sessionctx.Context, funcName string, evalType types.EvalType, args ...Expression) (et *ExprCollation, err error) { ec := inferCollation(args...) diff --git a/expression/integration_test.go b/expression/integration_test.go index a49df593d201c..5555de7e0aa62 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -7893,3 +7893,12 @@ func TestIssue39146(t *testing.T) { tk.MustExec("set @@tidb_enable_vectorized_expression = off;") tk.MustQuery(`select str_to_date(substr(dest,1,6),'%H%i%s') from sun;`).Check(testkit.Rows("20:23:10")) } + +func TestIssue40536(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("CREATE TABLE `6bf9e76d-ab44-4031-8a07-418b10741580` (\n `e0b5f703-6cfe-49b4-bc21-16a6455e43a7` set('7','va','ung60','ow','1g','gxwz5','uhnh','k','5la1','q8d9c','1f') NOT NULL DEFAULT '7,1g,uhnh,5la1,q8d9c',\n `fbc3527f-9617-4b9d-a5dc-4be31c00d8a5` datetime DEFAULT '6449-09-28 14:39:04',\n PRIMARY KEY (`e0b5f703-6cfe-49b4-bc21-16a6455e43a7`) /*T![clustered_index] CLUSTERED */\n) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin;") + tk.MustExec("CREATE TABLE `8919f3f4-25be-4a1a-904a-bb5e863d8fc8` (\n `9804d5f2-cbc7-43b7-b241-ea2656dc941a` enum('s951','36d','ua65','49yru','6l2em','4ea','jf2d2','vprsc','3yl7n','hz','ov') DEFAULT '4ea',\n `323cdbcb-0c14-4362-90ab-ea42caaed6a5` year(4) NOT NULL DEFAULT '1983',\n `b9b70f39-1a02-4114-9d7d-fa6259c1b691` time DEFAULT '20:18:04',\n PRIMARY KEY (`323cdbcb-0c14-4362-90ab-ea42caaed6a5`) /*T![clustered_index] CLUSTERED */,\n KEY `a704d6bb-772b-44ea-8cb0-6f7491c1aaa6` (`323cdbcb-0c14-4362-90ab-ea42caaed6a5`,`9804d5f2-cbc7-43b7-b241-ea2656dc941a`)\n) ENGINE=InnoDB DEFAULT CHARSET=ascii COLLATE=ascii_bin;") + tk.MustExec("delete from `6bf9e76d-ab44-4031-8a07-418b10741580` where not( `6bf9e76d-ab44-4031-8a07-418b10741580`.`e0b5f703-6cfe-49b4-bc21-16a6455e43a7` in ( select `9804d5f2-cbc7-43b7-b241-ea2656dc941a` from `8919f3f4-25be-4a1a-904a-bb5e863d8fc8` where `6bf9e76d-ab44-4031-8a07-418b10741580`.`e0b5f703-6cfe-49b4-bc21-16a6455e43a7` in ( '1f' ) and `6bf9e76d-ab44-4031-8a07-418b10741580`.`e0b5f703-6cfe-49b4-bc21-16a6455e43a7` in ( '1g' ,'va' ,'uhnh' ) ) ) and not( IsNull( `6bf9e76d-ab44-4031-8a07-418b10741580`.`e0b5f703-6cfe-49b4-bc21-16a6455e43a7` ) );\n") +} diff --git a/expression/util.go b/expression/util.go index 3f4b826239a1d..929dce489791d 100644 --- a/expression/util.go +++ b/expression/util.go @@ -415,7 +415,6 @@ func ColumnSubstituteImpl(expr Expression, schema *Schema, newExprs []Expression if v.InOperand { newExpr = SetExprColumnInOperand(newExpr) } - newExpr.SetCoercibility(v.Coercibility()) return true, false, newExpr case *ScalarFunction: substituted := false @@ -438,7 +437,11 @@ func ColumnSubstituteImpl(expr Expression, schema *Schema, newExprs []Expression // cowExprRef is a copy-on-write util, args array allocation happens only // when expr in args is changed refExprArr := cowExprRef{v.GetArgs(), nil} - _, coll := DeriveCollationFromExprs(v.GetCtx(), v.GetArgs()...) + oldCollEt, err := CheckAndDeriveCollationFromExprs(v.GetCtx(), v.FuncName.L, v.RetType.EvalType(), v.GetArgs()...) + if err != nil { + logutil.BgLogger().Error("Unexpected error happened during ColumnSubstitution", zap.Stack("stack")) + return false, false, v + } var tmpArgForCollCheck []Expression if collate.NewCollationEnabled() { tmpArgForCollCheck = make([]Expression, len(v.GetArgs())) @@ -454,9 +457,18 @@ func ColumnSubstituteImpl(expr Expression, schema *Schema, newExprs []Expression changed = false copy(tmpArgForCollCheck, refExprArr.Result()) tmpArgForCollCheck[idx] = newFuncExpr - _, newColl := DeriveCollationFromExprs(v.GetCtx(), tmpArgForCollCheck...) - if coll == newColl { - changed = checkCollationStrictness(coll, newFuncExpr.GetType().GetCollate()) + newCollEt, err := CheckAndDeriveCollationFromExprs(v.GetCtx(), v.FuncName.L, v.RetType.EvalType(), tmpArgForCollCheck...) + if err != nil { + logutil.BgLogger().Error("Unexpected error happened during ColumnSubstitution", zap.Stack("stack")) + return false, failed, v + } + if oldCollEt.Collation == newCollEt.Collation { + if newFuncExpr.GetType().GetCollate() == arg.GetType().GetCollate() && newFuncExpr.Coercibility() == arg.Coercibility() { + // It's safe to use the new expression, otherwise some cases in projection push-down will be wrong. + changed = true + } else { + changed = checkCollationStrictness(oldCollEt.Collation, newFuncExpr.GetType().GetCollate()) + } } } hasFail = hasFail || failed || oldChanged != changed diff --git a/parser/parser.go b/parser/parser.go index 0412b37bce71c..5aab37d65a5ef 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -15582,19 +15582,22 @@ yynewstate: { field := yyS[yypt-0].item.(*ast.SelectField) field.Offset = parser.startOffset(&yyS[yypt]) + if field.Expr != nil && field.AsName.O == "" { + endOffset := parser.yylval.offset + field.SetText(parser.lexer.client, strings.TrimSpace(parser.src[field.Offset:endOffset])) + } parser.yyVAL.item = []*ast.SelectField{field} } case 689: { fl := yyS[yypt-2].item.([]*ast.SelectField) - last := fl[len(fl)-1] - if last.Expr != nil && last.AsName.O == "" { - lastEnd := parser.endOffset(&yyS[yypt-1]) - last.SetText(parser.lexer.client, parser.src[last.Offset:lastEnd]) + field := yyS[yypt-0].item.(*ast.SelectField) + field.Offset = parser.startOffset(&yyS[yypt]) + if field.Expr != nil && field.AsName.O == "" { + endOffset := parser.yylval.offset + field.SetText(parser.lexer.client, strings.TrimSpace(parser.src[field.Offset:endOffset])) } - newField := yyS[yypt-0].item.(*ast.SelectField) - newField.Offset = parser.startOffset(&yyS[yypt]) - parser.yyVAL.item = append(fl, newField) + parser.yyVAL.item = append(fl, field) } case 690: { @@ -17543,30 +17546,6 @@ yynewstate: if yyS[yypt-1].item != nil { st.LockInfo = yyS[yypt-1].item.(*ast.SelectLockInfo) } - lastField := st.Fields.Fields[len(st.Fields.Fields)-1] - if lastField.Expr != nil && lastField.AsName.O == "" { - src := parser.src - var lastEnd int - if yyS[yypt-5].item != nil { - lastEnd = yyS[yypt-5].offset - 1 - } else if yyS[yypt-4].item != nil { - lastEnd = yyS[yypt-4].offset - 1 - } else if yyS[yypt-3].item != nil { - lastEnd = yyS[yypt-3].offset - 1 - } else if yyS[yypt-2].item != nil { - lastEnd = yyS[yypt-2].offset - 1 - } else if st.LockInfo != nil && st.LockInfo.LockType != ast.SelectLockNone { - lastEnd = yyS[yypt-1].offset - 1 - } else if yyS[yypt-0].item != nil { - lastEnd = yyS[yypt].offset - 1 - } else { - lastEnd = len(src) - if src[lastEnd-1] == ';' { - lastEnd-- - } - } - lastField.SetText(parser.lexer.client, src[lastField.Offset:lastEnd]) - } if yyS[yypt-5].item != nil { st.Where = yyS[yypt-5].item.(ast.ExprNode) } diff --git a/parser/parser.y b/parser/parser.y index 51015c919f364..11b53153ed9e7 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -5903,19 +5903,22 @@ FieldList: { field := $1.(*ast.SelectField) field.Offset = parser.startOffset(&yyS[yypt]) + if field.Expr != nil && field.AsName.O == "" { + endOffset := parser.yylval.offset + field.SetText(parser.lexer.client, strings.TrimSpace(parser.src[field.Offset:endOffset])) + } $$ = []*ast.SelectField{field} } | FieldList ',' Field { fl := $1.([]*ast.SelectField) - last := fl[len(fl)-1] - if last.Expr != nil && last.AsName.O == "" { - lastEnd := parser.endOffset(&yyS[yypt-1]) - last.SetText(parser.lexer.client, parser.src[last.Offset:lastEnd]) + field := $3.(*ast.SelectField) + field.Offset = parser.startOffset(&yyS[yypt]) + if field.Expr != nil && field.AsName.O == "" { + endOffset := parser.yylval.offset + field.SetText(parser.lexer.client, strings.TrimSpace(parser.src[field.Offset:endOffset])) } - newField := $3.(*ast.SelectField) - newField.Offset = parser.startOffset(&yyS[yypt]) - $$ = append(fl, newField) + $$ = append(fl, field) } GroupByClause: @@ -8677,30 +8680,6 @@ SelectStmt: if $6 != nil { st.LockInfo = $6.(*ast.SelectLockInfo) } - lastField := st.Fields.Fields[len(st.Fields.Fields)-1] - if lastField.Expr != nil && lastField.AsName.O == "" { - src := parser.src - var lastEnd int - if $2 != nil { - lastEnd = yyS[yypt-5].offset - 1 - } else if $3 != nil { - lastEnd = yyS[yypt-4].offset - 1 - } else if $4 != nil { - lastEnd = yyS[yypt-3].offset - 1 - } else if $5 != nil { - lastEnd = yyS[yypt-2].offset - 1 - } else if st.LockInfo != nil && st.LockInfo.LockType != ast.SelectLockNone { - lastEnd = yyS[yypt-1].offset - 1 - } else if $7 != nil { - lastEnd = yyS[yypt].offset - 1 - } else { - lastEnd = len(src) - if src[lastEnd-1] == ';' { - lastEnd-- - } - } - lastField.SetText(parser.lexer.client, src[lastField.Offset:lastEnd]) - } if $2 != nil { st.Where = $2.(ast.ExprNode) } @@ -10795,7 +10774,7 @@ ShowStmt: | "SHOW" "CREATE" "RESOURCE" "GROUP" ResourceGroupName { $$ = &ast.ShowStmt{ - Tp: ast.ShowCreateResourceGroup, + Tp: ast.ShowCreateResourceGroup, ResourceGroupName: $5, } } diff --git a/parser/parser_test.go b/parser/parser_test.go index 2fb9b073fd398..a5dbb70d9cf67 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -7142,3 +7142,21 @@ func TestTTLTableOption(t *testing.T) { RunTest(t, table, false) } + +func TestMultiStmt(t *testing.T) { + p := parser.New() + stmts, _, err := p.Parse("SELECT 'foo'; SELECT 'foo;bar','baz'; select 'foo' , 'bar' , 'baz' ;select 1", "", "") + require.NoError(t, err) + require.Equal(t, len(stmts), 4) + stmt1 := stmts[0].(*ast.SelectStmt) + stmt2 := stmts[1].(*ast.SelectStmt) + stmt3 := stmts[2].(*ast.SelectStmt) + stmt4 := stmts[3].(*ast.SelectStmt) + require.Equal(t, "'foo'", stmt1.Fields.Fields[0].Text()) + require.Equal(t, "'foo;bar'", stmt2.Fields.Fields[0].Text()) + require.Equal(t, "'baz'", stmt2.Fields.Fields[1].Text()) + require.Equal(t, "'foo'", stmt3.Fields.Fields[0].Text()) + require.Equal(t, "'bar'", stmt3.Fields.Fields[1].Text()) + require.Equal(t, "'baz'", stmt3.Fields.Fields[2].Text()) + require.Equal(t, "1", stmt4.Fields.Fields[0].Text()) +} diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index 3afbdf3b8a0bc..12c2730c59364 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -107,7 +107,6 @@ go_library( "//sessiontxn/staleread", "//statistics", "//statistics/handle", - "//store/driver/backoff", "//table", "//table/tables", "//table/temptable", diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 205a2d8242b4b..61575810da1fb 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1325,8 +1325,11 @@ func (ijHelper *indexJoinBuildHelper) resetContextForIndex(innerKeys []*expressi if ijHelper.curIdxOff2KeyOff[i] >= 0 { // Don't use the join columns if their collations are unmatched and the new collation is enabled. if collate.NewCollationEnabled() && types.IsString(idxCol.RetType.GetType()) && types.IsString(outerKeys[ijHelper.curIdxOff2KeyOff[i]].RetType.GetType()) { - _, coll := expression.DeriveCollationFromExprs(nil, idxCol, outerKeys[ijHelper.curIdxOff2KeyOff[i]]) - if !collate.CompatibleCollate(idxCol.GetType().GetCollate(), coll) { + et, err := expression.CheckAndDeriveCollationFromExprs(ijHelper.innerPlan.ctx, "equal", types.ETInt, idxCol, outerKeys[ijHelper.curIdxOff2KeyOff[i]]) + if err != nil { + logutil.BgLogger().Error("Unexpected error happened during constructing index join", zap.Stack("stack")) + } + if !collate.CompatibleCollate(idxCol.GetType().GetCollate(), et.Collation) { ijHelper.curIdxOff2KeyOff[i] = -1 } } diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 6c5d2c49fddf0..1ecc9f995243c 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2015,15 +2015,16 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid } // In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated. // So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask. - isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash + isDisaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash + isDisaggregatedTiFlashPath := isDisaggregatedTiFlash && ts.StoreType == kv.TiFlash canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed() if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash { if ts.KeepOrder { return invalidTask, nil } - if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) { + if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !isDisaggregatedTiFlash) { // If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution. - // But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table. + // But in disaggregated tiflash mode, we enable using mpp for static pruning partition table, because cop and batchCop is deprecated. ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return invalidTask, nil } @@ -2052,7 +2053,11 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid // So have to return a rootTask, but prop requires mppTask, cannot meet this requirement. task = invalidTask } else if prop.TaskTp == property.RootTaskType { - // when got here, canMppConvertToRootForDisaggregatedTiFlash is true. + // When got here, canMppConvertToRootForDisaggregatedTiFlash is true. + // This is for situations like cannot generate mppTask for some operators. + // Such as when the build side of HashJoin is Projection, + // which cannot pushdown to tiflash(because TiFlash doesn't support some expr in Proj) + // So HashJoin cannot pushdown to tiflash. But we still want TableScan to run on tiflash. task = mppTask task = task.convertToRootTask(ds.ctx) if !task.invalid() { diff --git a/planner/core/indexmerge_path.go b/planner/core/indexmerge_path.go index aad7f412441d3..27500e1d8d816 100644 --- a/planner/core/indexmerge_path.go +++ b/planner/core/indexmerge_path.go @@ -587,8 +587,7 @@ func (ds *DataSource) generateIndexMerge4MVIndex(normalPathCnt int, filters []ex } accessFilters, remainingFilters := ds.collectFilters4MVIndex(filters, idxCols) - if len(accessFilters) == 0 && // cannot use any filter on this MVIndex - !ds.possibleAccessPaths[idx].Forced { // whether this index is forced by use-index hint + if len(accessFilters) == 0 { // cannot use any filter on this MVIndex continue } diff --git a/planner/core/indexmerge_path_test.go b/planner/core/indexmerge_path_test.go index 6c2ab7a8ce087..b1487249b0902 100644 --- a/planner/core/indexmerge_path_test.go +++ b/planner/core/indexmerge_path_test.go @@ -15,6 +15,7 @@ package core_test import ( + "context" "fmt" "math/rand" "strings" @@ -224,6 +225,7 @@ func TestEnforceMVIndex(t *testing.T) { var output []struct { SQL string Plan []string + Err string } planSuiteData := core.GetIndexMergeSuiteData() planSuiteData.LoadTestCases(t, &input, &output) @@ -232,11 +234,21 @@ func TestEnforceMVIndex(t *testing.T) { testdata.OnRecord(func() { output[i].SQL = query }) - result := tk.MustQuery("explain format = 'brief' " + query) - testdata.OnRecord(func() { - output[i].Plan = testdata.ConvertRowsToStrings(result.Rows()) - }) - result.Check(testkit.Rows(output[i].Plan...)) + rs, err := tk.Exec("explain format = 'brief' " + query) + if err != nil { + testdata.OnRecord(func() { + output[i].Err = err.Error() + output[i].Plan = nil + }) + require.Equal(t, output[i].Err, err.Error()) + } else { + result := tk.ResultSetToResultWithCtx(context.Background(), rs, "") + testdata.OnRecord(func() { + output[i].Err = "" + output[i].Plan = testdata.ConvertRowsToStrings(result.Rows()) + }) + result.Check(testkit.Rows(output[i].Plan...)) + } } } @@ -270,6 +282,26 @@ func TestMVIndexInvisible(t *testing.T) { ` └─TableRowIDScan(Probe) 10.00 cop[tikv] table:t keep order:false, stats:pseudo`)) } +func TestMVIndexFullScan(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec(`create table t(j json, index kj((cast(j as signed array))))`) + tk.MustExec(`insert into t values ('[1]')`) + tk.MustExec(`insert into t values ('[1, 2]')`) + tk.MustExec(`insert into t values ('[]')`) + tk.MustExec(`insert into t values (NULL)`) + + tk.MustQuery(`select /*+ use_index_merge(t, kj) */ count(*) from t`).Check(testkit.Rows("4")) + tk.MustQuery(`select /*+ use_index_merge(t, kj) */ count(*) from t where (1 member of (j))`).Check(testkit.Rows("2")) + tk.MustQuery(`select /*+ use_index_merge(t, kj) */ count(*) from t where json_contains((j), '[1]')`).Check(testkit.Rows("2")) + tk.MustQuery(`select /*+ use_index_merge(t, kj) */ count(*) from t where json_overlaps((j), '[1]')`).Check(testkit.Rows("2")) + + // Forbid IndexMerge+IndexFullScan since IndexFullScan on MVIndex cannot read all rows some cases. + tk.MustGetErrMsg(`select /*+ use_index(t, kj) */ count(*) from t`, "[planner:1815]Internal : Can't find a proper physical plan for this query") +} + func TestMVIndexRandom(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -284,8 +316,8 @@ func TestMVIndexRandom(t *testing.T) { {"unsigned", randMVIndexValOpts{"unsigned", 0, 3}, randMVIndexValOpts{"unsigned", 0, 3}}, // unsigned-index + unsigned-values {"char(3)", randMVIndexValOpts{"string", 3, 3}, randMVIndexValOpts{"string", 3, 3}}, {"char(3)", randMVIndexValOpts{"string", 3, 3}, randMVIndexValOpts{"string", 1, 3}}, - //{"char(3)", randMVIndexValOpts{"string", 3, 3}, randMVIndexValOpts{"string", 5, 3}}, - //{"date", randMVIndexValOpts{"date", 0, 3}, randMVIndexValOpts{"date", 0, 3}}, + {"char(3)", randMVIndexValOpts{"string", 3, 3}, randMVIndexValOpts{"string", 5, 3}}, + {"date", randMVIndexValOpts{"date", 0, 3}, randMVIndexValOpts{"date", 0, 3}}, } { tk.MustExec("drop table if exists t") tk.MustExec(fmt.Sprintf(`create table t(a int, j json, index kj((cast(j as %v array))))`, testCase.indexType)) @@ -306,7 +338,7 @@ func TestMVIndexRandom(t *testing.T) { for i := 0; i < nQueries; i++ { conds := randMVIndexConds(rand.Intn(3)+1, testCase.queryValsOpts) r1 := tk.MustQuery("select /*+ ignore_index(t, kj) */ * from t where " + conds).Sort() - tk.MustQuery("select /*+ use_index(t, kj) */ * from t where " + conds).Sort().Check(r1.Rows()) + tk.MustQuery("select /*+ use_index_merge(t, kj) */ * from t where " + conds).Sort().Check(r1.Rows()) } } } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 2d73534fc2e1e..f6e566bac43a3 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -50,7 +49,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" - "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" @@ -67,7 +65,6 @@ import ( "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/size" - "github.com/tikv/client-go/v2/tikv" ) const ( @@ -692,13 +689,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { ds.preferStoreType = 0 return } - if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) { - // TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available. - errMsg := "No available tiflash_compute node" - warning := ErrInternal.GenWithStack(errMsg) - ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning) - return - } for _, path := range ds.possibleAccessPaths { if path.StoreType == kv.TiFlash { ds.preferStoreType |= preferTiFlash @@ -716,15 +706,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { } } -func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool { - bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil) - stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer()) - if err != nil || len(stores) == 0 { - return false - } - return true -} - func resetNotNullFlag(schema *expression.Schema, start, end int) { for i := start; i < end; i++ { col := *schema.Columns[i] diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 640c0f04630c1..a8a7a1918cfc0 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" @@ -2626,3 +2627,75 @@ func TestCountStarForTiFlash(t *testing.T) { tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) } } + +func TestHashAggPushdownToTiFlashCompute(t *testing.T) { + var ( + input []string + output []struct { + SQL string + Plan []string + Warning []string + } + ) + planSuiteData := core.GetPlanSuiteData() + planSuiteData.LoadTestCases(t, &input, &output) + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists tbl_15;") + tk.MustExec(`create table tbl_15 (col_89 text (473) collate utf8mb4_bin , + col_90 timestamp default '1976-04-03' , + col_91 tinyint unsigned not null , + col_92 tinyint , + col_93 double not null , + col_94 datetime not null default '1970-06-08' , + col_95 datetime default '2028-02-13' , + col_96 int unsigned not null default 2532480521 , + col_97 char (168) default '') partition by hash (col_91) partitions 4;`) + + tk.MustExec("drop table if exists tbl_16;") + tk.MustExec(`create table tbl_16 (col_98 text (246) not null , + col_99 decimal (30 ,19) , + col_100 mediumint unsigned , + col_101 text (410) collate utf8mb4_bin , + col_102 date not null , + col_103 timestamp not null default '2003-08-27' , + col_104 text (391) not null , + col_105 date default '2010-10-24' , + col_106 text (9) not null,primary key (col_100, col_98(5), col_103), + unique key idx_23 (col_100, col_106 (3), col_101 (3))) partition by hash (col_100) partitions 2;`) + + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = true + }) + defer config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + }) + + dom := domain.GetDomain(tk.Session()) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + require.True(t, exists) + for _, tblInfo := range db.Tables { + tableName := tblInfo.Name.L + if tableName == "tbl_15" || tableName == "tbl_16" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;") + tk.MustExec("set @@tidb_partition_prune_mode = 'static';") + tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash';") + + for i, ts := range input { + testdata.OnRecord(func() { + output[i].SQL = ts + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows()) + }) + tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) + } +} diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index d14a6bf51ea49..5535faa97ab92 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -26,7 +26,6 @@ import ( "unsafe" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -1453,8 +1452,6 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines() availableEngine := map[kv.StoreType]struct{}{} var availableEngineStr string - var outputComputeNodeErrMsg bool - noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx) for i := len(paths) - 1; i >= 0; i-- { // availableEngineStr is for warning message. if _, ok := availableEngine[paths[i].StoreType]; !ok { @@ -1464,20 +1461,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, } availableEngineStr += paths[i].StoreType.Name() } - _, exists := isolationReadEngines[paths[i].StoreType] - // Prune this path if: - // 1. path.StoreType doesn't exists in isolationReadEngines or - // 2. TiFlash is disaggregated and the number of tiflash_compute node is zero. - shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash - failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) { - // Ignore check if tiflash_compute node number. - // After we support disaggregated tiflash in test framework, can delete this failpoint. - shouldPruneTiFlashCompute = val.(bool) - }) - if shouldPruneTiFlashCompute { - outputComputeNodeErrMsg = true - } - if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute { + if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB { paths = append(paths[:i], paths[i+1:]...) } } @@ -1486,11 +1470,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, if len(paths) == 0 { helpMsg := "" if engineVals == "tiflash" { - if outputComputeNodeErrMsg { - helpMsg = ". Please check tiflash_compute node is available" - } else { - helpMsg = ". Please check tiflash replica or ensure the query is readonly" - } + helpMsg = ". Please check tiflash replica or ensure the query is readonly" } err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(), variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg)) diff --git a/planner/core/rule_aggregation_push_down.go b/planner/core/rule_aggregation_push_down.go index c9326929b550f..24aef4161a8ec 100644 --- a/planner/core/rule_aggregation_push_down.go +++ b/planner/core/rule_aggregation_push_down.go @@ -405,6 +405,12 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u if err != nil { return nil, err } + // Update mode of new generated firstRow as other agg funcs. + if len(agg.AggFuncs) != 0 { + firstRow.Mode = agg.AggFuncs[0].Mode + } else { + firstRow.Mode = aggregation.Partial1Mode + } newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow) } tmpSchema := expression.NewSchema(newAgg.GetGroupByCols()...) diff --git a/planner/core/task.go b/planner/core/task.go index 5d7ca6e5fd424..ff4e22756f15a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1163,6 +1163,12 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task { func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { for _, t := range tasks { if _, ok := t.(*mppTask); ok { + if p.TP() == plancodec.TypePartitionUnion { + // In attach2MppTasks(), will attach PhysicalUnion to mppTask directly. + // But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly. + // For now, return invalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask. + return invalidTask + } return p.attach2MppTasks(tasks...) } } diff --git a/planner/core/testdata/index_merge_suite_in.json b/planner/core/testdata/index_merge_suite_in.json index c64b00b983b78..d660364305397 100644 --- a/planner/core/testdata/index_merge_suite_in.json +++ b/planner/core/testdata/index_merge_suite_in.json @@ -7,7 +7,11 @@ "select /*+ use_index(t, kj) */ * from t where a<10", "select /*+ use_index(t, kj) */ * from t where (1 member of (j))", "select /*+ use_index(t, kj) */ * from t where (1 member of (j)) and a=10", - "select /*+ use_index(t, kj) */ * from t where (1 member of (j)) or a=10" + "select /*+ use_index(t, kj) */ * from t where (1 member of (j)) or a=10", + "select /*+ use_index_merge(t, kj) */ * from t", + "select /*+ use_index_merge(t, kj) */ a from t", + "select /*+ use_index_merge(t, kj) */ * from t where a<10", + "select /*+ use_index_merge(t, kj) */ * from t where (1 member of (j)) or a=10" ] }, { diff --git a/planner/core/testdata/index_merge_suite_out.json b/planner/core/testdata/index_merge_suite_out.json index a28164f5787fe..2c66948aca057 100644 --- a/planner/core/testdata/index_merge_suite_out.json +++ b/planner/core/testdata/index_merge_suite_out.json @@ -4,28 +4,18 @@ "Cases": [ { "SQL": "select /*+ use_index(t, kj) */ * from t", - "Plan": [ - "IndexMerge 10000.00 root type: union", - "├─IndexFullScan(Build) 10000.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) keep order:false, stats:pseudo", - "└─TableRowIDScan(Probe) 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" - ] + "Plan": null, + "Err": "[planner:1815]Internal : Can't find a proper physical plan for this query" }, { "SQL": "select /*+ use_index(t, kj) */ a from t", - "Plan": [ - "IndexMerge 10000.00 root type: union", - "├─IndexFullScan(Build) 10000.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) keep order:false, stats:pseudo", - "└─TableRowIDScan(Probe) 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" - ] + "Plan": null, + "Err": "[planner:1815]Internal : Can't find a proper physical plan for this query" }, { "SQL": "select /*+ use_index(t, kj) */ * from t where a<10", - "Plan": [ - "IndexMerge 3323.33 root type: union", - "├─IndexFullScan(Build) 10000.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) keep order:false, stats:pseudo", - "└─Selection(Probe) 3323.33 cop[tikv] lt(test.t.a, 10)", - " └─TableRowIDScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" - ] + "Plan": null, + "Err": "[planner:1815]Internal : Can't find a proper physical plan for this query" }, { "SQL": "select /*+ use_index(t, kj) */ * from t where (1 member of (j))", @@ -34,7 +24,8 @@ "└─IndexMerge 10.00 root type: union", " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo", " └─TableRowIDScan(Probe) 10.00 cop[tikv] table:t keep order:false, stats:pseudo" - ] + ], + "Err": "" }, { "SQL": "select /*+ use_index(t, kj) */ * from t where (1 member of (j)) and a=10", @@ -44,16 +35,47 @@ " ├─IndexRangeScan(Build) 10.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) range:[1,1], keep order:false, stats:pseudo", " └─Selection(Probe) 0.01 cop[tikv] eq(test.t.a, 10)", " └─TableRowIDScan 10.00 cop[tikv] table:t keep order:false, stats:pseudo" - ] + ], + "Err": "" }, { "SQL": "select /*+ use_index(t, kj) */ * from t where (1 member of (j)) or a=10", + "Plan": null, + "Err": "[planner:1815]Internal : Can't find a proper physical plan for this query" + }, + { + "SQL": "select /*+ use_index_merge(t, kj) */ * from t", + "Plan": [ + "TableReader 10000.00 root data:TableFullScan", + "└─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Err": "" + }, + { + "SQL": "select /*+ use_index_merge(t, kj) */ a from t", + "Plan": [ + "TableReader 10000.00 root data:TableFullScan", + "└─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Err": "" + }, + { + "SQL": "select /*+ use_index_merge(t, kj) */ * from t where a<10", + "Plan": [ + "TableReader 3323.33 root data:Selection", + "└─Selection 3323.33 cop[tikv] lt(test.t.a, 10)", + " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Err": "" + }, + { + "SQL": "select /*+ use_index_merge(t, kj) */ * from t where (1 member of (j)) or a=10", "Plan": [ "Selection 8000.00 root or(json_memberof(cast(1, json BINARY), test.t.j), eq(test.t.a, 10))", - "└─IndexMerge 10000.00 root type: union", - " ├─IndexFullScan(Build) 10000.00 cop[tikv] table:t, index:kj(cast(`j` as signed array)) keep order:false, stats:pseudo", - " └─TableRowIDScan(Probe) 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" - ] + "└─TableReader 10000.00 root data:TableFullScan", + " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Err": "" } ] }, diff --git a/planner/core/testdata/plan_suite_in.json b/planner/core/testdata/plan_suite_in.json index d433f5dd88dbe..6f6e74fac3cfa 100644 --- a/planner/core/testdata/plan_suite_in.json +++ b/planner/core/testdata/plan_suite_in.json @@ -1186,5 +1186,13 @@ "select a, count(*) from t group by a -- shouldn't be rewritten", "select sum(a) from t -- sum shouldn't be rewritten" ] + }, + { + "name": "TestHashAggPushdownToTiFlashCompute", + "cases": [ + "select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;", + "select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;", + "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;" + ] } ] diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index 31964823e95f2..14213e2223dab 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -7501,5 +7501,104 @@ "Warning": null } ] + }, + { + "Name": "TestHashAggPushdownToTiFlashCompute", + "Cases": [ + { + "SQL": "select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;", + "Plan": [ + "Limit 1.00 root offset:0, count:79", + "└─Sort 1.00 root Column#11, Column#12, Column#13, Column#14", + " └─HashAgg 1.00 root funcs:avg(distinct Column#89)->Column#11, funcs:min(Column#90)->Column#12, funcs:sum(distinct Column#91)->Column#13, funcs:max(Column#92)->Column#14", + " └─Projection 7100.44 root cast(test.tbl_15.col_96, decimal(10,0) UNSIGNED BINARY)->Column#89, Column#15, cast(test.tbl_15.col_91, decimal(3,0) UNSIGNED BINARY)->Column#91, Column#16", + " └─PartitionUnion 7100.44 root ", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#18)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#20)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#18, funcs:max(test.tbl_15.col_92)->Column#20", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#30)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#32)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#30, funcs:max(test.tbl_15.col_92)->Column#32", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#42)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#44)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#42, funcs:max(test.tbl_15.col_92)->Column#44", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", + " └─TableReader 1775.11 root data:ExchangeSender", + " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#54)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#56)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#54, funcs:max(test.tbl_15.col_92)->Column#56", + " └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─PartitionUnion 4.00 root ", + " ├─HashAgg 1.00 root funcs:count(Column#13)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#13", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", + " ├─HashAgg 1.00 root funcs:count(Column#14)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#14", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", + " ├─HashAgg 1.00 root funcs:count(Column#15)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#15", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", + " └─HashAgg 1.00 root funcs:count(Column#16)->Column#12", + " └─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#16", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;", + "Plan": [ + "TopN 20.00 root Column#10, offset:0, count:20", + "└─HashAgg 63.95 root group by:test.tbl_16.col_100, funcs:avg(Column#11, Column#12)->Column#10", + " └─PartitionUnion 63.95 root ", + " ├─StreamAgg 31.98 root group by:Column#22, funcs:count(Column#19)->Column#11, funcs:sum(Column#20)->Column#12, funcs:firstrow(Column#21)->test.tbl_16.col_100", + " │ └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#20, test.tbl_16.col_100, test.tbl_16.col_100", + " │ └─Sort 39.97 root test.tbl_16.col_100", + " │ └─TableReader 39.97 root data:ExchangeSender", + " │ └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", + " │ └─Selection 39.97 mpp[tiflash] or(eq(test.tbl_16.col_100, 10672141), in(test.tbl_16.col_104, \"yfEG1t!*b\", \"C1*bqx_qyO\", \"vQ^yUpKHr&j#~\"))", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_16, partition:p0 keep order:false, stats:pseudo", + " └─StreamAgg 31.98 root group by:Column#26, funcs:count(Column#23)->Column#11, funcs:sum(Column#24)->Column#12, funcs:firstrow(Column#25)->test.tbl_16.col_100", + " └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#24, test.tbl_16.col_100, test.tbl_16.col_100", + " └─Sort 39.97 root test.tbl_16.col_100", + " └─TableReader 39.97 root data:ExchangeSender", + " └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", + " └─Selection 39.97 mpp[tiflash] or(eq(test.tbl_16.col_100, 10672141), in(test.tbl_16.col_104, \"yfEG1t!*b\", \"C1*bqx_qyO\", \"vQ^yUpKHr&j#~\"))", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_16, partition:p1 keep order:false, stats:pseudo" + ], + "Warning": null + } + ] } ] diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index d732665bb45a4..8a28b91b6cb2c 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -113,7 +113,6 @@ func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv // TiKVDriver implements engine TiKV. type TiKVDriver struct { - keyspaceName string pdConfig config.PDClient security config.Security tikvConfig config.TiKVClient diff --git a/ttl/cache/task.go b/ttl/cache/task.go index c88a3fe0abb2e..3ef1e2ebee811 100644 --- a/ttl/cache/task.go +++ b/ttl/cache/task.go @@ -59,7 +59,7 @@ func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{}) // PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) { - return selectFromTTLTask + " WHERE status = 'waiting' OR owner_hb_time < %? ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit} + return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit} } // InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task diff --git a/ttl/ttlworker/task_manager_integration_test.go b/ttl/ttlworker/task_manager_integration_test.go index 9e3bad19b2acd..0419a14f4133f 100644 --- a/ttl/ttlworker/task_manager_integration_test.go +++ b/ttl/ttlworker/task_manager_integration_test.go @@ -186,6 +186,17 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2}) m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2")) + + // another task manager shouldn't fetch this task if it has finished + task := m2.GetRunningTasks()[0] + task.SetResult(nil) + m2.CheckFinishedTask(sessionFactory(), now) + scanWorker3 := ttlworker.NewMockScanWorker(t) + scanWorker3.Start() + m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3") + m3.SetScanWorkers4Test([]ttlworker.Worker{scanWorker3}) + m3.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) + tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("finished task-manager-2")) } func TestTaskMetrics(t *testing.T) { diff --git a/ttl/ttlworker/task_manager_test.go b/ttl/ttlworker/task_manager_test.go index 37365cb9757f6..cffb8e071b62d 100644 --- a/ttl/ttlworker/task_manager_test.go +++ b/ttl/ttlworker/task_manager_test.go @@ -54,6 +54,24 @@ func (m *taskManager) ReportMetrics() { m.reportMetrics() } +// CheckFinishedTask is an exported version of checkFinishedTask +func (m *taskManager) CheckFinishedTask(se session.Session, now time.Time) { + m.checkFinishedTask(se, now) +} + +// ReportTaskFinished is an exported version of reportTaskFinished +func (m *taskManager) GetRunningTasks() []*runningScanTask { + return m.runningTasks +} + +// ReportTaskFinished is an exported version of reportTaskFinished +func (t *runningScanTask) SetResult(err error) { + t.result = &ttlScanTaskExecResult{ + task: t.ttlScanTask, + err: err, + } +} + func TestResizeWorkers(t *testing.T) { tbl := newMockTTLTbl(t, "t1")