From f8fb6ab00951ed245d69e1a1b722e317aa936a49 Mon Sep 17 00:00:00 2001 From: "Kim, JinSan" Date: Tue, 22 Dec 2020 19:01:49 +0900 Subject: [PATCH] chore: remove abci socket implementation --- abci/client/client.go | 4 +- abci/client/socket_client.go | 406 -------------------------- abci/client/socket_client_test.go | 126 -------- abci/cmd/abci-cli/abci-cli.go | 2 +- abci/example/example_test.go | 78 +---- abci/example/kvstore/kvstore_test.go | 33 +-- abci/server/server.go | 2 - abci/server/socket_server.go | 252 ---------------- abci/tests/client_server_test.go | 2 +- abci/tests/test_app/main.go | 2 +- abci/tests/test_app/test.sh | 8 +- config/config.go | 4 +- docs/app-dev/abci-cli.md | 2 +- docs/guides/java.md | 2 +- docs/guides/kotlin.md | 2 +- docs/tendermint-core/configuration.md | 4 +- mempool/clist_mempool_test.go | 4 +- proxy/app_conn_test.go | 20 +- test/app/test.sh | 61 ++-- 19 files changed, 45 insertions(+), 969 deletions(-) delete mode 100644 abci/client/socket_client.go delete mode 100644 abci/client/socket_client_test.go delete mode 100644 abci/server/socket_server.go diff --git a/abci/client/client.go b/abci/client/client.go index 4f7c7b69a..c52b8a4cb 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -52,11 +52,9 @@ type Client interface { //---------------------------------------- // NewClient returns a new ABCI client of the specified transport type. -// It returns an error if the transport is not "socket" or "grpc" +// It returns an error if the transport is not "grpc" func NewClient(addr, transport string, mustConnect bool) (client Client, err error) { switch transport { - case "socket": - client = NewSocketClient(addr, mustConnect) case "grpc": client = NewGRPCClient(addr, mustConnect) default: diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go deleted file mode 100644 index 7898a8f26..000000000 --- a/abci/client/socket_client.go +++ /dev/null @@ -1,406 +0,0 @@ -package abcicli - -import ( - "bufio" - "container/list" - "errors" - "fmt" - "io" - "net" - "reflect" - "sync" - "time" - - "github.com/tendermint/tendermint/abci/types" - tmnet "github.com/tendermint/tendermint/libs/net" - "github.com/tendermint/tendermint/libs/service" - "github.com/tendermint/tendermint/libs/timer" -) - -const reqQueueSize = 256 // TODO make configurable -// const maxResponseSize = 1048576 // 1MB TODO make configurable -const flushThrottleMS = 20 // Don't wait longer than... - -var _ Client = (*socketClient)(nil) - -// This is goroutine-safe, but users should beware that -// the application in general is not meant to be interfaced -// with concurrent callers. -type socketClient struct { - service.BaseService - - addr string - mustConnect bool - conn net.Conn - - reqQueue chan *ReqRes - flushTimer *timer.ThrottleTimer - - mtx sync.Mutex - err error - reqSent *list.List // list of requests sent, waiting for response - resCb func(*types.Request, *types.Response) // called on all requests, if set. - -} - -func NewSocketClient(addr string, mustConnect bool) Client { - cli := &socketClient{ - reqQueue: make(chan *ReqRes, reqQueueSize), - flushTimer: timer.NewThrottleTimer("socketClient", flushThrottleMS), - mustConnect: mustConnect, - - addr: addr, - reqSent: list.New(), - resCb: nil, - } - cli.BaseService = *service.NewBaseService(nil, "socketClient", cli) - return cli -} - -func (cli *socketClient) OnStart() error { - var err error - var conn net.Conn -RETRY_LOOP: - for { - conn, err = tmnet.Connect(cli.addr) - if err != nil { - if cli.mustConnect { - return err - } - cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr), "err", err) - time.Sleep(time.Second * dialRetryIntervalSeconds) - continue RETRY_LOOP - } - cli.conn = conn - - go cli.sendRequestsRoutine(conn) - go cli.recvResponseRoutine(conn) - - return nil - } -} - -func (cli *socketClient) OnStop() { - if cli.conn != nil { - cli.conn.Close() - } - - cli.mtx.Lock() - defer cli.mtx.Unlock() - cli.flushQueue() -} - -// Stop the client and set the error -func (cli *socketClient) StopForError(err error) { - if !cli.IsRunning() { - return - } - - cli.mtx.Lock() - if cli.err == nil { - cli.err = err - } - cli.mtx.Unlock() - - cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error())) - cli.Stop() -} - -func (cli *socketClient) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() - return cli.err -} - -// Set listener for all responses -// NOTE: callback may get internally generated flush responses. -func (cli *socketClient) SetResponseCallback(resCb Callback) { - cli.mtx.Lock() - cli.resCb = resCb - cli.mtx.Unlock() -} - -//---------------------------------------- - -func (cli *socketClient) sendRequestsRoutine(conn io.Writer) { - - w := bufio.NewWriter(conn) - for { - select { - case <-cli.flushTimer.Ch: - select { - case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): - default: - // Probably will fill the buffer, or retry later. - } - case <-cli.Quit(): - return - case reqres := <-cli.reqQueue: - cli.willSendReq(reqres) - err := types.WriteMessage(reqres.Request, w) - if err != nil { - cli.StopForError(fmt.Errorf("error writing msg: %v", err)) - return - } - // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) - if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { - err = w.Flush() - if err != nil { - cli.StopForError(fmt.Errorf("error flushing writer: %v", err)) - return - } - } - } - } -} - -func (cli *socketClient) recvResponseRoutine(conn io.Reader) { - - r := bufio.NewReader(conn) // Buffer reads - for { - var res = &types.Response{} - err := types.ReadMessage(r, res) - if err != nil { - cli.StopForError(err) - return - } - switch r := res.Value.(type) { - case *types.Response_Exception: - // XXX After setting cli.err, release waiters (e.g. reqres.Done()) - cli.StopForError(errors.New(r.Exception.Error)) - return - default: - // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) - err := cli.didRecvResponse(res) - if err != nil { - cli.StopForError(err) - return - } - } - } -} - -func (cli *socketClient) willSendReq(reqres *ReqRes) { - cli.mtx.Lock() - defer cli.mtx.Unlock() - cli.reqSent.PushBack(reqres) -} - -func (cli *socketClient) didRecvResponse(res *types.Response) error { - cli.mtx.Lock() - defer cli.mtx.Unlock() - - // Get the first ReqRes - next := cli.reqSent.Front() - if next == nil { - return fmt.Errorf("unexpected result type %v when nothing expected", reflect.TypeOf(res.Value)) - } - reqres := next.Value.(*ReqRes) - if !resMatchesReq(reqres.Request, res) { - return fmt.Errorf("unexpected result type %v when response to %v expected", - reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value)) - } - - reqres.Response = res // Set response - reqres.Done() // Release waiters - cli.reqSent.Remove(next) // Pop first item from linked list - - // Notify client listener if set (global callback). - if cli.resCb != nil { - cli.resCb(reqres.Request, res) - } - - // Notify reqRes listener if set (request specific callback). - // NOTE: it is possible this callback isn't set on the reqres object. - // at this point, in which case it will be called after, when it is set. - if cb := reqres.GetCallback(); cb != nil { - cb(res) - } - - return nil -} - -//---------------------------------------- - -func (cli *socketClient) EchoAsync(msg string) *ReqRes { - return cli.queueRequest(types.ToRequestEcho(msg)) -} - -func (cli *socketClient) FlushAsync() *ReqRes { - return cli.queueRequest(types.ToRequestFlush()) -} - -func (cli *socketClient) InfoAsync(req types.RequestInfo) *ReqRes { - return cli.queueRequest(types.ToRequestInfo(req)) -} - -func (cli *socketClient) SetOptionAsync(req types.RequestSetOption) *ReqRes { - return cli.queueRequest(types.ToRequestSetOption(req)) -} - -func (cli *socketClient) DeliverTxAsync(req types.RequestDeliverTx) *ReqRes { - return cli.queueRequest(types.ToRequestDeliverTx(req)) -} - -func (cli *socketClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes { - return cli.queueRequest(types.ToRequestCheckTx(req)) -} - -func (cli *socketClient) QueryAsync(req types.RequestQuery) *ReqRes { - return cli.queueRequest(types.ToRequestQuery(req)) -} - -func (cli *socketClient) CommitAsync() *ReqRes { - return cli.queueRequest(types.ToRequestCommit()) -} - -func (cli *socketClient) InitChainAsync(req types.RequestInitChain) *ReqRes { - return cli.queueRequest(types.ToRequestInitChain(req)) -} - -func (cli *socketClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes { - return cli.queueRequest(types.ToRequestBeginBlock(req)) -} - -func (cli *socketClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes { - return cli.queueRequest(types.ToRequestEndBlock(req)) -} - -//---------------------------------------- - -func (cli *socketClient) FlushSync() error { - reqRes := cli.queueRequest(types.ToRequestFlush()) - if err := cli.Error(); err != nil { - return err - } - reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here - return cli.Error() -} - -func (cli *socketClient) EchoSync(msg string) (*types.ResponseEcho, error) { - reqres := cli.queueRequest(types.ToRequestEcho(msg)) - cli.FlushSync() - return reqres.Response.GetEcho(), cli.Error() -} - -func (cli *socketClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { - reqres := cli.queueRequest(types.ToRequestInfo(req)) - cli.FlushSync() - return reqres.Response.GetInfo(), cli.Error() -} - -func (cli *socketClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) { - reqres := cli.queueRequest(types.ToRequestSetOption(req)) - cli.FlushSync() - return reqres.Response.GetSetOption(), cli.Error() -} - -func (cli *socketClient) DeliverTxSync(req types.RequestDeliverTx) (*types.ResponseDeliverTx, error) { - reqres := cli.queueRequest(types.ToRequestDeliverTx(req)) - cli.FlushSync() - return reqres.Response.GetDeliverTx(), cli.Error() -} - -func (cli *socketClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { - reqres := cli.queueRequest(types.ToRequestCheckTx(req)) - cli.FlushSync() - return reqres.Response.GetCheckTx(), cli.Error() -} - -func (cli *socketClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { - reqres := cli.queueRequest(types.ToRequestQuery(req)) - cli.FlushSync() - return reqres.Response.GetQuery(), cli.Error() -} - -func (cli *socketClient) CommitSync() (*types.ResponseCommit, error) { - reqres := cli.queueRequest(types.ToRequestCommit()) - cli.FlushSync() - return reqres.Response.GetCommit(), cli.Error() -} - -func (cli *socketClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) { - reqres := cli.queueRequest(types.ToRequestInitChain(req)) - cli.FlushSync() - return reqres.Response.GetInitChain(), cli.Error() -} - -func (cli *socketClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { - reqres := cli.queueRequest(types.ToRequestBeginBlock(req)) - cli.FlushSync() - return reqres.Response.GetBeginBlock(), cli.Error() -} - -func (cli *socketClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) { - reqres := cli.queueRequest(types.ToRequestEndBlock(req)) - cli.FlushSync() - return reqres.Response.GetEndBlock(), cli.Error() -} - -//---------------------------------------- - -func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { - reqres := NewReqRes(req) - - // TODO: set cli.err if reqQueue times out - cli.reqQueue <- reqres - - // Maybe auto-flush, or unset auto-flush - switch req.Value.(type) { - case *types.Request_Flush: - cli.flushTimer.Unset() - default: - cli.flushTimer.Set() - } - - return reqres -} - -func (cli *socketClient) flushQueue() { - // mark all in-flight messages as resolved (they will get cli.Error()) - for req := cli.reqSent.Front(); req != nil; req = req.Next() { - reqres := req.Value.(*ReqRes) - reqres.Done() - } - - // mark all queued messages as resolved -LOOP: - for { - select { - case reqres := <-cli.reqQueue: - reqres.Done() - default: - break LOOP - } - } -} - -//---------------------------------------- - -func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { - switch req.Value.(type) { - case *types.Request_Echo: - _, ok = res.Value.(*types.Response_Echo) - case *types.Request_Flush: - _, ok = res.Value.(*types.Response_Flush) - case *types.Request_Info: - _, ok = res.Value.(*types.Response_Info) - case *types.Request_SetOption: - _, ok = res.Value.(*types.Response_SetOption) - case *types.Request_DeliverTx: - _, ok = res.Value.(*types.Response_DeliverTx) - case *types.Request_CheckTx: - _, ok = res.Value.(*types.Response_CheckTx) - case *types.Request_Commit: - _, ok = res.Value.(*types.Response_Commit) - case *types.Request_Query: - _, ok = res.Value.(*types.Response_Query) - case *types.Request_InitChain: - _, ok = res.Value.(*types.Response_InitChain) - case *types.Request_BeginBlock: - _, ok = res.Value.(*types.Response_BeginBlock) - case *types.Request_EndBlock: - _, ok = res.Value.(*types.Response_EndBlock) - } - return ok -} diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go deleted file mode 100644 index 37bc2b57a..000000000 --- a/abci/client/socket_client_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package abcicli_test - -import ( - "errors" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - abcicli "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/abci/server" - "github.com/tendermint/tendermint/abci/types" - tmrand "github.com/tendermint/tendermint/libs/rand" - "github.com/tendermint/tendermint/libs/service" -) - -type errorStopper interface { - StopForError(error) -} - -func TestSocketClientStopForErrorDeadlock(t *testing.T) { - c := abcicli.NewSocketClient(":80", false).(errorStopper) - err := errors.New("foo-tendermint") - - // See Issue https://github.com/tendermint/abci/issues/114 - doneChan := make(chan bool) - go func() { - defer close(doneChan) - c.StopForError(err) - c.StopForError(err) - }() - - select { - case <-doneChan: - case <-time.After(time.Second * 4): - t.Fatalf("Test took too long, potential deadlock still exists") - } -} - -func TestProperSyncCalls(t *testing.T) { - app := slowApp{} - - s, c := setupClientServer(t, app) - defer s.Stop() - defer c.Stop() - - resp := make(chan error, 1) - go func() { - // This is BeginBlockSync unrolled.... - reqres := c.BeginBlockAsync(types.RequestBeginBlock{}) - c.FlushSync() - res := reqres.Response.GetBeginBlock() - require.NotNil(t, res) - resp <- c.Error() - }() - - select { - case <-time.After(time.Second): - require.Fail(t, "No response arrived") - case err, ok := <-resp: - require.True(t, ok, "Must not close channel") - assert.NoError(t, err, "This should return success") - } -} - -func TestHangingSyncCalls(t *testing.T) { - app := slowApp{} - - s, c := setupClientServer(t, app) - defer s.Stop() - defer c.Stop() - - resp := make(chan error, 1) - go func() { - // Start BeginBlock and flush it - reqres := c.BeginBlockAsync(types.RequestBeginBlock{}) - flush := c.FlushAsync() - // wait 20 ms for all events to travel socket, but - // no response yet from server - time.Sleep(20 * time.Millisecond) - // kill the server, so the connections break - s.Stop() - - // wait for the response from BeginBlock - reqres.Wait() - flush.Wait() - resp <- c.Error() - }() - - select { - case <-time.After(time.Second): - require.Fail(t, "No response arrived") - case err, ok := <-resp: - require.True(t, ok, "Must not close channel") - assert.Error(t, err, "We should get EOF error") - } -} - -func setupClientServer(t *testing.T, app types.Application) ( - service.Service, abcicli.Client) { - // some port between 20k and 30k - port := 20000 + tmrand.Int32()%10000 - addr := fmt.Sprintf("localhost:%d", port) - - s, err := server.NewServer(addr, "socket", app) - require.NoError(t, err) - err = s.Start() - require.NoError(t, err) - - c := abcicli.NewSocketClient(addr, true) - err = c.Start() - require.NoError(t, err) - - return s, c -} - -type slowApp struct { - types.BaseApplication -} - -func (slowApp) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock { - time.Sleep(200 * time.Millisecond) - return types.ResponseBeginBlock{} -} diff --git a/abci/cmd/abci-cli/abci-cli.go b/abci/cmd/abci-cli/abci-cli.go index d5a9aca27..0afcd8e54 100644 --- a/abci/cmd/abci-cli/abci-cli.go +++ b/abci/cmd/abci-cli/abci-cli.go @@ -116,7 +116,7 @@ func addGlobalFlags() { "", "tcp://0.0.0.0:26658", "address of application socket") - RootCmd.PersistentFlags().StringVarP(&flagAbci, "abci", "", "socket", "either socket or grpc") + RootCmd.PersistentFlags().StringVarP(&flagAbci, "abci", "", "grpc", "grpc") RootCmd.PersistentFlags().BoolVarP(&flagVerbose, "verbose", "v", diff --git a/abci/example/example_test.go b/abci/example/example_test.go index d40976015..2efbdfee4 100644 --- a/abci/example/example_test.go +++ b/abci/example/example_test.go @@ -3,12 +3,9 @@ package example import ( "fmt" "net" - "reflect" "testing" "time" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" "golang.org/x/net/context" @@ -16,7 +13,6 @@ import ( "github.com/tendermint/tendermint/libs/log" tmnet "github.com/tendermint/tendermint/libs/net" - abcicli "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/code" "github.com/tendermint/tendermint/abci/example/kvstore" abciserver "github.com/tendermint/tendermint/abci/server" @@ -25,12 +21,7 @@ import ( func TestKVStore(t *testing.T) { fmt.Println("### Testing KVStore") - testStream(t, kvstore.NewApplication()) -} - -func TestBaseApp(t *testing.T) { - fmt.Println("### Testing BaseApp") - testStream(t, types.NewBaseApplication()) + testGRPCSync(t, types.NewGRPCApplication(kvstore.NewApplication())) } func TestGRPC(t *testing.T) { @@ -38,72 +29,6 @@ func TestGRPC(t *testing.T) { testGRPCSync(t, types.NewGRPCApplication(types.NewBaseApplication())) } -func testStream(t *testing.T, app types.Application) { - numDeliverTxs := 20000 - - // Start the listener - server := abciserver.NewSocketServer("unix://test.sock", app) - server.SetLogger(log.TestingLogger().With("module", "abci-server")) - if err := server.Start(); err != nil { - require.NoError(t, err, "Error starting socket server") - } - defer server.Stop() - - // Connect to the socket - client := abcicli.NewSocketClient("unix://test.sock", false) - client.SetLogger(log.TestingLogger().With("module", "abci-client")) - if err := client.Start(); err != nil { - t.Fatalf("Error starting socket client: %v", err.Error()) - } - defer client.Stop() - - done := make(chan struct{}) - counter := 0 - client.SetResponseCallback(func(req *types.Request, res *types.Response) { - // Process response - switch r := res.Value.(type) { - case *types.Response_DeliverTx: - counter++ - if r.DeliverTx.Code != code.CodeTypeOK { - t.Error("DeliverTx failed with ret_code", r.DeliverTx.Code) - } - if counter > numDeliverTxs { - t.Fatalf("Too many DeliverTx responses. Got %d, expected %d", counter, numDeliverTxs) - } - if counter == numDeliverTxs { - go func() { - time.Sleep(time.Second * 1) // Wait for a bit to allow counter overflow - close(done) - }() - return - } - case *types.Response_Flush: - // ignore - default: - t.Error("Unexpected response type", reflect.TypeOf(res.Value)) - } - }) - - // Write requests - for counter := 0; counter < numDeliverTxs; counter++ { - // Send request - reqRes := client.DeliverTxAsync(types.RequestDeliverTx{Tx: []byte("test")}) - _ = reqRes - // check err ? - - // Sometimes send flush messages - if counter%123 == 0 { - client.FlushAsync() - // check err ? - } - } - - // Send final flush message - client.FlushAsync() - - <-done -} - //------------------------- // test grpc @@ -151,6 +76,5 @@ func testGRPCSync(t *testing.T, app types.ABCIApplicationServer) { time.Sleep(time.Second * 1) // Wait for a bit to allow counter overflow }() } - } } diff --git a/abci/example/kvstore/kvstore_test.go b/abci/example/kvstore/kvstore_test.go index 4d8c829ad..3bf80faf9 100644 --- a/abci/example/kvstore/kvstore_test.go +++ b/abci/example/kvstore/kvstore_test.go @@ -226,28 +226,6 @@ func valsEqual(t *testing.T, vals1, vals2 []types.ValidatorUpdate) { } } -func makeSocketClientServer(app types.Application, name string) (abcicli.Client, service.Service, error) { - // Start the listener - socket := fmt.Sprintf("unix://%s.sock", name) - logger := log.TestingLogger() - - server := abciserver.NewSocketServer(socket, app) - server.SetLogger(logger.With("module", "abci-server")) - if err := server.Start(); err != nil { - return nil, nil, err - } - - // Connect to the socket - client := abcicli.NewSocketClient(socket, false) - client.SetLogger(logger.With("module", "abci-client")) - if err := client.Start(); err != nil { - server.Stop() - return nil, nil, err - } - - return client, server, nil -} - func makeGRPCClientServer(app types.Application, name string) (abcicli.Client, service.Service, error) { // Start the listener socket := fmt.Sprintf("unix://%s.sock", name) @@ -270,17 +248,8 @@ func makeGRPCClientServer(app types.Application, name string) (abcicli.Client, s } func TestClientServer(t *testing.T) { - // set up socket app - kvstore := NewApplication() - client, server, err := makeSocketClientServer(kvstore, "kvstore-socket") - require.Nil(t, err) - defer server.Stop() - defer client.Stop() - - runClientTests(t, client) - // set up grpc app - kvstore = NewApplication() + kvstore := NewApplication() gclient, gserver, err := makeGRPCClientServer(kvstore, "kvstore-grpc") require.Nil(t, err) defer gserver.Stop() diff --git a/abci/server/server.go b/abci/server/server.go index 6dd13ad02..13e6404b1 100644 --- a/abci/server/server.go +++ b/abci/server/server.go @@ -19,8 +19,6 @@ func NewServer(protoAddr, transport string, app types.Application) (service.Serv var s service.Service var err error switch transport { - case "socket": - s = NewSocketServer(protoAddr, app) case "grpc": s = NewGRPCServer(protoAddr, types.NewGRPCApplication(app)) default: diff --git a/abci/server/socket_server.go b/abci/server/socket_server.go deleted file mode 100644 index e68d79599..000000000 --- a/abci/server/socket_server.go +++ /dev/null @@ -1,252 +0,0 @@ -package server - -import ( - "bufio" - "fmt" - "io" - "net" - "os" - "runtime" - "sync" - - "github.com/tendermint/tendermint/abci/types" - tmlog "github.com/tendermint/tendermint/libs/log" - tmnet "github.com/tendermint/tendermint/libs/net" - "github.com/tendermint/tendermint/libs/service" -) - -// var maxNumberConnections = 2 - -type SocketServer struct { - service.BaseService - isLoggerSet bool - - proto string - addr string - listener net.Listener - - connsMtx sync.Mutex - conns map[int]net.Conn - nextConnID int - - appMtx sync.Mutex - app types.Application -} - -func NewSocketServer(protoAddr string, app types.Application) service.Service { - proto, addr := tmnet.ProtocolAndAddress(protoAddr) - s := &SocketServer{ - proto: proto, - addr: addr, - listener: nil, - app: app, - conns: make(map[int]net.Conn), - } - s.BaseService = *service.NewBaseService(nil, "ABCIServer", s) - return s -} - -func (s *SocketServer) SetLogger(l tmlog.Logger) { - s.BaseService.SetLogger(l) - s.isLoggerSet = true -} - -func (s *SocketServer) OnStart() error { - ln, err := net.Listen(s.proto, s.addr) - if err != nil { - return err - } - - s.listener = ln - go s.acceptConnectionsRoutine() - - return nil -} - -func (s *SocketServer) OnStop() { - if err := s.listener.Close(); err != nil { - s.Logger.Error("Error closing listener", "err", err) - } - - s.connsMtx.Lock() - defer s.connsMtx.Unlock() - for id, conn := range s.conns { - delete(s.conns, id) - if err := conn.Close(); err != nil { - s.Logger.Error("Error closing connection", "id", id, "conn", conn, "err", err) - } - } -} - -func (s *SocketServer) addConn(conn net.Conn) int { - s.connsMtx.Lock() - defer s.connsMtx.Unlock() - - connID := s.nextConnID - s.nextConnID++ - s.conns[connID] = conn - - return connID -} - -// deletes conn even if close errs -func (s *SocketServer) rmConn(connID int) error { - s.connsMtx.Lock() - defer s.connsMtx.Unlock() - - conn, ok := s.conns[connID] - if !ok { - return fmt.Errorf("connection %d does not exist", connID) - } - - delete(s.conns, connID) - return conn.Close() -} - -func (s *SocketServer) acceptConnectionsRoutine() { - for { - // Accept a connection - s.Logger.Info("Waiting for new connection...") - conn, err := s.listener.Accept() - if err != nil { - if !s.IsRunning() { - return // Ignore error from listener closing. - } - s.Logger.Error("Failed to accept connection", "err", err) - continue - } - - s.Logger.Info("Accepted a new connection") - - connID := s.addConn(conn) - - closeConn := make(chan error, 2) // Push to signal connection closed - responses := make(chan *types.Response, 1000) // A channel to buffer responses - - // Read requests from conn and deal with them - go s.handleRequests(closeConn, conn, responses) - // Pull responses from 'responses' and write them to conn. - go s.handleResponses(closeConn, conn, responses) - - // Wait until signal to close connection - go s.waitForClose(closeConn, connID) - } -} - -func (s *SocketServer) waitForClose(closeConn chan error, connID int) { - err := <-closeConn - switch { - case err == io.EOF: - s.Logger.Error("Connection was closed by client") - case err != nil: - s.Logger.Error("Connection error", "err", err) - default: - // never happens - s.Logger.Error("Connection was closed") - } - - // Close the connection - if err := s.rmConn(connID); err != nil { - s.Logger.Error("Error closing connection", "err", err) - } -} - -// Read requests from conn and deal with them -func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, responses chan<- *types.Response) { - var count int - var bufReader = bufio.NewReader(conn) - - defer func() { - // make sure to recover from any app-related panics to allow proper socket cleanup - r := recover() - if r != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - err := fmt.Errorf("recovered from panic: %v\n%s", r, buf) - if !s.isLoggerSet { - fmt.Fprintln(os.Stderr, err) - } - closeConn <- err - s.appMtx.Unlock() - } - }() - - for { - - var req = &types.Request{} - err := types.ReadMessage(bufReader, req) - if err != nil { - if err == io.EOF { - closeConn <- err - } else { - closeConn <- fmt.Errorf("error reading message: %w", err) - } - return - } - s.appMtx.Lock() - count++ - s.handleRequest(req, responses) - s.appMtx.Unlock() - } -} - -func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) { - switch r := req.Value.(type) { - case *types.Request_Echo: - responses <- types.ToResponseEcho(r.Echo.Message) - case *types.Request_Flush: - responses <- types.ToResponseFlush() - case *types.Request_Info: - res := s.app.Info(*r.Info) - responses <- types.ToResponseInfo(res) - case *types.Request_SetOption: - res := s.app.SetOption(*r.SetOption) - responses <- types.ToResponseSetOption(res) - case *types.Request_DeliverTx: - res := s.app.DeliverTx(*r.DeliverTx) - responses <- types.ToResponseDeliverTx(res) - case *types.Request_CheckTx: - res := s.app.CheckTx(*r.CheckTx) - responses <- types.ToResponseCheckTx(res) - case *types.Request_Commit: - res := s.app.Commit() - responses <- types.ToResponseCommit(res) - case *types.Request_Query: - res := s.app.Query(*r.Query) - responses <- types.ToResponseQuery(res) - case *types.Request_InitChain: - res := s.app.InitChain(*r.InitChain) - responses <- types.ToResponseInitChain(res) - case *types.Request_BeginBlock: - res := s.app.BeginBlock(*r.BeginBlock) - responses <- types.ToResponseBeginBlock(res) - case *types.Request_EndBlock: - res := s.app.EndBlock(*r.EndBlock) - responses <- types.ToResponseEndBlock(res) - default: - responses <- types.ToResponseException("Unknown request") - } -} - -// Pull responses from 'responses' and write them to conn. -func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, responses <-chan *types.Response) { - var count int - var bufWriter = bufio.NewWriter(conn) - for { - var res = <-responses - err := types.WriteMessage(res, bufWriter) - if err != nil { - closeConn <- fmt.Errorf("error writing message: %w", err) - return - } - if _, ok := res.Value.(*types.Response_Flush); ok { - err = bufWriter.Flush() - if err != nil { - closeConn <- fmt.Errorf("error flushing write buffer: %w", err) - return - } - } - count++ - } -} diff --git a/abci/tests/client_server_test.go b/abci/tests/client_server_test.go index 2ef64e66a..bc17ef9f9 100644 --- a/abci/tests/client_server_test.go +++ b/abci/tests/client_server_test.go @@ -12,7 +12,7 @@ import ( func TestClientServerNoAddrPrefix(t *testing.T) { addr := "localhost:26658" - transport := "socket" + transport := "grpc" app := kvstore.NewApplication() server, err := abciserver.NewServer(addr, transport, app) diff --git a/abci/tests/test_app/main.go b/abci/tests/test_app/main.go index ca298d7e2..8858ee712 100644 --- a/abci/tests/test_app/main.go +++ b/abci/tests/test_app/main.go @@ -16,7 +16,7 @@ var abciType string func init() { abciType = os.Getenv("ABCI") if abciType == "" { - abciType = "socket" + abciType = "grpc" } } diff --git a/abci/tests/test_app/test.sh b/abci/tests/test_app/test.sh index 0d8301831..b8af29f59 100755 --- a/abci/tests/test_app/test.sh +++ b/abci/tests/test_app/test.sh @@ -12,15 +12,9 @@ DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" # Change into that dir because we expect that. cd "$DIR" -echo "RUN COUNTER OVER SOCKET" -# test golang counter -ABCI_APP="counter" go run -mod=readonly ./*.go -echo "----------------------" - - echo "RUN COUNTER OVER GRPC" # test golang counter via grpc -ABCI_APP="counter --abci=grpc" ABCI="grpc" go run -mod=readonly ./*.go +ABCI_APP="counter" go run -mod=readonly ./*.go echo "----------------------" # test nodejs counter diff --git a/config/config.go b/config/config.go index dbc8f8311..1b8586d43 100644 --- a/config/config.go +++ b/config/config.go @@ -203,7 +203,7 @@ type BaseConfig struct { //nolint: maligned // A JSON file containing the private key to use for p2p authenticated encryption NodeKey string `mapstructure:"node_key_file"` - // Mechanism to connect to the ABCI application: socket | grpc + // Mechanism to connect to the ABCI application: grpc ABCI string `mapstructure:"abci"` // TCP or UNIX socket address for the profiling server to listen on @@ -223,7 +223,7 @@ func DefaultBaseConfig() BaseConfig { NodeKey: defaultNodeKeyPath, Moniker: defaultMoniker, ProxyApp: "tcp://127.0.0.1:26658", - ABCI: "socket", + ABCI: "grpc", LogLevel: DefaultPackageLogLevels(), LogFormat: LogFormatPlain, ProfListenAddress: "", diff --git a/docs/app-dev/abci-cli.md b/docs/app-dev/abci-cli.md index ec8b0abf3..6bd072b9c 100644 --- a/docs/app-dev/abci-cli.md +++ b/docs/app-dev/abci-cli.md @@ -44,7 +44,7 @@ Available Commands: set_option Set an options on the application Flags: - --abci string socket or grpc (default "socket") + --abci string grpc (default "grpc") --address string address of application socket (default "tcp://127.0.0.1:26658") -h, --help help for abci-cli -v, --verbose print the command and results as if it were a console session diff --git a/docs/guides/java.md b/docs/guides/java.md index 12bbc4565..0d81a4567 100644 --- a/docs/guides/java.md +++ b/docs/guides/java.md @@ -27,7 +27,7 @@ If you use Golang, you can run your app and Tendermint Core in the same process Please refer to [Writing a built-in Tendermint Core application in Go](./go-built-in.md) guide for details. If you choose another language, like we did in this guide, you have to write a separate app, -which will communicate with Tendermint Core via a socket (UNIX or TCP) or gRPC. +which will communicate with Tendermint Core via a gRPC. This guide will show you how to build external application using RPC server. Having a separate application might give you better security guarantees as two diff --git a/docs/guides/kotlin.md b/docs/guides/kotlin.md index 0c15098a4..90af6f3de 100644 --- a/docs/guides/kotlin.md +++ b/docs/guides/kotlin.md @@ -27,7 +27,7 @@ If you use Golang, you can run your app and Tendermint Core in the same process Please refer to [Writing a built-in Tendermint Core application in Go](./go-built-in.md) guide for details. If you choose another language, like we did in this guide, you have to write a separate app, -which will communicate with Tendermint Core via a socket (UNIX or TCP) or gRPC. +which will communicate with Tendermint Core via a gRPC. This guide will show you how to build external application using RPC server. Having a separate application might give you better security guarantees as two diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index 141645f26..2a242a3d8 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -81,8 +81,8 @@ priv_validator_laddr = "" # Path to the JSON file containing the private key to use for node authentication in the p2p protocol node_key_file = "config/node_key.json" -# Mechanism to connect to the ABCI application: socket | grpc -abci = "socket" +# Mechanism to connect to the ABCI application: grpc +abci = "grpc" # TCP or UNIX socket address for the profiling server to listen on prof_laddr = "" diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 17ab83f33..e23bb64d3 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -583,10 +583,10 @@ func newRemoteApp( clientCreator proxy.ClientCreator, server service.Service, ) { - clientCreator = proxy.NewRemoteClientCreator(addr, "socket", true) + clientCreator = proxy.NewRemoteClientCreator(addr, "grpc", true) // Start server - server = abciserver.NewSocketServer(addr, app) + server = abciserver.NewGRPCServer(addr, abci.NewGRPCApplication(app)) server.SetLogger(log.TestingLogger().With("module", "abci-server")) if err := server.Start(); err != nil { t.Fatalf("Error starting socket server: %v", err.Error()) diff --git a/proxy/app_conn_test.go b/proxy/app_conn_test.go index ca15f8977..cc952485f 100644 --- a/proxy/app_conn_test.go +++ b/proxy/app_conn_test.go @@ -43,17 +43,17 @@ func (app *appConnTest) InfoSync(req types.RequestInfo) (*types.ResponseInfo, er //---------------------------------------- -var SOCKET = "socket" +var GRPC = "grpc" func TestEcho(t *testing.T) { sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) + clientCreator := NewRemoteClientCreator(sockPath, GRPC, true) // Start server - s := server.NewSocketServer(sockPath, kvstore.NewApplication()) + s := server.NewGRPCServer(sockPath, types.NewGRPCApplication(kvstore.NewApplication())) s.SetLogger(log.TestingLogger().With("module", "abci-server")) if err := s.Start(); err != nil { - t.Fatalf("Error starting socket server: %v", err.Error()) + t.Fatalf("Error starting grpc server: %v", err.Error()) } defer s.Stop() @@ -81,13 +81,13 @@ func TestEcho(t *testing.T) { func BenchmarkEcho(b *testing.B) { b.StopTimer() // Initialize sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) + clientCreator := NewRemoteClientCreator(sockPath, GRPC, true) // Start server - s := server.NewSocketServer(sockPath, kvstore.NewApplication()) + s := server.NewGRPCServer(sockPath, types.NewGRPCApplication(kvstore.NewApplication())) s.SetLogger(log.TestingLogger().With("module", "abci-server")) if err := s.Start(); err != nil { - b.Fatalf("Error starting socket server: %v", err.Error()) + b.Fatalf("Error starting grpc server: %v", err.Error()) } defer s.Stop() @@ -120,13 +120,13 @@ func BenchmarkEcho(b *testing.B) { func TestInfo(t *testing.T) { sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) + clientCreator := NewRemoteClientCreator(sockPath, GRPC, true) // Start server - s := server.NewSocketServer(sockPath, kvstore.NewApplication()) + s := server.NewGRPCServer(sockPath, types.NewGRPCApplication(kvstore.NewApplication())) s.SetLogger(log.TestingLogger().With("module", "abci-server")) if err := s.Start(); err != nil { - t.Fatalf("Error starting socket server: %v", err.Error()) + t.Fatalf("Error starting grpc server: %v", err.Error()) } defer s.Stop() diff --git a/test/app/test.sh b/test/app/test.sh index dc60bfc1f..6ec48a98d 100755 --- a/test/app/test.sh +++ b/test/app/test.sh @@ -1,8 +1,7 @@ #! /bin/bash set -ex -#- kvstore over socket, curl -#- counter over socket, curl +#- kvstore over grpc, curl #- counter over grpc, curl #- counter over grpc, grpc @@ -11,57 +10,40 @@ set -ex export PATH="$GOBIN:$PATH" export TMHOME=$HOME/.tendermint_app -function kvstore_over_socket(){ +function kvstore_over_grpc(){ rm -rf $TMHOME tendermint init - echo "Starting kvstore_over_socket" - abci-cli kvstore > /dev/null & + echo "Starting kvstore_over_grpc" + abci-cli kvstore --abci grpc > /dev/null & pid_kvstore=$! - tendermint node > tendermint.log & + tendermint node --abci grpc > tendermint.log & pid_tendermint=$! sleep 5 echo "running test" - bash test/app/kvstore_test.sh "KVStore over Socket" + bash test/app/kvstore_test.sh "KVStore over GRPC" kill -9 $pid_kvstore $pid_tendermint } # start tendermint first -function kvstore_over_socket_reorder(){ +function kvstore_over_grpc_reorder(){ rm -rf $TMHOME tendermint init - echo "Starting kvstore_over_socket_reorder (ie. start tendermint first)" - tendermint node > tendermint.log & + echo "Starting kvstore_over_grpc_reorder (ie. start tendermint first)" + tendermint node --abci grpc > tendermint.log & pid_tendermint=$! sleep 2 - abci-cli kvstore > /dev/null & + abci-cli kvstore --abci grpc > /dev/null & pid_kvstore=$! sleep 5 echo "running test" - bash test/app/kvstore_test.sh "KVStore over Socket" + bash test/app/kvstore_test.sh "KVStore over GRPC" kill -9 $pid_kvstore $pid_tendermint } - -function counter_over_socket() { - rm -rf $TMHOME - tendermint init - echo "Starting counter_over_socket" - abci-cli counter --serial > /dev/null & - pid_counter=$! - tendermint node > tendermint.log & - pid_tendermint=$! - sleep 5 - - echo "running test" - bash test/app/counter_test.sh "Counter over Socket" - - kill -9 $pid_counter $pid_tendermint -} - function counter_over_grpc() { rm -rf $TMHOME tendermint init @@ -96,17 +78,14 @@ function counter_over_grpc_grpc() { kill -9 $pid_counter $pid_tendermint } -case "$1" in - "kvstore_over_socket") - kvstore_over_socket +case "$1" in + "kvstore_over_grpc") + kvstore_over_grpc ;; -"kvstore_over_socket_reorder") - kvstore_over_socket_reorder + "kvstore_over_grpc_reorder") + kvstore_over_grpc_reorder ;; - "counter_over_socket") - counter_over_socket - ;; -"counter_over_grpc") + "counter_over_grpc") counter_over_grpc ;; "counter_over_grpc_grpc") @@ -114,11 +93,9 @@ case "$1" in ;; *) echo "Running all" - kvstore_over_socket - echo "" - kvstore_over_socket_reorder + kvstore_over_grpc echo "" - counter_over_socket + kvstore_over_grpc_reorder echo "" counter_over_grpc echo ""