Skip to content

Commit

Permalink
This is an automated cherry-pick of #11624
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
wlwilliamx authored and ti-chi-bot committed Oct 10, 2024
1 parent bcb2bd3 commit f9080bd
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
8 changes: 7 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,19 @@ func (p *processor) GetTableStatus(tableID model.TableID, collectStat bool) tabl
}
}

<<<<<<< HEAD

Check failure on line 469 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 469 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body
func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats sinkmanager.TableStats) tablepb.Stats {
pullerStats := p.sourceManager.GetTablePullerStats(tableID)
now := p.upstream.PDClock.CurrentTime()
=======

Check failure on line 473 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expecting }

Check failure on line 473 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expecting }
func (p *processor) getStatsFromSourceManagerAndSinkManager(
span tablepb.Span, sinkStats sinkmanager.TableStats,
) tablepb.Stats {
pullerStats := p.sourceManager.r.GetTablePullerStats(span)
>>>>>>> 1b026f6b75 (cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624))

Check failure on line 478 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expecting }

Check failure on line 478 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 478 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expecting }

Check failure on line 478 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

stats := tablepb.Stats{
RegionCount: pullerStats.RegionCount,
CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0),
BarrierTs: sinkStats.BarrierTs,
StageCheckpoints: map[string]tablepb.Checkpoint{
"puller-ingress": {
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/tablepb/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ message Stats {
// Number of captured regions.
uint64 region_count = 1;
// The current timestamp from the table's point of view.
<<<<<<< HEAD
uint64 current_ts = 2 [(gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.Ts"];
=======
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field.
>>>>>>> 1b026f6b75 (cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624))
// Checkponits at each stage.
map<string, Checkpoint> stage_checkpoints = 3 [(gogoproto.nullable) = false];
// The barrier timestamp of the table.
Expand Down
8 changes: 7 additions & 1 deletion cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,13 @@ func (c *coordinator) maybeCollectMetrics() {
}
c.lastCollectTime = now

pdTime := now
// only nil in unit test
if c.pdClock != nil {
pdTime = c.pdClock.CurrentTime()
}

c.schedulerM.CollectMetrics()
c.replicationM.CollectMetrics()
c.replicationM.CollectMetrics(pdTime)
c.captureM.CollectMetrics()
}
12 changes: 5 additions & 7 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (r *Manager) logSlowTableInfo(currentTables []model.TableID, currentTime ti
}

// CollectMetrics collects metrics.
func (r *Manager) CollectMetrics() {
func (r *Manager) CollectMetrics(currentPDTime time.Time) {
cf := r.changefeedID
tableGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(len(r.tables)))
Expand All @@ -733,13 +733,12 @@ func (r *Manager) CollectMetrics() {
WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs))

// Slow table latency metrics.
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
for stage, checkpoint := range table.Stats.StageCheckpoints {
// Checkpoint ts
phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs)
slowestTableStageCheckpointTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs))
checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3
checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds()
slowestTableStageCheckpointTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag)
slowestTableStageCheckpointTsLagHistogramVec.
Expand All @@ -748,7 +747,7 @@ func (r *Manager) CollectMetrics() {
phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs))
resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3
resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds()
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand All @@ -759,7 +758,7 @@ func (r *Manager) CollectMetrics() {
phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs))
barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3
barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds()
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand Down Expand Up @@ -809,8 +808,7 @@ func (r *Manager) CollectMetrics() {
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))

phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds()
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
}
}
Expand Down

0 comments on commit f9080bd

Please sign in to comment.