Skip to content

Commit

Permalink
feat: added max txs per block to config.toml (#317)
Browse files Browse the repository at this point in the history
* feat: added max txs per block to config.toml

* chore: added test cases for ReapMaxBytesMaxGasMaxTxs

* chore: updated confusing comment for max_txs
  • Loading branch information
iproudhon authored Sep 27, 2021
1 parent d93b500 commit 6bcf753
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 28 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,9 @@ type ConsensusConfig struct {
CreateEmptyBlocks bool `mapstructure:"create_empty_blocks"`
CreateEmptyBlocksInterval time.Duration `mapstructure:"create_empty_blocks_interval"`

// Max transactions per block when creating a block. Not a global configuration. No limit if <= 0.
MaxTxs int64 `mapstructure:"max_txs"`

// Reactor sleep duration parameters
PeerGossipSleepDuration time.Duration `mapstructure:"peer_gossip_sleep_duration"`
PeerQueryMaj23SleepDuration time.Duration `mapstructure:"peer_query_maj23_sleep_duration"`
Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ skip_timeout_commit = {{ .Consensus.SkipTimeoutCommit }}
create_empty_blocks = {{ .Consensus.CreateEmptyBlocks }}
create_empty_blocks_interval = "{{ .Consensus.CreateEmptyBlocksInterval }}"
# Max transactions per block. No limit if <= 0.
max_txs = {{ .Consensus.MaxTxs }}
# Reactor sleep duration parameters
peer_gossip_sleep_duration = "{{ .Consensus.PeerGossipSleepDuration }}"
peer_query_maj23_sleep_duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}"
Expand Down
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
message := lazyProposer.state.MakeHashMessage(lazyProposer.Round)
proof, _ := lazyProposer.privValidator.GenerateVRFProof(message)
block, blockParts := lazyProposer.blockExec.CreateProposalBlock(
lazyProposer.Height, lazyProposer.state, commit, proposerAddr, lazyProposer.Round, proof,
lazyProposer.Height, lazyProposer.state, commit, proposerAddr, lazyProposer.Round, proof, 0,
)

// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
Expand Down
2 changes: 1 addition & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func createProposalBlockSlim(cs *State, vs *validatorStub, round int32) (*types.
cs.Logger.Error("enterPropose: Cannot generate vrf proof: %s", err.Error())
return nil, nil
}
return cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr, round, proof)
return cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr, round, proof, 0)
}

