From 536216fd019f233fff68b9ce893f1855d32fe775 Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 23 Dec 2015 19:16:55 +0000 Subject: [PATCH] Fix the rest of the code --- examples/pubsub/main.go | 5 +++-- server/subscriber.go | 10 ++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/examples/pubsub/main.go b/examples/pubsub/main.go index 063b065b80..3fcd3100c2 100644 --- a/examples/pubsub/main.go +++ b/examples/pubsub/main.go @@ -33,8 +33,9 @@ func pub() { } func sub() { - _, err := broker.Subscribe(topic, func(msg *broker.Message) { - fmt.Println("[sub] received message:", string(msg.Body), "header", msg.Header) + _, err := broker.Subscribe(topic, func(p broker.Publication) error { + fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) + return nil }) if err != nil { fmt.Println(err) diff --git a/server/subscriber.go b/server/subscriber.go index 4a06efbf06..7cca43cbc8 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -155,11 +155,12 @@ func validateSubscriber(sub Subscriber) error { } func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler { - return func(msg *broker.Message) { + return func(p broker.Publication) error { + msg := p.Message() ct := msg.Header["Content-Type"] cf, err := s.newCodec(ct) if err != nil { - return + return err } hdr := make(map[string]string) @@ -190,11 +191,11 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle defer co.Close() if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { - continue + return err } if err := co.ReadBody(req.Interface()); err != nil { - continue + return err } fn := func(ctx context.Context, msg Publication) error { @@ -225,6 +226,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle message: req.Interface(), }) } + return nil } }