Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: refine updating stats using feedback #6796

Merged
merged 6 commits into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 103 additions & 73 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ type BucketFeedback struct {
feedback []feedback // All the feedback info in the same bucket.
lower *types.Datum // The lower bound of the new bucket.
upper *types.Datum // The upper bound of the new bucket.
scalar scalar // The scalar info for the boundary.
}

// buildBucketFeedback build the feedback for each bucket from the histogram feedback.
Expand Down Expand Up @@ -300,65 +299,31 @@ func (b *BucketFeedback) getBoundaries(num int) []types.Datum {
return vals[:total]
}

// There are only two types of datum in bucket: one is `Blob`, which is for index; the other one
// is `Int`, which is for primary key.
type bucket = feedback

// Get the fraction of the [lowerVal, upperVal] that intersect with the bucket boundary.
func (b *BucketFeedback) getFraction(lowerVal, upperVal *types.Datum) float64 {
var lower, upper float64
if b.lower.Kind() == types.KindBytes {
value := lowerVal.GetBytes()
lower = convertBytesToScalar(value[b.scalar.commonPfxLen:])
value = upperVal.GetBytes()
upper = convertBytesToScalar(value[b.scalar.commonPfxLen:])
} else {
lower = float64(lowerVal.GetInt64())
upper = float64(upperVal.GetInt64())
}
return calcFraction(b.scalar.lower, b.scalar.upper, upper) - calcFraction(b.scalar.lower, b.scalar.upper, lower)
}

func (b *BucketFeedback) getBucketCount(count float64) int64 {
// Get the scalar info for boundary.
prefixLen := commonPrefixLength(b.lower.GetBytes(), b.upper.GetBytes())
if b.lower.Kind() == types.KindBytes {
b.scalar.commonPfxLen = commonPrefixLength(b.lower.GetBytes(), b.upper.GetBytes())
b.scalar.lower = convertBytesToScalar(b.lower.GetBytes()[prefixLen:])
b.scalar.upper = convertBytesToScalar(b.upper.GetBytes()[prefixLen:])
} else {
b.scalar.lower = float64(b.lower.GetInt64())
b.scalar.upper = float64(b.upper.GetInt64())
}
// Use the feedback that covers most to update this bucket's count. We only consider feedback that covers at
// least minBucketFraction.
maxFraction := minBucketFraction
for _, fb := range b.feedback {
fraction := b.getFraction(fb.lower, fb.upper)
if fraction >= maxFraction {
maxFraction = fraction
count = float64(fb.count) / fraction
}
}
return int64(count)
}