func addVotes(to *State, votes ...*types.Vote) {
Expand Down
12 changes: 11 additions & 1 deletion consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,17 @@ func TestMempoolRmBadTx(t *testing.T) {

// check for the tx
for {
txs := assertMempool(cs.txNotifier).ReapMaxBytesMaxGas(int64(len(txBytes)), -1)
txs := assertMempool(cs.txNotifier).ReapMaxTxs(1)
if len(txs) == 0 {
emptyMempoolCh <- struct{}{}
return
}
txs = assertMempool(cs.txNotifier).ReapMaxBytesMaxGasMaxTxs(int64(len(txBytes)), -1, 1)
if len(txs) == 0 {
emptyMempoolCh <- struct{}{}
return
}
txs = assertMempool(cs.txNotifier).ReapMaxBytesMaxGas(int64(len(txBytes)), -1)
if len(txs) == 0 {
emptyMempoolCh <- struct{}{}
return
Expand Down
5 changes: 3 additions & 2 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, err
}
func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) {
}
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxBytesMaxGasMaxTxs(_, _, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (emptyMempool) Update(
_ *types.Block,
_ []*abci.ResponseDeliverTx,
Expand Down
2 changes: 1 addition & 1 deletion consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ func (cs *State) createProposalBlock(round int32) (block *types.Block, blockPart
return
}

return cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr, round, proof)
return cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr, round, proof, cs.config.MaxTxs)
}

// Enter: `timeoutPropose` after entering Propose.
Expand Down
38 changes: 38 additions & 0 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,44 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
return txs
}

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) ReapMaxBytesMaxGasMaxTxs(maxBytes, maxGas, maxTxs int64) types.Txs {
mem.updateMtx.RLock()
defer mem.updateMtx.RUnlock()

var totalGas int64

if maxTxs <= 0 {
maxTxs = int64(mem.txs.Len())
}

// TODO: we will get a performance boost if we have a good estimate of avg
// size per tx, and set the initial capacity based off of that.
// txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max/mem.avgTxSize))
txs := make([]types.Tx, 0, mem.txs.Len())
protoTxs := tmproto.Data{}
for e := mem.txs.Front(); e != nil && len(txs) < int(maxTxs); e = e.Next() {
memTx := e.Value.(*mempoolTx)

protoTxs.Txs = append(protoTxs.Txs, memTx.tx)
// Check total size requirement
if maxBytes > -1 && int64(protoTxs.Size()) > maxBytes {
return txs
}
// Check total gas requirement.
// If maxGas is negative, skip this check.
// Since newTotalGas < masGas, which
// must be non-negative, it follows that this won't overflow.
newTotalGas := totalGas + memTx.gasWanted
if maxGas > -1 && newTotalGas > maxGas {
return txs
}
totalGas = newTotalGas
txs = append(txs, memTx.tx)
}
return txs
}

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
mem.updateMtx.RLock()
Expand Down
41 changes: 25 additions & 16 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,26 +114,35 @@ func TestReapMaxBytesMaxGas(t *testing.T) {
maxBytes int64
maxGas int64
expectedNumTxs int
maxTxs int64
}{
{20, -1, -1, 20},
{20, -1, 0, 0},
{20, -1, 10, 10},
{20, -1, 30, 20},
{20, 0, -1, 0},
{20, 0, 10, 0},
{20, 10, 10, 0},
{20, 24, 10, 1},
{20, 240, 5, 5},
{20, 240, -1, 10},
{20, 240, 10, 10},
{20, 240, 15, 10},
{20, 20000, -1, 20},
{20, 20000, 5, 5},
{20, 20000, 30, 20},
{20, -1, -1, 20, 0},
{20, -1, 0, 0, 0},
{20, -1, 10, 10, 0},
{20, -1, 30, 20, 0},
{20, 0, -1, 0, 0},
{20, 0, 10, 0, 0},
{20, 10, 10, 0, 0},
{20, 24, 10, 1, 0},
{20, 240, 5, 5, 0},
{20, 240, -1, 10, 0},
{20, 240, 10, 10, 0},
{20, 240, 15, 10, 0},
{20, 20000, -1, 20, 0},
{20, 20000, 5, 5, 0},
{20, 20000, 30, 20, 0},
{20, 20000, 30, 20, 0},
{20, 20000, 30, 10, 10},
{20, 20000, 30, 20, 100},
}
for tcIndex, tt := range tests {
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
var got types.Txs
if tt.maxTxs <= 0 {
got = mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
} else {
got = mempool.ReapMaxBytesMaxGasMaxTxs(tt.maxBytes, tt.maxGas, tt.maxTxs)
}
assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
len(got), tt.expectedNumTxs, tcIndex)
mempool.Flush()
Expand Down
3 changes: 3 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Mempool interface {
// transactions (~ all available transactions).
ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs

// Puts cap on txs as well on top of ReapMaxBytesMaxGas
ReapMaxBytesMaxGasMaxTxs(maxBytes, maxGas, maxTxs int64) types.Txs

// ReapMaxTxs reaps up to max transactions from the mempool.
// If max is negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
Expand Down
5 changes: 3 additions & 2 deletions mempool/mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) {
}
func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) {
}
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (Mempool) ReapMaxBytesMaxGasMaxTxs(_, _, _ int64) types.Txs { return types.Txs{} }
func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (Mempool) Update(
_ *types.Block,
_ []*abci.ResponseDeliverTx,
Expand Down
2 changes: 2 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func TestCreateProposalBlock(t *testing.T) {
proposerAddr,
0,
proof,
0,
)

// check that the part set does not exceed the maximum block size
Expand Down Expand Up @@ -370,6 +371,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
proposerAddr,
0,
proof,
0,
)

pb, err := block.ToProto()
Expand Down
3 changes: 2 additions & 1 deletion state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
proposerAddr []byte,
round int32,
proof crypto.Proof,
maxTxs int64,
) (*types.Block, *types.PartSet) {

maxBytes := state.ConsensusParams.Block.MaxBytes
Expand All @@ -135,7 +136,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
// Fetch a limited amount of valid txs
maxDataBytes := types.MaxDataBytes(maxBytes, commit, evidence)

txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
txs := blockExec.mempool.ReapMaxBytesMaxGasMaxTxs(maxDataBytes, maxGas, maxTxs)

return state.MakeBlock(height, txs, commit, evidence, proposerAddr, round, proof)
}
Expand Down
5 changes: 3 additions & 2 deletions test/maverick/consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, err
}
func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) {
}
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxBytesMaxGasMaxTxs(_, _, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (emptyMempool) Update(
_ *types.Block,
_ []*abci.ResponseDeliverTx,
Expand Down
2 changes: 1 addition & 1 deletion test/maverick/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ func (cs *State) createProposalBlock(round int32) (block *types.Block, blockPart
cs.Logger.Error(fmt.Sprintf("enterPropose: %v", err))
return
}
return cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr, round, proof)
return cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr, round, proof, 0)
}

// Enter: any +2/3 prevotes at next round.
Expand Down

0 comments on commit 6bcf753

Please sign in to comment.