Skip to content

Commit

Permalink
stats: adjust datum type when using the index query feedback (#10614) (
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and zz-jason committed Jun 11, 2019
1 parent d7129e2 commit 8cdccca
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 34 deletions.
79 changes: 58 additions & 21 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,23 +804,46 @@ func decodeFeedbackForPK(q *QueryFeedback, pb *queryFeedback, isUnsigned bool) {
}
}

func decodeFeedbackForColumn(q *QueryFeedback, pb *queryFeedback) error {
func convertDatumsType(vals []types.Datum, ft *types.FieldType, loc *time.Location) error {
for i, val := range vals {
if val.Kind() == types.KindMinNotNull || val.Kind() == types.KindMaxValue {
continue
}
newVal, err := tablecodec.UnflattenDatums([]types.Datum{val}, []*types.FieldType{ft}, loc)
if err != nil {
return err
}
vals[i] = newVal[0]
}
return nil
}

func decodeColumnBounds(data []byte, ft *types.FieldType) ([]types.Datum, error) {
vals, err := codec.DecodeRange(data, 1)
if err != nil {
return nil, err
}
err = convertDatumsType(vals, ft, time.UTC)
return vals, err
}

func decodeFeedbackForColumn(q *QueryFeedback, pb *queryFeedback, ft *types.FieldType) error {
q.tp = colType
for i := 0; i < len(pb.ColumnRanges); i += 2 {
low, err := codec.DecodeRange(pb.ColumnRanges[i], 1)
low, err := decodeColumnBounds(pb.ColumnRanges[i], ft)
if err != nil {
return errors.Trace(err)
return err
}
high, err := codec.DecodeRange(pb.ColumnRanges[i+1], 1)
high, err := decodeColumnBounds(pb.ColumnRanges[i+1], ft)
if err != nil {
return errors.Trace(err)
return err
}
q.feedback = append(q.feedback, feedback{&low[0], &high[0], pb.Counts[i/2], 0})
}
return nil
}

func decodeFeedback(val []byte, q *QueryFeedback, c *CMSketch, isUnsigned bool) error {
func decodeFeedback(val []byte, q *QueryFeedback, c *CMSketch, ft *types.FieldType) error {
buf := bytes.NewBuffer(val)
dec := gob.NewDecoder(buf)
pb := &queryFeedback{}
Expand All @@ -831,9 +854,9 @@ func decodeFeedback(val []byte, q *QueryFeedback, c *CMSketch, isUnsigned bool)
if len(pb.IndexRanges) > 0 || len(pb.HashValues) > 0 {
decodeFeedbackForIndex(q, pb, c)
} else if len(pb.IntRanges) > 0 {
decodeFeedbackForPK(q, pb, isUnsigned)
decodeFeedbackForPK(q, pb, mysql.HasUnsignedFlag(ft.Flag))
} else {
err := decodeFeedbackForColumn(q, pb)
err := decodeFeedbackForColumn(q, pb, ft)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1014,10 +1037,13 @@ func logForIndex(prefix string, t *Table, idx *Index, ranges []*ranger.Range, ac
zap.String("equality", equalityString), zap.Uint32("expected equality", equalityCount),
zap.String("range", rangeString))
} else if colHist := t.columnByName(colName); colHist != nil && colHist.Histogram.Len() > 0 {
rangeString := colRangeToStr(colHist, &rang, -1, factor)
logutil.Logger(context.Background()).Debug(prefix, zap.String("index", idx.Info.Name.O), zap.Int64("actual", actual[i]),
zap.String("equality", equalityString), zap.Uint32("expected equality", equalityCount),
zap.String("range", rangeString))
err = convertRangeType(&rang, colHist.tp, time.UTC)
if err == nil {
rangeString := colRangeToStr(colHist, &rang, -1, factor)
logutil.Logger(context.Background()).Debug(prefix, zap.String("index", idx.Info.Name.O), zap.Int64("actual", actual[i]),
zap.String("equality", equalityString), zap.Uint32("expected equality", equalityCount),
zap.String("range", rangeString))
}
} else {
count, err := getPseudoRowCountByColumnRanges(sc, float64(t.Count), []*ranger.Range{&rang}, 0)
if err == nil {
Expand Down Expand Up @@ -1076,8 +1102,16 @@ func getNewCountForIndex(eqCount, rangeCount, totalCount, realCount float64) (fl
return eqCount * adjustFactor, rangeCount * adjustFactor
}

// dumpFeedbackForIndex dumps the feedback for index.
// For queries that contains both equality and range query, we will split them and update accordingly.
func convertRangeType(ran *ranger.Range, ft *types.FieldType, loc *time.Location) error {
err := convertDatumsType(ran.LowVal, ft, loc)
if err != nil {
return err
}
return convertDatumsType(ran.HighVal, ft, loc)
}

// DumpFeedbackForIndex dumps the feedback for index.
// For queries that contains both equality and range query, we will split them and Update accordingly.
func dumpFeedbackForIndex(h *Handle, q *QueryFeedback, t *Table) error {
idx, ok := t.Indices[q.hist.ID]
if !ok {
Expand Down Expand Up @@ -1105,7 +1139,7 @@ func dumpFeedbackForIndex(h *Handle, q *QueryFeedback, t *Table) error {
continue
}
equalityCount := float64(idx.CMSketch.QueryBytes(bytes)) * idx.getIncreaseFactor(t.Count)
rang := ranger.Range{
rang := &ranger.Range{
LowVal: []types.Datum{ran.LowVal[rangePosition]},
HighVal: []types.Datum{ran.HighVal[rangePosition]},
}
Expand All @@ -1114,11 +1148,14 @@ func dumpFeedbackForIndex(h *Handle, q *QueryFeedback, t *Table) error {
rangeFB := &QueryFeedback{tableID: q.tableID}
// prefer index stats over column stats
if idx := t.indexStartWithColumn(colName); idx != nil && idx.Histogram.Len() != 0 {
rangeCount, err = t.GetRowCountByIndexRanges(sc, idx.ID, []*ranger.Range{&rang})
rangeCount, err = t.GetRowCountByIndexRanges(sc, idx.ID, []*ranger.Range{rang})
rangeFB.tp, rangeFB.hist = indexType, &idx.Histogram
} else if col := t.columnByName(colName); col != nil && col.Histogram.Len() != 0 {
rangeCount, err = t.GetRowCountByColumnRanges(sc, col.ID, []*ranger.Range{&rang})
rangeFB.tp, rangeFB.hist = colType, &col.Histogram
err = convertRangeType(rang, col.tp, time.UTC)
if err == nil {
rangeCount, err = t.GetRowCountByColumnRanges(sc, col.ID, []*ranger.Range{rang})
rangeFB.tp, rangeFB.hist = colType, &col.Histogram
}
} else {
continue
}
Expand All @@ -1130,7 +1167,7 @@ func dumpFeedbackForIndex(h *Handle, q *QueryFeedback, t *Table) error {
equalityCount, rangeCount = getNewCountForIndex(equalityCount, rangeCount, float64(t.Count), float64(q.feedback[i].count))
value := types.NewBytesDatum(bytes)
q.feedback[i] = feedback{lower: &value, upper: &value, count: int64(equalityCount)}
err = rangeFB.dumpRangeFeedback(h, &rang, rangeCount)
err = rangeFB.dumpRangeFeedback(h, rang, rangeCount)
if err != nil {
logutil.Logger(context.Background()).Debug("dump range feedback fail", zap.Error(err))
continue
Expand Down Expand Up @@ -1255,7 +1292,7 @@ func getMaxValue(ft *types.FieldType) (max types.Datum) {
case mysql.TypeNewDecimal:
max.SetMysqlDecimal(types.NewMaxOrMinDec(false, ft.Flen, ft.Decimal))
case mysql.TypeDuration:
max.SetMysqlDuration(types.Duration{Duration: math.MaxInt64})
max.SetMysqlDuration(types.Duration{Duration: types.MaxTime})
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp:
if ft.Tp == mysql.TypeDate || ft.Tp == mysql.TypeDatetime {
max.SetMysqlTime(types.Time{Time: types.MaxDatetime, Type: ft.Tp})
Expand Down Expand Up @@ -1289,7 +1326,7 @@ func getMinValue(ft *types.FieldType) (min types.Datum) {
case mysql.TypeNewDecimal:
min.SetMysqlDecimal(types.NewMaxOrMinDec(true, ft.Flen, ft.Decimal))
case mysql.TypeDuration:
min.SetMysqlDuration(types.Duration{Duration: math.MinInt64})
min.SetMysqlDuration(types.Duration{Duration: types.MinTime})
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp:
if ft.Tp == mysql.TypeDate || ft.Tp == mysql.TypeDatetime {
min.SetMysqlTime(types.Time{Time: types.MinDatetime, Type: ft.Tp})
Expand Down
4 changes: 2 additions & 2 deletions statistics/feedback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (s *testFeedbackSuite) TestFeedbackEncoding(c *C) {
val, err := encodeFeedback(q)
c.Assert(err, IsNil)
rq := &QueryFeedback{}
c.Assert(decodeFeedback(val, rq, nil, false), IsNil)
c.Assert(decodeFeedback(val, rq, nil, hist.tp), IsNil)
for _, fb := range rq.feedback {
fb.lower.SetBytes(codec.EncodeInt(nil, fb.lower.GetInt64()))
fb.upper.SetBytes(codec.EncodeInt(nil, fb.upper.GetInt64()))
Expand All @@ -251,7 +251,7 @@ func (s *testFeedbackSuite) TestFeedbackEncoding(c *C) {
c.Assert(err, IsNil)
rq = &QueryFeedback{}
cms := NewCMSketch(4, 4)
c.Assert(decodeFeedback(val, rq, cms, false), IsNil)
c.Assert(decodeFeedback(val, rq, cms, hist.tp), IsNil)
c.Assert(cms.QueryBytes(codec.EncodeInt(nil, 0)), Equals, uint32(1))
q.feedback = q.feedback[:1]
c.Assert(q.Equal(rq), IsTrue)
Expand Down
5 changes: 2 additions & 3 deletions statistics/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package statistics

import (
"fmt"
"go.uber.org/zap/zapcore"
"math"
"strconv"
"strings"
Expand All @@ -25,7 +24,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -34,6 +32,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -555,7 +554,7 @@ func (h *Handle) handleSingleHistogramUpdate(is infoschema.InfoSchema, rows []ch
}
q := &QueryFeedback{}
for _, row := range rows {
err1 := decodeFeedback(row.GetBytes(3), q, cms, mysql.HasUnsignedFlag(hist.tp.Flag))
err1 := decodeFeedback(row.GetBytes(3), q, cms, hist.tp)
if err1 != nil {
logutil.Logger(context.Background()).Debug("decode feedback failed", zap.Error(err))
}
Expand Down
73 changes: 65 additions & 8 deletions statistics/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,16 +1129,18 @@ func (s *testStatsUpdateSuite) TestIndexQueryFeedback(c *C) {
statistics.FeedbackProbability = 1

testKit.MustExec("use test")
testKit.MustExec("create table t (a bigint(64), b bigint(64), c bigint(64), index idx_ab(a,b), index idx_ac(a,c), index idx_b(b))")
testKit.MustExec("create table t (a bigint(64), b bigint(64), c bigint(64), d float, e double, f decimal(17,2), " +
"g time, h date, index idx_b(b), index idx_ab(a,b), index idx_ac(a,c), index idx_ad(a, d), index idx_ae(a, e), index idx_af(a, f)," +
" index idx_ag(a, g), index idx_ah(a, h))")
for i := 0; i < 20; i++ {
testKit.MustExec(fmt.Sprintf("insert into t values (1, %d, %d)", i, i))
testKit.MustExec(fmt.Sprintf(`insert into t values (1, %d, %d, %d, %d, %d, %d, "%s")`, i, i, i, i, i, i, fmt.Sprintf("1000-01-%02d", i+1)))
}
h := s.do.StatsHandle()
h.HandleDDLEvent(<-h.DDLEventCh())
c.Assert(h.DumpStatsDeltaToKV(statistics.DumpAll), IsNil)
testKit.MustExec("analyze table t with 3 buckets")
for i := 0; i < 20; i++ {
testKit.MustExec(fmt.Sprintf("insert into t values (1, %d, %d)", i, i))
testKit.MustExec(fmt.Sprintf(`insert into t values (1, %d, %d, %d, %d, %d, %d, "%s")`, i, i, i, i, i, i, fmt.Sprintf("1000-01-%02d", i+1)))
}
c.Assert(h.DumpStatsDeltaToKV(statistics.DumpAll), IsNil)
is := s.do.InfoSchema()
Expand All @@ -1156,12 +1158,12 @@ func (s *testStatsUpdateSuite) TestIndexQueryFeedback(c *C) {
}{
{
sql: "select * from t use index(idx_ab) where a = 1 and b < 21",
hist: "index:3 ndv:20\n" +
hist: "index:1 ndv:20\n" +
"num: 16 lower_bound: -inf upper_bound: 7 repeats: 0\n" +
"num: 16 lower_bound: 8 upper_bound: 15 repeats: 0\n" +
"num: 8 lower_bound: 16 upper_bound: 21 repeats: 0",
rangeID: tblInfo.Indices[2].ID,
idxID: tblInfo.Indices[0].ID,
rangeID: tblInfo.Indices[0].ID,
idxID: tblInfo.Indices[1].ID,
idxCols: 1,
eqCount: 39,
},
Expand All @@ -1172,17 +1174,72 @@ func (s *testStatsUpdateSuite) TestIndexQueryFeedback(c *C) {
"num: 13 lower_bound: 7 upper_bound: 13 repeats: 0\n" +
"num: 12 lower_bound: 14 upper_bound: 21 repeats: 0",
rangeID: tblInfo.Columns[2].ID,
idxID: tblInfo.Indices[1].ID,
idxID: tblInfo.Indices[2].ID,
idxCols: 0,
eqCount: 35,
},
{
sql: "select * from t use index(idx_ad) where a = 1 and d < 21",
hist: "column:4 ndv:20 totColSize:160\n" +
"num: 13 lower_bound: -10000000000000 upper_bound: 6 repeats: 0\n" +
"num: 12 lower_bound: 7 upper_bound: 13 repeats: 0\n" +
"num: 10 lower_bound: 14 upper_bound: 21 repeats: 0",
rangeID: tblInfo.Columns[3].ID,
idxID: tblInfo.Indices[3].ID,
idxCols: 0,
eqCount: 32,
},
{
sql: "select * from t use index(idx_ae) where a = 1 and e < 21",
hist: "column:5 ndv:20 totColSize:160\n" +
"num: 13 lower_bound: -100000000000000000000000 upper_bound: 6 repeats: 0\n" +
"num: 12 lower_bound: 7 upper_bound: 13 repeats: 0\n" +
"num: 10 lower_bound: 14 upper_bound: 21 repeats: 0",
rangeID: tblInfo.Columns[4].ID,
idxID: tblInfo.Indices[4].ID,
idxCols: 0,
eqCount: 32,
},
{
sql: "select * from t use index(idx_af) where a = 1 and f < 21",
hist: "column:6 ndv:20 totColSize:200\n" +
"num: 13 lower_bound: -999999999999999.99 upper_bound: 6.00 repeats: 0\n" +
"num: 12 lower_bound: 7.00 upper_bound: 13.00 repeats: 0\n" +
"num: 10 lower_bound: 14.00 upper_bound: 21.00 repeats: 0",
rangeID: tblInfo.Columns[5].ID,
idxID: tblInfo.Indices[5].ID,
idxCols: 0,
eqCount: 32,
},
{
sql: "select * from t use index(idx_ag) where a = 1 and g < 21",
hist: "column:7 ndv:20 totColSize:98\n" +
"num: 13 lower_bound: -838:59:59 upper_bound: 00:00:06 repeats: 0\n" +
"num: 11 lower_bound: 00:00:07 upper_bound: 00:00:13 repeats: 0\n" +
"num: 10 lower_bound: 00:00:14 upper_bound: 00:00:21 repeats: 0",
rangeID: tblInfo.Columns[6].ID,
idxID: tblInfo.Indices[6].ID,
idxCols: 0,
eqCount: 32,
},
{
sql: `select * from t use index(idx_ah) where a = 1 and h < "1000-01-21"`,
hist: "column:8 ndv:20 totColSize:180\n" +
"num: 13 lower_bound: 1000-01-01 upper_bound: 1000-01-07 repeats: 0\n" +
"num: 11 lower_bound: 1000-01-08 upper_bound: 1000-01-14 repeats: 0\n" +
"num: 10 lower_bound: 1000-01-15 upper_bound: 1000-01-21 repeats: 0",
rangeID: tblInfo.Columns[7].ID,
idxID: tblInfo.Indices[7].ID,
idxCols: 0,
eqCount: 32,
},
}
for i, t := range tests {
testKit.MustQuery(t.sql)
c.Assert(h.DumpStatsDeltaToKV(statistics.DumpAll), IsNil)
c.Assert(h.DumpStatsFeedbackToKV(), IsNil)
c.Assert(h.HandleUpdateStats(s.do.InfoSchema()), IsNil)
h.Update(is)
c.Assert(h.Update(is), IsNil)
tbl := h.GetTableStats(tblInfo)
if t.idxCols == 0 {
c.Assert(tbl.Columns[t.rangeID].ToString(0), Equals, tests[i].hist)
Expand Down

0 comments on commit 8cdccca

Please sign in to comment.