From 07068379c6158c34585cbeb7824fa9405901f80c Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 14 Apr 2018 16:16:58 +0100 Subject: [PATCH] remove remote func methods --- client/client.go | 12 --------- client/options.go | 9 +++++++ client/rpc_client.go | 52 +++++++++++++++++++-------------------- client/rpc_client_test.go | 46 ++++++++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 39 deletions(-) diff --git a/client/client.go b/client/client.go index 8975694e68..4022d24991 100644 --- a/client/client.go +++ b/client/client.go @@ -17,9 +17,7 @@ type Client interface { NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error - CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error) - StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error) Publish(ctx context.Context, p Publication, opts ...PublishOption) error String() string } @@ -85,22 +83,12 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca return DefaultClient.Call(ctx, request, response, opts...) } -// Makes a synchronous call to the specified address using the default client -func CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error { - return DefaultClient.CallRemote(ctx, address, request, response, opts...) -} - // Creates a streaming connection with a service and returns responses on the // channel passed in. It's up to the user to close the streamer. func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { return DefaultClient.Stream(ctx, request, opts...) } -// Creates a streaming connection to the address specified. -func StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { - return DefaultClient.StreamRemote(ctx, address, request, opts...) -} - // Publishes a publication using the default client. Using the underlying broker // set within the options. func Publish(ctx context.Context, p Publication) error { diff --git a/client/options.go b/client/options.go index f806500bb7..5d7a0965fd 100644 --- a/client/options.go +++ b/client/options.go @@ -40,6 +40,8 @@ type Options struct { type CallOptions struct { SelectOptions []selector.SelectOption + // Address of remote host + Address string // Backoff func Backoff BackoffFunc // Check if retriable func @@ -226,6 +228,13 @@ func DialTimeout(d time.Duration) Option { // Call Options +// WithAddress sets the remote address to use rather than using service discovery +func WithAddress(a string) CallOption { + return func(o *CallOptions) { + o.Address = a + } +} + func WithSelectOption(so ...selector.SelectOption) CallOption { return func(o *CallOptions) { o.SelectOptions = append(o.SelectOptions, so...) diff --git a/client/rpc_client.go b/client/rpc_client.go index e251b1a473..ad24473156 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -11,6 +11,7 @@ import ( "github.com/micro/go-micro/codec" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/metadata" + "github.com/micro/go-micro/registry" "github.com/micro/go-micro/selector" "github.com/micro/go-micro/transport" "sync/atomic" @@ -213,13 +214,25 @@ func (r *rpcClient) Options() Options { return r.opts } -func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error { - // make a copy of call opts - callOpts := r.opts.CallOptions - for _, opt := range opts { - opt(&callOpts) +func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { + // return remote address + if len(opts.Address) > 0 { + return func() (*registry.Node, error) { + return ®istry.Node{ + Address: opts.Address, + }, nil + }, nil + } + + // get next nodes from the selector + next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...) + if err != nil && err == selector.ErrNotFound { + return nil, errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) } - return r.call(ctx, address, request, response, callOpts) + + return next, nil } func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { @@ -229,12 +242,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac opt(&callOpts) } - // get next nodes from the selector - next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...) - if err != nil && err == selector.ErrNotFound { - return errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + next, err := r.next(request, callOpts) + if err != nil { + return err } // check if we already have a deadline @@ -330,15 +340,6 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return gerr } -func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { - // make a copy of call opts - callOpts := r.opts.CallOptions - for _, opt := range opts { - opt(&callOpts) - } - return r.stream(ctx, address, request, callOpts) -} - func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { // make a copy of call opts callOpts := r.opts.CallOptions @@ -346,12 +347,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt opt(&callOpts) } - // get next nodes from the selector - next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...) - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + next, err := r.next(request, callOpts) + if err != nil { + return nil, err } // check if we already have a deadline diff --git a/client/rpc_client_test.go b/client/rpc_client_test.go index 7c3b9a98f8..9e5c87b44c 100644 --- a/client/rpc_client_test.go +++ b/client/rpc_client_test.go @@ -10,6 +10,52 @@ import ( "github.com/micro/go-micro/selector" ) +func TestCallAddress(t *testing.T) { + var called bool + service := "test.service" + method := "Test.Method" + address := "10.1.10.1:8080" + + wrap := func(cf CallFunc) CallFunc { + return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { + called = true + + if req.Service() != service { + return fmt.Errorf("expected service: %s got %s", service, req.Service()) + } + + if req.Method() != method { + return fmt.Errorf("expected service: %s got %s", method, req.Method()) + } + + if addr != address { + return fmt.Errorf("expected address: %s got %s", address, addr) + } + + // don't do the call + return nil + } + } + + r := mock.NewRegistry() + c := NewClient( + Registry(r), + WrapCall(wrap), + ) + c.Options().Selector.Init(selector.Registry(r)) + + req := c.NewRequest(service, method, nil) + + // test calling remote address + if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil { + t.Fatal("call with address error", err) + } + + if !called { + t.Fatal("wrapper not called") + } + +} func TestCallWrapper(t *testing.T) { var called bool id := "test.1"