Skip to content

Commit

Permalink
update service not found error tooltip
Browse files Browse the repository at this point in the history
fixing test failed issue

change back error type
change registry.ErrNotFound back to selector.ErrNotFound

change back error type
change registry.ErrNotFound back to selector.ErrNotFound

remove the single node tunnel test

Fix read yaml config from memory

package main

import (
	"fmt"

	"github.com/micro/go-micro/config"
	"github.com/micro/go-micro/config/source/memory"
)

var configData = []byte(`
---
a: 1234
`)

func main() {
	memorySource := memory.NewSource(
		memory.WithYAML(configData),
	)
	// Create new config
	conf := config.NewConfig()

	// Load file source
	conf.Load(memorySource)

	fmt.Println(string(conf.Bytes()))
}
  • Loading branch information
xpunch committed Aug 11, 2019
1 parent b13604f commit de34f25
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 77 deletions.
33 changes: 19 additions & 14 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele

// get next nodes from the selector
next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
}

return next, nil
Expand Down Expand Up @@ -350,15 +351,17 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface

// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
service := req.Service()
if err != nil {
if err == selector.ErrNotFound {
return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
}

// make the call
err = gcall(ctx, node, req, rsp, callOpts)
g.opts.Selector.Mark(req.Service(), node, err)
g.opts.Selector.Mark(service, node, err)
return err
}

Expand Down Expand Up @@ -429,14 +432,16 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
}

node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
service := req.Service()
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
}

stream, err := g.stream(ctx, node, req, callOpts)
g.opts.Selector.Mark(req.Service(), node, err)
g.opts.Selector.Mark(service, node, err)
return stream, err
}

Expand Down
33 changes: 19 additions & 14 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,11 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro

// get next nodes from the selector
next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", "service %s: %v", service, err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", service, err.Error())
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
}

return next, nil
Expand Down Expand Up @@ -375,15 +376,17 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac

// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
service := request.Service()
if err != nil {
if err == selector.ErrNotFound {
return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
}

// make the call
err = rcall(ctx, node, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
r.opts.Selector.Mark(service, node, err)
return err
}

Expand Down Expand Up @@ -452,14 +455,16 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
}

node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
service := request.Service()
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
}

stream, err := r.stream(ctx, node, request, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
r.opts.Selector.Mark(service, node, err)
return stream, err
}

Expand Down
3 changes: 3 additions & 0 deletions client/selector/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, e
// if that fails go directly to the registry
services, err := c.rc.GetService(service)
if err != nil {
if err == registry.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}

Expand Down
1 change: 1 addition & 0 deletions config/source/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type memory struct {
func (s *memory) Read() (*source.ChangeSet, error) {
s.RLock()
cs := &source.ChangeSet{
Format: s.ChangeSet.Format,
Timestamp: s.ChangeSet.Timestamp,
Data: s.ChangeSet.Data,
Checksum: s.ChangeSet.Checksum,
Expand Down
2 changes: 1 addition & 1 deletion registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
DefaultRegistry = NewRegistry()

// Not found error when GetService is called
ErrNotFound = errors.New("not found")
ErrNotFound = errors.New("service not found")
// Watcher stopped error when watcher is stopped
ErrWatcherStopped = errors.New("watcher stopped")
)
Expand Down
4 changes: 2 additions & 2 deletions server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,12 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
g.rpc.mu.Unlock()

if service == nil {
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
}

mtype := service.method[methodName]
if mtype == nil {
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err()
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s.%s", serviceName, methodName)).Err()
}

// process unary
Expand Down
42 changes: 21 additions & 21 deletions tunnel/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,26 @@ func (t *tun) process() {
for {
select {
case msg := <-t.send:
nmsg := &transport.Message{
Header: msg.data.Header,
newMsg := &transport.Message{
Header: make(map[string]string),
Body: msg.data.Body,
}

if nmsg.Header == nil {
nmsg.Header = make(map[string]string)
for k, v := range msg.data.Header {
newMsg.Header[k] = v
}

// set the tunnel id on the outgoing message
nmsg.Header["Micro-Tunnel-Id"] = msg.id
newMsg.Header["Micro-Tunnel-Id"] = msg.id

// set the session id
nmsg.Header["Micro-Tunnel-Session"] = msg.session
newMsg.Header["Micro-Tunnel-Session"] = msg.session

// send the message via the interface
t.RLock()
for _, link := range t.links {
link.Send(nmsg)
log.Debugf("Sending %+v to %s", newMsg, link.Remote())
link.Send(newMsg)
}
t.RUnlock()
case <-t.closed:
Expand Down Expand Up @@ -170,29 +171,26 @@ func (t *tun) listen(link transport.Socket, listener bool) {
var s *socket
var exists bool

// if its a local listener then we use that as the session id
// e.g we're using a loopback connecting to ourselves
if listener {
log.Debugf("Received %+v from %s", msg, link.Remote())
// get the socket based on the tunnel id and session
// this could be something we dialed in which case
// we have a session for it otherwise its a listener
s, exists = t.getSocket(id, session)
if !exists {
// try get it based on just the tunnel id
// the assumption here is that a listener
// has no session but its set a listener session
s, exists = t.getSocket(id, "listener")
} else {
// get the socket based on the tunnel id and session
// this could be something we dialed in which case
// we have a session for it otherwise its a listener
s, exists = t.getSocket(id, session)
if !exists {
// try get it based on just the tunnel id
// the assumption here is that a listener
// has no session but its set a listener session
s, exists = t.getSocket(id, "listener")
}
}

// no socket in existence
if !exists {
log.Debugf("Skipping")
// drop it, we don't care about
// messages we don't know about
continue
}
log.Debugf("Using socket %s %s", s.id, s.session)

// is the socket closed?
select {
Expand Down Expand Up @@ -398,6 +396,7 @@ func (t *tun) Init(opts ...Option) error {

// Dial an address
func (t *tun) Dial(addr string) (Conn, error) {
log.Debugf("Tunnel dialing %s", addr)
c, ok := t.newSocket(addr, t.newSession())
if !ok {
return nil, errors.New("error dialing " + addr)
Expand All @@ -413,6 +412,7 @@ func (t *tun) Dial(addr string) (Conn, error) {

// Accept a connection on the address
func (t *tun) Listen(addr string) (Listener, error) {
log.Debugf("Tunnel listening on %s", addr)
// create a new socket by hashing the address
c, ok := t.newSocket(addr, "listener")
if !ok {
Expand Down
3 changes: 0 additions & 3 deletions tunnel/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ func (t *tunListener) process() {
wait: make(chan bool),
}

// first message
sock.recv <- m

// save the socket
conns[m.session] = sock

Expand Down
22 changes: 0 additions & 22 deletions tunnel/tunnel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,6 @@ func testSend(t *testing.T, tun Tunnel) {
}

func TestTunnel(t *testing.T) {
// create a new listener
tun := NewTunnel(Nodes("127.0.0.1:9096"))
err := tun.Connect()
if err != nil {
t.Fatal(err)
}
defer tun.Close()

var wg sync.WaitGroup

// start accepting connections
wg.Add(1)
go testAccept(t, tun, &wg)

// send a message
testSend(t, tun)

// wait until message is received
wg.Wait()
}

func TestTwoTunnel(t *testing.T) {
// create a new tunnel client
tunA := NewTunnel(
Address("127.0.0.1:9096"),
Expand Down

0 comments on commit de34f25

Please sign in to comment.