Skip to content

Commit

Permalink
Fixed router idempotency. Return registry.ErrWatchStopped from mdns reg
Browse files Browse the repository at this point in the history
  • Loading branch information
milosgajdos committed Jul 1, 2019
1 parent cff46c3 commit f6e064c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 32 deletions.
63 changes: 34 additions & 29 deletions network/router/default_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,10 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error {

for {
res, err := w.Next()
if err == registry.ErrWatcherStopped {
break
}

if err != nil {
watchErr = err
if err != registry.ErrWatcherStopped {
watchErr = err
}
break
}

Expand Down Expand Up @@ -181,14 +179,13 @@ func (r *router) watchTable(w Watcher) error {

var watchErr error

exit:
for {
event, err := w.Next()
if err == ErrWatcherStopped {
break
}

if err != nil {
watchErr = err
if err != ErrWatcherStopped {
watchErr = err
}
break
}

Expand All @@ -200,23 +197,21 @@ func (r *router) watchTable(w Watcher) error {

select {
case <-r.exit:
return nil
break exit
case r.advertChan <- u:
}
}

// close the advertisement channel
close(r.advertChan)

return watchErr
}

// manageStatus manages router status
func (r *router) manageStatus(errChan <-chan error) {
// watchError watches router errors
func (r *router) watchError(errChan <-chan error) {
defer r.wg.Done()

r.Lock()
r.status.Code = Running
r.status.Error = nil
r.Unlock()

var code StatusCode
var err error

Expand All @@ -229,14 +224,13 @@ func (r *router) manageStatus(errChan <-chan error) {

r.Lock()
defer r.Unlock()
r.status.Code = code
r.status.Error = err

// close the advertise channel
close(r.advertChan)
status := Status{
Code: code,
Error: err,
}
r.status = status

// stop the router if some error happened
// this will notify all watcher goroutines to stop
if err != nil && code != Stopped {
close(r.exit)
}
Expand Down Expand Up @@ -303,7 +297,14 @@ func (r *router) Advertise() (<-chan *Update, error) {
}()

r.wg.Add(1)
go r.manageStatus(errChan)
go r.watchError(errChan)

// mark router as running and set its Error to nil
status := Status{
Code: Running,
Error: nil,
}
r.status = status
}

return r.advertChan, nil
Expand Down Expand Up @@ -338,15 +339,19 @@ func (r *router) Status() Status {
// Stop stops the router
func (r *router) Stop() error {
r.RLock()
defer r.RUnlock()

// only close the channel if the router is running
if r.status.Code == Running {
// notify all goroutines to finish
close(r.exit)
// wait for all goroutines to finish
r.wg.Wait()
}
r.RUnlock()

// drain the advertise channel
for range r.advertChan {
}

// wait for all goroutines to finish
r.wg.Wait()

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion network/router/table_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var (
// ErrWatcherStopped is returned when routing table watcher has been stopped
ErrWatcherStopped = errors.New("routing table watcher stopped")
ErrWatcherStopped = errors.New("watcher stopped")
)

// EventType defines routing table event
Expand Down
3 changes: 1 addition & 2 deletions registry/mdns_watcher.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package registry

import (
"errors"
"strings"

"github.com/micro/mdns"
Expand Down Expand Up @@ -63,7 +62,7 @@ func (m *mdnsWatcher) Next() (*Result, error) {
Service: service,
}, nil
case <-m.exit:
return nil, errors.New("watcher stopped")
return nil, ErrWatcherStopped
}
}
}
Expand Down

0 comments on commit f6e064c

Please sign in to comment.