Skip to content

Commit

Permalink
Wait() option now accept *sync.WaitGroup
Browse files Browse the repository at this point in the history
The original signature accept a boolean, and it feel like a little
verbose, since when people pass in this option, he/she always want to
pass a `true`.

Now if input `wg` is nil, it has same effect as passing `true` in
original code. Furthermore, if user want's finer grained control during
shutdown, one can pass in a predefined `wg`, so that server will wait
against it during shutdown.
  • Loading branch information
magodo committed May 27, 2019
1 parent e035664 commit ebc479e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 15 deletions.
2 changes: 1 addition & 1 deletion function.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func newFunction(opts ...Option) Function {

service.Server().Init(
// ensure the service waits for requests to finish
server.Wait(true),
server.Wait(nil),
// wrap handlers and subscribers to finish execution
server.WrapHandler(fnHandlerWrapper(fn)),
server.WrapSubscriber(fnSubWrapper(fn)),
Expand Down
11 changes: 6 additions & 5 deletions server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ package server

import (
"context"
"sync"
)

type serverKey struct{}

func wait(ctx context.Context) bool {
func wait(ctx context.Context) *sync.WaitGroup {
if ctx == nil {
return false
return nil
}
wait, ok := ctx.Value("wait").(bool)
wg, ok := ctx.Value("wait").(*sync.WaitGroup)
if !ok {
return false
return nil
}
return wait
return wg
}

func FromContext(ctx context.Context) (Server, bool) {
Expand Down
11 changes: 9 additions & 2 deletions server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"sync"
"time"

"github.com/micro/go-micro/broker"
Expand Down Expand Up @@ -198,12 +199,18 @@ func WithRouter(r Router) Option {
}

// Wait tells the server to wait for requests to finish before exiting
func Wait(b bool) Option {
// If `wg` is nil, server only wait for completion of rpc handler.
// For user need finer grained control, pass a concrete `wg` here, server will
// wait against it on stop.
func Wait(wg *sync.WaitGroup) Option {
return func(o *Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "wait", b)
if wg == nil {
wg = new(sync.WaitGroup)
}
o.Context = context.WithValue(o.Context, "wait", wg)
}
}

Expand Down
23 changes: 16 additions & 7 deletions server/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type rpcServer struct {
// used for first registration
registered bool
// graceful exit
wg sync.WaitGroup
wg *sync.WaitGroup
}

func newRpcServer(opts ...Option) Server {
Expand All @@ -42,6 +42,7 @@ func newRpcServer(opts ...Option) Server {
handlers: make(map[string]Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
wg: wait(options.Context),
}
}

Expand All @@ -63,8 +64,10 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
return
}

// add to wait group
s.wg.Add(1)
// add to wait group if "wait" is opt-in
if s.wg != nil {
s.wg.Add(1)
}

// we use this Timeout header to set a server deadline
to := msg.Header["Timeout"]
Expand Down Expand Up @@ -111,7 +114,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
},
Body: []byte(err.Error()),
})
s.wg.Done()
if s.wg != nil {
s.wg.Done()
}
return
}
}
Expand Down Expand Up @@ -167,12 +172,16 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
if err != nil {
log.Logf("rpc: unable to write error response: %v", err)
}
s.wg.Done()
if s.wg != nil {
s.wg.Done()
}
return
}

// done
s.wg.Done()
if s.wg != nil {
s.wg.Done()
}
}
}

Expand Down Expand Up @@ -555,7 +564,7 @@ func (s *rpcServer) Start() error {
}

// wait for requests to finish
if wait(s.opts.Context) {
if s.wg != nil {
s.wg.Wait()
}

Expand Down

0 comments on commit ebc479e

Please sign in to comment.