Skip to content

Commit

Permalink
remove remote func methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Asim Aslam committed Apr 14, 2018
1 parent 528b5f5 commit 0706837
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 39 deletions.
12 changes: 0 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ type Client interface {
NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error)
StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error)
Publish(ctx context.Context, p Publication, opts ...PublishOption) error
String() string
}
Expand Down Expand Up @@ -85,22 +83,12 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca
return DefaultClient.Call(ctx, request, response, opts...)
}

// Makes a synchronous call to the specified address using the default client
func CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error {
return DefaultClient.CallRemote(ctx, address, request, response, opts...)
}

// Creates a streaming connection with a service and returns responses on the
// channel passed in. It's up to the user to close the streamer.
func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.Stream(ctx, request, opts...)
}

// Creates a streaming connection to the address specified.
func StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.StreamRemote(ctx, address, request, opts...)
}

// Publishes a publication using the default client. Using the underlying broker
// set within the options.
func Publish(ctx context.Context, p Publication) error {
Expand Down
9 changes: 9 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Options struct {
type CallOptions struct {
SelectOptions []selector.SelectOption

// Address of remote host
Address string
// Backoff func
Backoff BackoffFunc
// Check if retriable func
Expand Down Expand Up @@ -226,6 +228,13 @@ func DialTimeout(d time.Duration) Option {

// Call Options

// WithAddress sets the remote address to use rather than using service discovery
func WithAddress(a string) CallOption {
return func(o *CallOptions) {
o.Address = a
}
}

func WithSelectOption(so ...selector.SelectOption) CallOption {
return func(o *CallOptions) {
o.SelectOptions = append(o.SelectOptions, so...)
Expand Down
52 changes: 25 additions & 27 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"sync/atomic"
Expand Down Expand Up @@ -213,13 +214,25 @@ func (r *rpcClient) Options() Options {
return r.opts
}

func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
// return remote address
if len(opts.Address) > 0 {
return func() (*registry.Node, error) {
return &registry.Node{
Address: opts.Address,
}, nil
}, nil
}

// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
return r.call(ctx, address, request, response, callOpts)

return next, nil
}

func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
Expand All @@ -229,12 +242,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
opt(&callOpts)
}

// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
next, err := r.next(request, callOpts)
if err != nil {
return err
}

// check if we already have a deadline
Expand Down Expand Up @@ -330,28 +340,16 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return gerr
}

func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
return r.stream(ctx, address, request, callOpts)
}

func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}

// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
next, err := r.next(request, callOpts)
if err != nil {
return nil, err
}

// check if we already have a deadline
Expand Down
46 changes: 46 additions & 0 deletions client/rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,52 @@ import (
"github.com/micro/go-micro/selector"
)

func TestCallAddress(t *testing.T) {
var called bool
service := "test.service"
method := "Test.Method"
address := "10.1.10.1:8080"

wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called = true

if req.Service() != service {
return fmt.Errorf("expected service: %s got %s", service, req.Service())
}

if req.Method() != method {
return fmt.Errorf("expected service: %s got %s", method, req.Method())
}

if addr != address {
return fmt.Errorf("expected address: %s got %s", address, addr)
}

// don't do the call
return nil
}
}

r := mock.NewRegistry()
c := NewClient(
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))

req := c.NewRequest(service, method, nil)

// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
t.Fatal("call with address error", err)
}

if !called {
t.Fatal("wrapper not called")
}

}
func TestCallWrapper(t *testing.T) {
var called bool
id := "test.1"
Expand Down

0 comments on commit 0706837

Please sign in to comment.