Skip to content

Commit

Permalink
Update options to be public. This means people can implement the inte…
Browse files Browse the repository at this point in the history
…rfaces and actually use the options
  • Loading branch information
Asim committed Dec 31, 2015
1 parent c2154fd commit 64b45f7
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 202 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Example struct{}
func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error {
md, _ := c.GetMetadata(ctx)
log.Infof("Received Example.Call request with metadata: %v", md)
rsp.Msg = server.Config().Id() + ": Hello " + req.Name
rsp.Msg = server.Options().Id + ": Hello " + req.Name
return nil
}
```
Expand Down
3 changes: 2 additions & 1 deletion broker/broker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package broker

type Broker interface {
Options() Options
Address() string
Connect() error
Disconnect() error
Expand Down Expand Up @@ -28,7 +29,7 @@ type Publication interface {
}

type Subscriber interface {
Config() SubscribeOptions
Options() SubscribeOptions
Topic() string
Unsubscribe() error
}
Expand Down
7 changes: 6 additions & 1 deletion broker/http_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type httpBroker struct {
id string
address string
unsubscribe chan *httpSubscriber
opts Options

sync.RWMutex
subscribers map[string][]*httpSubscriber
Expand Down Expand Up @@ -85,7 +86,7 @@ func (h *httpPublication) Topic() string {
return h.t
}

func (h *httpSubscriber) Config() SubscribeOptions {
func (h *httpSubscriber) Options() SubscribeOptions {
return h.opts
}

Expand Down Expand Up @@ -213,6 +214,10 @@ func (h *httpBroker) Init(opts ...Option) error {
return nil
}

func (h *httpBroker) Options() Options {
return h.opts
}

func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
s, err := registry.GetService("topic:" + topic)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion broker/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package broker

type Options struct{}
type Options struct {

// Other options to be used by broker implementations
Options map[string]string
}

type PublishOptions struct{}

Expand Down
8 changes: 4 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ type Streamer interface {
Close() error
}

type Option func(*options)
type CallOption func(*callOptions)
type PublishOption func(*publishOptions)
type RequestOption func(*requestOptions)
type Option func(*Options)
type CallOption func(*CallOptions)
type PublishOption func(*PublishOptions)
type RequestOption func(*RequestOptions)

var (
DefaultClient Client = newRpcClient()
Expand Down
74 changes: 43 additions & 31 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,87 +8,99 @@ import (
"github.com/micro/go-micro/transport"
)

type options struct {
contentType string
broker broker.Broker
codecs map[string]codec.NewCodec
registry registry.Registry
selector selector.Selector
transport transport.Transport
wrappers []Wrapper
type Options struct {
ContentType string
Broker broker.Broker
Codecs map[string]codec.NewCodec
Registry registry.Registry
Selector selector.Selector
Transport transport.Transport
Wrappers []Wrapper

// Other options to be used by client implementations
Options map[string]string
}

type callOptions struct {
selectOptions []selector.SelectOption
type CallOptions struct {
SelectOptions []selector.SelectOption

// Other options to be used by client implementations
Options map[string]string
}

type PublishOptions struct {
// Other options to be used by client implementations
Options map[string]string
}

type publishOptions struct{}
type RequestOptions struct {
Stream bool

type requestOptions struct {
stream bool
// Other options to be used by client implementations
Options map[string]string
}

// Broker to be used for pub/sub
func Broker(b broker.Broker) Option {
return func(o *options) {
o.broker = b
return func(o *Options) {
o.Broker = b
}
}

// Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c codec.NewCodec) Option {
return func(o *options) {
o.codecs[contentType] = c
return func(o *Options) {
o.Codecs[contentType] = c
}
}

// Default content type of the client
func ContentType(ct string) Option {
return func(o *options) {
o.contentType = ct
return func(o *Options) {
o.ContentType = ct
}
}

// Registry to find nodes for a given service
func Registry(r registry.Registry) Option {
return func(o *options) {
o.registry = r
return func(o *Options) {
o.Registry = r
}
}

// Transport to use for communication e.g http, rabbitmq, etc
func Transport(t transport.Transport) Option {
return func(o *options) {
o.transport = t
return func(o *Options) {
o.Transport = t
}
}

// Select is used to select a node to route a request to
func Selector(s selector.Selector) Option {
return func(o *options) {
o.selector = s
return func(o *Options) {
o.Selector = s
}
}

// Adds a Wrapper to a list of options passed into the client
func Wrap(w Wrapper) Option {
return func(o *options) {
o.wrappers = append(o.wrappers, w)
return func(o *Options) {
o.Wrappers = append(o.Wrappers, w)
}
}

// Call Options

func WithSelectOption(so selector.SelectOption) CallOption {
return func(o *callOptions) {
o.selectOptions = append(o.selectOptions, so)
return func(o *CallOptions) {
o.SelectOptions = append(o.SelectOptions, so)
}
}

// Request Options

func StreamingRequest() RequestOption {
return func(o *requestOptions) {
o.stream = true
return func(o *RequestOptions) {
o.Stream = true
}
}
58 changes: 29 additions & 29 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,40 @@ import (

type rpcClient struct {
once sync.Once
opts options
opts Options
}

func newRpcClient(opt ...Option) Client {
var once sync.Once

opts := options{
codecs: make(map[string]codec.NewCodec),
opts := Options{
Codecs: make(map[string]codec.NewCodec),
}

for _, o := range opt {
o(&opts)
}

if len(opts.contentType) == 0 {
opts.contentType = defaultContentType
if len(opts.ContentType) == 0 {
opts.ContentType = defaultContentType
}

if opts.broker == nil {
opts.broker = broker.DefaultBroker
if opts.Broker == nil {
opts.Broker = broker.DefaultBroker
}

if opts.registry == nil {
opts.registry = registry.DefaultRegistry
if opts.Registry == nil {
opts.Registry = registry.DefaultRegistry
}

if opts.selector == nil {
opts.selector = selector.NewSelector(
selector.Registry(opts.registry),
if opts.Selector == nil {
opts.Selector = selector.NewSelector(
selector.Registry(opts.Registry),
)
}

if opts.transport == nil {
opts.transport = transport.DefaultTransport
if opts.Transport == nil {
opts.Transport = transport.DefaultTransport
}

rc := &rpcClient{
Expand All @@ -62,15 +62,15 @@ func newRpcClient(opt ...Option) Client {
c := Client(rc)

// wrap in reverse
for i := len(opts.wrappers); i > 0; i-- {
c = opts.wrappers[i-1](c)
for i := len(opts.Wrappers); i > 0; i-- {
c = opts.Wrappers[i-1](c)
}

return c
}

func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
if c, ok := r.opts.codecs[contentType]; ok {
if c, ok := r.opts.Codecs[contentType]; ok {
return c, nil
}
if cf, ok := defaultCodecs[contentType]; ok {
Expand Down Expand Up @@ -98,7 +98,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r
return errors.InternalServerError("go.micro.client", err.Error())
}

c, err := r.opts.transport.Dial(address)
c, err := r.opts.Transport.Dial(address)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St
return nil, errors.InternalServerError("go.micro.client", err.Error())
}

c, err := r.opts.transport.Dial(address, transport.WithStream())
c, err := r.opts.Transport.Dial(address, transport.WithStream())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
Expand All @@ -154,12 +154,12 @@ func (r *rpcClient) CallRemote(ctx context.Context, address string, request Requ
}

func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
var copts callOptions
var copts CallOptions
for _, opt := range opts {
opt(&copts)
}

next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...)
next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
Expand All @@ -179,7 +179,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
}

err = r.call(ctx, address, request, response)
r.opts.selector.Mark(request.Service(), node, err)
r.opts.Selector.Mark(request.Service(), node, err)
return err
}

Expand All @@ -188,12 +188,12 @@ func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Re
}

func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
var copts callOptions
var copts CallOptions
for _, opt := range opts {
opt(&copts)
}

next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...)
next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
Expand All @@ -213,7 +213,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
}

stream, err := r.stream(ctx, address, request)
r.opts.selector.Mark(request.Service(), node, err)
r.opts.Selector.Mark(request.Service(), node, err)
return stream, err
}

Expand All @@ -234,24 +234,24 @@ func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishO
return errors.InternalServerError("go.micro.client", err.Error())
}
r.once.Do(func() {
r.opts.broker.Connect()
r.opts.Broker.Connect()
})

return r.opts.broker.Publish(p.Topic(), &broker.Message{
return r.opts.Broker.Publish(p.Topic(), &broker.Message{
Header: md,
Body: b.Bytes(),
})
}

func (r *rpcClient) NewPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, r.opts.contentType)
return newRpcPublication(topic, message, r.opts.ContentType)
}

func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, "application/octet-stream")
}
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, r.opts.contentType, reqOpts...)
return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...)
}

func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
Expand Down
Loading

0 comments on commit 64b45f7

Please sign in to comment.