diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index f63b77fb583..ebd65e4015f 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -378,11 +378,9 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager( span tablepb.Span, sinkStats sinkmanager.TableStats, ) tablepb.Stats { pullerStats := p.sourceManager.r.GetTablePullerStats(span) - now := p.upstream.PDClock.CurrentTime() stats := tablepb.Stats{ RegionCount: pullerStats.RegionCount, - CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0), BarrierTs: sinkStats.BarrierTs, StageCheckpoints: map[string]tablepb.Checkpoint{ "puller-ingress": { diff --git a/cdc/processor/tablepb/table.proto b/cdc/processor/tablepb/table.proto index 3727edc0c13..006a8f5f92f 100644 --- a/cdc/processor/tablepb/table.proto +++ b/cdc/processor/tablepb/table.proto @@ -65,7 +65,7 @@ message Stats { // Number of captured regions. uint64 region_count = 1; // The current timestamp from the table's point of view. - uint64 current_ts = 2 [(gogoproto.casttype) = "Ts"]; + uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field. // Checkponits at each stage. map stage_checkpoints = 3 [(gogoproto.nullable) = false]; // The barrier timestamp of the table. diff --git a/cdc/scheduler/internal/v3/coordinator.go b/cdc/scheduler/internal/v3/coordinator.go index 8537598500f..45d1c293951 100644 --- a/cdc/scheduler/internal/v3/coordinator.go +++ b/cdc/scheduler/internal/v3/coordinator.go @@ -446,7 +446,13 @@ func (c *coordinator) maybeCollectMetrics() { } c.lastCollectTime = now + pdTime := now + // only nil in unit test + if c.pdClock != nil { + pdTime = c.pdClock.CurrentTime() + } + c.schedulerM.CollectMetrics() - c.replicationM.CollectMetrics() + c.replicationM.CollectMetrics(pdTime) c.captureM.CollectMetrics() } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 5f7933e8a06..f1247751d6c 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -769,7 +769,7 @@ func (r *Manager) logSlowTableInfo(currentPDTime time.Time) { } // CollectMetrics collects metrics. -func (r *Manager) CollectMetrics() { +func (r *Manager) CollectMetrics(currentPDTime time.Time) { cf := r.changefeedID tableGauge. WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len())) @@ -786,13 +786,12 @@ func (r *Manager) CollectMetrics() { WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs)) // Slow table latency metrics. - phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs) for stage, checkpoint := range table.Stats.StageCheckpoints { // Checkpoint ts phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs) slowestTableStageCheckpointTsGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs)) - checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3 + checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds() slowestTableStageCheckpointTsLagGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag) slowestTableStageCheckpointTsLagHistogramVec. @@ -801,7 +800,7 @@ func (r *Manager) CollectMetrics() { phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs) slowestTableStageResolvedTsGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs)) - resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3 + resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds() slowestTableStageResolvedTsLagGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag) slowestTableStageResolvedTsLagHistogramVec. @@ -812,7 +811,7 @@ func (r *Manager) CollectMetrics() { phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs) slowestTableStageResolvedTsGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs)) - barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3 + barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds() slowestTableStageResolvedTsLagGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag) slowestTableStageResolvedTsLagHistogramVec. @@ -863,8 +862,7 @@ func (r *Manager) CollectMetrics() { phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs) slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs)) - phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs) - lag := float64(phyCurrentTs-phyCkptTs) / 1e3 + lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds() slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag) } }