diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index c6a3f5dd..9d33e017 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -73,10 +73,11 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele // get next nodes from the selector next, err := g.opts.Selector.Select(service, opts.SelectOptions...) - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + if err != nil { + if err == selector.ErrNotFound { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } return next, nil @@ -350,15 +351,17 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface // select next node node, err := next() - if err != nil && err == selector.ErrNotFound { - return errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + service := req.Service() + if err != nil { + if err == selector.ErrNotFound { + return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } // make the call err = gcall(ctx, node, req, rsp, callOpts) - g.opts.Selector.Mark(req.Service(), node, err) + g.opts.Selector.Mark(service, node, err) return err } @@ -429,14 +432,16 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli } node, err := next() - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + service := req.Service() + if err != nil { + if err == selector.ErrNotFound { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } stream, err := g.stream(ctx, node, req, callOpts) - g.opts.Selector.Mark(req.Service(), node, err) + g.opts.Selector.Mark(service, node, err) return stream, err } diff --git a/client/rpc_client.go b/client/rpc_client.go index a1f46dea..07b7d6e0 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -312,10 +312,11 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro // get next nodes from the selector next, err := r.opts.Selector.Select(service, opts.SelectOptions...) - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", "service %s: %v", service, err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", service, err.Error()) + if err != nil { + if err == selector.ErrNotFound { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } return next, nil @@ -375,15 +376,17 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac // select next node node, err := next() - if err != nil && err == selector.ErrNotFound { - return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) - } else if err != nil { - return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) + service := request.Service() + if err != nil { + if err == selector.ErrNotFound { + return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error()) } // make the call err = rcall(ctx, node, request, response, callOpts) - r.opts.Selector.Mark(request.Service(), node, err) + r.opts.Selector.Mark(service, node, err) return err } @@ -452,14 +455,16 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt } node, err := next() - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) + service := request.Service() + if err != nil { + if err == selector.ErrNotFound { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error()) } stream, err := r.stream(ctx, node, request, callOpts) - r.opts.Selector.Mark(request.Service(), node, err) + r.opts.Selector.Mark(service, node, err) return stream, err } diff --git a/client/selector/default.go b/client/selector/default.go index fa6a23a5..870bea2a 100644 --- a/client/selector/default.go +++ b/client/selector/default.go @@ -51,6 +51,9 @@ func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, e // if that fails go directly to the registry services, err := c.rc.GetService(service) if err != nil { + if err == registry.ErrNotFound { + return nil, ErrNotFound + } return nil, err } diff --git a/config/source/memory/memory.go b/config/source/memory/memory.go index 607c7b4a..78e26d3f 100644 --- a/config/source/memory/memory.go +++ b/config/source/memory/memory.go @@ -18,6 +18,7 @@ type memory struct { func (s *memory) Read() (*source.ChangeSet, error) { s.RLock() cs := &source.ChangeSet{ + Format: s.ChangeSet.Format, Timestamp: s.ChangeSet.Timestamp, Data: s.ChangeSet.Data, Checksum: s.ChangeSet.Checksum, diff --git a/registry/registry.go b/registry/registry.go index a4df9ba2..8a29d5cf 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -29,7 +29,7 @@ var ( DefaultRegistry = NewRegistry() // Not found error when GetService is called - ErrNotFound = errors.New("not found") + ErrNotFound = errors.New("service not found") // Watcher stopped error when watcher is stopped ErrWatcherStopped = errors.New("watcher stopped") ) diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 2aefc6f1..bb968d94 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -271,12 +271,12 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { g.rpc.mu.Unlock() if service == nil { - return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err() + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() } mtype := service.method[methodName] if mtype == nil { - return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err() + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s.%s", serviceName, methodName)).Err() } // process unary diff --git a/tunnel/default.go b/tunnel/default.go index 842d0a95..480fd465 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -111,25 +111,26 @@ func (t *tun) process() { for { select { case msg := <-t.send: - nmsg := &transport.Message{ - Header: msg.data.Header, + newMsg := &transport.Message{ + Header: make(map[string]string), Body: msg.data.Body, } - if nmsg.Header == nil { - nmsg.Header = make(map[string]string) + for k, v := range msg.data.Header { + newMsg.Header[k] = v } // set the tunnel id on the outgoing message - nmsg.Header["Micro-Tunnel-Id"] = msg.id + newMsg.Header["Micro-Tunnel-Id"] = msg.id // set the session id - nmsg.Header["Micro-Tunnel-Session"] = msg.session + newMsg.Header["Micro-Tunnel-Session"] = msg.session // send the message via the interface t.RLock() for _, link := range t.links { - link.Send(nmsg) + log.Debugf("Sending %+v to %s", newMsg, link.Remote()) + link.Send(newMsg) } t.RUnlock() case <-t.closed: @@ -170,29 +171,26 @@ func (t *tun) listen(link transport.Socket, listener bool) { var s *socket var exists bool - // if its a local listener then we use that as the session id - // e.g we're using a loopback connecting to ourselves - if listener { + log.Debugf("Received %+v from %s", msg, link.Remote()) + // get the socket based on the tunnel id and session + // this could be something we dialed in which case + // we have a session for it otherwise its a listener + s, exists = t.getSocket(id, session) + if !exists { + // try get it based on just the tunnel id + // the assumption here is that a listener + // has no session but its set a listener session s, exists = t.getSocket(id, "listener") - } else { - // get the socket based on the tunnel id and session - // this could be something we dialed in which case - // we have a session for it otherwise its a listener - s, exists = t.getSocket(id, session) - if !exists { - // try get it based on just the tunnel id - // the assumption here is that a listener - // has no session but its set a listener session - s, exists = t.getSocket(id, "listener") - } } // no socket in existence if !exists { + log.Debugf("Skipping") // drop it, we don't care about // messages we don't know about continue } + log.Debugf("Using socket %s %s", s.id, s.session) // is the socket closed? select { @@ -398,6 +396,7 @@ func (t *tun) Init(opts ...Option) error { // Dial an address func (t *tun) Dial(addr string) (Conn, error) { + log.Debugf("Tunnel dialing %s", addr) c, ok := t.newSocket(addr, t.newSession()) if !ok { return nil, errors.New("error dialing " + addr) @@ -413,6 +412,7 @@ func (t *tun) Dial(addr string) (Conn, error) { // Accept a connection on the address func (t *tun) Listen(addr string) (Listener, error) { + log.Debugf("Tunnel listening on %s", addr) // create a new socket by hashing the address c, ok := t.newSocket(addr, "listener") if !ok { diff --git a/tunnel/listener.go b/tunnel/listener.go index 070b313b..368cf4a5 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -48,9 +48,6 @@ func (t *tunListener) process() { wait: make(chan bool), } - // first message - sock.recv <- m - // save the socket conns[m.session] = sock diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index 1580d4a9..721479bb 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -54,28 +54,6 @@ func testSend(t *testing.T, tun Tunnel) { } func TestTunnel(t *testing.T) { - // create a new listener - tun := NewTunnel(Nodes("127.0.0.1:9096")) - err := tun.Connect() - if err != nil { - t.Fatal(err) - } - defer tun.Close() - - var wg sync.WaitGroup - - // start accepting connections - wg.Add(1) - go testAccept(t, tun, &wg) - - // send a message - testSend(t, tun) - - // wait until message is received - wg.Wait() -} - -func TestTwoTunnel(t *testing.T) { // create a new tunnel client tunA := NewTunnel( Address("127.0.0.1:9096"),