Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metric for measuring performance #229

Merged
merged 12 commits into from
May 6, 2021
2 changes: 1 addition & 1 deletion blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ FOR_LOOP:
// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
var err error
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first, nil)
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
Expand Down
2 changes: 1 addition & 1 deletion blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newBlockchainReactor(
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}

state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock, nil)
if err != nil {
panic(fmt.Errorf("error apply block: %w", err))
}
Expand Down
2 changes: 1 addition & 1 deletion blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (bcR *BlockchainReactor) processBlock() error {

bcR.store.SaveBlock(first, firstParts, second.LastCommit)

bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first)
bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first, nil)
if err != nil {
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
Expand Down
2 changes: 1 addition & 1 deletion blockchain/v1/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func newBlockchainReactor(
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}

state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock, nil)
if err != nil {
panic(fmt.Errorf("error apply block: %w", err))
}
Expand Down
2 changes: 1 addition & 1 deletion blockchain/v2/processor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newProcessorContext(st blockStore, ex blockApplier, s state.State) *pContex
}

func (pc *pContext) applyBlock(blockID types.BlockID, block *types.Block) error {
newState, _, err := pc.applier.ApplyBlock(pc.state, blockID, block)
newState, _, err := pc.applier.ApplyBlock(pc.state, blockID, block, nil)
pc.state = newState
return err
}
Expand Down
3 changes: 2 additions & 1 deletion blockchain/v2/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type blockVerifier interface {
}

type blockApplier interface {
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, int64, error)
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block, times *state.CommitStepTimes) (
state.State, int64, error)
}

// XXX: unify naming in this package around tmState
Expand Down
4 changes: 2 additions & 2 deletions blockchain/v2/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type mockBlockApplier struct {

// XXX: Add whitelist/blacklist?
func (mba *mockBlockApplier) ApplyBlock(
state sm.State, blockID types.BlockID, block *types.Block,
state sm.State, blockID types.BlockID, block *types.Block, stepTimes *sm.CommitStepTimes,
) (sm.State, int64, error) {
state.LastBlockHeight++
return state, 0, nil
Expand Down Expand Up @@ -542,7 +542,7 @@ func newReactorStore(
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}

state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock, nil)
if err != nil {
panic(fmt.Errorf("error apply block: %w", err))
}
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ func DefaultInstrumentationConfig() *InstrumentationConfig {
Prometheus: false,
PrometheusListenAddr: ":26660",
MaxOpenConnections: 3,
Namespace: "tendermint",
Namespace: "ostracon",
}
}

Expand Down
92 changes: 92 additions & 0 deletions consensus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ type Metrics struct {

// Number of blockparts transmitted by peer.
BlockParts metrics.Counter

// ////////////////////////////////////
// Metrics for measuring performance
// ////////////////////////////////////

// Number of blocks that are we couldn't receive
MissingProposal metrics.Gauge

// Number of rounds turned over.
RoundFailures metrics.Histogram

// Execution time profiling of each step
DurationProposal metrics.Histogram
DurationPrevote metrics.Histogram
DurationPrecommit metrics.Histogram
DurationCommitExecuting metrics.Histogram
DurationCommitCommitting metrics.Histogram
DurationCommitRechecking metrics.Histogram
DurationWaitingForNewRound metrics.Histogram
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand Down Expand Up @@ -186,6 +205,68 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "block_parts",
Help: "Number of blockparts transmitted by peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
MissingProposal: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "missing_proposal",
Help: "Number of blocks we couldn't receive",
}, labels).With(labelsAndValues...),
RoundFailures: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "round_failures",
Help: "Number of rounds failed on consensus",
Buckets: stdprometheus.LinearBuckets(0, 1, 5),
}, labels).With(labelsAndValues...),
DurationProposal: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "duration_proposal",
Help: "Duration of proposal step",
Buckets: stdprometheus.LinearBuckets(100, 100, 10),
}, labels).With(labelsAndValues...),
DurationPrevote: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "duration_prevote",
Help: "Duration of prevote step",
Buckets: stdprometheus.LinearBuckets(100, 100, 10),
}, labels).With(labelsAndValues...),
DurationPrecommit: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "duration_precommit",
Help: "Duration of precommit step",
Buckets: stdprometheus.LinearBuckets(100, 100, 10),
}, labels).With(labelsAndValues...),
DurationCommitExecuting: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "duration_commit_executing",
Help: "Duration of executing block txs",
Buckets: stdprometheus.LinearBuckets(100, 100, 10),
}, labels).With(labelsAndValues...),
DurationCommitCommitting: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "duration_commit_committing",
Help: "Duration of committing updated state",
Buckets: stdprometheus.LinearBuckets(100, 100, 10),
}, labels).With(labelsAndValues...),
DurationCommitRechecking: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "duration_commit_rechecking",
Help: "Duration of rechecking mempool txs",
Buckets: stdprometheus.LinearBuckets(100, 100, 10),
}, labels).With(labelsAndValues...),
DurationWaitingForNewRound: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "duration_waiting_for_new_round",
Help: "Duration of waiting for next new round",
Buckets: stdprometheus.LinearBuckets(100, 100, 10),
}, labels).With(labelsAndValues...),
}
}

