Skip to content

Commit

Permalink
Merge branch 'replace_datumcompare_12' of github.com:wjhuang2016/tidb…
Browse files Browse the repository at this point in the history
… into replace_datumcompare_12
  • Loading branch information
wjhuang2016 committed Dec 9, 2021
2 parents c369069 + 6000688 commit 8e18988
Show file tree
Hide file tree
Showing 14 changed files with 897 additions and 678 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ gotest_in_verify_ci_part_2: failpoint-enable tools/bin/gotestsum tools/bin/gocov

race: failpoint-enable
@export log_level=debug; \
$(GOTEST) -timeout 20m -race $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
$(GOTEST) -timeout 25m -race $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

leak: failpoint-enable
Expand Down
14 changes: 14 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9487,3 +9487,17 @@ func (s *testSerialSuite) TestIssue28650(c *C) {
}()
}
}

func (s *testSerialSuite) TestIssue30289(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
fpName := "github.com/pingcap/tidb/executor/issue30289"
c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable(fpName), IsNil)
}()
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a")
c.Assert(err.Error(), Matches, "issue30289 build return error")
}
13 changes: 12 additions & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,13 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) {
return
}
if !hasWaitedForBuild {
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
probeSideResult.Reset()
}
})
if probeSideResult.NumRows() == 0 && !e.useOuterToBuild {
e.finished.Store(true)
return
}
emptyBuild, buildErr := e.wait4BuildSide()
if buildErr != nil {
Expand Down Expand Up @@ -258,6 +262,13 @@ func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) {
func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh <-chan struct{}) {
defer close(chkCh)
var err error
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
err = errors.Errorf("issue30289 build return error")
e.buildFinished <- errors.Trace(err)
return
}
})
for {
if e.finished.Load().(bool) {
return
Expand Down
28 changes: 21 additions & 7 deletions executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,39 @@ func (e *ShuffleExec) Close() error {
if !e.prepared {
for _, w := range e.workers {
for _, r := range w.receivers {
close(r.inputHolderCh)
close(r.inputCh)
if r.inputHolderCh != nil {
close(r.inputHolderCh)
}
if r.inputCh != nil {
close(r.inputCh)
}
}
close(w.outputHolderCh)
if w.outputHolderCh != nil {
close(w.outputHolderCh)
}
}
if e.outputCh != nil {
close(e.outputCh)
}
close(e.outputCh)
}
close(e.finishCh)
if e.finishCh != nil {
close(e.finishCh)
}
for _, w := range e.workers {
for _, r := range w.receivers {
for range r.inputCh {
if r.inputCh != nil {
for range r.inputCh {
}
}
}
// close child executor of each worker
if err := w.childExec.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
for range e.outputCh { // workers exit before `e.outputCh` is closed.
if e.outputCh != nil {
for range e.outputCh { // workers exit before `e.outputCh` is closed.
}
}
e.executed = false

Expand Down
2 changes: 1 addition & 1 deletion expression/builtin_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -4856,7 +4856,7 @@ func (b *builtinUnixTimestampIntSig) evalIntWithCtx(ctx sessionctx.Context, row
}

tz := ctx.GetSessionVars().Location()
t, err := val.GoTime(tz)
t, err := val.AdjustedGoTime(tz)
if err != nil {
return 0, false, nil
}
Expand Down
2 changes: 1 addition & 1 deletion expression/builtin_time_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2390,7 +2390,7 @@ func (b *builtinUnixTimestampIntSig) vecEvalInt(input *chunk.Chunk, result *chun
continue
}

t, err := buf.GetTime(i).GoTime(getTimeZone(b.ctx))
t, err := buf.GetTime(i).AdjustedGoTime(getTimeZone(b.ctx))
if err != nil {
i64s[i] = 0
continue
Expand Down
21 changes: 21 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3978,6 +3978,8 @@ func (s *testIntegrationSuite) TestCompareBuiltin(c *C) {
result.Check(testkit.Rows("1 1 1"))
result = tk.MustQuery(`select INTERVAL(100, NULL, NULL, NULL, NULL, NULL, 100);`)
result.Check(testkit.Rows("6"))
result = tk.MustQuery(`SELECT INTERVAL(0,(1*5)/2) + INTERVAL(5,4,3);`)
result.Check(testkit.Rows("2"))

// for greatest
result = tk.MustQuery(`select greatest(1, 2, 3), greatest("a", "b", "c"), greatest(1.1, 1.2, 1.3), greatest("123a", 1, 2)`)
Expand Down Expand Up @@ -9365,6 +9367,25 @@ func (s *testIntegrationSuite) TestIssue30101(c *C) {
tk.MustQuery("select greatest(c1, c2) from t1;").Sort().Check(testkit.Rows("9223372036854775809"))
}

func (s *testIntegrationSuite) TestIssue28739(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`USE test`)
tk.MustExec("SET time_zone = 'Europe/Vilnius'")
tk.MustQuery("SELECT UNIX_TIMESTAMP('2020-03-29 03:45:00')").Check(testkit.Rows("1585443600"))
tk.MustQuery("SELECT FROM_UNIXTIME(UNIX_TIMESTAMP('2020-03-29 03:45:00'))").Check(testkit.Rows("2020-03-29 04:00:00"))
tk.MustExec(`DROP TABLE IF EXISTS t`)
tk.MustExec(`CREATE TABLE t (dt DATETIME NULL)`)
defer tk.MustExec(`DROP TABLE t`)
// Test the vector implememtation
tk.MustExec(`INSERT INTO t VALUES ('2021-10-31 02:30:00'), ('2021-03-28 02:30:00'), ('2020-10-04 02:15:00'), ('2020-03-29 03:45:00'), (NULL)`)
tk.MustQuery(`SELECT dt, UNIX_TIMESTAMP(dt) FROM t`).Sort().Check(testkit.Rows(
"2020-03-29 03:45:00 1585443600",
"2020-10-04 02:15:00 1601766900",
"2021-03-28 02:30:00 1616891400",
"2021-10-31 02:30:00 1635636600",
"<nil> <nil>"))
}

func (s *testIntegrationSuite) TestIssue30326(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
Loading

0 comments on commit 8e18988

Please sign in to comment.