Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Main patch tm v0.34.14 #366

Merged
merged 9 commits into from
Feb 16, 2022
10 changes: 5 additions & 5 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type ReqRes struct {
*types.Request
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx tmsync.RWMutex
mtx tmsync.Mutex
wg *sync.WaitGroup
done bool // Gets set to true once *after* WaitGroup.Done().
cb ResponseCallback // A single callback that may be set.
Expand All @@ -107,24 +107,24 @@ func NewReqRes(req *types.Request, cb ResponseCallback) *ReqRes {
// InvokeCallback invokes a thread-safe execution of the configured callback
// if non-nil.
func (reqRes *ReqRes) InvokeCallback() {
reqRes.mtx.RLock()
defer reqRes.mtx.RUnlock()
reqRes.mtx.Lock()
defer reqRes.mtx.Unlock()

if reqRes.cb != nil {
reqRes.cb(reqRes.Response)
}
}

func (reqRes *ReqRes) SetDone(res *types.Response) (set bool) {
reqRes.mtx.RLock()
reqRes.mtx.Lock()
// TODO should we panic if it's already done?
set = !reqRes.done
if set {
reqRes.Response = res
reqRes.done = true
reqRes.wg.Done()
}
reqRes.mtx.RUnlock()
reqRes.mtx.Unlock()

// NOTE `reqRes.cb` is immutable so we're safe to access it at here without `mtx`
if set && reqRes.cb != nil {
Expand Down
8 changes: 4 additions & 4 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type grpcClient struct {
client types.ABCIApplicationClient
conn *grpc.ClientConn

mtx tmsync.RWMutex
mtx tmsync.Mutex
addr string
err error

globalCbMtx sync.RWMutex
globalCbMtx sync.Mutex
globalCb func(*types.Request, *types.Response) // listens to all callbacks
}

Expand Down Expand Up @@ -109,8 +109,8 @@ func (cli *grpcClient) StopForError(err error) {
}

func (cli *grpcClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}

Expand Down
26 changes: 12 additions & 14 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ type localClient struct {
service.BaseService

// TODO: remove `mtx` to increase concurrency. We could remove it because the app should protect itself.
mtx *tmsync.RWMutex
mtx *tmsync.Mutex
// CONTRACT: The application should protect itself from concurrency as an abci server.
types.Application

globalCbMtx tmsync.RWMutex
globalCbMtx tmsync.Mutex
globalCb GlobalCallback
}

Expand All @@ -30,16 +30,14 @@ var _ Client = (*localClient)(nil)
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = &tmsync.RWMutex{}
mtx = new(tmsync.Mutex)
}

cli := &localClient{
mtx: mtx,
Application: app,
}

cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli
}
Expand Down Expand Up @@ -78,8 +76,8 @@ func (app *localClient) EchoAsync(msg string, cb ResponseCallback) *ReqRes {
}

func (app *localClient) InfoAsync(req types.RequestInfo, cb ResponseCallback) *ReqRes {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

reqRes := NewReqRes(types.ToRequestInfo(req), cb)
res := app.Application.Info(req)
Expand Down Expand Up @@ -120,8 +118,8 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx, cb ResponseCallba
}

func (app *localClient) QueryAsync(req types.RequestQuery, cb ResponseCallback) *ReqRes {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

reqRes := NewReqRes(types.ToRequestQuery(req), cb)
res := app.Application.Query(req)
Expand Down Expand Up @@ -234,8 +232,8 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) {
}

func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Info(req)
return &res, nil
Expand Down Expand Up @@ -267,8 +265,8 @@ func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCh
}

func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Query(req)
return &res, nil
Expand Down
8 changes: 4 additions & 4 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ type socketClient struct {
reqQueue chan *ReqRes
flushTimer *timer.ThrottleTimer

mtx tmsync.RWMutex
mtx tmsync.Mutex
err error
reqSent *list.List // list of requests sent, waiting for response

globalCbMtx tmsync.RWMutex
globalCbMtx tmsync.Mutex
globalCb GlobalCallback
}

Expand Down Expand Up @@ -101,8 +101,8 @@ func (cli *socketClient) OnStop() {

// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}

Expand Down
69 changes: 69 additions & 0 deletions cmd/ostracon/commands/rollback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package commands

import (
"fmt"

"github.com/spf13/cobra"

"github.com/line/tm-db/v2/metadb"

cfg "github.com/line/ostracon/config"
"github.com/line/ostracon/state"
"github.com/line/ostracon/store"
)

var RollbackStateCmd = &cobra.Command{
Use: "rollback",
Short: "rollback ostracon state by one height",
Long: `
A state rollback is performed to recover from an incorrect application state transition,
when Ostracon has persisted an incorrect app hash and is thus unable to make
progress. Rollback overwrites a state at height n with the state at height n - 1.
The application should also roll back to height n - 1. No blocks are removed, so upon
restarting Ostracon the transactions in block n will be re-executed against the
application.
`,
RunE: func(cmd *cobra.Command, args []string) error {
height, hash, err := RollbackState(config)
if err != nil {
return fmt.Errorf("failed to rollback state: %w", err)
}

fmt.Printf("Rolled back state to height %d and hash %v", height, hash)
return nil
},
}

// RollbackState takes the state at the current height n and overwrites it with the state
// at height n - 1. Note state here refers to ostracon state not application state.
// Returns the latest state height and app hash alongside an error if there was one.
func RollbackState(config *cfg.Config) (int64, []byte, error) {
// use the parsed config to load the block and state store
blockStore, stateStore, err := loadStateAndBlockStore(config)
if err != nil {
return -1, nil, err
}

// rollback the last state
return state.Rollback(blockStore, stateStore)
}

func loadStateAndBlockStore(config *cfg.Config) (*store.BlockStore, state.Store, error) {
dbType := metadb.BackendType(config.DBBackend)

// Get BlockStore
blockStoreDB, err := metadb.NewDB("blockstore", dbType, config.DBDir())
if err != nil {
return nil, nil, err
}
blockStore := store.NewBlockStore(blockStoreDB)

// Get StateStore
stateDB, err := metadb.NewDB("state", dbType, config.DBDir())
if err != nil {
return nil, nil, err
}
stateStore := state.NewStore(stateDB)

return blockStore, stateStore, nil
}
4 changes: 1 addition & 3 deletions cmd/ostracon/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ func AddNodeFlags(cmd *cobra.Command) {
"proxy_app",
config.ProxyApp,
"proxy app address, or one of: 'kvstore',"+
" 'persistent_kvstore',"+
" 'counter',"+
" 'counter_serial' or 'noop' for local testing.")
" 'persistent_kvstore', 'counter', 'e2e' or 'noop' for local testing.")
cmd.Flags().String("abci", config.ABCI, "specify abci transport (socket | grpc)")

// rpc flags
Expand Down
1 change: 1 addition & 0 deletions cmd/ostracon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func main() {
cmd.ShowNodeIDCmd,
cmd.GenNodeKeyCmd,
cmd.VersionCmd,
cmd.RollbackStateCmd,
debug.DebugCmd,
cli.NewCompletionCmd(rootCmd, true),
)
Expand Down
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func newStateWithConfigAndBlockStoreWithLoggers(
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion evidence/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (evpool *Pool) processConsensusBuffer(state sm.State) {
)

case voteSet.VoteA.Height < state.LastBlockHeight:
voterSet, err := evpool.stateDB.LoadVoters(voteSet.VoteA.Height, state.VoterParams)
_, voterSet, _, _, err := evpool.stateDB.LoadVoters(voteSet.VoteA.Height, state.VoterParams)
if err != nil {
evpool.logger.Error("failed to load validator/voter set for conflicting votes", "height",
voteSet.VoteA.Height, "err", err,
Expand Down
27 changes: 13 additions & 14 deletions evidence/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,15 @@ func TestEvidencePoolBasic(t *testing.T) {
)

valSet, voterSet, privVals := types.RandVoterSet(1, 10)
state := createState(height+1, valSet)

blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return(
&types.BlockMeta{Header: types.Header{Time: defaultEvidenceTime}},
)
stateStore.On(
"LoadValidators",
mock.AnythingOfType("int64"),
).Return(valSet, nil)
stateStore.On(
"LoadVoters",
mock.AnythingOfType("int64"),
mock.AnythingOfType("*types.VoterParams"),
).Return(voterSet, nil)
stateStore.On("Load").Return(createState(height+1, valSet), nil)
stateStore.On("LoadValidators", mock.AnythingOfType("int64")).Return(valSet, nil)
stateStore.On("LoadVoters", mock.AnythingOfType("int64"), state.VoterParams).Return(
valSet, voterSet, state.VoterParams, state.LastProofHash, nil)
stateStore.On("Load").Return(state, nil)

pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
require.NoError(t, err)
Expand Down Expand Up @@ -273,14 +268,17 @@ func TestLightClientAttackEvidenceLifecycle(t *testing.T) {
LastBlockTime: defaultEvidenceTime.Add(2 * time.Hour),
LastBlockHeight: 110,
ConsensusParams: *types.DefaultConsensusParams(),
VoterParams: types.DefaultVoterParams(),
}
stateStore := &smmocks.Store{}
stateStore.On("LoadValidators", height).Return(trusted.ValidatorSet, nil)
stateStore.On("LoadValidators", commonHeight).Return(common.ValidatorSet, nil)
stateStore.On("LoadVoters", height, mock.AnythingOfType("*types.VoterParams")).Return(
ev.ConflictingBlock.VoterSet, nil) // Should use correct VoterSet for bls.VerifyAggregatedSignature
stateStore.On("LoadVoters", commonHeight, mock.AnythingOfType("*types.VoterParams")).Return(
ev.ConflictingBlock.VoterSet, nil) // Should use correct VoterSet for bls.VerifyAggregatedSignature
// Should use correct VoterSet for bls.VerifyAggregatedSignature
stateStore.On("LoadVoters", height, state.VoterParams).Return(
trusted.ValidatorSet, ev.ConflictingBlock.VoterSet, state.VoterParams, mock.Anything, nil)
// Should use correct VoterSet for bls.VerifyAggregatedSignature
stateStore.On("LoadVoters", commonHeight, state.VoterParams).Return(
trusted.ValidatorSet, ev.ConflictingBlock.VoterSet, state.VoterParams, state.LastProofHash, nil)
stateStore.On("Load").Return(state, nil)
blockStore := &mocks.BlockStore{}
blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trusted.Header})
Expand Down Expand Up @@ -480,5 +478,6 @@ func createState(height int64, valSet *types.ValidatorSet) sm.State {
LastBlockTime: defaultEvidenceTime,
Validators: valSet,
ConsensusParams: *types.DefaultConsensusParams(),
VoterParams: types.DefaultVoterParams(),
}
}
8 changes: 2 additions & 6 deletions evidence/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
// apply the evidence-specific verification logic
switch ev := evidence.(type) {
case *types.DuplicateVoteEvidence:
voterSet, err := evpool.stateDB.LoadVoters(evidence.Height(), state.VoterParams)
_, voterSet, _, _, err := evpool.stateDB.LoadVoters(evidence.Height(), state.VoterParams)
if err != nil {
return err
}
Expand All @@ -61,11 +61,7 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
if err != nil {
return err
}
commonVals, err := evpool.stateDB.LoadValidators(evidence.Height())
if err != nil {
return err
}
commonVoters, err := evpool.stateDB.LoadVoters(evidence.Height(), state.VoterParams)
commonVals, commonVoters, _, _, err := evpool.stateDB.LoadVoters(evidence.Height(), state.VoterParams)
if err != nil {
return err
}
Expand Down
Loading