Expand Down Expand Up @@ -216,5 +297,16 @@ func NopMetrics() *Metrics {
FastSyncing: discard.NewGauge(),
StateSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),

MissingProposal: discard.NewGauge(),
RoundFailures: discard.NewHistogram(),

DurationProposal: discard.NewHistogram(),
DurationPrevote: discard.NewHistogram(),
DurationPrecommit: discard.NewHistogram(),
DurationCommitExecuting: discard.NewHistogram(),
DurationCommitCommitting: discard.NewHistogram(),
DurationCommitRechecking: discard.NewHistogram(),
DurationWaitingForNewRound: discard.NewHistogram(),
}
}
2 changes: 1 addition & 1 deletion consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
blockExec.SetEventBus(h.eventBus)

var err error
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block)
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block, nil)
if err != nil {
return sm.State{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ func applyBlock(stateStore sm.Store, st sm.State, blk *types.Block, proxyApp pro
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)

blkID := types.BlockID{Hash: blk.Hash(), PartSetHeader: blk.MakePartSet(testPartSize).Header()}
newState, _, err := blockExec.ApplyBlock(st, blkID, blk)
newState, _, err := blockExec.ApplyBlock(st, blkID, blk, nil)
if err != nil {
panic(err)
}
Expand Down
73 changes: 72 additions & 1 deletion consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,53 @@ type timeoutInfo struct {
Step cstypes.RoundStepType `json:"step"`
}

type StepTimes struct {
Proposal types.StepDuration
Prevote types.StepDuration
Precommit types.StepDuration
sm.CommitStepTimes
WaitingForNewRound types.StepDuration
}

func (st *StepTimes) StartNewRound() time.Time {
now := tmtime.Now()
if st.Current == &st.WaitingForNewRound {
st.Current.End = now
}
st.Current = &st.Proposal
st.Current.Start = now
return now
}

func (st *StepTimes) ToPrevoteStep() time.Time {
return st.ToNextStep(&st.Proposal, &st.Prevote)
}

func (st *StepTimes) ToPrecommitStep() time.Time {
return st.ToNextStep(&st.Prevote, &st.Precommit)
}

func (st *StepTimes) ToCommitExecuting() time.Time {
return st.ToNextStep(&st.Precommit, &st.CommitExecuting)
}

func (st *StepTimes) EndRound() time.Time {
now := tmtime.Now()
if st.Current == &st.CommitRechecking {
st.Current.End = now
st.Current = &st.WaitingForNewRound
}
return now
}

func (st *StepTimes) StartWaiting() time.Time {
now := tmtime.Now()
if st.Current == &st.WaitingForNewRound {
st.Current.Start = now
}
return now
}

func (ti *timeoutInfo) String() string {
return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step)
}
Expand Down Expand Up @@ -139,6 +186,9 @@ type State struct {

// for reporting metrics
metrics *Metrics

// times of each step
stepTimes *StepTimes
}

// StateOption sets an optional parameter on the State.
Expand Down Expand Up @@ -169,6 +219,7 @@ func NewState(
evpool: evpool,
evsw: tmevents.NewEventSwitch(),
metrics: NopMetrics(),
stepTimes: &StepTimes{},
}

// set function defaults (may be overwritten before calling Start)
Expand Down Expand Up @@ -968,7 +1019,8 @@ func (cs *State) enterNewRound(height int64, round int32) {
return
}

if now := tmtime.Now(); cs.StartTime.After(now) {
now := cs.stepTimes.StartNewRound()
if cs.StartTime.After(now) {
logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now)
}

Expand Down Expand Up @@ -1221,6 +1273,7 @@ func (cs *State) enterPrevote(height int64, round int32) {
// Done enterPrevote:
cs.updateRoundStep(round, cstypes.RoundStepPrevote)
cs.newStep()
cs.stepTimes.ToPrevoteStep()
}()

logger.Debug("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
Expand All @@ -1246,6 +1299,8 @@ func (cs *State) defaultDoPrevote(height int64, round int32) {
if cs.ProposalBlock == nil {
logger.Debug("prevote step: ProposalBlock is nil")
cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
// increase missing proposal by one
cs.metrics.MissingProposal.Add(1)
return
}

Expand Down Expand Up @@ -1319,6 +1374,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
// Done enterPrecommit:
cs.updateRoundStep(round, cstypes.RoundStepPrecommit)
cs.newStep()
cs.stepTimes.ToPrecommitStep()
}()

// check for a polka
Expand Down Expand Up @@ -1632,13 +1688,15 @@ func (cs *State) finalizeCommit(height int64) {
retainHeight int64
)

cs.stepTimes.ToCommitExecuting()
stateCopy, retainHeight, err = cs.blockExec.ApplyBlock(
stateCopy,
types.BlockID{
Hash: block.Hash(),
PartSetHeader: blockParts.Header(),
},
block,
&cs.stepTimes.CommitStepTimes,
)
if err != nil {
logger.Error("failed to apply block", "err", err)
Expand All @@ -1657,6 +1715,8 @@ func (cs *State) finalizeCommit(height int64) {
}
}

cs.stepTimes.EndRound()

// must be called before we update state
cs.recordMetrics(height, block)

Expand All @@ -1678,6 +1738,8 @@ func (cs *State) finalizeCommit(height int64) {
// * cs.Height has been increment to height+1
// * cs.Step is now cstypes.RoundStepNewHeight
// * cs.StartTime is set to when we will start round0.

cs.stepTimes.StartWaiting()
}

func (cs *State) pruneBlocks(retainHeight int64) (uint64, error) {
Expand Down Expand Up @@ -1782,6 +1844,15 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
cs.metrics.TotalTxs.Add(float64(len(block.Data.Txs)))
cs.metrics.BlockSizeBytes.Set(float64(block.Size()))
cs.metrics.CommittedHeight.Set(float64(block.Height))

cs.metrics.RoundFailures.Observe(float64(cs.Round))
cs.metrics.DurationProposal.Observe(cs.stepTimes.Proposal.GetDuration())
cs.metrics.DurationPrevote.Observe(cs.stepTimes.Prevote.GetDuration())
cs.metrics.DurationPrecommit.Observe(cs.stepTimes.Precommit.GetDuration())
cs.metrics.DurationCommitExecuting.Observe(cs.stepTimes.CommitExecuting.GetDuration())
cs.metrics.DurationCommitCommitting.Observe(cs.stepTimes.CommitCommitting.GetDuration())
cs.metrics.DurationCommitRechecking.Observe(cs.stepTimes.CommitRechecking.GetDuration())
cs.metrics.DurationWaitingForNewRound.Observe(cs.stepTimes.WaitingForNewRound.GetDuration())
}

//-----------------------------------------------------------------------------
Expand Down
Loading