Skip to content

Commit

Permalink
cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Depreca…
Browse files Browse the repository at this point in the history
…te current_ts Field in Stats (#11624)

close #11561
  • Loading branch information
wlwilliamx authored and ti-chi-bot committed Oct 10, 2024
1 parent e2b6bb1 commit 3f07a1d
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 11 deletions.
2 changes: 0 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,9 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(
span tablepb.Span, sinkStats sinkmanager.TableStats,
) tablepb.Stats {
pullerStats := p.sourceManager.r.GetTablePullerStats(span)
now := p.upstream.PDClock.CurrentTime()

stats := tablepb.Stats{
RegionCount: pullerStats.RegionCount,
CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0),
BarrierTs: sinkStats.BarrierTs,
StageCheckpoints: map[string]tablepb.Checkpoint{
"puller-ingress": {
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/tablepb/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ message Stats {
// Number of captured regions.
uint64 region_count = 1;
// The current timestamp from the table's point of view.
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts"];
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field.
// Checkponits at each stage.
map<string, Checkpoint> stage_checkpoints = 3 [(gogoproto.nullable) = false];
// The barrier timestamp of the table.
Expand Down
8 changes: 7 additions & 1 deletion cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,13 @@ func (c *coordinator) maybeCollectMetrics() {
}
c.lastCollectTime = now

pdTime := now
// only nil in unit test
if c.pdClock != nil {
pdTime = c.pdClock.CurrentTime()
}

c.schedulerM.CollectMetrics()
c.replicationM.CollectMetrics()
c.replicationM.CollectMetrics(pdTime)
c.captureM.CollectMetrics()
}
12 changes: 5 additions & 7 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ func (r *Manager) logSlowTableInfo(currentPDTime time.Time) {
}

// CollectMetrics collects metrics.
func (r *Manager) CollectMetrics() {
func (r *Manager) CollectMetrics(currentPDTime time.Time) {
cf := r.changefeedID
tableGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len()))
Expand All @@ -786,13 +786,12 @@ func (r *Manager) CollectMetrics() {
WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs))

// Slow table latency metrics.
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
for stage, checkpoint := range table.Stats.StageCheckpoints {
// Checkpoint ts
phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs)
slowestTableStageCheckpointTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs))
checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3
checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds()
slowestTableStageCheckpointTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag)
slowestTableStageCheckpointTsLagHistogramVec.
Expand All @@ -801,7 +800,7 @@ func (r *Manager) CollectMetrics() {
phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs))
resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3
resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds()
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand All @@ -812,7 +811,7 @@ func (r *Manager) CollectMetrics() {
phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs))
barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3
barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds()
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand Down Expand Up @@ -863,8 +862,7 @@ func (r *Manager) CollectMetrics() {
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))

phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds()
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
}
}
Expand Down

0 comments on commit 3f07a1d

Please sign in to comment.