Skip to content

Commit

Permalink
Add configurable codecs for broker/transport
Browse files Browse the repository at this point in the history
  • Loading branch information
Asim Aslam committed Dec 6, 2016
1 parent e102599 commit bd3c798
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 3 deletions.
10 changes: 10 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,22 @@ type Publication interface {
Ack() error
}

// Subscriber is a convenience return type for the Subscribe method
type Subscriber interface {
Options() SubscribeOptions
Topic() string
Unsubscribe() error
}

// Codec is used for encoding where the broker doesn't natively support
// headers in the message type. In this case the entire message is
// encoded as the payload
type Codec interface {
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
String() string
}

var (
DefaultBroker Broker = newHttpBroker()
)
Expand Down
19 changes: 19 additions & 0 deletions broker/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package broker

import (
"encoding/json"
)

type jsonCodec struct{}

func (j jsonCodec) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}

func (j jsonCodec) Unmarshal(d []byte, v interface{}) error {
return json.Unmarshal(d, v)
}

func (j jsonCodec) String() string {
return "json"
}
25 changes: 25 additions & 0 deletions broker/codec/json/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package json

import (
"encoding/json"

"github.com/micro/go-micro/broker"
)

type jsonCodec struct{}

func (j jsonCodec) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}

func (j jsonCodec) Unmarshal(d []byte, v interface{}) error {
return json.Unmarshal(d, v)
}

func (j jsonCodec) String() string {
return "json"
}

func NewCodec() broker.Codec {
return jsonCodec{}
}
34 changes: 34 additions & 0 deletions broker/codec/noop/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package noop

import (
"errors"

"github.com/micro/go-micro/broker"
)

type noopCodec struct{}

func (n noopCodec) Marshal(v interface{}) ([]byte, error) {
msg, ok := v.(*broker.Message)
if !ok {
return nil, errors.New("invalid message")
}
return msg.Body, nil
}

func (n noopCodec) Unmarshal(d []byte, v interface{}) error {
msg, ok := v.(*broker.Message)
if !ok {
return errors.New("invalid message")
}
msg.Body = d
return nil
}

func (n noopCodec) String() string {
return "noop"
}

func NewCodec() broker.Codec {
return noopCodec{}
}
6 changes: 3 additions & 3 deletions broker/http_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package broker
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -96,6 +95,7 @@ func newTransport(config *tls.Config) *http.Transport {

func newHttpBroker(opts ...Option) Broker {
options := Options{
Codec: jsonCodec{},
Context: context.TODO(),
}

Expand Down Expand Up @@ -269,7 +269,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

var m *Message
if err = json.Unmarshal(b, &m); err != nil {
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err))
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
Expand Down Expand Up @@ -352,7 +352,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)

m.Header[":topic"] = topic

b, err := json.Marshal(m)
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions broker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type Options struct {
Addrs []string
Secure bool
Codec Codec
TLSConfig *tls.Config

// Other options for implementations of the interface
Expand Down Expand Up @@ -96,6 +97,14 @@ func Secure(b bool) Option {
}
}

// Codec sets the codec used for encoding/decoding used where
// a broker does not support headers
func SetCodec(c Codec) Option {
return func(o *Options) {
o.Codec = c
}
}

// Specify TLS Config
func TLSConfig(t *tls.Config) Option {
return func(o *Options) {
Expand Down

0 comments on commit bd3c798

Please sign in to comment.