Skip to content

Commit

Permalink
Registration Retry / Interval (micro#1651)
Browse files Browse the repository at this point in the history
* Change the default ttl to 90 seconds

* add retries to registration

* Add retry to web register
  • Loading branch information
asim committed May 20, 2020
1 parent e61edf6 commit a29676b
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 19 deletions.
33 changes: 26 additions & 7 deletions server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/server"
"github.com/micro/go-micro/v2/util/addr"
"github.com/micro/go-micro/v2/util/backoff"
mgrpc "github.com/micro/go-micro/v2/util/grpc"
mnet "github.com/micro/go-micro/v2/util/net"
"golang.org/x/net/netutil"
Expand Down Expand Up @@ -566,16 +567,36 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
}

func (g *grpcServer) Register() error {

g.RLock()
rsvc := g.rsvc
config := g.opts
g.RUnlock()

regFunc := func(service *registry.Service) error {
var regErr error

for i := 0; i < 3; i++ {
// set the ttl
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
// attempt to register
if err := config.Registry.Register(service, rOpts...); err != nil {
// set the error
regErr = err
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
// success so nil error
regErr = nil
break
}

return regErr
}

// if service already filled, reuse it and return early
if rsvc != nil {
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
if err := config.Registry.Register(rsvc, rOpts...); err != nil {
if err := regFunc(rsvc); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -677,10 +698,8 @@ func (g *grpcServer) Register() error {
}
}

// create registry options
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}

if err := config.Registry.Register(service, rOpts...); err != nil {
// register the service
if err := regFunc(service); err != nil {
return err
}

Expand Down
36 changes: 28 additions & 8 deletions server/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/transport"
"github.com/micro/go-micro/v2/util/addr"
"github.com/micro/go-micro/v2/util/backoff"
mnet "github.com/micro/go-micro/v2/util/net"
"github.com/micro/go-micro/v2/util/socket"
)
Expand Down Expand Up @@ -514,18 +515,39 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
}

func (s *rpcServer) Register() error {

s.RLock()
rsvc := s.rsvc
config := s.Options()
s.RUnlock()

if rsvc != nil {
regFunc := func(service *registry.Service) error {
// create registry options
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
if err := config.Registry.Register(rsvc, rOpts...); err != nil {
return err

var regErr error

for i := 0; i < 3; i++ {
// attempt to register
if err := config.Registry.Register(service, rOpts...); err != nil {
// set the error
regErr = err
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
// success so nil error
regErr = nil
break
}

return regErr
}

// have we registered before?
if rsvc != nil {
if err := regFunc(rsvc); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -635,10 +657,8 @@ func (s *rpcServer) Register() error {
}
}

// create registry options
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}

if err := config.Registry.Register(service, rOpts...); err != nil {
// register the service
if err := regFunc(service); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type Handler interface {
}

// Subscriber interface represents a subscription to a given topic using
// a specific subscriber function or object with endpoints. It mirrors
// a specific subscriber function or object with endpoints. It mirrors
// the handler in its behaviour.
type Subscriber interface {
Topic() string
Expand All @@ -145,7 +145,7 @@ var (
DefaultRouter = newRpcRouter()
DefaultRegisterCheck = func(context.Context) error { return nil }
DefaultRegisterInterval = time.Second * 30
DefaultRegisterTTL = time.Minute
DefaultRegisterTTL = time.Second * 90

// NewServer creates a new server
NewServer func(...Option) Server = newRpcServer
Expand Down
20 changes: 19 additions & 1 deletion web/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/micro/go-micro/v2/registry"
maddr "github.com/micro/go-micro/v2/util/addr"
authutil "github.com/micro/go-micro/v2/util/auth"
"github.com/micro/go-micro/v2/util/backoff"
mhttp "github.com/micro/go-micro/v2/util/http"
mnet "github.com/micro/go-micro/v2/util/net"
signalutil "github.com/micro/go-micro/v2/util/signal"
Expand Down Expand Up @@ -138,7 +139,24 @@ func (s *service) register() error {
return err
}

return r.Register(s.srv, registry.RegisterTTL(s.opts.RegisterTTL))
var regErr error

// try three times if necessary
for i := 0; i < 3; i++ {
// attempt to register
if err := r.Register(s.srv, registry.RegisterTTL(s.opts.RegisterTTL)); err != nil {
// set the error
regErr = err
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
// success so nil error
regErr = nil
break
}

return regErr
}

func (s *service) deregister() error {
Expand Down
2 changes: 1 addition & 1 deletion web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
DefaultAddress = ":0"

// for registration
DefaultRegisterTTL = time.Minute
DefaultRegisterTTL = time.Second * 90
DefaultRegisterInterval = time.Second * 30

// static directory
Expand Down

0 comments on commit a29676b

Please sign in to comment.