Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: refine updating stats using feedback #6796

Merged
merged 6 commits into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 77 additions & 64 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ type BucketFeedback struct {
feedback []feedback // All the feedback info in the same bucket.
lower *types.Datum // The lower bound of the new bucket.
upper *types.Datum // The upper bound of the new bucket.
scalar scalar // The scalar info for the boundary.
}

// buildBucketFeedback build the feedback for each bucket from the histogram feedback.
Expand Down Expand Up @@ -264,63 +263,24 @@ func (b *BucketFeedback) getBoundaries(num int) []types.Datum {

type bucket = feedback

// Get the fraction of the [lowerVal, upperVal] that intersect with the bucket boundary.
func (b *BucketFeedback) getFraction(lowerVal, upperVal *types.Datum) float64 {
var lower, upper float64
if b.lower.Kind() == types.KindBytes {
value := lowerVal.GetBytes()
lower = convertBytesToScalar(value[b.scalar.commonPfxLen:])
value = upperVal.GetBytes()
upper = convertBytesToScalar(value[b.scalar.commonPfxLen:])
} else {
lower = float64(lowerVal.GetInt64())
upper = float64(upperVal.GetInt64())
}
return calcFraction(b.scalar.lower, b.scalar.upper, upper) - calcFraction(b.scalar.lower, b.scalar.upper, lower)
}

func (b *BucketFeedback) getBucketCount(count float64) int64 {
// Get the scalar info for boundary.
prefixLen := commonPrefixLength(b.lower.GetBytes(), b.upper.GetBytes())
if b.lower.Kind() == types.KindBytes {
b.scalar.commonPfxLen = commonPrefixLength(b.lower.GetBytes(), b.upper.GetBytes())
b.scalar.lower = convertBytesToScalar(b.lower.GetBytes()[prefixLen:])
b.scalar.upper = convertBytesToScalar(b.upper.GetBytes()[prefixLen:])
} else {
b.scalar.lower = float64(b.lower.GetInt64())
b.scalar.upper = float64(b.upper.GetInt64())
}
// Use the feedback that covers most to update this bucket's count. We only consider feedback that covers at
// least minBucketFraction.
maxFraction := minBucketFraction
for _, fb := range b.feedback {
fraction := b.getFraction(fb.lower, fb.upper)
if fraction >= maxFraction {
maxFraction = fraction
count = float64(fb.count) / fraction
}
}
return int64(count)
}

// updateBucket split the bucket according to feedback.
func (b *BucketFeedback) splitBucket(newBktNum int, totalCount float64, count float64) []bucket {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment and method name not match.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The count should be renamed to originBucketCount?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/newBktNum/newNumBkts

// do not split if the count is already too small.
if newBktNum <= 1 || count < minBucketFraction*totalCount {
bkt := bucket{lower: b.lower, upper: b.upper, count: int64(count)}
return []bucket{bkt}
}
// Split the bucket.
bounds := b.getBoundaries(newBktNum)
bounds := b.getBoundaries(newBktNum + 1)
bkts := make([]bucket, 0, len(bounds)-1)
for i := 1; i < len(bounds); i++ {
newCount := int64(count * b.getFraction(&bounds[i-1], &bounds[i]))
bkt := bucket{&bounds[i-1], bounds[i].Copy(), 0, 0}
// get bucket count
_, ratio := getOverlapFraction(feedback{b.lower, b.upper, int64(count), 0}, bkt)
bucketCount := count * ratio
bucketCount = b.bucketCount(bkt, bucketCount)
// do not split if the count of result bucket is too small.
if float64(newCount) < minBucketFraction*totalCount {
if bucketCount < minBucketFraction*totalCount {
bounds[i] = bounds[i-1]
continue
}
bkts = append(bkts, bucket{lower: &bounds[i-1], upper: bounds[i].Copy(), count: newCount, repeat: 0})
bkt.count = int64(bucketCount)
bkts = append(bkts, bkt)
// To guarantee that each bucket's range will not overlap.
if bounds[i].Kind() == types.KindBytes {
bounds[i].SetBytes(kv.Key(bounds[i].GetBytes()).PrefixNext())
Expand All @@ -333,6 +293,68 @@ func (b *BucketFeedback) splitBucket(newBktNum int, totalCount float64, count fl
return bkts
}

func getFraction4PK(minValue, maxValue, lower, upper *types.Datum) (float64, float64) {
if minValue.Kind() == types.KindInt64 {
l, r := float64(minValue.GetInt64()), float64(maxValue.GetInt64())
return calcFraction(l, r, float64(lower.GetInt64())), calcFraction(l, r, float64(upper.GetInt64()))
}
l, r := float64(minValue.GetUint64()), float64(maxValue.GetUint64())
return calcFraction(l, r, float64(lower.GetUint64())), calcFraction(l, r, float64(upper.GetUint64()))
}

func getFraction4Index(minValue, maxValue, lower, upper *types.Datum, prefixLen int) (float64, float64) {
l, r := convertBytesToScalar(minValue.GetBytes()[prefixLen:]), convertBytesToScalar(maxValue.GetBytes()[prefixLen:])
return calcFraction(l, r, convertBytesToScalar(lower.GetBytes()[prefixLen:])),
calcFraction(l, r, convertBytesToScalar(upper.GetBytes()[prefixLen:]))
}

// getOverlapFraction gets the overlap fraction of feedback and bucket range. In order to get the bucket count, it also
// returns the ratio between feedback fraction and bucket fraction.
func getOverlapFraction(fb feedback, bkt bucket) (float64, float64) {
datums := make([]types.Datum, 0, 4)
datums = append(datums, *fb.lower, *fb.upper)
datums = append(datums, *bkt.lower, *bkt.upper)
err := types.SortDatums(nil, datums)
if err != nil {
return 0, 0
}
var fbLower, fbUpper, bktLower, bktUpper float64
minValue, maxValue := &datums[0], &datums[3]
if datums[0].Kind() == types.KindBytes {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about add comment at the declaration of bucket to tell that the datum in bucket can only be Bytes or Int, and Bytes is for index and Int is for int pk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

prefixLen := commonPrefixLength(minValue.GetBytes(), maxValue.GetBytes())
fbLower, fbUpper = getFraction4Index(minValue, maxValue, fb.lower, fb.upper, prefixLen)
bktLower, bktUpper = getFraction4Index(minValue, maxValue, bkt.lower, bkt.upper, prefixLen)
} else {
fbLower, fbUpper = getFraction4PK(minValue, maxValue, fb.lower, fb.upper)
bktLower, bktUpper = getFraction4PK(minValue, maxValue, bkt.lower, bkt.upper)
}
ratio := (bktUpper - bktLower) / (fbUpper - fbLower)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ratio seems not consistent with the comment, should be (fbUpper - fbLower) /(bktUpper - bktLower)?

And later we calculate the new bucket count with count * ratio.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the comment is wrong. The count is the feedback count, so this ratio is right.

// full overlap
if fbLower <= bktLower && bktUpper <= fbUpper {
return bktUpper - bktLower, ratio
}
if bktLower <= fbLower && fbUpper <= bktUpper {
return fbUpper - fbLower, ratio
}
// partial overlap
overlap := math.Min(bktUpper-fbLower, fbUpper-bktLower)
return overlap, ratio
}

func (b *BucketFeedback) bucketCount(bkt bucket, defaultCount float64) float64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about name it refineBucketCount?
And add some comment about this method.

bestFraction := minBucketFraction
count := defaultCount
for _, fb := range b.feedback {
fraction, ratio := getOverlapFraction(fb, bkt)
// choose the max overlap fraction
if fraction > bestFraction {
bestFraction = fraction
count = float64(fb.count) * ratio
}
}
return count
}

// Get the split count for the histogram.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to explain why choose this algorithm.

func getSplitCount(count, remainBuckets int) int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about numFeedbacks instead of count?

remainBuckets = mathutil.Max(remainBuckets, 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better use another variable like splitCount rather than modify the input argument.

Expand Down Expand Up @@ -425,31 +447,18 @@ func mergeBuckets(bkts []bucket, isNewBuckets []bool, totalCount float64) []buck

func splitBuckets(h *Histogram, feedback *QueryFeedback) ([]bucket, []bool, int64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments about this function.

bktID2FB, fbNum := buildBucketFeedback(h, feedback)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/fbNum/totalNumFBs

counts := make([]int64, 0, h.Len())
for i := 0; i < h.Len(); i++ {
bkt, ok := bktID2FB[i]
if !ok {
counts = append(counts, h.bucketCount(i))
} else {
counts = append(counts, bkt.getBucketCount(float64(h.bucketCount(i))))
}
}
totCount := int64(0)
for _, count := range counts {
totCount += count
}
buckets := make([]bucket, 0, h.Len())
isNewBuckets := make([]bool, 0, h.Len())
splitCount := getSplitCount(fbNum, defaultBucketCount-h.Len())
for i := 0; i < h.Len(); i++ {
bkt, ok := bktID2FB[i]
// No feedback, just use the original one.
if !ok {
buckets = append(buckets, bucket{h.GetLower(i), h.GetUpper(i), counts[i], h.Buckets[i].Repeat})
buckets = append(buckets, bucket{h.GetLower(i), h.GetUpper(i), h.bucketCount(i), h.Buckets[i].Repeat})
isNewBuckets = append(isNewBuckets, false)
continue
}
bkts := bkt.splitBucket(splitCount*len(bkt.feedback)/fbNum, float64(totCount), float64(counts[i]))
bkts := bkt.splitBucket(splitCount*len(bkt.feedback)/fbNum, h.totalRowCount(), float64(h.bucketCount(i)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a dedicated variable for splitCount*len(bkt.feedback)/fbNum and add a comment to explain why.

buckets = append(buckets, bkts...)
if len(bkts) == 1 {
isNewBuckets = append(isNewBuckets, false)
Expand All @@ -459,6 +468,10 @@ func splitBuckets(h *Histogram, feedback *QueryFeedback) ([]bucket, []bool, int6
}
}
}
totCount := int64(0)
for _, bkt := range buckets {
totCount += bkt.count
}
return buckets, isNewBuckets, totCount
}

Expand Down
51 changes: 36 additions & 15 deletions statistics/feedback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ func (s *testFeedbackSuite) TestUpdateHistogram(c *C) {
q := NewQueryFeedback(0, genHistogram(), 0, false)
q.feedback = feedbacks
originBucketCount := defaultBucketCount
defaultBucketCount = 5
defaultBucketCount = 7
defer func() { defaultBucketCount = originBucketCount }()
c.Assert(UpdateHistogram(q.Hist(), q).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 10000\tlower_bound: 0\tupper_bound: 1\trepeats: 0\n"+
"num: 10003\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+
"num: 10021\tlower_bound: 4\tupper_bound: 20\trepeats: 0\n"+
"num: 10046\tlower_bound: 21\tupper_bound: 46\trepeats: 0\n"+
"num: 10059\tlower_bound: 47\tupper_bound: 60\trepeats: 0")
"num: 10008\tlower_bound: 2\tupper_bound: 7\trepeats: 0\n"+
"num: 10019\tlower_bound: 8\tupper_bound: 19\trepeats: 0\n"+
"num: 10019\tlower_bound: 20\tupper_bound: 20\trepeats: 0\n"+
"num: 10037\tlower_bound: 21\tupper_bound: 39\trepeats: 0\n"+
"num: 10055\tlower_bound: 40\tupper_bound: 58\trepeats: 0\n"+
"num: 10057\tlower_bound: 59\tupper_bound: 60\trepeats: 0")
}

func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
Expand All @@ -93,10 +95,10 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
"num: 1\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+
"num: 1\tlower_bound: 5\tupper_bound: 7\trepeats: 0\n"+
"num: 6\tlower_bound: 10\tupper_bound: 15\trepeats: 0\n"+
"num: 10\tlower_bound: 16\tupper_bound: 20\trepeats: 0\n"+
"num: 10\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
"num: 6\tlower_bound: 16\tupper_bound: 20\trepeats: 0\n"+
"num: 6\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, true, true, false})
c.Assert(totalCount, Equals, int64(11))
c.Assert(totalCount, Equals, int64(6))

// test do not split if the bucket count is too small
feedbacks = []feedback{newFeedback(0, 1, 100000)}
Expand All @@ -111,26 +113,45 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
"num: 100000\tlower_bound: 0\tupper_bound: 1\trepeats: 0\n"+
"num: 100000\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+
"num: 100000\tlower_bound: 5\tupper_bound: 7\trepeats: 0\n"+
"num: 100002\tlower_bound: 10\tupper_bound: 20\trepeats: 0\n"+
"num: 100002\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, false, false})
c.Assert(totalCount, Equals, int64(100002))
"num: 100001\tlower_bound: 10\tupper_bound: 15\trepeats: 0\n"+
"num: 100001\tlower_bound: 16\tupper_bound: 20\trepeats: 0\n"+
"num: 100001\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, true, true, false})
c.Assert(totalCount, Equals, int64(100001))

// test do not split if the result bucket count is too small
h := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 5, 0)
appendBucket(h, 0, 1000000)
h.Buckets[0].Count = 1000000
feedbacks = feedbacks[:0]
for i := 0; i < 100; i++ {
feedbacks = append(feedbacks, newFeedback(0, 101, 1))
feedbacks = append(feedbacks, newFeedback(0, 10, 1))
}
q = NewQueryFeedback(0, h, 0, false)
q.feedback = feedbacks
buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), q)
c.Assert(buildNewHistogram(q.Hist(), buckets).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 9900\tlower_bound: 0\tupper_bound: 1000000\trepeats: 0")
"num: 1000000\tlower_bound: 0\tupper_bound: 1000000\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false})
c.Assert(totalCount, Equals, int64(9900))
c.Assert(totalCount, Equals, int64(1000000))

// test split even if the feedback range is too small
h = NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 5, 0)
appendBucket(h, 0, 1000000)
feedbacks = feedbacks[:0]
for i := 0; i < 100; i++ {
feedbacks = append(feedbacks, newFeedback(0, 10, 1))
}
q = NewQueryFeedback(0, h, 0, false)
q.feedback = feedbacks
buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), q)
c.Assert(buildNewHistogram(q.Hist(), buckets).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 1\tlower_bound: 0\tupper_bound: 10\trepeats: 0\n"+
"num: 1\tlower_bound: 11\tupper_bound: 1000000\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{true, true})
c.Assert(totalCount, Equals, int64(1))
}

func (s *testFeedbackSuite) TestMergeBuckets(c *C) {
Expand Down