diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 865f2d72485..c943f73c246 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -466,13 +466,19 @@ func (p *processor) GetTableStatus(tableID model.TableID, collectStat bool) tabl } } +<<<<<<< HEAD func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats sinkmanager.TableStats) tablepb.Stats { pullerStats := p.sourceManager.GetTablePullerStats(tableID) now := p.upstream.PDClock.CurrentTime() +======= +func (p *processor) getStatsFromSourceManagerAndSinkManager( + span tablepb.Span, sinkStats sinkmanager.TableStats, +) tablepb.Stats { + pullerStats := p.sourceManager.r.GetTablePullerStats(span) +>>>>>>> 1b026f6b75 (cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624)) stats := tablepb.Stats{ RegionCount: pullerStats.RegionCount, - CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0), BarrierTs: sinkStats.BarrierTs, StageCheckpoints: map[string]tablepb.Checkpoint{ "puller-ingress": { diff --git a/cdc/processor/tablepb/table.proto b/cdc/processor/tablepb/table.proto index a3d771eea2d..3bef599da76 100644 --- a/cdc/processor/tablepb/table.proto +++ b/cdc/processor/tablepb/table.proto @@ -64,7 +64,11 @@ message Stats { // Number of captured regions. uint64 region_count = 1; // The current timestamp from the table's point of view. +<<<<<<< HEAD uint64 current_ts = 2 [(gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.Ts"]; +======= + uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field. +>>>>>>> 1b026f6b75 (cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624)) // Checkponits at each stage. map stage_checkpoints = 3 [(gogoproto.nullable) = false]; // The barrier timestamp of the table. diff --git a/cdc/scheduler/internal/v3/coordinator.go b/cdc/scheduler/internal/v3/coordinator.go index 57f52fc3831..28f9f29199e 100644 --- a/cdc/scheduler/internal/v3/coordinator.go +++ b/cdc/scheduler/internal/v3/coordinator.go @@ -426,7 +426,13 @@ func (c *coordinator) maybeCollectMetrics() { } c.lastCollectTime = now + pdTime := now + // only nil in unit test + if c.pdClock != nil { + pdTime = c.pdClock.CurrentTime() + } + c.schedulerM.CollectMetrics() - c.replicationM.CollectMetrics() + c.replicationM.CollectMetrics(pdTime) c.captureM.CollectMetrics() } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index e8c943482bb..65fc3093117 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -716,7 +716,7 @@ func (r *Manager) logSlowTableInfo(currentTables []model.TableID, currentTime ti } // CollectMetrics collects metrics. -func (r *Manager) CollectMetrics() { +func (r *Manager) CollectMetrics(currentPDTime time.Time) { cf := r.changefeedID tableGauge. WithLabelValues(cf.Namespace, cf.ID).Set(float64(len(r.tables))) @@ -733,13 +733,12 @@ func (r *Manager) CollectMetrics() { WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs)) // Slow table latency metrics. - phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs) for stage, checkpoint := range table.Stats.StageCheckpoints { // Checkpoint ts phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs) slowestTableStageCheckpointTsGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs)) - checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3 + checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds() slowestTableStageCheckpointTsLagGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag) slowestTableStageCheckpointTsLagHistogramVec. @@ -748,7 +747,7 @@ func (r *Manager) CollectMetrics() { phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs) slowestTableStageResolvedTsGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs)) - resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3 + resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds() slowestTableStageResolvedTsLagGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag) slowestTableStageResolvedTsLagHistogramVec. @@ -759,7 +758,7 @@ func (r *Manager) CollectMetrics() { phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs) slowestTableStageResolvedTsGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs)) - barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3 + barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds() slowestTableStageResolvedTsLagGaugeVec. WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag) slowestTableStageResolvedTsLagHistogramVec. @@ -809,8 +808,7 @@ func (r *Manager) CollectMetrics() { phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs) slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs)) - phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs) - lag := float64(phyCurrentTs-phyCkptTs) / 1e3 + lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds() slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag) } }