Skip to content

Commit

Permalink
Fix the rest of the code
Browse files Browse the repository at this point in the history
  • Loading branch information
Asim committed Dec 23, 2015
1 parent 02aca81 commit 536216f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
5 changes: 3 additions & 2 deletions examples/pubsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func pub() {
}

func sub() {
_, err := broker.Subscribe(topic, func(msg *broker.Message) {
fmt.Println("[sub] received message:", string(msg.Body), "header", msg.Header)
_, err := broker.Subscribe(topic, func(p broker.Publication) error {
fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
})
if err != nil {
fmt.Println(err)
Expand Down
10 changes: 6 additions & 4 deletions server/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,12 @@ func validateSubscriber(sub Subscriber) error {
}

func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler {
return func(msg *broker.Message) {
return func(p broker.Publication) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
if err != nil {
return
return err
}

hdr := make(map[string]string)
Expand Down Expand Up @@ -190,11 +191,11 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle
defer co.Close()

if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil {
continue
return err
}

if err := co.ReadBody(req.Interface()); err != nil {
continue
return err
}

fn := func(ctx context.Context, msg Publication) error {
Expand Down Expand Up @@ -225,6 +226,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle
message: req.Interface(),
})
}
return nil
}
}

Expand Down

0 comments on commit 536216f

Please sign in to comment.