Skip to content

Commit

Permalink
[WIP]: broker ErrorHandler option (#1296)
Browse files Browse the repository at this point in the history
* broker ErrorHandler option

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* rewrite Event interface, add error

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* implement new interface

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* change ErrorHandler func to broker.Handler

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* fix

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
  • Loading branch information
vtolstov committed Mar 6, 2020
1 parent 11be2c6 commit 8ee5607
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 14 deletions.
1 change: 1 addition & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Event interface {
Topic() string
Message() *Message
Ack() error
Error() error
}

// Subscriber is a convenience return type for the Subscribe method
Expand Down
29 changes: 25 additions & 4 deletions broker/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ type subscriber struct {
}

type publication struct {
t string
m *Message
t string
err error
m *Message
}

func (p *publication) Topic() string {
Expand All @@ -70,6 +71,10 @@ func (p *publication) Ack() error {
return nil
}

func (p *publication) Error() error {
return p.err
}

func (s *subscriber) Options() SubscribeOptions {
return s.opts
}
Expand Down Expand Up @@ -390,10 +395,26 @@ func (n *natsBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO

fn := func(msg *nats.Msg) {
var m Message
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
pub := &publication{t: msg.Subject}
eh := n.opts.ErrorHandler
err := n.opts.Codec.Unmarshal(msg.Data, &m)
pub.err = err
pub.m = &m
if err != nil {
m.Body = msg.Data
log.Error(err)
if eh != nil {
eh(pub)
}
return
}
handler(&publication{m: &m, t: msg.Subject})
if err := handler(pub); err != nil {
pub.err = err
log.Error(err)
if eh != nil {
eh(pub)
}
}
}

var sub *nats.Subscription
Expand Down
10 changes: 10 additions & 0 deletions broker/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type memoryBroker struct {
type memoryEvent struct {
opts broker.Options
topic string
err error
message interface{}
}

Expand Down Expand Up @@ -120,6 +121,11 @@ func (m *memoryBroker) Publish(topic string, msg *broker.Message, opts ...broker

for _, sub := range subs {
if err := sub.handler(p); err != nil {
p.err = err
if eh := m.opts.ErrorHandler; eh != nil {
eh(p)
continue
}
return err
}
}
Expand Down Expand Up @@ -197,6 +203,10 @@ func (m *memoryEvent) Ack() error {
return nil
}

func (m *memoryEvent) Error() error {
return m.err
}

func (m *memorySubscriber) Options() broker.SubscribeOptions {
return m.opts
}
Expand Down
29 changes: 25 additions & 4 deletions broker/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ type subscriber struct {
}

type publication struct {
t string
m *broker.Message
t string
err error
m *broker.Message
}

func (p *publication) Topic() string {
Expand All @@ -67,6 +68,10 @@ func (p *publication) Ack() error {
return nil
}

func (p *publication) Error() error {
return p.err
}

func (s *subscriber) Options() broker.SubscribeOptions {
return s.opts
}
Expand Down Expand Up @@ -375,10 +380,26 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro

fn := func(msg *nats.Msg) {
var m broker.Message
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
pub := &publication{t: msg.Subject}
eh := n.opts.ErrorHandler
err := n.opts.Codec.Unmarshal(msg.Data, &m)
pub.err = err
pub.m = &m
if err != nil {
m.Body = msg.Data
log.Error(err)
if eh != nil {
eh(pub)
}
return
}
handler(&publication{m: &m, t: msg.Subject})
if err := handler(pub); err != nil {
pub.err = err
log.Error(err)
if eh != nil {
eh(pub)
}
}
}

var sub *nats.Subscription
Expand Down
19 changes: 16 additions & 3 deletions broker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ import (
)

type Options struct {
Addrs []string
Secure bool
Codec codec.Marshaler
Addrs []string
Secure bool
Codec codec.Marshaler

// Handler executed when error happens in broker mesage
// processing
ErrorHandler Handler

TLSConfig *tls.Config
// Registry used for clustering
Registry registry.Registry
Expand Down Expand Up @@ -81,6 +86,14 @@ func DisableAutoAck() SubscribeOption {
}
}

// ErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func ErrorHandler(h Handler) Option {
return func(o *Options) {
o.ErrorHandler = h
}
}

// Queue sets the name of the queue to share messages on
func Queue(name string) SubscribeOption {
return func(o *SubscribeOptions) {
Expand Down
11 changes: 8 additions & 3 deletions broker/service/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type serviceSub struct {

type serviceEvent struct {
topic string
err error
message *broker.Message
}

Expand All @@ -32,6 +33,10 @@ func (s *serviceEvent) Ack() error {
return nil
}

func (s *serviceEvent) Error() error {
return s.err
}

func (s *serviceSub) isClosed() bool {
select {
case <-s.closed:
Expand Down Expand Up @@ -71,14 +76,14 @@ func (s *serviceSub) run() error {
return err
}

// TODO: handle error
s.handler(&serviceEvent{
p := &serviceEvent{
topic: s.topic,
message: &broker.Message{
Header: msg.Header,
Body: msg.Body,
},
})
}
p.err = s.handler(p)
}
}

Expand Down
5 changes: 5 additions & 0 deletions server/rpc_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

// event is a broker event we handle on the server transport
type event struct {
err error
message *broker.Message
}

Expand All @@ -19,6 +20,10 @@ func (e *event) Message() *broker.Message {
return e.message
}

func (e *event) Error() error {
return e.err
}

func (e *event) Topic() string {
return e.message.Header["Micro-Topic"]
}
Expand Down
4 changes: 4 additions & 0 deletions tunnel/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (t *tunEvent) Ack() error {
return nil
}

func (t *tunEvent) Error() error {
return nil
}

func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
Context: context.Background(),
Expand Down

0 comments on commit 8ee5607

Please sign in to comment.