diff --git a/go.mod b/go.mod index baeefbb..232badd 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,6 @@ go 1.13 require ( github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077 - github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568 + github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e golang.org/x/net v0.0.0-20200904194848-62affa334b73 ) diff --git a/go.sum b/go.sum index 8658f1c..83b135e 100644 --- a/go.sum +++ b/go.sum @@ -305,8 +305,8 @@ github.com/unistack-org/micro/v3 v3.0.0-gamma/go.mod h1:iEtpu3wTYCRs3pQ3VsFEO7JB github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568 h1:2h+k414Q3ABTRHByIvPJYZbi5s8qlCi9yG7x3wqaFDs= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568/go.mod h1:LFvCXGOgcLIj2k/8eL71TpIpcJBN2SXXAUx8U6dz9Rw= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e h1:v27OUgoE2UOyCe6uLksdpG6oErx62nUXWIkTPxS7yIw= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e/go.mod h1:LFvCXGOgcLIj2k/8eL71TpIpcJBN2SXXAUx8U6dz9Rw= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/tcp.go b/tcp.go index 079ef53..bd56094 100644 --- a/tcp.go +++ b/tcp.go @@ -38,6 +38,8 @@ type tcpServer struct { subscribers map[*tcpSubscriber][]broker.Subscriber // used for first registration registered bool + // registry service instance + rsvc *registry.Service } func (h *tcpServer) newCodec(contentType string) (codec.NewCodec, error) { @@ -134,10 +136,19 @@ func (h *tcpServer) Subscribe(sb server.Subscriber) error { func (h *tcpServer) Register() error { h.Lock() - opts := h.opts + config := h.opts + rsvc := h.rsvc eps := h.hd.Endpoints() h.Unlock() + // if service already filled, reuse it and return early + if rsvc != nil { + if err := server.DefaultRegisterFunc(rsvc, config); err != nil { + return err + } + return nil + } + service, err := server.NewRegistryService(h) if err != nil { return err @@ -162,62 +173,79 @@ func (h *tcpServer) Register() error { } h.Unlock() - rOpts := []registry.RegisterOption{ - registry.RegisterTTL(opts.RegisterTTL), + h.RLock() + registered := h.registered + h.RUnlock() + + if !registered { + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) + } } - h.registerOnce.Do(func() { - logger.Infof("Registering node: %s", opts.Name+"-"+opts.Id) - }) - - if err := opts.Registry.Register(opts.Context, service, rOpts...); err != nil { + // register the service + if err := server.DefaultRegisterFunc(service, config); err != nil { return err } + // already registered? don't need to register subscribers + if registered { + return nil + } + h.Lock() defer h.Unlock() if h.registered { return nil } - h.registered = true - - subCtx := h.opts.Context for sb := range h.subscribers { - handler := h.createSubHandler(sb, opts) - var subOpts []broker.SubscribeOption + handler := h.createSubHandler(sb, config) + var opts []broker.SubscribeOption if queue := sb.Options().Queue; len(queue) > 0 { - subOpts = append(subOpts, broker.Queue(queue)) + opts = append(opts, broker.SubscribeGroup(queue)) } + + subCtx := config.Context if cx := sb.Options().Context; cx != nil { subCtx = cx } - if !sb.Options().AutoAck { - subOpts = append(subOpts, broker.DisableAutoAck()) + opts = append(opts, broker.SubscribeContext(subCtx)) + opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Subscribing to topic: %s", sb.Topic()) } - sub, err := opts.Broker.Subscribe(subCtx, sb.Topic(), handler, subOpts...) + sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) if err != nil { return err } h.subscribers[sb] = []broker.Subscriber{sub} } + + h.registered = true + h.rsvc = service + return nil } func (h *tcpServer) Deregister() error { h.Lock() - opts := h.opts + config := h.opts h.Unlock() - logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id) - service, err := server.NewRegistryService(h) if err != nil { return err } - if err := opts.Registry.Deregister(opts.Context, service); err != nil { + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Deregistering node: %s", service.Nodes[0].Id) + } + + if err := server.DefaultDeregisterFunc(service, config); err != nil { return err } @@ -228,21 +256,32 @@ func (h *tcpServer) Deregister() error { } h.registered = false + wg := sync.WaitGroup{} subCtx := h.opts.Context for sb, subs := range h.subscribers { if cx := sb.Options().Context; cx != nil { subCtx = cx } + for _, sub := range subs { - logger.Infof("Unsubscribing from topic: %s", sub.Topic()) - if err := sub.Unsubscribe(subCtx); err != nil { - logger.Errorf("failed to unsubscribe topic: %s error: %v", sb.Topic(), err) - return err - } + wg.Add(1) + go func(s broker.Subscriber) { + defer wg.Done() + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Unsubscribing from topic: %s", s.Topic()) + } + if err := s.Unsubscribe(subCtx); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Unsubscribing from topic: %s err: %v", s.Topic(), err) + } + } + }(sub) } h.subscribers[sb] = nil } + wg.Wait() + h.Unlock() return nil } @@ -261,10 +300,10 @@ func (h *tcpServer) getListener() net.Listener { } func (h *tcpServer) Start() error { - h.Lock() - opts := h.opts + h.RLock() + config := h.opts hd := h.hd.Handler() - h.Unlock() + h.RUnlock() var err error var ts net.Listener @@ -273,35 +312,35 @@ func (h *tcpServer) Start() error { ts = l } else { // check the tls config for secure connect - if tc := opts.TLSConfig; tc != nil { - ts, err = tls.Listen("tcp", opts.Address, tc) + if tc := config.TLSConfig; tc != nil { + ts, err = tls.Listen("tcp", config.Address, tc) // otherwise just plain tcp listener } else { - ts, err = net.Listen("tcp", opts.Address) + ts, err = net.Listen("tcp", config.Address) } if err != nil { return err } - if opts.Context != nil { - if c, ok := opts.Context.Value(maxConnKey{}).(int); ok && c > 0 { + if config.Context != nil { + if c, ok := config.Context.Value(maxConnKey{}).(int); ok && c > 0 { ts = netutil.LimitListener(ts, c) } } } - logger.Infof("Listening on %s", ts.Addr().String()) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Info("Listening on %s", ts.Addr().String()) + } h.Lock() h.opts.Address = ts.Addr().String() h.Unlock() - if err = opts.Broker.Connect(h.opts.Context); err != nil { + if err = config.Broker.Connect(config.Context); err != nil { return err } - config := h.Options() - // register if err = h.Register(); err != nil { return err @@ -317,9 +356,9 @@ func (h *tcpServer) Start() error { t := new(time.Ticker) // only process if it exists - if opts.RegisterInterval > time.Duration(0) { + if config.RegisterInterval > time.Duration(0) { // new ticker - t = time.NewTicker(opts.RegisterInterval) + t = time.NewTicker(config.RegisterInterval) } // return error chan @@ -335,24 +374,24 @@ func (h *tcpServer) Start() error { h.RUnlock() rerr := h.opts.RegisterCheck(h.opts.Context) if rerr != nil && registered { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) } // deregister self in case of error if err := h.Deregister(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err) } } } else if rerr != nil && !registered { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr) } continue } if err := h.Register(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err) } } // wait for exit @@ -366,7 +405,7 @@ func (h *tcpServer) Start() error { // deregister h.Deregister() - opts.Broker.Disconnect(h.opts.Context) + config.Broker.Disconnect(config.Context) }() return nil @@ -384,7 +423,9 @@ func (h *tcpServer) String() string { func (s *tcpServer) serve(ln net.Listener, h Handler) { var tempDelay time.Duration // how long to sleep on accept failure - + s.RLock() + config := s.opts + s.RUnlock() for { c, err := ln.Accept() if err != nil { @@ -402,20 +443,20 @@ func (s *tcpServer) serve(ln net.Listener, h Handler) { if max := 1 * time.Second; tempDelay > max { tempDelay = max } - if logger.V(logger.ErrorLevel) { - logger.Errorf("tcp: Accept error: %v; retrying in %v", err, tempDelay) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("tcp: Accept error: %v; retrying in %v", err, tempDelay) } time.Sleep(tempDelay) continue } - if logger.V(logger.ErrorLevel) { - logger.Errorf("tcp: Accept error: %v", err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("tcp: Accept error: %v", err) } return } if err != nil { - logger.Errorf("tcp: accept err: %v", err) + config.Logger.Error("tcp: accept err: %v", err) return } go h.Serve(c)