// updateBucket split the bucket according to feedback.
func (b *BucketFeedback) splitBucket(newBktNum int, totalCount float64, count float64) []bucket {
// do not split if the count is already too small.
if newBktNum <= 1 || count < minBucketFraction*totalCount {
bkt := bucket{lower: b.lower, upper: b.upper, count: int64(count)}
return []bucket{bkt}
}
// splitBucket firstly splits this "BucketFeedback" to "newNumBkts" new buckets,
// calculates the count for each new bucket, merge the new bucket whose count
// is smaller than "minBucketFraction*totalCount" with the next new bucket
// until the last new bucket.
func (b *BucketFeedback) splitBucket(newNumBkts int, totalCount float64, originBucketCount float64) []bucket {
// Split the bucket.
bounds := b.getBoundaries(newBktNum)
bounds := b.getBoundaries(newNumBkts + 1)
bkts := make([]bucket, 0, len(bounds)-1)
for i := 1; i < len(bounds); i++ {
newCount := int64(count * b.getFraction(&bounds[i-1], &bounds[i]))
newBkt := bucket{&bounds[i-1], bounds[i].Copy(), 0, 0}
// get bucket count
_, ratio := getOverlapFraction(feedback{b.lower, b.upper, int64(originBucketCount), 0}, newBkt)
countInNewBkt := originBucketCount * ratio
countInNewBkt = b.refineBucketCount(newBkt, countInNewBkt)
// do not split if the count of result bucket is too small.
if float64(newCount) < minBucketFraction*totalCount {
if countInNewBkt < minBucketFraction*totalCount {
bounds[i] = bounds[i-1]
continue
}
bkts = append(bkts, bucket{lower: &bounds[i-1], upper: bounds[i].Copy(), count: newCount, repeat: 0})
newBkt.count = int64(countInNewBkt)
bkts = append(bkts, newBkt)
// To guarantee that each bucket's range will not overlap.
if bounds[i].Kind() == types.KindBytes {
bounds[i].SetBytes(kv.Key(bounds[i].GetBytes()).PrefixNext())
Expand All @@ -371,11 +336,82 @@ func (b *BucketFeedback) splitBucket(newBktNum int, totalCount float64, count fl
return bkts
}

// Get the split count for the histogram.
func getSplitCount(count, remainBuckets int) int {
remainBuckets = mathutil.Max(remainBuckets, 10)
func getFraction4PK(minValue, maxValue, lower, upper *types.Datum) (float64, float64) {
if minValue.Kind() == types.KindInt64 {
l, r := float64(minValue.GetInt64()), float64(maxValue.GetInt64())
return calcFraction(l, r, float64(lower.GetInt64())), calcFraction(l, r, float64(upper.GetInt64()))
}
l, r := float64(minValue.GetUint64()), float64(maxValue.GetUint64())
return calcFraction(l, r, float64(lower.GetUint64())), calcFraction(l, r, float64(upper.GetUint64()))
}

func getFraction4Index(minValue, maxValue, lower, upper *types.Datum, prefixLen int) (float64, float64) {
l, r := convertBytesToScalar(minValue.GetBytes()[prefixLen:]), convertBytesToScalar(maxValue.GetBytes()[prefixLen:])
return calcFraction(l, r, convertBytesToScalar(lower.GetBytes()[prefixLen:])),
calcFraction(l, r, convertBytesToScalar(upper.GetBytes()[prefixLen:]))
}

// getOverlapFraction gets the overlap fraction of feedback and bucket range. In order to get the bucket count, it also
// returns the ratio between bucket fraction and feedback fraction.
func getOverlapFraction(fb feedback, bkt bucket) (float64, float64) {
datums := make([]types.Datum, 0, 4)
datums = append(datums, *fb.lower, *fb.upper)
datums = append(datums, *bkt.lower, *bkt.upper)
err := types.SortDatums(nil, datums)
if err != nil {
return 0, 0
}
var fbLower, fbUpper, bktLower, bktUpper float64
minValue, maxValue := &datums[0], &datums[3]
if datums[0].Kind() == types.KindBytes {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about add comment at the declaration of bucket to tell that the datum in bucket can only be Bytes or Int, and Bytes is for index and Int is for int pk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

prefixLen := commonPrefixLength(minValue.GetBytes(), maxValue.GetBytes())
fbLower, fbUpper = getFraction4Index(minValue, maxValue, fb.lower, fb.upper, prefixLen)
bktLower, bktUpper = getFraction4Index(minValue, maxValue, bkt.lower, bkt.upper, prefixLen)
} else {
fbLower, fbUpper = getFraction4PK(minValue, maxValue, fb.lower, fb.upper)
bktLower, bktUpper = getFraction4PK(minValue, maxValue, bkt.lower, bkt.upper)
}
ratio := (bktUpper - bktLower) / (fbUpper - fbLower)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ratio seems not consistent with the comment, should be (fbUpper - fbLower) /(bktUpper - bktLower)?

And later we calculate the new bucket count with count * ratio.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the comment is wrong. The count is the feedback count, so this ratio is right.

// full overlap
if fbLower <= bktLower && bktUpper <= fbUpper {
return bktUpper - bktLower, ratio
}
if bktLower <= fbLower && fbUpper <= bktUpper {
return fbUpper - fbLower, ratio
}
// partial overlap
overlap := math.Min(bktUpper-fbLower, fbUpper-bktLower)
return overlap, ratio
}

// refineBucketCount refine the newly split bucket count. It uses the feedback that overlaps most
// with the bucket to get the bucket count.
func (b *BucketFeedback) refineBucketCount(bkt bucket, defaultCount float64) float64 {
bestFraction := minBucketFraction
count := defaultCount
for _, fb := range b.feedback {
fraction, ratio := getOverlapFraction(fb, bkt)
// choose the max overlap fraction
if fraction > bestFraction {
bestFraction = fraction
count = float64(fb.count) * ratio
}
}
return count
}

const (
defaultSplitCount = 10
splitPerFeedback = 10
)

// getSplitCount gets the split count for the histogram. It is based on the intuition that:
// 1: If we have more remaining unused buckets, we can split more.
// 2: We cannot split too aggressive, thus we make it split every `splitPerFeedback`.
func getSplitCount(numFeedbacks, remainBuckets int) int {
// Split more if have more buckets available.
return mathutil.Min(remainBuckets, count/10)
splitCount := mathutil.Max(remainBuckets, defaultSplitCount)
return mathutil.Min(splitCount, numFeedbacks/splitPerFeedback)
}

type bucketScore struct {
Expand Down Expand Up @@ -461,33 +497,23 @@ func mergeBuckets(bkts []bucket, isNewBuckets []bool, totalCount float64) []buck
return bkts
}

// splitBuckets split the histogram buckets according to the feedback.
func splitBuckets(h *Histogram, feedback *QueryFeedback) ([]bucket, []bool, int64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments about this function.

bktID2FB, fbNum := buildBucketFeedback(h, feedback)
counts := make([]int64, 0, h.Len())
for i := 0; i < h.Len(); i++ {
bkt, ok := bktID2FB[i]
if !ok {
counts = append(counts, h.bucketCount(i))
} else {
counts = append(counts, bkt.getBucketCount(float64(h.bucketCount(i))))
}
}
totCount := int64(0)
for _, count := range counts {
totCount += count
}
bktID2FB, numTotalFBs := buildBucketFeedback(h, feedback)
buckets := make([]bucket, 0, h.Len())
isNewBuckets := make([]bool, 0, h.Len())
splitCount := getSplitCount(fbNum, defaultBucketCount-h.Len())
splitCount := getSplitCount(numTotalFBs, defaultBucketCount-h.Len())
for i := 0; i < h.Len(); i++ {
bkt, ok := bktID2FB[i]
bktFB, ok := bktID2FB[i]
// No feedback, just use the original one.
if !ok {
buckets = append(buckets, bucket{h.GetLower(i), h.GetUpper(i), counts[i], h.Buckets[i].Repeat})
buckets = append(buckets, bucket{h.GetLower(i), h.GetUpper(i), h.bucketCount(i), h.Buckets[i].Repeat})
isNewBuckets = append(isNewBuckets, false)
continue
}
bkts := bkt.splitBucket(splitCount*len(bkt.feedback)/fbNum, float64(totCount), float64(counts[i]))
// Distribute the total split count to bucket based on number of bucket feedback.
newBktNums := splitCount * len(bktFB.feedback) / numTotalFBs
bkts := bktFB.splitBucket(newBktNums, h.totalRowCount(), float64(h.bucketCount(i)))
buckets = append(buckets, bkts...)
if len(bkts) == 1 {
isNewBuckets = append(isNewBuckets, false)
Expand All @@ -497,6 +523,10 @@ func splitBuckets(h *Histogram, feedback *QueryFeedback) ([]bucket, []bool, int6
}
}
}
totCount := int64(0)
for _, bkt := range buckets {
totCount += bkt.count
}
return buckets, isNewBuckets, totCount
}

Expand Down
51 changes: 36 additions & 15 deletions statistics/feedback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ func (s *testFeedbackSuite) TestUpdateHistogram(c *C) {
q := NewQueryFeedback(0, genHistogram(), 0, false)
q.feedback = feedbacks
originBucketCount := defaultBucketCount
defaultBucketCount = 5
defaultBucketCount = 7
defer func() { defaultBucketCount = originBucketCount }()
c.Assert(UpdateHistogram(q.Hist(), q).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 10000\tlower_bound: 0\tupper_bound: 1\trepeats: 0\n"+
"num: 10003\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+
"num: 10021\tlower_bound: 4\tupper_bound: 20\trepeats: 0\n"+
"num: 10046\tlower_bound: 21\tupper_bound: 46\trepeats: 0\n"+
"num: 10059\tlower_bound: 47\tupper_bound: 60\trepeats: 0")
"num: 10008\tlower_bound: 2\tupper_bound: 7\trepeats: 0\n"+
"num: 10019\tlower_bound: 8\tupper_bound: 19\trepeats: 0\n"+
"num: 10019\tlower_bound: 20\tupper_bound: 20\trepeats: 0\n"+
"num: 10037\tlower_bound: 21\tupper_bound: 39\trepeats: 0\n"+
"num: 10055\tlower_bound: 40\tupper_bound: 58\trepeats: 0\n"+
"num: 10057\tlower_bound: 59\tupper_bound: 60\trepeats: 0")
}

func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
Expand All @@ -93,10 +95,10 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
"num: 1\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+
"num: 1\tlower_bound: 5\tupper_bound: 7\trepeats: 0\n"+
"num: 6\tlower_bound: 10\tupper_bound: 15\trepeats: 0\n"+
"num: 10\tlower_bound: 16\tupper_bound: 20\trepeats: 0\n"+
"num: 10\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
"num: 6\tlower_bound: 16\tupper_bound: 20\trepeats: 0\n"+
"num: 6\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, true, true, false})
c.Assert(totalCount, Equals, int64(11))
c.Assert(totalCount, Equals, int64(6))

// test do not split if the bucket count is too small
feedbacks = []feedback{newFeedback(0, 1, 100000)}
Expand All @@ -111,26 +113,45 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
"num: 100000\tlower_bound: 0\tupper_bound: 1\trepeats: 0\n"+
"num: 100000\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+
"num: 100000\tlower_bound: 5\tupper_bound: 7\trepeats: 0\n"+
"num: 100002\tlower_bound: 10\tupper_bound: 20\trepeats: 0\n"+
"num: 100002\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, false, false})
c.Assert(totalCount, Equals, int64(100002))
"num: 100001\tlower_bound: 10\tupper_bound: 15\trepeats: 0\n"+
"num: 100001\tlower_bound: 16\tupper_bound: 20\trepeats: 0\n"+
"num: 100001\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, true, true, false})
c.Assert(totalCount, Equals, int64(100001))

// test do not split if the result bucket count is too small
h := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 5, 0)
appendBucket(h, 0, 1000000)
h.Buckets[0].Count = 1000000
feedbacks = feedbacks[:0]
for i := 0; i < 100; i++ {
feedbacks = append(feedbacks, newFeedback(0, 101, 1))
feedbacks = append(feedbacks, newFeedback(0, 10, 1))
}
q = NewQueryFeedback(0, h, 0, false)
q.feedback = feedbacks
buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), q)
c.Assert(buildNewHistogram(q.Hist(), buckets).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 9900\tlower_bound: 0\tupper_bound: 1000000\trepeats: 0")
"num: 1000000\tlower_bound: 0\tupper_bound: 1000000\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false})
c.Assert(totalCount, Equals, int64(9900))
c.Assert(totalCount, Equals, int64(1000000))

// test split even if the feedback range is too small
h = NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 5, 0)
appendBucket(h, 0, 1000000)
feedbacks = feedbacks[:0]
for i := 0; i < 100; i++ {
feedbacks = append(feedbacks, newFeedback(0, 10, 1))
}
q = NewQueryFeedback(0, h, 0, false)
q.feedback = feedbacks
buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), q)
c.Assert(buildNewHistogram(q.Hist(), buckets).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 1\tlower_bound: 0\tupper_bound: 10\trepeats: 0\n"+
"num: 1\tlower_bound: 11\tupper_bound: 1000000\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{true, true})
c.Assert(totalCount, Equals, int64(1))
}

func (s *testFeedbackSuite) TestMergeBuckets(c *C) {
Expand Down