Skip to content

Commit

Permalink
Revert "abci: change client to use multi-reader mutexes (#6306)" (bac…
Browse files Browse the repository at this point in the history
…kport #7106) (#7109)
  • Loading branch information
tnasu committed Feb 10, 2022
1 parent 401a2e0 commit c7d2f1e
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 32 deletions.
10 changes: 5 additions & 5 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type ReqRes struct {
*types.Request
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx tmsync.RWMutex
mtx tmsync.Mutex
wg *sync.WaitGroup
done bool // Gets set to true once *after* WaitGroup.Done().
cb ResponseCallback // A single callback that may be set.
Expand All @@ -107,24 +107,24 @@ func NewReqRes(req *types.Request, cb ResponseCallback) *ReqRes {
// InvokeCallback invokes a thread-safe execution of the configured callback
// if non-nil.
func (reqRes *ReqRes) InvokeCallback() {
reqRes.mtx.RLock()
defer reqRes.mtx.RUnlock()
reqRes.mtx.Lock()
defer reqRes.mtx.Unlock()

if reqRes.cb != nil {
reqRes.cb(reqRes.Response)
}
}

func (reqRes *ReqRes) SetDone(res *types.Response) (set bool) {
reqRes.mtx.RLock()
reqRes.mtx.Lock()
// TODO should we panic if it's already done?
set = !reqRes.done
if set {
reqRes.Response = res
reqRes.done = true
reqRes.wg.Done()
}
reqRes.mtx.RUnlock()
reqRes.mtx.Unlock()

// NOTE `reqRes.cb` is immutable so we're safe to access it at here without `mtx`
if set && reqRes.cb != nil {
Expand Down
8 changes: 4 additions & 4 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type grpcClient struct {
client types.ABCIApplicationClient
conn *grpc.ClientConn

mtx tmsync.RWMutex
mtx tmsync.Mutex
addr string
err error

globalCbMtx sync.RWMutex
globalCbMtx sync.Mutex
globalCb func(*types.Request, *types.Response) // listens to all callbacks
}

Expand Down Expand Up @@ -109,8 +109,8 @@ func (cli *grpcClient) StopForError(err error) {
}

func (cli *grpcClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}

Expand Down
26 changes: 12 additions & 14 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ type localClient struct {
service.BaseService

// TODO: remove `mtx` to increase concurrency. We could remove it because the app should protect itself.
mtx *tmsync.RWMutex
mtx *tmsync.Mutex
// CONTRACT: The application should protect itself from concurrency as an abci server.
types.Application

globalCbMtx tmsync.RWMutex
globalCbMtx tmsync.Mutex
globalCb GlobalCallback
}

Expand All @@ -30,16 +30,14 @@ var _ Client = (*localClient)(nil)
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = &tmsync.RWMutex{}
mtx = new(tmsync.Mutex)
}

cli := &localClient{
mtx: mtx,
Application: app,
}

cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli
}
Expand Down Expand Up @@ -78,8 +76,8 @@ func (app *localClient) EchoAsync(msg string, cb ResponseCallback) *ReqRes {
}

func (app *localClient) InfoAsync(req types.RequestInfo, cb ResponseCallback) *ReqRes {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

reqRes := NewReqRes(types.ToRequestInfo(req), cb)
res := app.Application.Info(req)
Expand Down Expand Up @@ -120,8 +118,8 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx, cb ResponseCallba
}

func (app *localClient) QueryAsync(req types.RequestQuery, cb ResponseCallback) *ReqRes {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

reqRes := NewReqRes(types.ToRequestQuery(req), cb)
res := app.Application.Query(req)
Expand Down Expand Up @@ -234,8 +232,8 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) {
}

func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Info(req)
return &res, nil
Expand Down Expand Up @@ -267,8 +265,8 @@ func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCh
}

func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Query(req)
return &res, nil
Expand Down
8 changes: 4 additions & 4 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ type socketClient struct {
reqQueue chan *ReqRes
flushTimer *timer.ThrottleTimer

mtx tmsync.RWMutex
mtx tmsync.Mutex
err error
reqSent *list.List // list of requests sent, waiting for response

globalCbMtx tmsync.RWMutex
globalCbMtx tmsync.Mutex
globalCb GlobalCallback
}

Expand Down Expand Up @@ -101,8 +101,8 @@ func (cli *socketClient) OnStop() {

// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func newStateWithConfigAndBlockStoreWithLoggers(
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)

Expand Down
4 changes: 2 additions & 2 deletions proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ type ClientCreator interface {
// local proxy uses a mutex on an in-proc app

type localClientCreator struct {
mtx *tmsync.RWMutex
mtx *tmsync.Mutex
app types.Application
}

// NewLocalClientCreator returns a ClientCreator for the given app,
// which will be running locally.
func NewLocalClientCreator(app types.Application) ClientCreator {
return &localClientCreator{
mtx: new(tmsync.RWMutex),
mtx: new(tmsync.Mutex),
app: app,
}
}
Expand Down

0 comments on commit c7d2f1e

Please sign in to comment.