fix logger usage

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-11-05 02:12:36 +03:00
parent 4585a4fb9a
commit b12d45f45c
3 changed files with 99 additions and 58 deletions

151
tcp.go
View File

@@ -38,6 +38,8 @@ type tcpServer struct {
subscribers map[*tcpSubscriber][]broker.Subscriber
// used for first registration
registered bool
// registry service instance
rsvc *registry.Service
}
func (h *tcpServer) newCodec(contentType string) (codec.NewCodec, error) {
@@ -134,10 +136,19 @@ func (h *tcpServer) Subscribe(sb server.Subscriber) error {
func (h *tcpServer) Register() error {
h.Lock()
opts := h.opts
config := h.opts
rsvc := h.rsvc
eps := h.hd.Endpoints()
h.Unlock()
// if service already filled, reuse it and return early
if rsvc != nil {
if err := server.DefaultRegisterFunc(rsvc, config); err != nil {
return err
}
return nil
}
service, err := server.NewRegistryService(h)
if err != nil {
return err
@@ -162,62 +173,79 @@ func (h *tcpServer) Register() error {
}
h.Unlock()
rOpts := []registry.RegisterOption{
registry.RegisterTTL(opts.RegisterTTL),
h.RLock()
registered := h.registered
h.RUnlock()
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id)
}
}
h.registerOnce.Do(func() {
logger.Infof("Registering node: %s", opts.Name+"-"+opts.Id)
})
if err := opts.Registry.Register(opts.Context, service, rOpts...); err != nil {
// register the service
if err := server.DefaultRegisterFunc(service, config); err != nil {
return err
}
// already registered? don't need to register subscribers
if registered {
return nil
}
h.Lock()
defer h.Unlock()
if h.registered {
return nil
}
h.registered = true
subCtx := h.opts.Context
for sb := range h.subscribers {
handler := h.createSubHandler(sb, opts)
var subOpts []broker.SubscribeOption
handler := h.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
subOpts = append(subOpts, broker.Queue(queue))
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
if !sb.Options().AutoAck {
subOpts = append(subOpts, broker.DisableAutoAck())
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Subscribing to topic: %s", sb.Topic())
}
sub, err := opts.Broker.Subscribe(subCtx, sb.Topic(), handler, subOpts...)
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
h.subscribers[sb] = []broker.Subscriber{sub}
}
h.registered = true
h.rsvc = service
return nil
}
func (h *tcpServer) Deregister() error {
h.Lock()
opts := h.opts
config := h.opts
h.Unlock()
logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id)
service, err := server.NewRegistryService(h)
if err != nil {
return err
}
if err := opts.Registry.Deregister(opts.Context, service); err != nil {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Deregistering node: %s", service.Nodes[0].Id)
}
if err := server.DefaultDeregisterFunc(service, config); err != nil {
return err
}
@@ -228,21 +256,32 @@ func (h *tcpServer) Deregister() error {
}
h.registered = false
wg := sync.WaitGroup{}
subCtx := h.opts.Context
for sb, subs := range h.subscribers {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
for _, sub := range subs {
logger.Infof("Unsubscribing from topic: %s", sub.Topic())
if err := sub.Unsubscribe(subCtx); err != nil {
logger.Errorf("failed to unsubscribe topic: %s error: %v", sb.Topic(), err)
return err
}
wg.Add(1)
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info("Unsubscribing from topic: %s", s.Topic())
}
if err := s.Unsubscribe(subCtx); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Unsubscribing from topic: %s err: %v", s.Topic(), err)
}
}
}(sub)
}
h.subscribers[sb] = nil
}
wg.Wait()
h.Unlock()
return nil
}
@@ -261,10 +300,10 @@ func (h *tcpServer) getListener() net.Listener {
}
func (h *tcpServer) Start() error {
h.Lock()
opts := h.opts
h.RLock()
config := h.opts
hd := h.hd.Handler()
h.Unlock()
h.RUnlock()
var err error
var ts net.Listener
@@ -273,35 +312,35 @@ func (h *tcpServer) Start() error {
ts = l
} else {
// check the tls config for secure connect
if tc := opts.TLSConfig; tc != nil {
ts, err = tls.Listen("tcp", opts.Address, tc)
if tc := config.TLSConfig; tc != nil {
ts, err = tls.Listen("tcp", config.Address, tc)
// otherwise just plain tcp listener
} else {
ts, err = net.Listen("tcp", opts.Address)
ts, err = net.Listen("tcp", config.Address)
}
if err != nil {
return err
}
if opts.Context != nil {
if c, ok := opts.Context.Value(maxConnKey{}).(int); ok && c > 0 {
if config.Context != nil {
if c, ok := config.Context.Value(maxConnKey{}).(int); ok && c > 0 {
ts = netutil.LimitListener(ts, c)
}
}
}
logger.Infof("Listening on %s", ts.Addr().String())
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Info("Listening on %s", ts.Addr().String())
}
h.Lock()
h.opts.Address = ts.Addr().String()
h.Unlock()
if err = opts.Broker.Connect(h.opts.Context); err != nil {
if err = config.Broker.Connect(config.Context); err != nil {
return err
}
config := h.Options()
// register
if err = h.Register(); err != nil {
return err
@@ -317,9 +356,9 @@ func (h *tcpServer) Start() error {
t := new(time.Ticker)
// only process if it exists
if opts.RegisterInterval > time.Duration(0) {
if config.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(opts.RegisterInterval)
t = time.NewTicker(config.RegisterInterval)
}
// return error chan
@@ -335,24 +374,24 @@ func (h *tcpServer) Start() error {
h.RUnlock()
rerr := h.opts.RegisterCheck(h.opts.Context)
if rerr != nil && registered {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
}
// deregister self in case of error
if err := h.Deregister(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
}
} else if rerr != nil && !registered {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
}
continue
}
if err := h.Register(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
// wait for exit
@@ -366,7 +405,7 @@ func (h *tcpServer) Start() error {
// deregister
h.Deregister()
opts.Broker.Disconnect(h.opts.Context)
config.Broker.Disconnect(config.Context)
}()
return nil
@@ -384,7 +423,9 @@ func (h *tcpServer) String() string {
func (s *tcpServer) serve(ln net.Listener, h Handler) {
var tempDelay time.Duration // how long to sleep on accept failure
s.RLock()
config := s.opts
s.RUnlock()
for {
c, err := ln.Accept()
if err != nil {
@@ -402,20 +443,20 @@ func (s *tcpServer) serve(ln net.Listener, h Handler) {
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
if logger.V(logger.ErrorLevel) {
logger.Errorf("tcp: Accept error: %v; retrying in %v", err, tempDelay)
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("tcp: Accept error: %v; retrying in %v", err, tempDelay)
}
time.Sleep(tempDelay)
continue
}
if logger.V(logger.ErrorLevel) {
logger.Errorf("tcp: Accept error: %v", err)
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error("tcp: Accept error: %v", err)
}
return
}
if err != nil {
logger.Errorf("tcp: accept err: %v", err)
config.Logger.Error("tcp: accept err: %v", err)
return
}
go h.Serve(c)