Skip to content

Commit

Permalink
handle errors differently
Browse files Browse the repository at this point in the history
  • Loading branch information
asim committed Jan 22, 2019
1 parent d8ba18d commit 2ed676a
Showing 1 changed file with 14 additions and 26 deletions.
40 changes: 14 additions & 26 deletions server/rpc_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,27 +162,23 @@ func prepareMethod(method reflect.Method) *methodType {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
}

func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) {
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error {
msg := new(codec.Message)
msg.Type = codec.Response
resp := router.getResponse()
resp.msg = msg

// Encode the response header
resp.msg.Endpoint = req.msg.Endpoint
if errmsg != "" {
resp.msg.Error = errmsg
reply = invalidRequest
}
resp.msg.Id = req.msg.Id
sending.Lock()
err = cc.Write(resp.msg, reply)
err := cc.Write(resp.msg, reply)
sending.Unlock()
router.freeResponse(resp)
return err
}

func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) {
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error {
defer router.freeRequest(req)

function := mtype.method.Func
var returnValues []reflect.Value

Expand All @@ -206,18 +202,13 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
return nil
}

errmsg := ""
err := fn(ctx, r, replyv.Interface())
if err != nil {
errmsg = err.Error()
// execute handler
if err := fn(ctx, r, replyv.Interface()); err != nil {
return err
}

err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true)
if err != nil {
log.Log("rpc call: unable to send response: ", err)
}
router.freeRequest(req)
return
// send response
return router.sendResponse(sending, req, replyv.Interface(), cc, true)
}

// declare a local error to see if we errored out already
Expand Down Expand Up @@ -250,16 +241,15 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
// client.Stream request
r.stream = true

errmsg := ""
// execute handler
if err := fn(ctx, r, stream); err != nil {
errmsg = err.Error()
return err
}

// this is the last packet, we don't do anything with
// the error here (well sendStreamResponse will log it
// already)
router.sendResponse(sending, req, nil, cc, errmsg, true)
router.freeRequest(req)
return router.sendResponse(sending, req, nil, cc, true)
}

func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
Expand Down Expand Up @@ -448,11 +438,9 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response)
}
// send a response if we actually managed to read a header.
if req != nil {
router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true)
router.freeRequest(req)
}
return err
}
service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
return nil
return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
}

0 comments on commit 2ed676a

Please sign in to comment.