Skip to content

Commit

Permalink
kv-client(cdc): log slowest regions and region holes (#9933)
Browse files Browse the repository at this point in the history
ref #9222
  • Loading branch information
hicqu authored Oct 20, 2023
1 parent 747965d commit a71208a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
11 changes: 7 additions & 4 deletions cdc/kv/regionlock/region_range_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs(
if action != nil {
action(item.regionID, &item.state)
}

r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, item.startKey) < 0
if spanz.EndCompare(lastEnd, item.startKey) < 0 {
r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: item.startKey})
}
ckpt := item.state.CheckpointTs.Load()
if ckpt > r.FastestRegion.CheckpointTs {
r.FastestRegion.RegionID = item.regionID
Expand All @@ -505,13 +506,15 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs(
lastEnd = item.endKey
return true
})
r.HoleExists = r.HoleExists || spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0
if spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0 {
r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: l.totalSpan.EndKey})
}
return
}

// CollectedLockedRangeAttrs returns by `RegionRangeLock.CollectedLockedRangeAttrs`.
type CollectedLockedRangeAttrs struct {
HoleExists bool
Holes []tablepb.Span
FastestRegion LockedRangeAttrs
SlowestRegion LockedRangeAttrs
}
Expand Down
50 changes: 50 additions & 0 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package kv
import (
"context"
"encoding/binary"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -40,6 +42,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
kvclientv2 "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -246,6 +249,7 @@ func (s *SharedClient) Run(ctx context.Context) error {
g.Go(func() error { return s.requestRegionToStore(ctx, g) })
g.Go(func() error { return s.handleErrors(ctx) })
g.Go(func() error { return s.resolveLock(ctx) })
g.Go(func() error { return s.logSlowRegions(ctx) })

log.Info("event feed started",
zap.String("namespace", s.changefeed.Namespace),
Expand Down Expand Up @@ -689,6 +693,52 @@ func (s *SharedClient) resolveLock(ctx context.Context) error {
}
}

func (s *SharedClient) logSlowRegions(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

currTime := s.pdClock.CurrentTime()
s.totalSpans.RLock()
for subscriptionID, rt := range s.totalSpans.v {
attr := rt.rangeLock.CollectLockedRangeAttrs(nil)
if attr.SlowestRegion.Initialized {
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs)
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval {
log.Info("event feed finds a slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.Holes) > 0 {
holes := make([]string, 0, len(attr.Holes))
for _, hole := range attr.Holes {
holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey))
}
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", subscriptionID),
zap.String("holes", strings.Join(holes, ", ")))
}
}
s.totalSpans.RUnlock()
}
}

func (s *SharedClient) newRequestedTable(
subID SubscriptionID, span tablepb.Span, startTs uint64,
eventCh chan<- MultiplexingEvent,
Expand Down

0 comments on commit a71208a

Please sign in to comment.