From 7d3dc63ae43527ac5408b2417be83a64d138c05c Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 5 Nov 2020 01:55:38 +0300 Subject: [PATCH] fix logger usage Signed-off-by: Vasiliy Tolstov --- go.mod | 2 +- go.sum | 4 +- http.go | 144 ++++++++++++++++++++++++++++++++------------------------ 3 files changed, 85 insertions(+), 65 deletions(-) diff --git a/go.mod b/go.mod index b9a53b3..257129c 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,5 @@ go 1.13 require ( github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077 - github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568 + github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e ) diff --git a/go.sum b/go.sum index ff17481..03e5a40 100644 --- a/go.sum +++ b/go.sum @@ -305,8 +305,8 @@ github.com/unistack-org/micro/v3 v3.0.0-gamma/go.mod h1:iEtpu3wTYCRs3pQ3VsFEO7JB github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568 h1:2h+k414Q3ABTRHByIvPJYZbi5s8qlCi9yG7x3wqaFDs= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568/go.mod h1:LFvCXGOgcLIj2k/8eL71TpIpcJBN2SXXAUx8U6dz9Rw= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e h1:v27OUgoE2UOyCe6uLksdpG6oErx62nUXWIkTPxS7yIw= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e/go.mod h1:LFvCXGOgcLIj2k/8eL71TpIpcJBN2SXXAUx8U6dz9Rw= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/http.go b/http.go index 5e6f7d9..ea2567d 100644 --- a/http.go +++ b/http.go @@ -31,13 +31,14 @@ var ( type httpServer struct { sync.RWMutex - opts server.Options - hd server.Handler - exit chan chan error - registerOnce sync.Once - subscribers map[*httpSubscriber][]broker.Subscriber + opts server.Options + hd server.Handler + exit chan chan error + subscribers map[*httpSubscriber][]broker.Subscriber // used for first registration registered bool + // registry service instance + rsvc *registry.Service } func (h *httpServer) newCodec(contentType string) (codec.NewCodec, error) { @@ -131,10 +132,19 @@ func (h *httpServer) Subscribe(sb server.Subscriber) error { } func (h *httpServer) Register() error { - h.Lock() - opts := h.opts + h.RLock() eps := h.hd.Endpoints() - h.Unlock() + rsvc := h.rsvc + config := h.opts + h.RUnlock() + + // if service already filled, reuse it and return early + if rsvc != nil { + if err := server.DefaultRegisterFunc(rsvc, config); err != nil { + return err + } + return nil + } service, err := server.NewRegistryService(h) if err != nil { @@ -154,78 +164,88 @@ func (h *httpServer) Register() error { sort.Slice(subscriberList, func(i, j int) bool { return subscriberList[i].topic > subscriberList[j].topic }) + for _, e := range subscriberList { service.Endpoints = append(service.Endpoints, e.Endpoints()...) } h.Unlock() - rOpts := []registry.RegisterOption{ - registry.RegisterTTL(opts.RegisterTTL), + h.RLock() + registered := h.registered + h.RUnlock() + + if !registered { + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) + } } - h.registerOnce.Do(func() { - logger.Infof("Registering node: %s", opts.Name+"-"+opts.Id) - }) - - if err := opts.Registry.Register(h.opts.Context, service, rOpts...); err != nil { + // register the service + if err := server.DefaultRegisterFunc(service, config); err != nil { return err } + // already registered? don't need to register subscribers + if registered { + return nil + } + h.Lock() defer h.Unlock() - if h.registered { - return nil - } - h.registered = true - - subCtx := h.opts.Context - for sb := range h.subscribers { - handler := h.createSubHandler(sb, opts) - var subOpts []broker.SubscribeOption + handler := h.createSubHandler(sb, config) + var opts []broker.SubscribeOption if queue := sb.Options().Queue; len(queue) > 0 { - subOpts = append(subOpts, broker.Queue(queue)) + opts = append(opts, broker.SubscribeGroup(queue)) } + subCtx := config.Context if cx := sb.Options().Context; cx != nil { subCtx = cx } + opts = append(opts, broker.SubscribeContext(subCtx)) + opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) - if !sb.Options().AutoAck { - subOpts = append(subOpts, broker.DisableAutoAck()) - } - - sub, err := opts.Broker.Subscribe(subCtx, sb.Topic(), handler, subOpts...) + sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) if err != nil { return err } h.subscribers[sb] = []broker.Subscriber{sub} } + + h.registered = true + h.rsvc = service + return nil } func (h *httpServer) Deregister() error { - h.Lock() - opts := h.opts - h.Unlock() - - logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id) + h.RLock() + config := h.opts + h.RUnlock() service, err := server.NewRegistryService(h) if err != nil { return err } - if err := opts.Registry.Deregister(h.opts.Context, service); err != nil { + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Deregistering node: %s", service.Nodes[0].Id) + } + + if err := server.DefaultDeregisterFunc(service, config); err != nil { return err } h.Lock() + h.rsvc = nil + if !h.registered { h.Unlock() return nil } + h.registered = false subCtx := h.opts.Context @@ -235,9 +255,9 @@ func (h *httpServer) Deregister() error { } for _, sub := range subs { - logger.Infof("Unsubscribing from topic: %s", sub.Topic()) + config.Logger.Info("Unsubscribing from topic: %s", sub.Topic()) if err := sub.Unsubscribe(subCtx); err != nil { - logger.Errorf("failed to unsubscribe topic: %s, error: %v", sb.Topic(), err) + config.Logger.Error("failed to unsubscribe topic: %s, error: %v", sb.Topic(), err) return err } } @@ -248,19 +268,19 @@ func (h *httpServer) Deregister() error { } func (h *httpServer) Start() error { - h.Lock() - opts := h.opts + h.RLock() + config := h.opts hd := h.hd - h.Unlock() + h.RUnlock() - config := h.Options() - - ln, err := net.Listen("tcp", opts.Address) + ln, err := net.Listen("tcp", config.Address) if err != nil { return err } - logger.Infof("Listening on %s", ln.Addr().String()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Listening on %s", ln.Addr().String()) + } h.Lock() h.opts.Address = ln.Addr().String() @@ -271,13 +291,13 @@ func (h *httpServer) Start() error { return errors.New("Server required http.Handler") } - if err = opts.Broker.Connect(h.opts.Context); err != nil { + if err = config.Broker.Connect(h.opts.Context); err != nil { return err } - if err = h.opts.RegisterCheck(h.opts.Context); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) + if err = config.RegisterCheck(h.opts.Context); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, err) } } else { if err = h.Register(); err != nil { @@ -291,9 +311,9 @@ func (h *httpServer) Start() error { t := new(time.Ticker) // only process if it exists - if opts.RegisterInterval > time.Duration(0) { + if config.RegisterInterval > time.Duration(0) { // new ticker - t = time.NewTicker(opts.RegisterInterval) + t = time.NewTicker(config.RegisterInterval) } // return error chan @@ -307,31 +327,31 @@ func (h *httpServer) Start() error { h.RLock() registered := h.registered h.RUnlock() - rerr := h.opts.RegisterCheck(h.opts.Context) + rerr := config.RegisterCheck(h.opts.Context) if rerr != nil && registered { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) } // deregister self in case of error if err := h.Deregister(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err) } } } else if rerr != nil && !registered { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr) } continue } if err := h.Register(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err) } } if err := h.Register(); err != nil { - logger.Error("Server register error: ", err) + config.Logger.Error("Server register error: ", err) } // wait for exit case ch = <-h.exit: @@ -344,7 +364,7 @@ func (h *httpServer) Start() error { // deregister h.Deregister() - opts.Broker.Disconnect(h.opts.Context) + config.Broker.Disconnect(config.Context) }() return nil