Skip to content

Commit

Permalink
refactor naming package
Browse files Browse the repository at this point in the history
  • Loading branch information
iamqizhao committed Oct 7, 2015
1 parent 63a6c41 commit 8d7cb92
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 111 deletions.
178 changes: 87 additions & 91 deletions naming/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,115 +34,49 @@
package etcd

import (
"sync"

etcdcl "github.com/coreos/etcd/client"
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
)
// update defines an etcd key-value update.
type update struct {
key, val string
}

// getNode reports the set of changes starting from node recursively.
func getNode(node *etcdcl.Node) (updates []*update) {
for _, v := range node.Nodes {
updates = append(updates, getNode(v)...)
}
// getNode builds the key-value map starting from node recursively. It returns the
// max etcdcl.Node.ModifiedIndex starting from that node.
func getNode(node *etcdcl.Node, kv map[string]string) uint64 {
if !node.Dir {
u := &update{
key: node.Key,
val: node.Value,
}
updates = []*update{u}
kv[node.Key] = node.Value
return node.ModifiedIndex
}
return
}

type watcher struct {
wr etcdcl.Watcher
ctx context.Context
cancel context.CancelFunc
kv map[string]string
}

func (w *watcher) Next() (nu []*naming.Update, _ error) {
for {
resp, err := w.wr.Next(w.ctx)
if err != nil {
return nil, err
}
updates := getNode(resp.Node)
for _, u := range updates {
switch resp.Action {
case "set":
if resp.PrevNode == nil {
w.kv[u.key] = u.val
nu = append(nu, &naming.Update{
Op: naming.Add,
Addr: u.val,
})
} else {
nu = append(nu, &naming.Update{
Op: naming.Delete,
Addr: w.kv[u.key],
})
nu = append(nu, &naming.Update{
Op: naming.Add,
Addr: u.val,
})
w.kv[u.key] = u.val
}
case "delete":
nu = append(nu, &naming.Update{
Op: naming.Delete,
Addr: w.kv[u.key],
})
delete(w.kv, u.key)
}
}
if len(nu) > 0 {
break
var max uint64
for _, v := range node.Nodes {
i := getNode(v, kv)
if max < i {
max = i
}
}
return nu, nil
}

func (w *watcher) Stop() {
w.cancel()
return max
}

type resolver struct {
kapi etcdcl.KeysAPI
kv map[string]string
}

func (r *resolver) NewWatcher(target string) naming.Watcher {
ctx, cancel := context.WithCancel(context.Background())
w := &watcher{
wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}),
ctx: ctx,
cancel: cancel,
}
for k, v := range r.kv {
w.kv[k] = v
}
return w
}

func (r *resolver) Resolve(target string) (nu []*naming.Update, _ error) {
func (r *resolver) Resolve(target string) (naming.Watcher, error) {
resp, err := r.kapi.Get(context.Background(), target, &etcdcl.GetOptions{Recursive: true})
if err != nil {
return nil, err
}
updates := getNode(resp.Node)
for _, u := range updates {
r.kv[u.key] = u.val
nu = append(nu, &naming.Update{
Op: naming.Add,
Addr: u.val,
})
}
return nu, nil
kv := make(map[string]string)
// Record the index in order to avoid missing updates between Get returning and
// watch starting.
index := getNode(resp.Node, kv)
return &watcher{
wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{
AfterIndex: index,
Recursive: true}),
kv: kv,
}, nil
}

// NewResolver creates an etcd-based naming.Resolver.
Expand All @@ -153,6 +87,68 @@ func NewResolver(cfg etcdcl.Config) (naming.Resolver, error) {
}
return &resolver{
kapi: etcdcl.NewKeysAPI(c),
kv: make(map[string]string),
}, nil
}

type watcher struct {
wr etcdcl.Watcher
kv map[string]string
}

var once sync.Once

func (w *watcher) Next(ctx context.Context) (nu []*naming.Update, err error) {
once.Do(func() {
select {
case <-ctx.Done():
err = ctx.Err()
default:
for _, v := range w.kv {
nu = append(nu, &naming.Update{
Op: naming.Add,
Addr: v,
})
}
}
})
if len(nu) > 0 || err != nil {
// once.Do ran. Return directly.
return
}
for {
resp, err := w.wr.Next(ctx)
if err != nil {
return nil, err
}
if resp.Node.Dir {
continue
}
switch resp.Action {
case "set":
if resp.PrevNode == nil {
nu = append(nu, &naming.Update{
Op: naming.Add,
Addr: resp.Node.Value,
})
w.kv[resp.Node.Key] = resp.Node.Value
} else {
nu = append(nu, &naming.Update{
Op: naming.Delete,
Addr: w.kv[resp.Node.Key],
})
nu = append(nu, &naming.Update{
Op: naming.Add,
Addr: resp.Node.Value,
})
w.kv[resp.Node.Key] = resp.Node.Value
}
case "delete":
nu = append(nu, &naming.Update{
Op: naming.Delete,
Addr: resp.Node.Value,
})
delete(w.kv, resp.Node.Key)
}
return nu, nil
}
}
41 changes: 21 additions & 20 deletions naming/naming.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,41 @@
// The interface is EXPERIMENTAL and may be suject to change.
package naming

// OP defines the corresponding operations for a name resolution change.
type OP uint8
import (
"golang.org/x/net/context"
)

// Operation defines the corresponding operations for a name resolution change.
type Operation uint8

const (
// Add indicates a new address is added.
Add = iota
Add Operation = iota
// Delete indicates an exisiting address is deleted.
Delete
)

type ServiceConfig interface{}

// Update defines a name resolution change.
// Update defines a name resolution update. Notice that it is not valid having both
// empty string Addr and nil Metadata in an Update.
type Update struct {
// Op indicates the operation of the update.
Op OP
Addr string
Config ServiceConfig
Op Operation
// Addr is the updated address. It is empty string if there is no address update.
Addr string
// Metadata is the updated metadata. It is nil if there is no metadata update.
// Metadata is not required for a custom naming implementation.
Metadata interface{}
}

// Resolver does one-shot name resolution and creates a Watcher to
// watch the future updates.
// Resolver creates a Watcher for a target to track its resolution changes.
type Resolver interface {
// Resolve returns the name resolution results.
Resolve(target string) ([]*Update, error)
// NewWatcher creates a Watcher to watch the changes on target.
NewWatcher(target string) Watcher
// Resolve creates a Watcher for target.
Resolve(target string) (Watcher, error)
}

// Watcher watches the updates for a particular target.
// Watcher watches for the updates on the specified target.
type Watcher interface {
// Next blocks until an update or error happens. It may return one or more
// updates.
Next() ([]*Update, error)
// Stop stops the Watcher.
Stop()
// updates. The first call should get the full set of the results.
Next(ctx context.Context) ([]*Update, error)
}

0 comments on commit 8d7cb92

Please sign in to comment.