From cd2ac648ffc008e755f0c5d6a509e82d8a451686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E5=B0=8F=E4=B9=90?= Date: Sun, 11 Aug 2019 18:05:35 +0800 Subject: [PATCH 1/2] Fix read yaml config from memory package main import ( "fmt" "github.com/micro/go-micro/config" "github.com/micro/go-micro/config/source/memory" ) var configData = []byte(` --- a: 1234 `) func main() { memorySource := memory.NewSource( memory.WithYAML(configData), ) // Create new config conf := config.NewConfig() // Load file source conf.Load(memorySource) fmt.Println(string(conf.Bytes())) } --- config/source/memory/memory.go | 1 + 1 file changed, 1 insertion(+) diff --git a/config/source/memory/memory.go b/config/source/memory/memory.go index 607c7b4a..78e26d3f 100644 --- a/config/source/memory/memory.go +++ b/config/source/memory/memory.go @@ -18,6 +18,7 @@ type memory struct { func (s *memory) Read() (*source.ChangeSet, error) { s.RLock() cs := &source.ChangeSet{ + Format: s.ChangeSet.Format, Timestamp: s.ChangeSet.Timestamp, Data: s.ChangeSet.Data, Checksum: s.ChangeSet.Checksum, From de34f259baf6fac9c02b42ab888f8da002ecf4a6 Mon Sep 17 00:00:00 2001 From: johnson Date: Sun, 11 Aug 2019 10:14:41 +0800 Subject: [PATCH 2/2] update service not found error tooltip fixing test failed issue change back error type change registry.ErrNotFound back to selector.ErrNotFound change back error type change registry.ErrNotFound back to selector.ErrNotFound remove the single node tunnel test Fix read yaml config from memory package main import ( "fmt" "github.com/micro/go-micro/config" "github.com/micro/go-micro/config/source/memory" ) var configData = []byte(` --- a: 1234 `) func main() { memorySource := memory.NewSource( memory.WithYAML(configData), ) // Create new config conf := config.NewConfig() // Load file source conf.Load(memorySource) fmt.Println(string(conf.Bytes())) } --- client/grpc/grpc.go | 33 ++++++++++++++------------ client/rpc_client.go | 33 ++++++++++++++------------ client/selector/default.go | 3 +++ config/source/memory/memory.go | 1 + registry/registry.go | 2 +- server/grpc/grpc.go | 4 ++-- tunnel/default.go | 42 +++++++++++++++++----------------- tunnel/listener.go | 3 --- tunnel/tunnel_test.go | 22 ------------------ 9 files changed, 66 insertions(+), 77 deletions(-) diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index c6a3f5dd..9d33e017 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -73,10 +73,11 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele // get next nodes from the selector next, err := g.opts.Selector.Select(service, opts.SelectOptions...) - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + if err != nil { + if err == selector.ErrNotFound { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } return next, nil @@ -350,15 +351,17 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface // select next node node, err := next() - if err != nil && err == selector.ErrNotFound { - return errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + service := req.Service() + if err != nil { + if err == selector.ErrNotFound { + return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } // make the call err = gcall(ctx, node, req, rsp, callOpts) - g.opts.Selector.Mark(req.Service(), node, err) + g.opts.Selector.Mark(service, node, err) return err } @@ -429,14 +432,16 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli } node, err := next() - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + service := req.Service() + if err != nil { + if err == selector.ErrNotFound { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } stream, err := g.stream(ctx, node, req, callOpts) - g.opts.Selector.Mark(req.Service(), node, err) + g.opts.Selector.Mark(service, node, err) return stream, err } diff --git a/client/rpc_client.go b/client/rpc_client.go index a1f46dea..07b7d6e0 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -312,10 +312,11 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro // get next nodes from the selector next, err := r.opts.Selector.Select(service, opts.SelectOptions...) - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", "service %s: %v", service, err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", service, err.Error()) + if err != nil { + if err == selector.ErrNotFound { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } return next, nil @@ -375,15 +376,17 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac // select next node node, err := next() - if err != nil && err == selector.ErrNotFound { - return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) - } else if err != nil { - return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) + service := request.Service() + if err != nil { + if err == selector.ErrNotFound { + return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error()) } // make the call err = rcall(ctx, node, request, response, callOpts) - r.opts.Selector.Mark(request.Service(), node, err) + r.opts.Selector.Mark(service, node, err) return err } @@ -452,14 +455,16 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt } node, err := next() - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) + service := request.Service() + if err != nil { + if err == selector.ErrNotFound { + return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) + } + return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error()) } stream, err := r.stream(ctx, node, request, callOpts) - r.opts.Selector.Mark(request.Service(), node, err) + r.opts.Selector.Mark(service, node, err) return stream, err } diff --git a/client/selector/default.go b/client/selector/default.go index fa6a23a5..870bea2a 100644 --- a/client/selector/default.go +++ b/client/selector/default.go @@ -51,6 +51,9 @@ func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, e // if that fails go directly to the registry services, err := c.rc.GetService(service) if err != nil { + if err == registry.ErrNotFound { + return nil, ErrNotFound + } return nil, err } diff --git a/config/source/memory/memory.go b/config/source/memory/memory.go index 607c7b4a..78e26d3f 100644 --- a/config/source/memory/memory.go +++ b/config/source/memory/memory.go @@ -18,6 +18,7 @@ type memory struct { func (s *memory) Read() (*source.ChangeSet, error) { s.RLock() cs := &source.ChangeSet{ + Format: s.ChangeSet.Format, Timestamp: s.ChangeSet.Timestamp, Data: s.ChangeSet.Data, Checksum: s.ChangeSet.Checksum, diff --git a/registry/registry.go b/registry/registry.go index a4df9ba2..8a29d5cf 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -29,7 +29,7 @@ var ( DefaultRegistry = NewRegistry() // Not found error when GetService is called - ErrNotFound = errors.New("not found") + ErrNotFound = errors.New("service not found") // Watcher stopped error when watcher is stopped ErrWatcherStopped = errors.New("watcher stopped") ) diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 2aefc6f1..bb968d94 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -271,12 +271,12 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { g.rpc.mu.Unlock() if service == nil { - return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err() + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() } mtype := service.method[methodName] if mtype == nil { - return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %v", service)).Err() + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s.%s", serviceName, methodName)).Err() } // process unary diff --git a/tunnel/default.go b/tunnel/default.go index 842d0a95..480fd465 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -111,25 +111,26 @@ func (t *tun) process() { for { select { case msg := <-t.send: - nmsg := &transport.Message{ - Header: msg.data.Header, + newMsg := &transport.Message{ + Header: make(map[string]string), Body: msg.data.Body, } - if nmsg.Header == nil { - nmsg.Header = make(map[string]string) + for k, v := range msg.data.Header { + newMsg.Header[k] = v } // set the tunnel id on the outgoing message - nmsg.Header["Micro-Tunnel-Id"] = msg.id + newMsg.Header["Micro-Tunnel-Id"] = msg.id // set the session id - nmsg.Header["Micro-Tunnel-Session"] = msg.session + newMsg.Header["Micro-Tunnel-Session"] = msg.session // send the message via the interface t.RLock() for _, link := range t.links { - link.Send(nmsg) + log.Debugf("Sending %+v to %s", newMsg, link.Remote()) + link.Send(newMsg) } t.RUnlock() case <-t.closed: @@ -170,29 +171,26 @@ func (t *tun) listen(link transport.Socket, listener bool) { var s *socket var exists bool - // if its a local listener then we use that as the session id - // e.g we're using a loopback connecting to ourselves - if listener { + log.Debugf("Received %+v from %s", msg, link.Remote()) + // get the socket based on the tunnel id and session + // this could be something we dialed in which case + // we have a session for it otherwise its a listener + s, exists = t.getSocket(id, session) + if !exists { + // try get it based on just the tunnel id + // the assumption here is that a listener + // has no session but its set a listener session s, exists = t.getSocket(id, "listener") - } else { - // get the socket based on the tunnel id and session - // this could be something we dialed in which case - // we have a session for it otherwise its a listener - s, exists = t.getSocket(id, session) - if !exists { - // try get it based on just the tunnel id - // the assumption here is that a listener - // has no session but its set a listener session - s, exists = t.getSocket(id, "listener") - } } // no socket in existence if !exists { + log.Debugf("Skipping") // drop it, we don't care about // messages we don't know about continue } + log.Debugf("Using socket %s %s", s.id, s.session) // is the socket closed? select { @@ -398,6 +396,7 @@ func (t *tun) Init(opts ...Option) error { // Dial an address func (t *tun) Dial(addr string) (Conn, error) { + log.Debugf("Tunnel dialing %s", addr) c, ok := t.newSocket(addr, t.newSession()) if !ok { return nil, errors.New("error dialing " + addr) @@ -413,6 +412,7 @@ func (t *tun) Dial(addr string) (Conn, error) { // Accept a connection on the address func (t *tun) Listen(addr string) (Listener, error) { + log.Debugf("Tunnel listening on %s", addr) // create a new socket by hashing the address c, ok := t.newSocket(addr, "listener") if !ok { diff --git a/tunnel/listener.go b/tunnel/listener.go index 070b313b..368cf4a5 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -48,9 +48,6 @@ func (t *tunListener) process() { wait: make(chan bool), } - // first message - sock.recv <- m - // save the socket conns[m.session] = sock diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index 1580d4a9..721479bb 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -54,28 +54,6 @@ func testSend(t *testing.T, tun Tunnel) { } func TestTunnel(t *testing.T) { - // create a new listener - tun := NewTunnel(Nodes("127.0.0.1:9096")) - err := tun.Connect() - if err != nil { - t.Fatal(err) - } - defer tun.Close() - - var wg sync.WaitGroup - - // start accepting connections - wg.Add(1) - go testAccept(t, tun, &wg) - - // send a message - testSend(t, tun) - - // wait until message is received - wg.Wait() -} - -func TestTwoTunnel(t *testing.T) { // create a new tunnel client tunA := NewTunnel( Address("127.0.0.1:9096"),