Skip to content

Commit

Permalink
Merge pull request #4 from micro/master
Browse files Browse the repository at this point in the history
pull
  • Loading branch information
lpxxn committed Jun 27, 2019
2 parents c282125 + 0da8256 commit 5334203
Show file tree
Hide file tree
Showing 18 changed files with 1,011 additions and 241 deletions.
4 changes: 2 additions & 2 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele

// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = prx
opts.Address = []string{prx}
}

// return remote address
if len(opts.Address) > 0 {
return func() (*registry.Node, error) {
return &registry.Node{
Address: opts.Address,
Address: opts.Address[0],
}, nil
}, nil
}
Expand Down
8 changes: 4 additions & 4 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type Options struct {
type CallOptions struct {
SelectOptions []selector.SelectOption

// Address of remote host
Address string
// Address of remote hosts
Address []string
// Backoff func
Backoff BackoffFunc
// Check if retriable func
Expand Down Expand Up @@ -245,8 +245,8 @@ func WithExchange(e string) PublishOption {
}
}

// WithAddress sets the remote address to use rather than using service discovery
func WithAddress(a string) CallOption {
// WithAddress sets the remote addresses to use rather than using service discovery
func WithAddress(a ...string) CallOption {
return func(o *CallOptions) {
o.Address = a
}
Expand Down
29 changes: 18 additions & 11 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,29 +283,36 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro

// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = prx
opts.Address = []string{prx}
}

// return remote address
if len(opts.Address) > 0 {
address := opts.Address
port := 0
var nodes []*registry.Node

host, sport, err := net.SplitHostPort(opts.Address)
if err == nil {
address = host
port, _ = strconv.Atoi(sport)
}
for _, addr := range opts.Address {
address := addr
port := 0

return func() (*registry.Node, error) {
return &registry.Node{
host, sport, err := net.SplitHostPort(addr)
if err == nil {
address = host
port, _ = strconv.Atoi(sport)
}

nodes = append(nodes, &registry.Node{
Address: address,
Port: port,
// Set the protocol
Metadata: map[string]string{
"protocol": "mucp",
},
}, nil
})
}

// crude return method
return func() (*registry.Node, error) {
return nodes[time.Now().Unix()%int64(len(nodes))], nil
}, nil
}

Expand Down
287 changes: 287 additions & 0 deletions client/selector/router/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
// Package router is a network/router selector
package router

import (
"context"
"fmt"
"net"
"os"
"sort"
"strconv"
"sync"

"github.com/micro/go-micro/client"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/network/router"
pb "github.com/micro/go-micro/network/router/proto"
"github.com/micro/go-micro/registry"
)

type routerSelector struct {
opts selector.Options

// the router
r router.Router

// the client we have
c client.Client

// the client for the remote router
rs pb.RouterService

// name of the router
name string

// address of the remote router
addr string

// whether to use the remote router
remote bool
}

type clientKey struct{}
type routerKey struct{}

// getRoutes returns the routes whether they are remote or local
func (r *routerSelector) getRoutes(service string) ([]router.Route, error) {
if !r.remote {
// lookup router for routes for the service
return r.r.Table().Lookup(router.NewQuery(
router.QueryDestination(service),
))
}

// lookup the remote router

var addrs []string

// set the remote address if specified
if len(r.addr) > 0 {
addrs = append(addrs, r.addr)
} else {
// we have a name so we need to check the registry
services, err := r.c.Options().Registry.GetService(r.name)
if err != nil {
return nil, err
}

for _, service := range services {
for _, node := range service.Nodes {
addr := node.Address
if node.Port > 0 {
addr = fmt.Sprintf("%s:%d", node.Address, node.Port)
}
addrs = append(addrs, addr)
}
}
}

// no router addresses available
if len(addrs) == 0 {
return nil, selector.ErrNoneAvailable
}

var pbRoutes *pb.LookupResponse
var err error

// TODO: implement backoff and retries
for _, addr := range addrs {
// call the router
pbRoutes, err = r.rs.Lookup(context.Background(), &pb.LookupRequest{
Query: &pb.Query{
Destination: service,
},
}, client.WithAddress(addr))
if err != nil {
continue
}
break
}

// errored out
if err != nil {
return nil, err
}

// no routes
if pbRoutes == nil {
return nil, selector.ErrNoneAvailable
}

var routes []router.Route

// convert from pb to []*router.Route
for _, r := range pbRoutes.Routes {
routes = append(routes, router.Route{
Destination: r.Destination,
Gateway: r.Gateway,
Router: r.Router,
Network: r.Network,
Metric: int(r.Metric),
})
}

return routes, nil
}

func (r *routerSelector) Init(opts ...selector.Option) error {
// no op
return nil
}

func (r *routerSelector) Options() selector.Options {
return r.opts
}

func (r *routerSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
// TODO: pull routes asynchronously and cache
routes, err := r.getRoutes(service)
if err != nil {
return nil, err
}

// no routes return not found error
if len(routes) == 0 {
return nil, selector.ErrNotFound
}

// TODO: apply filters by pseudo constructing service

// sort the routes based on metric
sort.Slice(routes, func(i, j int) bool {
return routes[i].Metric < routes[j].Metric
})

// roundrobin assuming routes are in metric preference order
var i int
var mtx sync.Mutex

return func() (*registry.Node, error) {
// get index and increment counter with every call to next
mtx.Lock()
idx := i
i++
mtx.Unlock()

// get route based on idx
route := routes[idx%len(routes)]

// defaults to gateway and no port
address := route.Gateway
port := 0

// check if its host:port
host, pr, err := net.SplitHostPort(address)
if err == nil {
pp, _ := strconv.Atoi(pr)
// set port
port = pp
// set address
address = host
}

// return as a node
return &registry.Node{
// TODO: add id and metadata if we can
Address: address,
Port: port,
}, nil
}, nil
}

func (r *routerSelector) Mark(service string, node *registry.Node, err error) {
// TODO: pass back metrics or information to the router
return
}

func (r *routerSelector) Reset(service string) {
// TODO: reset the metrics or information at the router
return
}

func (r *routerSelector) Close() error {
// stop the router advertisements
return r.r.Stop()
}

func (r *routerSelector) String() string {
return "router"
}

// NewSelector returns a new router based selector
func NewSelector(opts ...selector.Option) selector.Selector {
options := selector.Options{
Context: context.Background(),
}

for _, o := range opts {
o(&options)
}

// set default registry if not set
if options.Registry == nil {
options.Registry = registry.DefaultRegistry
}

// try get router from the context
r, ok := options.Context.Value(routerKey{}).(router.Router)
if !ok {
// TODO: Use router.DefaultRouter?
r = router.NewRouter(
router.Registry(options.Registry),
)
}

// try get client from the context
c, ok := options.Context.Value(clientKey{}).(client.Client)
if !ok {
c = client.DefaultClient
}

// get the router from env vars if its a remote service
remote := true
routerName := os.Getenv("MICRO_ROUTER")
routerAddress := os.Getenv("MICRO_ROUTER_ADDRESS")

// start the router advertisements if we're running it locally
if len(routerName) == 0 && len(routerAddress) == 0 {
go r.Advertise()
remote = false
}

return &routerSelector{
opts: options,
// set the internal router
r: r,
// set the client
c: c,
// set the router client
rs: pb.NewRouterService(routerName, c),
// name of the router
name: routerName,
// address of router
addr: routerAddress,
// let ourselves know to use the remote router
remote: remote,
}
}

// WithClient sets the client for the request
func WithClient(c client.Client) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, clientKey{}, c)
}
}

// WithRouter sets the router as an option
func WithRouter(r router.Router) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, routerKey{}, r)
}
}
2 changes: 2 additions & 0 deletions config/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
// selectors
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/client/selector/dns"
"github.com/micro/go-micro/client/selector/router"
"github.com/micro/go-micro/client/selector/static"

// transports
Expand Down Expand Up @@ -196,6 +197,7 @@ var (
"default": selector.NewSelector,
"dns": dns.NewSelector,
"cache": selector.NewSelector,
"router": router.NewSelector,
"static": static.NewSelector,
}

Expand Down
Loading

0 comments on commit 5334203

Please sign in to comment.