fix logger usage

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-11-05 01:55:38 +03:00
parent 860a1929f4
commit 7d3dc63ae4
3 changed files with 85 additions and 65 deletions

2
go.mod
View File

@ -5,5 +5,5 @@ go 1.13
require ( require (
github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd
github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077 github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568 github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e
) )

4
go.sum
View File

@ -305,8 +305,8 @@ github.com/unistack-org/micro/v3 v3.0.0-gamma/go.mod h1:iEtpu3wTYCRs3pQ3VsFEO7JB
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568 h1:2h+k414Q3ABTRHByIvPJYZbi5s8qlCi9yG7x3wqaFDs= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e h1:v27OUgoE2UOyCe6uLksdpG6oErx62nUXWIkTPxS7yIw=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568/go.mod h1:LFvCXGOgcLIj2k/8eL71TpIpcJBN2SXXAUx8U6dz9Rw= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e/go.mod h1:LFvCXGOgcLIj2k/8eL71TpIpcJBN2SXXAUx8U6dz9Rw=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=

136
http.go
View File

@ -34,10 +34,11 @@ type httpServer struct {
opts server.Options opts server.Options
hd server.Handler hd server.Handler
exit chan chan error exit chan chan error
registerOnce sync.Once
subscribers map[*httpSubscriber][]broker.Subscriber subscribers map[*httpSubscriber][]broker.Subscriber
// used for first registration // used for first registration
registered bool registered bool
// registry service instance
rsvc *registry.Service
} }
func (h *httpServer) newCodec(contentType string) (codec.NewCodec, error) { func (h *httpServer) newCodec(contentType string) (codec.NewCodec, error) {
@ -131,10 +132,19 @@ func (h *httpServer) Subscribe(sb server.Subscriber) error {
} }
func (h *httpServer) Register() error { func (h *httpServer) Register() error {
h.Lock() h.RLock()
opts := h.opts
eps := h.hd.Endpoints() eps := h.hd.Endpoints()
h.Unlock() rsvc := h.rsvc
config := h.opts
h.RUnlock()
// if service already filled, reuse it and return early
if rsvc != nil {
if err := server.DefaultRegisterFunc(rsvc, config); err != nil {
return err
}
return nil
}
service, err := server.NewRegistryService(h) service, err := server.NewRegistryService(h)
if err != nil { if err != nil {
@ -154,78 +164,88 @@ func (h *httpServer) Register() error {
sort.Slice(subscriberList, func(i, j int) bool { sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic return subscriberList[i].topic > subscriberList[j].topic
}) })
for _, e := range subscriberList { for _, e := range subscriberList {
service.Endpoints = append(service.Endpoints, e.Endpoints()...) service.Endpoints = append(service.Endpoints, e.Endpoints()...)
} }
h.Unlock() h.Unlock()
rOpts := []registry.RegisterOption{ h.RLock()
registry.RegisterTTL(opts.RegisterTTL), registered := h.registered
h.RUnlock()
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id)
}
} }
h.registerOnce.Do(func() { // register the service
logger.Infof("Registering node: %s", opts.Name+"-"+opts.Id) if err := server.DefaultRegisterFunc(service, config); err != nil {
})
if err := opts.Registry.Register(h.opts.Context, service, rOpts...); err != nil {
return err return err
} }
// already registered? don't need to register subscribers
if registered {
return nil
}
h.Lock() h.Lock()
defer h.Unlock() defer h.Unlock()
if h.registered {
return nil
}
h.registered = true
subCtx := h.opts.Context
for sb := range h.subscribers { for sb := range h.subscribers {
handler := h.createSubHandler(sb, opts) handler := h.createSubHandler(sb, config)
var subOpts []broker.SubscribeOption var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 { if queue := sb.Options().Queue; len(queue) > 0 {
subOpts = append(subOpts, broker.Queue(queue)) opts = append(opts, broker.SubscribeGroup(queue))
} }
subCtx := config.Context
if cx := sb.Options().Context; cx != nil { if cx := sb.Options().Context; cx != nil {
subCtx = cx subCtx = cx
} }
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
if !sb.Options().AutoAck { sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
subOpts = append(subOpts, broker.DisableAutoAck())
}
sub, err := opts.Broker.Subscribe(subCtx, sb.Topic(), handler, subOpts...)
if err != nil { if err != nil {
return err return err
} }
h.subscribers[sb] = []broker.Subscriber{sub} h.subscribers[sb] = []broker.Subscriber{sub}
} }
h.registered = true
h.rsvc = service
return nil return nil
} }
func (h *httpServer) Deregister() error { func (h *httpServer) Deregister() error {
h.Lock() h.RLock()
opts := h.opts config := h.opts
h.Unlock() h.RUnlock()
logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id)
service, err := server.NewRegistryService(h) service, err := server.NewRegistryService(h)
if err != nil { if err != nil {
return err return err
} }
if err := opts.Registry.Deregister(h.opts.Context, service); err != nil { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Deregistering node: %s", service.Nodes[0].Id)
}
if err := server.DefaultDeregisterFunc(service, config); err != nil {
return err return err
} }
h.Lock() h.Lock()
h.rsvc = nil
if !h.registered { if !h.registered {
h.Unlock() h.Unlock()
return nil return nil
} }
h.registered = false h.registered = false
subCtx := h.opts.Context subCtx := h.opts.Context
@ -235,9 +255,9 @@ func (h *httpServer) Deregister() error {
} }
for _, sub := range subs { for _, sub := range subs {
logger.Infof("Unsubscribing from topic: %s", sub.Topic()) config.Logger.Info("Unsubscribing from topic: %s", sub.Topic())
if err := sub.Unsubscribe(subCtx); err != nil { if err := sub.Unsubscribe(subCtx); err != nil {
logger.Errorf("failed to unsubscribe topic: %s, error: %v", sb.Topic(), err) config.Logger.Error("failed to unsubscribe topic: %s, error: %v", sb.Topic(), err)
return err return err
} }
} }
@ -248,19 +268,19 @@ func (h *httpServer) Deregister() error {
} }
func (h *httpServer) Start() error { func (h *httpServer) Start() error {
h.Lock() h.RLock()
opts := h.opts config := h.opts
hd := h.hd hd := h.hd
h.Unlock() h.RUnlock()
config := h.Options() ln, err := net.Listen("tcp", config.Address)
ln, err := net.Listen("tcp", opts.Address)
if err != nil { if err != nil {
return err return err
} }
logger.Infof("Listening on %s", ln.Addr().String()) if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Listening on %s", ln.Addr().String())
}
h.Lock() h.Lock()
h.opts.Address = ln.Addr().String() h.opts.Address = ln.Addr().String()
@ -271,13 +291,13 @@ func (h *httpServer) Start() error {
return errors.New("Server required http.Handler") return errors.New("Server required http.Handler")
} }
if err = opts.Broker.Connect(h.opts.Context); err != nil { if err = config.Broker.Connect(h.opts.Context); err != nil {
return err return err
} }
if err = h.opts.RegisterCheck(h.opts.Context); err != nil { if err = config.RegisterCheck(h.opts.Context); err != nil {
if logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, err)
} }
} else { } else {
if err = h.Register(); err != nil { if err = h.Register(); err != nil {
@ -291,9 +311,9 @@ func (h *httpServer) Start() error {
t := new(time.Ticker) t := new(time.Ticker)
// only process if it exists // only process if it exists
if opts.RegisterInterval > time.Duration(0) { if config.RegisterInterval > time.Duration(0) {
// new ticker // new ticker
t = time.NewTicker(opts.RegisterInterval) t = time.NewTicker(config.RegisterInterval)
} }
// return error chan // return error chan
@ -307,31 +327,31 @@ func (h *httpServer) Start() error {
h.RLock() h.RLock()
registered := h.registered registered := h.registered
h.RUnlock() h.RUnlock()
rerr := h.opts.RegisterCheck(h.opts.Context) rerr := config.RegisterCheck(h.opts.Context)
if rerr != nil && registered { if rerr != nil && registered {
if logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
} }
// deregister self in case of error // deregister self in case of error
if err := h.Deregister(); err != nil { if err := h.Deregister(); err != nil {
if logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err)
} }
} }
} else if rerr != nil && !registered { } else if rerr != nil && !registered {
if logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr) config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
} }
continue continue
} }
if err := h.Register(); err != nil { if err := h.Register(); err != nil {
if logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err)
} }
} }
if err := h.Register(); err != nil { if err := h.Register(); err != nil {
logger.Error("Server register error: ", err) config.Logger.Error("Server register error: ", err)
} }
// wait for exit // wait for exit
case ch = <-h.exit: case ch = <-h.exit:
@ -344,7 +364,7 @@ func (h *httpServer) Start() error {
// deregister // deregister
h.Deregister() h.Deregister()
opts.Broker.Disconnect(h.opts.Context) config.Broker.Disconnect(config.Context)
}() }()
return nil return nil