Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into OCC-ConfHandler2
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Feb 10, 2020
2 parents 0994f18 + da2ed2b commit 3d0b8e1
Show file tree
Hide file tree
Showing 24 changed files with 435 additions and 147 deletions.
14 changes: 1 addition & 13 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,8 +1193,7 @@ func (s *serialTestStateChangeSuite) TestParallelFlashbackTable(c *C) {
tk.MustExec("create table t (a int);")
tk.MustExec("drop table if exists t")
// Test parallel flashback table.
ts := getDDLJobStartTime(tk, "test_db_state", "t")
sql1 := fmt.Sprintf("flashback table t until timestamp '%v' to t_flashback", ts)
sql1 := "flashback table t to t_flashback"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2, NotNil)
Expand All @@ -1203,14 +1202,3 @@ func (s *serialTestStateChangeSuite) TestParallelFlashbackTable(c *C) {
}
s.testControlParallelExecSQL(c, sql1, sql1, f)
}

func getDDLJobStartTime(tk *testkit.TestKit, dbName, tblName string) string {
re := tk.MustQuery("admin show ddl jobs 100")
rows := re.Rows()
for _, row := range rows {
if row[1] == dbName && row[2] == tblName && (row[3] == "drop table" || row[3] == "truncate table") {
return row[8].(string)
}
}
return ""
}
11 changes: 11 additions & 0 deletions executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,14 @@ type baseAggFunc struct {
func (*baseAggFunc) MergePartialResult(sctx sessionctx.Context, src, dst PartialResult) error {
return nil
}

// SlidingWindowAggFunc is the interface to evaluate the aggregate functions using sliding window.
type SlidingWindowAggFunc interface {
// Slide evaluates the aggregate functions using a sliding window. The input
// lastStart and lastEnd are the interval of the former sliding window,
// shiftStart, shiftEnd mean the sliding window offset. Note that the input
// PartialResult stores the intermediate result which will be used in the next
// sliding window, ensure call ResetPartialResult after a frame are evaluated
// completely.
Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error
}
175 changes: 175 additions & 0 deletions executor/aggfuncs/func_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,31 @@ func (e *countOriginal4Int) UpdatePartialResult(sctx sessionctx.Context, rowsInG
return nil
}

func (e *countOriginal4Int) Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4Count)(pr)
for i := uint64(0); i < shiftStart; i++ {
_, isNull, err := e.args[0].EvalInt(sctx, rows[lastStart+i])
if err != nil {
return err
}
if isNull {
continue
}
*p--
}
for i := uint64(0); i < shiftEnd; i++ {
_, isNull, err := e.args[0].EvalInt(sctx, rows[lastEnd+i])
if err != nil {
return err
}
if isNull {
continue
}
*p++
}
return nil
}

type countOriginal4Real struct {
baseCount
}
Expand All @@ -80,6 +105,31 @@ func (e *countOriginal4Real) UpdatePartialResult(sctx sessionctx.Context, rowsIn
return nil
}

func (e *countOriginal4Real) Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4Count)(pr)
for i := uint64(0); i < shiftStart; i++ {
_, isNull, err := e.args[0].EvalReal(sctx, rows[lastStart+i])
if err != nil {
return err
}
if isNull {
continue
}
*p--
}
for i := uint64(0); i < shiftEnd; i++ {
_, isNull, err := e.args[0].EvalReal(sctx, rows[lastEnd+i])
if err != nil {
return err
}
if isNull {
continue
}
*p++
}
return nil
}

type countOriginal4Decimal struct {
baseCount
}
Expand All @@ -102,6 +152,31 @@ func (e *countOriginal4Decimal) UpdatePartialResult(sctx sessionctx.Context, row
return nil
}

func (e *countOriginal4Decimal) Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4Count)(pr)
for i := uint64(0); i < shiftStart; i++ {
_, isNull, err := e.args[0].EvalDecimal(sctx, rows[lastStart+i])
if err != nil {
return err
}
if isNull {
continue
}
*p--
}
for i := uint64(0); i < shiftEnd; i++ {
_, isNull, err := e.args[0].EvalDecimal(sctx, rows[lastEnd+i])
if err != nil {
return err
}
if isNull {
continue
}
*p++
}
return nil
}

type countOriginal4Time struct {
baseCount
}
Expand All @@ -124,6 +199,31 @@ func (e *countOriginal4Time) UpdatePartialResult(sctx sessionctx.Context, rowsIn
return nil
}

func (e *countOriginal4Time) Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4Count)(pr)
for i := uint64(0); i < shiftStart; i++ {
_, isNull, err := e.args[0].EvalTime(sctx, rows[lastStart+i])
if err != nil {
return err
}
if isNull {
continue
}
*p--
}
for i := uint64(0); i < shiftEnd; i++ {
_, isNull, err := e.args[0].EvalTime(sctx, rows[lastEnd+i])
if err != nil {
return err
}
if isNull {
continue
}
*p++
}
return nil
}

type countOriginal4Duration struct {
baseCount
}
Expand All @@ -146,6 +246,31 @@ func (e *countOriginal4Duration) UpdatePartialResult(sctx sessionctx.Context, ro
return nil
}

