Skip to content

Commit

Permalink
various fixes for broker and messaging in server (micro#1187)
Browse files Browse the repository at this point in the history
* provide broker disconnect messages in server

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* broker/eats: another fix

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
  • Loading branch information
vtolstov committed Feb 11, 2020
1 parent 2764de9 commit 79ad1e6
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 13 deletions.
24 changes: 16 additions & 8 deletions broker/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func (n *natsBroker) serve(exit chan bool) error {
// register the cluster address
for {
select {
case err := <-n.closeCh:
if err != nil {
log.Log(err)
}
case <-exit:
// deregister on exit
n.opts.Registry.Deregister(&registry.Service{
Expand Down Expand Up @@ -282,7 +286,6 @@ func (n *natsBroker) Connect() error {
}

// set to connected
n.connected = true
}

status := nats.CLOSED
Expand Down Expand Up @@ -313,6 +316,9 @@ func (n *natsBroker) Connect() error {
return err
}
n.conn = c

n.connected = true

return nil
}
}
Expand All @@ -328,18 +334,19 @@ func (n *natsBroker) Disconnect() error {
// drain the connection if specified
if n.drain {
n.conn.Drain()
n.closeCh <- nil
}

// close the client connection
n.conn.Close()

// shutdown the local server
// and deregister
select {
case <-n.exit:
default:
close(n.exit)
if n.server != nil {
select {
case <-n.exit:
default:
close(n.exit)
}
}

// set not connected
Expand Down Expand Up @@ -439,7 +446,7 @@ func (n *natsBroker) onClose(conn *nats.Conn) {
}

func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) {
n.closeCh <- nil
n.closeCh <- err
}

func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) {
Expand All @@ -459,7 +466,8 @@ func NewBroker(opts ...Option) Broker {
}

n := &natsBroker{
opts: options,
opts: options,
closeCh: make(chan error),
}
n.setOption(opts...)

Expand Down
2 changes: 1 addition & 1 deletion broker/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err e
}

func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) {
n.closeCh <- nil
n.closeCh <- err
}

func NewBroker(opts ...broker.Option) broker.Broker {
Expand Down
6 changes: 2 additions & 4 deletions server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,10 +805,7 @@ func (g *grpcServer) Start() error {
return err
}

baddr := config.Broker.Address()
bname := config.Broker.String()

log.Logf("Broker [%s] Connected to %s", bname, baddr)
log.Logf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
}

// announce self to the world
Expand Down Expand Up @@ -876,6 +873,7 @@ func (g *grpcServer) Start() error {
// close transport
ch <- nil

log.Logf("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
// disconnect broker
config.Broker.Disconnect()
}()
Expand Down
1 change: 1 addition & 0 deletions server/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,7 @@ func (s *rpcServer) Start() error {
// close transport listener
ch <- ts.Close()

log.Logf("Broker [%s] Disconnected from %s", bname, config.Broker.Address())
// disconnect the broker
config.Broker.Disconnect()

Expand Down

0 comments on commit 79ad1e6

Please sign in to comment.