Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624) #11643

Open
wants to merge 1 commit into
base: release-6.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,19 @@
}
}

<<<<<<< HEAD

Check failure on line 469 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 469 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body
func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats sinkmanager.TableStats) tablepb.Stats {
pullerStats := p.sourceManager.GetTablePullerStats(tableID)
now := p.upstream.PDClock.CurrentTime()
=======

Check failure on line 473 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expecting }

Check failure on line 473 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expecting }
func (p *processor) getStatsFromSourceManagerAndSinkManager(
span tablepb.Span, sinkStats sinkmanager.TableStats,
) tablepb.Stats {
pullerStats := p.sourceManager.r.GetTablePullerStats(span)
>>>>>>> 1b026f6b75 (cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624))

Check failure on line 478 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expecting }

Check failure on line 478 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 478 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expecting }

Check failure on line 478 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

stats := tablepb.Stats{
RegionCount: pullerStats.RegionCount,
CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0),
BarrierTs: sinkStats.BarrierTs,
StageCheckpoints: map[string]tablepb.Checkpoint{
"puller-ingress": {
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/tablepb/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ message Stats {
// Number of captured regions.
uint64 region_count = 1;
// The current timestamp from the table's point of view.
<<<<<<< HEAD
uint64 current_ts = 2 [(gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.Ts"];
=======
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field.
>>>>>>> 1b026f6b75 (cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624))
// Checkponits at each stage.
map<string, Checkpoint> stage_checkpoints = 3 [(gogoproto.nullable) = false];
// The barrier timestamp of the table.
Expand Down
8 changes: 7 additions & 1 deletion cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,13 @@ func (c *coordinator) maybeCollectMetrics() {
}
c.lastCollectTime = now

pdTime := now
// only nil in unit test
if c.pdClock != nil {
pdTime = c.pdClock.CurrentTime()
}

c.schedulerM.CollectMetrics()
c.replicationM.CollectMetrics()
c.replicationM.CollectMetrics(pdTime)
c.captureM.CollectMetrics()
}
12 changes: 5 additions & 7 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (r *Manager) logSlowTableInfo(currentTables []model.TableID, currentTime ti
}

// CollectMetrics collects metrics.
func (r *Manager) CollectMetrics() {
func (r *Manager) CollectMetrics(currentPDTime time.Time) {
cf := r.changefeedID
tableGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(len(r.tables)))
Expand All @@ -733,13 +733,12 @@ func (r *Manager) CollectMetrics() {
WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs))

// Slow table latency metrics.
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
for stage, checkpoint := range table.Stats.StageCheckpoints {
// Checkpoint ts
phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs)
slowestTableStageCheckpointTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs))
checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3
checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds()
slowestTableStageCheckpointTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag)
slowestTableStageCheckpointTsLagHistogramVec.
Expand All @@ -748,7 +747,7 @@ func (r *Manager) CollectMetrics() {
phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs))
resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3
resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds()
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand All @@ -759,7 +758,7 @@ func (r *Manager) CollectMetrics() {
phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs))
barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3
barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds()
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand Down Expand Up @@ -809,8 +808,7 @@ func (r *Manager) CollectMetrics() {
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))

phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds()
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
}
}
Expand Down
Loading