func (e *countOriginal4Duration) Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4Count)(pr)
for i := uint64(0); i < shiftStart; i++ {
_, isNull, err := e.args[0].EvalDuration(sctx, rows[lastStart+i])
if err != nil {
return err
}
if isNull {
continue
}
*p--
}
for i := uint64(0); i < shiftEnd; i++ {
_, isNull, err := e.args[0].EvalDuration(sctx, rows[lastEnd+i])
if err != nil {
return err
}
if isNull {
continue
}
*p++
}
return nil
}

type countOriginal4JSON struct {
baseCount
}
Expand All @@ -168,6 +293,31 @@ func (e *countOriginal4JSON) UpdatePartialResult(sctx sessionctx.Context, rowsIn
return nil
}

func (e *countOriginal4JSON) Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4Count)(pr)
for i := uint64(0); i < shiftStart; i++ {
_, isNull, err := e.args[0].EvalJSON(sctx, rows[lastStart+i])
if err != nil {
return err
}
if isNull {
continue
}
*p--
}
for i := uint64(0); i < shiftEnd; i++ {
_, isNull, err := e.args[0].EvalJSON(sctx, rows[lastEnd+i])
if err != nil {
return err
}
if isNull {
continue
}
*p++
}
return nil
}

type countOriginal4String struct {
baseCount
}
Expand All @@ -190,6 +340,31 @@ func (e *countOriginal4String) UpdatePartialResult(sctx sessionctx.Context, rows
return nil
}

func (e *countOriginal4String) Slide(sctx sessionctx.Context, rows []chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4Count)(pr)
for i := uint64(0); i < shiftStart; i++ {
_, isNull, err := e.args[0].EvalString(sctx, rows[lastStart+i])
if err != nil {
return err
}
if isNull {
continue
}
*p--
}
for i := uint64(0); i < shiftEnd; i++ {
_, isNull, err := e.args[0].EvalString(sctx, rows[lastEnd+i])
if err != nil {
return err
}
if isNull {
continue
}
*p++
}
return nil
}

type countPartial struct {
baseCount
}
Expand Down
29 changes: 29 additions & 0 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,35 @@ func BenchmarkWindowFunctionsWithFrame(b *testing.B) {
}
}

func BenchmarkWindowFunctionsWithSlidingWindow(b *testing.B) {
b.ReportAllocs()
windowFuncs := []string{
ast.AggFuncCount,
}
rows := []int{1000, 100000}
ndvs := []int{10, 1000}
frames := []*core.WindowFrame{
{Type: ast.Rows, Start: &core.FrameBound{Type: ast.Preceding, Num: 10}, End: &core.FrameBound{Type: ast.Following, Num: 10}},
{Type: ast.Rows, Start: &core.FrameBound{Type: ast.Preceding, Num: 100}, End: &core.FrameBound{Type: ast.Following, Num: 100}},
}
for _, row := range rows {
for _, ndv := range ndvs {
for _, frame := range frames {
for _, windowFunc := range windowFuncs {
cas := defaultWindowTestCase()
cas.rows = row
cas.ndv = ndv
cas.windowFunc = windowFunc
cas.frame = frame
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkWindowExecWithCase(b, cas)
})
}
}
}
}
}

type hashJoinTestCase struct {
rows int
cols []*types.FieldType
Expand Down
13 changes: 3 additions & 10 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
if s.JobID != 0 {
job, tblInfo, err = e.getRecoverTableByJobID(s, t, dom)
} else {
job, tblInfo, err = e.getRecoverTableByTableName(s.Table, "")
job, tblInfo, err = e.getRecoverTableByTableName(s.Table)
}
if err != nil {
return err
Expand Down Expand Up @@ -429,7 +429,7 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta,
return job, table.Meta(), nil
}

func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName, ts string) (*model.Job, *model.TableInfo, error) {
func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.Job, *model.TableInfo, error) {
txn, err := e.ctx.Txn(true)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -464,9 +464,6 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName, ts string
if err != nil {
return nil, nil, err
}
if len(ts) != 0 && ts != model.TSConvert2Time(job.StartTS).String() {
continue
}
// Get the snapshot infoSchema before drop table.
snapInfo, err := dom.GetSnapshotInfoSchema(job.StartTS)
if err != nil {
Expand Down Expand Up @@ -500,11 +497,7 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName, ts string
}

func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
ts := s.Timestamp.GetString()
if len(ts) == 0 {
return errors.Errorf("The timestamp in flashback statement should be consistent with the drop/truncate DDL start time")
}
job, tblInfo, err := e.getRecoverTableByTableName(s.Table, ts)
job, tblInfo, err := e.getRecoverTableByTableName(s.Table)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ import (
)

var _ = Suite(&testExecSuite{})
var _ = SerialSuites(&testExecSerialSuite{})

type testExecSuite struct {
}

type testExecSerialSuite struct {
}

// mockSessionManager is a mocked session manager which is used for test.
type mockSessionManager struct {
PS []*util.ProcessInfo
Expand Down Expand Up @@ -240,7 +244,7 @@ func assertEqualStrings(c *C, got []field, expect []string) {
}
}

func (s *testExecSuite) TestSortSpillDisk(c *C) {
func (s *testExecSerialSuite) TestSortSpillDisk(c *C) {
originCfg := config.GetGlobalConfig()
newConf := *originCfg
newConf.OOMUseTmpStorage = true
Expand Down
Loading

0 comments on commit 3d0b8e1

Please sign in to comment.