diff --git a/agent/input/discord/conn.go b/agent/input/discord/conn.go index 8d5d70d1..ac9af037 100644 --- a/agent/input/discord/conn.go +++ b/agent/input/discord/conn.go @@ -7,7 +7,7 @@ import ( "github.com/bwmarrin/discordgo" "github.com/micro/go-micro/v2/agent/input" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" ) type discordConn struct { @@ -74,7 +74,9 @@ func (dc *discordConn) Send(e *input.Event) error { fields := strings.Split(e.To, ":") _, err := dc.master.session.ChannelMessageSend(fields[0], string(e.Data)) if err != nil { - log.Error("[bot][loop][send]", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("[bot][loop][send]", err) + } } return nil } diff --git a/agent/input/telegram/conn.go b/agent/input/telegram/conn.go index 41ecae72..b0cb4f33 100644 --- a/agent/input/telegram/conn.go +++ b/agent/input/telegram/conn.go @@ -7,7 +7,7 @@ import ( "github.com/forestgiant/sliceutil" "github.com/micro/go-micro/v2/agent/input" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" tgbotapi "gopkg.in/telegram-bot-api.v4" ) @@ -104,7 +104,9 @@ func (tc *telegramConn) Send(event *input.Event) error { if err != nil { // probably it could be because of nested HTML tags -- telegram doesn't allow nested tags - log.Error("[telegram][Send] error:", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("[telegram][Send] error:", err) + } msgConfig.Text = "This bot couldn't send the response (Internal error)" tc.input.api.Send(msgConfig) } diff --git a/api/handler/broker/broker.go b/api/handler/broker/broker.go index 6a503e37..8f82446b 100644 --- a/api/handler/broker/broker.go +++ b/api/handler/broker/broker.go @@ -14,7 +14,7 @@ import ( "github.com/gorilla/websocket" "github.com/micro/go-micro/v2/api/handler" "github.com/micro/go-micro/v2/broker" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" ) const ( @@ -136,7 +136,9 @@ func (c *conn) writeLoop() { }() if err != nil { - log.Error(err.Error()) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err.Error()) + } return } @@ -214,7 +216,9 @@ func (b *brokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ws, err := b.u.Upgrade(w, r, nil) if err != nil { - log.Error(err.Error()) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err.Error()) + } return } diff --git a/api/server/http/http.go b/api/server/http/http.go index 1fed42b1..0af91256 100644 --- a/api/server/http/http.go +++ b/api/server/http/http.go @@ -13,7 +13,7 @@ import ( "github.com/gorilla/handlers" "github.com/micro/go-micro/v2/api/server" "github.com/micro/go-micro/v2/api/server/cors" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" ) type httpServer struct { @@ -75,7 +75,9 @@ func (s *httpServer) Start() error { return err } - log.Infof("HTTP API Listening on %s", l.Addr().String()) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("HTTP API Listening on %s", l.Addr().String()) + } s.mtx.Lock() s.address = l.Addr().String() @@ -84,7 +86,7 @@ func (s *httpServer) Start() error { go func() { if err := http.Serve(l, s.mux); err != nil { // temporary fix - //log.Fatal(err) + //logger.Fatal(err) } }() diff --git a/broker/default.go b/broker/default.go index 05eef916..ae6562ce 100644 --- a/broker/default.go +++ b/broker/default.go @@ -11,7 +11,7 @@ import ( "time" "github.com/micro/go-micro/v2/codec/json" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/util/addr" "github.com/nats-io/nats-server/v2/server" @@ -172,7 +172,9 @@ func (n *natsBroker) serve(exit chan bool) error { for _, node := range service.Nodes { u, err := url.Parse("nats://" + node.Address) if err != nil { - log.Info(err) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Info(err) + } continue } // append to the cluster routes @@ -247,7 +249,9 @@ func (n *natsBroker) serve(exit chan bool) error { select { case err := <-n.closeCh: if err != nil { - log.Info(err) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Info(err) + } } case <-exit: // deregister on exit @@ -402,7 +406,9 @@ func (n *natsBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO pub.m = &m if err != nil { m.Body = msg.Data - log.Error(err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } if eh != nil { eh(pub) } @@ -410,7 +416,9 @@ func (n *natsBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO } if err := handler(pub); err != nil { pub.err = err - log.Error(err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } if eh != nil { eh(pub) } diff --git a/broker/memory/memory.go b/broker/memory/memory.go index 2a449467..5424f378 100644 --- a/broker/memory/memory.go +++ b/broker/memory/memory.go @@ -10,7 +10,7 @@ import ( "github.com/google/uuid" "github.com/micro/go-micro/v2/broker" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" maddr "github.com/micro/go-micro/v2/util/addr" mnet "github.com/micro/go-micro/v2/util/net" ) @@ -190,7 +190,9 @@ func (m *memoryEvent) Message() *broker.Message { case []byte: msg := &broker.Message{} if err := m.opts.Codec.Unmarshal(v, msg); err != nil { - log.Errorf("[memory]: failed to unmarshal: %v\n", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[memory]: failed to unmarshal: %v\n", err) + } return nil } return msg diff --git a/broker/nats/nats.go b/broker/nats/nats.go index 1af597eb..01b59d18 100644 --- a/broker/nats/nats.go +++ b/broker/nats/nats.go @@ -13,7 +13,7 @@ import ( "github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/codec/json" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/util/addr" "github.com/nats-io/nats-server/v2/server" @@ -169,7 +169,9 @@ func (n *natsBroker) serve(exit chan bool) error { for _, node := range service.Nodes { u, err := url.Parse("nats://" + node.Address) if err != nil { - log.Error(err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } continue } // append to the cluster routes @@ -387,7 +389,9 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro pub.m = &m if err != nil { m.Body = msg.Data - log.Error(err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } if eh != nil { eh(pub) } @@ -395,7 +399,9 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro } if err := handler(pub); err != nil { pub.err = err - log.Error(err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } if eh != nil { eh(pub) } diff --git a/broker/service/service.go b/broker/service/service.go index a52400ba..0ce2f00e 100644 --- a/broker/service/service.go +++ b/broker/service/service.go @@ -8,7 +8,7 @@ import ( "github.com/micro/go-micro/v2/broker" pb "github.com/micro/go-micro/v2/broker/service/proto" "github.com/micro/go-micro/v2/client" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" ) type serviceBroker struct { @@ -45,7 +45,9 @@ func (b *serviceBroker) Options() broker.Options { } func (b *serviceBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { - log.Debugf("Publishing to topic %s broker %v", topic, b.Addrs) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Publishing to topic %s broker %v", topic, b.Addrs) + } _, err := b.Client.Publish(context.TODO(), &pb.PublishRequest{ Topic: topic, Message: &pb.Message{ @@ -61,7 +63,9 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ... for _, o := range opts { o(&options) } - log.Debugf("Subscribing to topic %s queue %s broker %v", topic, options.Queue, b.Addrs) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Subscribing to topic %s queue %s broker %v", topic, options.Queue, b.Addrs) + } stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ Topic: topic, Queue: options.Queue, @@ -83,19 +87,27 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ... for { select { case <-sub.closed: - log.Debugf("Unsubscribed from topic %s", topic) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Unsubscribed from topic %s", topic) + } return default: - // run the subscriber - log.Debugf("Streaming from broker %v to topic [%s] queue [%s]", b.Addrs, topic, options.Queue) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + // run the subscriber + logger.Debugf("Streaming from broker %v to topic [%s] queue [%s]", b.Addrs, topic, options.Queue) + } if err := sub.run(); err != nil { - log.Debugf("Resubscribing to topic %s broker %v", topic, b.Addrs) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Resubscribing to topic %s broker %v", topic, b.Addrs) + } stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ Topic: topic, Queue: options.Queue, }, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour)) if err != nil { - log.Debugf("Failed to resubscribe to topic %s: %v", topic, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Failed to resubscribe to topic %s: %v", topic, err) + } time.Sleep(time.Second) continue } diff --git a/broker/service/subscriber.go b/broker/service/subscriber.go index 662d6d50..c11ec962 100644 --- a/broker/service/subscriber.go +++ b/broker/service/subscriber.go @@ -3,7 +3,7 @@ package service import ( "github.com/micro/go-micro/v2/broker" pb "github.com/micro/go-micro/v2/broker/service/proto" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" ) type serviceSub struct { @@ -62,7 +62,9 @@ func (s *serviceSub) run() error { // TODO: do not fail silently msg, err := s.stream.Recv() if err != nil { - log.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err) + } // close the exit channel close(exit) diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index 9e1d0039..5daad43f 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -17,7 +17,7 @@ import ( "github.com/micro/go-micro/v2/debug/profile/http" "github.com/micro/go-micro/v2/debug/profile/pprof" "github.com/micro/go-micro/v2/debug/trace" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/runtime" "github.com/micro/go-micro/v2/server" @@ -537,13 +537,13 @@ func (c *cmd) Before(ctx *cli.Context) error { clientOpts = append(clientOpts, client.Registry(*c.opts.Registry)) if err := (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)); err != nil { - log.Fatalf("Error configuring registry: %v", err) + logger.Fatalf("Error configuring registry: %v", err) } clientOpts = append(clientOpts, client.Selector(*c.opts.Selector)) if err := (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)); err != nil { - log.Fatalf("Error configuring broker: %v", err) + logger.Fatalf("Error configuring broker: %v", err) } } @@ -590,31 +590,31 @@ func (c *cmd) Before(ctx *cli.Context) error { if len(ctx.String("broker_address")) > 0 { if err := (*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...)); err != nil { - log.Fatalf("Error configuring broker: %v", err) + logger.Fatalf("Error configuring broker: %v", err) } } if len(ctx.String("registry_address")) > 0 { if err := (*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...)); err != nil { - log.Fatalf("Error configuring registry: %v", err) + logger.Fatalf("Error configuring registry: %v", err) } } if len(ctx.String("transport_address")) > 0 { if err := (*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...)); err != nil { - log.Fatalf("Error configuring transport: %v", err) + logger.Fatalf("Error configuring transport: %v", err) } } if len(ctx.String("store_address")) > 0 { if err := (*c.opts.Store).Init(store.Nodes(strings.Split(ctx.String("store_address"), ",")...)); err != nil { - log.Fatalf("Error configuring store: %v", err) + logger.Fatalf("Error configuring store: %v", err) } } if len(ctx.String("store_namespace")) > 0 { if err := (*c.opts.Store).Init(store.Namespace(ctx.String("store_address"))); err != nil { - log.Fatalf("Error configuring store: %v", err) + logger.Fatalf("Error configuring store: %v", err) } } @@ -648,7 +648,7 @@ func (c *cmd) Before(ctx *cli.Context) error { if len(ctx.String("runtime_source")) > 0 { if err := (*c.opts.Runtime).Init(runtime.WithSource(ctx.String("runtime_source"))); err != nil { - log.Fatalf("Error configuring runtime: %v", err) + logger.Fatalf("Error configuring runtime: %v", err) } } @@ -696,7 +696,7 @@ func (c *cmd) Before(ctx *cli.Context) error { if len(authOpts) > 0 { if err := (*c.opts.Auth).Init(authOpts...); err != nil { - log.Fatalf("Error configuring auth: %v", err) + logger.Fatalf("Error configuring auth: %v", err) } } @@ -729,14 +729,14 @@ func (c *cmd) Before(ctx *cli.Context) error { // Lets set it up if len(serverOpts) > 0 { if err := (*c.opts.Server).Init(serverOpts...); err != nil { - log.Fatalf("Error configuring server: %v", err) + logger.Fatalf("Error configuring server: %v", err) } } // Use an init option? if len(clientOpts) > 0 { if err := (*c.opts.Client).Init(clientOpts...); err != nil { - log.Fatalf("Error configuring client: %v", err) + logger.Fatalf("Error configuring client: %v", err) } } diff --git a/config/source/service/service.go b/config/source/service/service.go index 5e71ab17..52dfe223 100644 --- a/config/source/service/service.go +++ b/config/source/service/service.go @@ -6,7 +6,7 @@ import ( "github.com/micro/go-micro/v2/client" "github.com/micro/go-micro/v2/config/source" proto "github.com/micro/go-micro/v2/config/source/service/proto" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" ) var ( @@ -36,7 +36,9 @@ func (m *service) Read() (set *source.ChangeSet, err error) { func (m *service) Watch() (w source.Watcher, err error) { stream, err := m.client.Watch(context.Background(), &proto.WatchRequest{Key: m.key, Path: m.path}) if err != nil { - log.Error("watch err: ", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("watch err: ", err) + } return } return newWatcher(stream) diff --git a/logger/helper.go b/logger/helper.go index 00ac1304..a78f907d 100644 --- a/logger/helper.go +++ b/logger/helper.go @@ -12,52 +12,88 @@ func NewHelper(log Logger) *Helper { } func (h *Helper) Info(args ...interface{}) { + if !h.Logger.Options().Level.Enabled(InfoLevel) { + return + } h.Logger.Fields(h.fields).Log(InfoLevel, args...) } func (h *Helper) Infof(template string, args ...interface{}) { + if !h.Logger.Options().Level.Enabled(InfoLevel) { + return + } h.Logger.Fields(h.fields).Logf(InfoLevel, template, args...) } func (h *Helper) Trace(args ...interface{}) { + if !h.Logger.Options().Level.Enabled(TraceLevel) { + return + } h.Logger.Fields(h.fields).Log(TraceLevel, args...) } func (h *Helper) Tracef(template string, args ...interface{}) { + if !h.Logger.Options().Level.Enabled(TraceLevel) { + return + } h.Logger.Fields(h.fields).Logf(TraceLevel, template, args...) } func (h *Helper) Debug(args ...interface{}) { + if !h.Logger.Options().Level.Enabled(DebugLevel) { + return + } h.Logger.Fields(h.fields).Log(DebugLevel, args...) } func (h *Helper) Debugf(template string, args ...interface{}) { + if !h.Logger.Options().Level.Enabled(DebugLevel) { + return + } h.Logger.Fields(h.fields).Logf(DebugLevel, template, args...) } func (h *Helper) Warn(args ...interface{}) { + if !h.Logger.Options().Level.Enabled(WarnLevel) { + return + } h.Logger.Fields(h.fields).Log(WarnLevel, args...) } func (h *Helper) Warnf(template string, args ...interface{}) { + if !h.Logger.Options().Level.Enabled(WarnLevel) { + return + } h.Logger.Fields(h.fields).Logf(WarnLevel, template, args...) } func (h *Helper) Error(args ...interface{}) { + if !h.Logger.Options().Level.Enabled(ErrorLevel) { + return + } h.Logger.Fields(h.fields).Log(ErrorLevel, args...) } func (h *Helper) Errorf(template string, args ...interface{}) { + if !h.Logger.Options().Level.Enabled(ErrorLevel) { + return + } h.Logger.Fields(h.fields).Logf(ErrorLevel, template, args...) } func (h *Helper) Fatal(args ...interface{}) { - h.Logger.Fields(h.fields).Log(ErrorLevel, args...) + if !h.Logger.Options().Level.Enabled(FatalLevel) { + return + } + h.Logger.Fields(h.fields).Log(FatalLevel, args...) os.Exit(1) } func (h *Helper) Fatalf(template string, args ...interface{}) { - h.Logger.Fields(h.fields).Logf(ErrorLevel, template, args...) + if !h.Logger.Options().Level.Enabled(FatalLevel) { + return + } + h.Logger.Fields(h.fields).Logf(FatalLevel, template, args...) os.Exit(1) } diff --git a/network/default.go b/network/default.go index 5e116b85..f777b8c4 100644 --- a/network/default.go +++ b/network/default.go @@ -15,7 +15,7 @@ import ( "github.com/micro/go-micro/v2/client" cmucp "github.com/micro/go-micro/v2/client/mucp" rtr "github.com/micro/go-micro/v2/client/selector/router" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/network/resolver/dns" pbNet "github.com/micro/go-micro/v2/network/service/proto" "github.com/micro/go-micro/v2/proxy" @@ -223,7 +223,7 @@ func (n *network) acceptNetConn(l tunnel.Listener, recv chan *message) { conn, err := l.Accept() if err != nil { sleep := backoff.Do(i) - log.Debugf("Network tunnel [%s] accept error: %v, backing off for %v", ControlChannel, err, sleep) + logger.Debugf("Network tunnel [%s] accept error: %v, backing off for %v", ControlChannel, err, sleep) time.Sleep(sleep) i++ continue @@ -232,7 +232,7 @@ func (n *network) acceptNetConn(l tunnel.Listener, recv chan *message) { select { case <-n.closed: if err := conn.Close(); err != nil { - log.Debugf("Network tunnel [%s] failed to close connection: %v", NetworkChannel, err) + logger.Debugf("Network tunnel [%s] failed to close connection: %v", NetworkChannel, err) } return default: @@ -250,7 +250,9 @@ func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *message) { conn, err := l.Accept() if err != nil { sleep := backoff.Do(i) - log.Debugf("Network tunnel [%s] accept error: %v, backing off for %v", ControlChannel, err, sleep) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network tunnel [%s] accept error: %v, backing off for %v", ControlChannel, err, sleep) + } time.Sleep(sleep) i++ continue @@ -259,7 +261,9 @@ func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *message) { select { case <-n.closed: if err := conn.Close(); err != nil { - log.Debugf("Network tunnel [%s] failed to close connection: %v", ControlChannel, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network tunnel [%s] failed to close connection: %v", ControlChannel, err) + } } return default: @@ -355,7 +359,9 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { for i := 0; i < max; i++ { if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil { if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err) + } } } } @@ -371,7 +377,9 @@ func (n *network) initNodes(startup bool) { // NOTE: this condition never fires // as resolveNodes() never returns error if err != nil && !startup { - log.Debugf("Network failed to init nodes: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to init nodes: %v", err) + } return } @@ -390,8 +398,10 @@ func (n *network) initNodes(startup bool) { init = append(init, node) } - // initialize the tunnel - log.Tracef("Network initialising nodes %+v\n", init) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + // initialize the tunnel + logger.Tracef("Network initialising nodes %+v\n", init) + } n.tunnel.Init( tunnel.Nodes(nodes...), @@ -403,7 +413,9 @@ func (n *network) resolveNodes() ([]string, error) { // resolve the network address to network nodes records, err := n.options.Resolver.Resolve(n.options.Name) if err != nil { - log.Debugf("Network failed to resolve nodes: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to resolve nodes: %v", err) + } } // sort by lowest priority @@ -444,7 +456,9 @@ func (n *network) resolveNodes() ([]string, error) { // resolve anything that looks like a host name records, err := dns.Resolve(node) if err != nil { - log.Debugf("Failed to resolve %v %v", node, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Failed to resolve %v %v", node, err) + } continue } @@ -464,7 +478,9 @@ func (n *network) handleNetConn(s tunnel.Session, msg chan *message) { for { m := new(transport.Message) if err := s.Recv(m); err != nil { - log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) + } switch err { case io.EOF, tunnel.ErrReadTimeout: s.Close() @@ -497,7 +513,9 @@ func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) { for { m := new(transport.Message) if err := s.Recv(m); err != nil { - log.Debugf("Network tunnel [%s] receive error: %v", ControlChannel, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network tunnel [%s] receive error: %v", ControlChannel, err) + } switch err { case io.EOF, tunnel.ErrReadTimeout: s.Close() @@ -575,12 +593,15 @@ func (n *network) getRouteMetric(router string, gateway string, link string) int return 2 } - log.Tracef("Network looking up %s link to gateway: %s", link, gateway) - + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Network looking up %s link to gateway: %s", link, gateway) + } // attempt to find link based on gateway address lnk, ok := n.peerLinks[gateway] if !ok { - log.Debugf("Network failed to find a link to gateway: %s", gateway) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to find a link to gateway: %s", gateway) + } // no link found so infinite metric returned return math.MaxInt64 } @@ -598,11 +619,15 @@ func (n *network) getRouteMetric(router string, gateway string, link string) int // make sure length is non-zero if length == 0 { - log.Debugf("Link length is 0 %v %v", link, lnk.Length()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Link length is 0 %v %v", link, lnk.Length()) + } length = 10e9 } - log.Tracef("Network calculated metric %v delay %v length %v distance %v", (delay*length*int64(hops))/10e6, delay, length, hops) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Network calculated metric %v delay %v length %v distance %v", (delay*length*int64(hops))/10e6, delay, length, hops) + } return (delay * length * int64(hops)) / 10e6 } @@ -626,7 +651,9 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { pbRtrAdvert := &pbRtr.Advert{} if err := proto.Unmarshal(m.msg.Body, pbRtrAdvert); err != nil { - log.Debugf("Network fail to unmarshal advert message: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network fail to unmarshal advert message: %v", err) + } continue } @@ -634,14 +661,17 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { if pbRtrAdvert.Id == n.options.Id { continue } - - log.Debugf("Network received advert message from: %s", pbRtrAdvert.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network received advert message from: %s", pbRtrAdvert.Id) + } // loookup advertising node in our peer topology advertNode := n.node.GetPeerNode(pbRtrAdvert.Id) if advertNode == nil { // if we can't find the node in our topology (MaxDepth) we skipp prcessing adverts - log.Debugf("Network skipping advert message from unknown peer: %s", pbRtrAdvert.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network skipping advert message from unknown peer: %s", pbRtrAdvert.Id) + } continue } @@ -658,7 +688,9 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { // if the origin router is not the advertising node peer // we can't rule out potential routing loops so we bail here if peer := advertNode.GetPeerNode(event.Route.Router); peer == nil { - log.Debugf("Network skipping advert message from peer: %s", pbRtrAdvert.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network skipping advert message from peer: %s", pbRtrAdvert.Id) + } continue } } @@ -676,7 +708,9 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { // calculate route metric and add to the advertised metric // we need to make sure we do not overflow math.MaxInt64 metric := n.getRouteMetric(event.Route.Router, event.Route.Gateway, event.Route.Link) - log.Tracef("Network metric for router %s and gateway %s: %v", event.Route.Router, event.Route.Gateway, metric) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Network metric for router %s and gateway %s: %v", event.Route.Router, event.Route.Gateway, metric) + } // check we don't overflow max int 64 if d := route.Metric + metric; d <= 0 { @@ -698,7 +732,9 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { // if no events are eligible for processing continue if len(events) == 0 { - log.Tracef("Network no events to be processed by router: %s", n.options.Id) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Network no events to be processed by router: %s", n.options.Id) + } continue } @@ -711,9 +747,13 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { Events: events, } - log.Tracef("Network router %s processing advert: %s", n.Id(), advert.Id) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Network router %s processing advert: %s", n.Id(), advert.Id) + } if err := n.router.Process(advert); err != nil { - log.Debugf("Network failed to process advert %s: %v", advert.Id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to process advert %s: %v", advert.Id, err) + } } } case <-n.closed: @@ -743,7 +783,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { pbNetConnect := &pbNet.Connect{} if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil { - log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) + logger.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) continue } @@ -752,7 +792,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { continue } - log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id) + logger.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id) peer := &node{ id: pbNetConnect.Node.Id, @@ -768,15 +808,15 @@ func (n *network) processNetChan(listener tunnel.Listener) { // TODO: should we do this only if we manage to add a peer // What should we do if the peer links failed to be updated? if err := n.updatePeerLinks(peer); err != nil { - log.Debugf("Network failed updating peer links: %s", err) + logger.Debugf("Network failed updating peer links: %s", err) } // add peer to the list of node peers if err := n.AddPeer(peer); err == ErrPeerExists { - log.Tracef("Network peer exists, refreshing: %s", peer.id) + logger.Tracef("Network peer exists, refreshing: %s", peer.id) // update lastSeen time for the peer if err := n.RefreshPeer(peer.id, peer.link, now); err != nil { - log.Debugf("Network failed refreshing peer %s: %v", peer.id, err) + logger.Debugf("Network failed refreshing peer %s: %v", peer.id, err) } } @@ -796,14 +836,14 @@ func (n *network) processNetChan(listener tunnel.Listener) { // get a list of the best routes for each service in our routing table routes, err := n.getProtoRoutes() if err != nil { - log.Debugf("Network node %s failed listing routes: %v", n.id, err) + logger.Debugf("Network node %s failed listing routes: %v", n.id, err) } // attached the routes to the message msg.Routes = routes // send sync message to the newly connected peer if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to send sync message: %v", err) + logger.Debugf("Network failed to send sync message: %v", err) } }() case "peer": @@ -812,7 +852,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { pbNetPeer := &pbNet.Peer{} if err := proto.Unmarshal(m.msg.Body, pbNetPeer); err != nil { - log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err) + logger.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err) continue } @@ -821,7 +861,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { continue } - log.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address) + logger.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address) peer := &node{ id: pbNetPeer.Node.Id, @@ -837,7 +877,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { // TODO: should we do this only if we manage to add a peer // What should we do if the peer links failed to be updated? if err := n.updatePeerLinks(peer); err != nil { - log.Debugf("Network failed updating peer links: %s", err) + logger.Debugf("Network failed updating peer links: %s", err) } // if it's a new peer i.e. we do not have it in our graph, we request full sync @@ -853,29 +893,29 @@ func (n *network) processNetChan(listener tunnel.Listener) { // get a list of the best routes for each service in our routing table routes, err := n.getProtoRoutes() if err != nil { - log.Debugf("Network node %s failed listing routes: %v", n.id, err) + logger.Debugf("Network node %s failed listing routes: %v", n.id, err) } // attached the routes to the message msg.Routes = routes // send sync message to the newly connected peer if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to send sync message: %v", err) + logger.Debugf("Network failed to send sync message: %v", err) } }() continue // if we already have the peer in our graph, skip further steps } else if err != ErrPeerExists { - log.Debugf("Network got error adding peer %v", err) + logger.Debugf("Network got error adding peer %v", err) continue } - log.Tracef("Network peer exists, refreshing: %s", pbNetPeer.Node.Id) + logger.Tracef("Network peer exists, refreshing: %s", pbNetPeer.Node.Id) // update lastSeen time for the peer if err := n.RefreshPeer(peer.id, peer.link, now); err != nil { - log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err) + logger.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err) } // NOTE: we don't unpack MaxDepth toplogy @@ -883,9 +923,9 @@ func (n *network) processNetChan(listener tunnel.Listener) { // update the link peer.link = m.msg.Header["Micro-Link"] - log.Tracef("Network updating topology of node: %s", n.node.id) + logger.Tracef("Network updating topology of node: %s", n.node.id) if err := n.node.UpdatePeer(peer); err != nil { - log.Debugf("Network failed to update peers: %v", err) + logger.Debugf("Network failed to update peers: %v", err) } // tell the connect loop that we've been discovered @@ -901,7 +941,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { pbNetSync := &pbNet.Sync{} if err := proto.Unmarshal(m.msg.Body, pbNetSync); err != nil { - log.Debugf("Network tunnel [%s] sync unmarshal error: %v", NetworkChannel, err) + logger.Debugf("Network tunnel [%s] sync unmarshal error: %v", NetworkChannel, err) continue } @@ -910,7 +950,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { continue } - log.Debugf("Network received sync message from: %s", pbNetSync.Peer.Node.Id) + logger.Debugf("Network received sync message from: %s", pbNetSync.Peer.Node.Id) peer := &node{ id: pbNetSync.Peer.Node.Id, @@ -926,15 +966,21 @@ func (n *network) processNetChan(listener tunnel.Listener) { // TODO: should we do this only if we manage to add a peer // What should we do if the peer links failed to be updated? if err := n.updatePeerLinks(peer); err != nil { - log.Debugf("Network failed updating peer links: %s", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed updating peer links: %s", err) + } } // add peer to the list of node peers if err := n.node.AddPeer(peer); err == ErrPeerExists { - log.Tracef("Network peer exists, refreshing: %s", peer.id) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Network peer exists, refreshing: %s", peer.id) + } // update lastSeen time for the existing node if err := n.RefreshPeer(peer.id, peer.link, now); err != nil { - log.Debugf("Network failed refreshing peer %s: %v", peer.id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed refreshing peer %s: %v", peer.id, err) + } } } @@ -947,7 +993,9 @@ func (n *network) processNetChan(listener tunnel.Listener) { route := pbUtil.ProtoToRoute(pbRoute) // continue if we are the originator of the route if route.Router == n.router.Options().Id { - log.Debugf("Network node %s skipping route addition: route already present", n.id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network node %s skipping route addition: route already present", n.id) + } continue } @@ -972,7 +1020,9 @@ func (n *network) processNetChan(listener tunnel.Listener) { routes, err := n.router.Table().Query(q...) if err != nil && err != router.ErrRouteNotFound { - log.Debugf("Network node %s failed listing best routes for %s: %v", n.id, route.Service, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network node %s failed listing best routes for %s: %v", n.id, route.Service, err) + } continue } @@ -980,7 +1030,9 @@ func (n *network) processNetChan(listener tunnel.Listener) { // create the new route we have just received if len(routes) == 0 { if err := n.router.Table().Create(route); err != nil && err != router.ErrDuplicateRoute { - log.Debugf("Network node %s failed to add route: %v", n.id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network node %s failed to add route: %v", n.id, err) + } } continue } @@ -1006,14 +1058,18 @@ func (n *network) processNetChan(listener tunnel.Listener) { // add route to the routing table if err := n.router.Table().Create(route); err != nil && err != router.ErrDuplicateRoute { - log.Debugf("Network node %s failed to add route: %v", n.id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network node %s failed to add route: %v", n.id, err) + } } } // update your sync timestamp // NOTE: this might go away as we will be doing full table advert to random peer if err := n.RefreshSync(now); err != nil { - log.Debugf("Network failed refreshing sync time: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed refreshing sync time: %v", err) + } } go func() { @@ -1022,13 +1078,17 @@ func (n *network) processNetChan(listener tunnel.Listener) { // advertise yourself to the new node if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise peers: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to advertise peers: %v", err) + } } }() case "close": pbNetClose := &pbNet.Close{} if err := proto.Unmarshal(m.msg.Body, pbNetClose); err != nil { - log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err) + } continue } @@ -1037,7 +1097,9 @@ func (n *network) processNetChan(listener tunnel.Listener) { continue } - log.Debugf("Network received close message from: %s", pbNetClose.Node.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network received close message from: %s", pbNetClose.Node.Id) + } peer := &node{ id: pbNetClose.Node.Id, @@ -1045,11 +1107,15 @@ func (n *network) processNetChan(listener tunnel.Listener) { } if err := n.DeletePeerNode(peer.id); err != nil { - log.Debugf("Network failed to delete node %s routes: %v", peer.id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to delete node %s routes: %v", peer.id, err) + } } if err := n.prunePeerRoutes(peer); err != nil { - log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) + } } // NOTE: we should maybe advertise this to the network so we converge faster on closed nodes @@ -1166,7 +1232,9 @@ func (n *network) manage() { // set the link via peer links l, ok := n.peerLinks[peer.address] if ok { - log.Debugf("Network link not found for peer %s cannot announce", peer.id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network link not found for peer %s cannot announce", peer.id) + } continue } link = l.Id() @@ -1192,7 +1260,9 @@ func (n *network) manage() { for _, peer := range peers { // advertise yourself to the network if err := n.sendTo("peer", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to advertise peer %s: %v", peer.id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to advertise peer %s: %v", peer.id, err) + } continue } @@ -1214,32 +1284,41 @@ func (n *network) manage() { // unknown link and peer so lets do the connect flow if err := n.sendTo("connect", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to connect %s: %v", peer.id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to connect %s: %v", peer.id, err) + } continue } links[peer.link] = time.Now() } case <-prune.C: - log.Debugf("Network node %s pruning stale peers", n.id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network node %s pruning stale peers", n.id) + } pruned := n.PruneStalePeers(PruneTime) for id, peer := range pruned { - log.Debugf("Network peer exceeded prune time: %s", id) - + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network peer exceeded prune time: %s", id) + } n.Lock() delete(n.peerLinks, peer.address) n.Unlock() if err := n.prunePeerRoutes(peer); err != nil { - log.Debugf("Network failed pruning peer %s routes: %v", id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed pruning peer %s routes: %v", id, err) + } } } // get a list of all routes routes, err := n.options.Router.Table().List() if err != nil { - log.Debugf("Network failed listing routes when pruning peers: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed listing routes when pruning peers: %v", err) + } continue } @@ -1261,7 +1340,9 @@ func (n *network) manage() { } // otherwise delete all the routes originated by it if err := n.pruneRoutes(router.QueryRouter(route.Router)); err != nil { - log.Debugf("Network failed deleting routes by %s: %v", route.Router, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed deleting routes by %s: %v", route.Router, err) + } } } case <-netsync.C: @@ -1291,14 +1372,18 @@ func (n *network) manage() { // get a list of the best routes for each service in our routing table routes, err := n.getProtoRoutes() if err != nil { - log.Debugf("Network node %s failed listing routes: %v", n.id, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network node %s failed listing routes: %v", n.id, err) + } } // attached the routes to the message msg.Routes = routes // send sync message to the newly connected peer if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil { - log.Debugf("Network failed to send sync message: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to send sync message: %v", err) + } } }() case <-resolve.C: @@ -1348,7 +1433,9 @@ func (n *network) sendConnect() { } if err := n.sendMsg("connect", NetworkChannel, msg); err != nil { - log.Debugf("Network failed to send connect message: %s", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to send connect message: %s", err) + } } } @@ -1367,9 +1454,13 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message) if peerNode := n.GetPeerNode(peer.id); peerNode != nil { // update node status when error happens peerNode.status.err.Update(err) - log.Debugf("Network increment peer %v error count to: %d", peerNode, peerNode, peerNode.status.Error().Count()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network increment peer %v error count to: %d", peerNode, peerNode, peerNode.status.Error().Count()) + } if count := peerNode.status.Error().Count(); count == MaxPeerErrors { - log.Debugf("Network peer %v error count exceeded %d. Prunning.", peerNode, MaxPeerErrors) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network peer %v error count exceeded %d. Prunning.", peerNode, MaxPeerErrors) + } n.PrunePeer(peerNode.id) } } @@ -1383,8 +1474,9 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message) id = peer.link } - log.Debugf("Network sending %s message from: %s to %s", method, n.options.Id, id) - + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network sending %s message from: %s to %s", method, n.options.Id, id) + } tmsg := &transport.Message{ Header: map[string]string{ "Micro-Method": method, @@ -1400,12 +1492,18 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message) if err := c.Send(tmsg); err != nil { // TODO: Lookup peer in our graph if peerNode := n.GetPeerNode(peer.id); peerNode != nil { - log.Debugf("Network found peer %s: %v", peer.id, peerNode) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network found peer %s: %v", peer.id, peerNode) + } // update node status when error happens peerNode.status.err.Update(err) - log.Debugf("Network increment node peer %p %v count to: %d", peerNode, peerNode, peerNode.status.Error().Count()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network increment node peer %p %v count to: %d", peerNode, peerNode, peerNode.status.Error().Count()) + } if count := peerNode.status.Error().Count(); count == MaxPeerErrors { - log.Debugf("Network node peer %v count exceeded %d: %d", peerNode, MaxPeerErrors, peerNode.status.Error().Count()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network node peer %v count exceeded %d: %d", peerNode, MaxPeerErrors, peerNode.status.Error().Count()) + } n.PrunePeer(peerNode.id) } } @@ -1431,7 +1529,9 @@ func (n *network) sendMsg(method, channel string, msg proto.Message) error { } n.RUnlock() - log.Debugf("Network sending %s message from: %s", method, n.options.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network sending %s message from: %s", method, n.options.Id) + } return client.Send(&transport.Message{ Header: map[string]string{ @@ -1448,7 +1548,9 @@ func (n *network) updatePeerLinks(peer *node) error { linkId := peer.link - log.Tracef("Network looking up link %s in the peer links", linkId) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Network looking up link %s in the peer links", linkId) + } // lookup the peer link var peerLink tunnel.Link @@ -1464,8 +1566,10 @@ func (n *network) updatePeerLinks(peer *node) error { return ErrPeerLinkNotFound } - // if the peerLink is found in the returned links update peerLinks - log.Tracef("Network updating peer links for peer %s", peer.address) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + // if the peerLink is found in the returned links update peerLinks + logger.Tracef("Network updating peer links for peer %s", peer.address) + } // lookup a link and update it if better link is available if link, ok := n.peerLinks[peer.address]; ok { @@ -1547,7 +1651,9 @@ func (n *network) connect() { // well functioning tunnel clients as "discovered" will be false until the // n.discovered channel is read at some point later on. if err := n.createClients(); err != nil { - log.Debugf("Failed to recreate network/control clients: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Failed to recreate network/control clients: %v", err) + } continue } @@ -1756,7 +1862,9 @@ func (n *network) Close() error { } if err := n.sendMsg("close", NetworkChannel, msg); err != nil { - log.Debugf("Network failed to send close message: %s", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Network failed to send close message: %s", err) + } } <-time.After(time.Millisecond * 100) } diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index d9edf14a..f2acdc4c 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -15,7 +15,7 @@ import ( "github.com/micro/go-micro/v2/codec" "github.com/micro/go-micro/v2/codec/bytes" "github.com/micro/go-micro/v2/errors" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/metadata" "github.com/micro/go-micro/v2/proxy" "github.com/micro/go-micro/v2/router" @@ -163,7 +163,9 @@ func (p *Proxy) filterRoutes(ctx context.Context, routes []router.Route) []route filteredRoutes = append(filteredRoutes, route) } - log.Tracef("Proxy filtered routes %+v\n", filteredRoutes) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Proxy filtered routes %+v\n", filteredRoutes) + } return filteredRoutes } @@ -259,7 +261,9 @@ func (p *Proxy) manageRoutes(route router.Route, action string) error { p.Lock() defer p.Unlock() - log.Tracef("Proxy taking route action %v %+v\n", action, route) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Proxy taking route action %v %+v\n", action, route) + } switch action { case "create", "update": @@ -309,7 +313,9 @@ func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error { // TODO: check that we're not broadcast storming by sending to the same topic // that we're actually subscribed to - log.Tracef("Proxy received message for %s", msg.Topic()) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Proxy received message for %s", msg.Topic()) + } var errors []string @@ -350,7 +356,9 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server return errors.BadRequest("go.micro.proxy", "service name is blank") } - log.Tracef("Proxy received request for %s", service) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Proxy received request for %s", service) + } // are we network routing or local routing if len(p.Links) == 0 { @@ -410,7 +418,9 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server opts = append(opts, client.WithAddress(addresses...)) } - log.Tracef("Proxy calling %+v\n", addresses) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Proxy calling %+v\n", addresses) + } // serve the normal way return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, opts...) } @@ -433,7 +443,9 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server continue } - log.Tracef("Proxy using route %+v\n", route) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Proxy using route %+v\n", route) + } // set the address to call addresses := toNodes([]router.Route{route}) diff --git a/registry/cache/cache.go b/registry/cache/cache.go index 689c6e35..a68e32ef 100644 --- a/registry/cache/cache.go +++ b/registry/cache/cache.go @@ -7,7 +7,7 @@ import ( "sync" "time" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" ) @@ -339,7 +339,9 @@ func (c *cache) run() { c.setStatus(err) if a > 3 { - log.Info("rcache: ", err, " backing off ", d) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Info("rcache: ", err, " backing off ", d) + } a = 0 } @@ -362,7 +364,9 @@ func (c *cache) run() { c.setStatus(err) if b > 3 { - log.Info("rcache: ", err, " backing off ", d) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Info("rcache: ", err, " backing off ", d) + } b = 0 } diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go index 40ba3b03..630fdbaf 100644 --- a/registry/etcd/etcd.go +++ b/registry/etcd/etcd.go @@ -15,7 +15,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" hash "github.com/mitchellh/hashstructure" "go.uber.org/zap" @@ -191,13 +191,17 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op // renew the lease if it exists if leaseID > 0 { - log.Tracef("Renewing existing lease for %s %d", s.Name, leaseID) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Renewing existing lease for %s %d", s.Name, leaseID) + } if _, err := e.client.KeepAliveOnce(context.TODO(), leaseID); err != nil { if err != rpctypes.ErrLeaseNotFound { return err } - log.Tracef("Lease not found for %s %d", s.Name, leaseID) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Lease not found for %s %d", s.Name, leaseID) + } // lease not found do register leaseNotFound = true } @@ -216,7 +220,9 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op // the service is unchanged, skip registering if ok && v == h && !leaseNotFound { - log.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Service %s node %s unchanged skipping registration", s.Name, node.Id) + } return nil } @@ -245,7 +251,9 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op } } - log.Tracef("Registering %s id %s with lease %v and ttl %v", service.Name, node.Id, lgr, options.TTL) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Registering %s id %s with lease %v and ttl %v", service.Name, node.Id, lgr, options.TTL) + } // create an entry for the node if lgr != nil { _, err = e.client.Put(ctx, nodePath(service.Name, node.Id), encode(service), clientv3.WithLease(lgr.ID)) @@ -284,7 +292,9 @@ func (e *etcdRegistry) Deregister(s *registry.Service) error { ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) defer cancel() - log.Tracef("Deregistering %s id %s", s.Name, node.Id) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Deregistering %s id %s", s.Name, node.Id) + } _, err := e.client.Delete(ctx, nodePath(s.Name, node.Id)) if err != nil { return err diff --git a/registry/kubernetes/watcher.go b/registry/kubernetes/watcher.go index 5c54a1b6..89bffed6 100644 --- a/registry/kubernetes/watcher.go +++ b/registry/kubernetes/watcher.go @@ -6,7 +6,7 @@ import ( "strings" "sync" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/util/kubernetes/client" ) @@ -132,7 +132,9 @@ func (k *k8sWatcher) buildPodResults(pod *client.Pod, cache *client.Pod) []*regi func (k *k8sWatcher) handleEvent(event client.Event) { var pod client.Pod if err := json.Unmarshal([]byte(event.Object), &pod); err != nil { - log.Info("K8s Watcher: Couldnt unmarshal event object from pod") + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Info("K8s Watcher: Couldnt unmarshal event object from pod") + } return } diff --git a/registry/mdns_registry.go b/registry/mdns_registry.go index 8563ecac..7be88162 100644 --- a/registry/mdns_registry.go +++ b/registry/mdns_registry.go @@ -11,7 +11,7 @@ import ( "time" "github.com/google/uuid" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/mdns" ) @@ -278,7 +278,9 @@ func (m *mdnsRegistry) GetService(service string) ([]*Service, error) { } else if e.AddrV6 != nil { addr = "[" + e.AddrV6.String() + "]" } else { - log.Infof("[mdns]: invalid endpoint received: %v", e) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("[mdns]: invalid endpoint received: %v", e) + } continue } s.Nodes = append(s.Nodes, &Node{ diff --git a/registry/memory/memory.go b/registry/memory/memory.go index 3dc96f46..cde49b96 100644 --- a/registry/memory/memory.go +++ b/registry/memory/memory.go @@ -7,7 +7,7 @@ import ( "time" "github.com/google/uuid" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" ) @@ -75,7 +75,9 @@ func (m *Registry) ttlPrune() { for version, record := range records { for id, n := range record.Nodes { if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL { - log.Debugf("Registry TTL expired for node %s of service %s", n.Id, name) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry TTL expired for node %s of service %s", n.Id, name) + } delete(m.records[name][version].Nodes, id) } } @@ -158,7 +160,9 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption if _, ok := m.records[s.Name][s.Version]; !ok { m.records[s.Name][s.Version] = r - log.Debugf("Registry added new service: %s, version: %s", s.Name, s.Version) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry added new service: %s, version: %s", s.Name, s.Version) + } go m.sendEvent(®istry.Result{Action: "update", Service: s}) return nil } @@ -184,14 +188,18 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption } if addedNodes { - log.Debugf("Registry added new node to service: %s, version: %s", s.Name, s.Version) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry added new node to service: %s, version: %s", s.Name, s.Version) + } go m.sendEvent(®istry.Result{Action: "update", Service: s}) return nil } // refresh TTL and timestamp for _, n := range s.Nodes { - log.Debugf("Updated registration for service: %s, version: %s", s.Name, s.Version) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Updated registration for service: %s, version: %s", s.Name, s.Version) + } m.records[s.Name][s.Version].Nodes[n.Id].TTL = options.TTL m.records[s.Name][s.Version].Nodes[n.Id].LastSeen = time.Now() } @@ -207,18 +215,24 @@ func (m *Registry) Deregister(s *registry.Service) error { if _, ok := m.records[s.Name][s.Version]; ok { for _, n := range s.Nodes { if _, ok := m.records[s.Name][s.Version].Nodes[n.Id]; ok { - log.Debugf("Registry removed node from service: %s, version: %s", s.Name, s.Version) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry removed node from service: %s, version: %s", s.Name, s.Version) + } delete(m.records[s.Name][s.Version].Nodes, n.Id) } } if len(m.records[s.Name][s.Version].Nodes) == 0 { delete(m.records[s.Name], s.Version) - log.Debugf("Registry removed service: %s, version: %s", s.Name, s.Version) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry removed service: %s, version: %s", s.Name, s.Version) + } } } if len(m.records[s.Name]) == 0 { delete(m.records, s.Name) - log.Debugf("Registry removed service: %s", s.Name) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry removed service: %s", s.Name) + } } go m.sendEvent(®istry.Result{Action: "delete", Service: s}) } diff --git a/router/default.go b/router/default.go index 47da6ba0..9026886b 100644 --- a/router/default.go +++ b/router/default.go @@ -10,7 +10,7 @@ import ( "time" "github.com/google/uuid" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" ) @@ -308,11 +308,15 @@ func (m adverts) process(a *advert) error { // suppress/recover the event based on its penalty level switch { case a.penalty > AdvertSuppress && !a.isSuppressed: - log.Debugf("Router suppressing advert %d %.2f for route %s %s", hash, a.penalty, service, address) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router suppressing advert %d %.2f for route %s %s", hash, a.penalty, service, address) + } a.isSuppressed = true a.suppressTime = time.Now() case a.penalty < AdvertRecover && a.isSuppressed: - log.Debugf("Router recovering advert %d %.2f for route %s %s", hash, a.penalty, service, address) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router recovering advert %d %.2f for route %s %s", hash, a.penalty, service, address) + } a.isSuppressed = false } @@ -357,14 +361,18 @@ func (r *router) advertiseEvents() error { // routing table watcher w, err = r.Watch() if err != nil { - log.Errorf("Error creating watcher: %v", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error creating watcher: %v", err) + } time.Sleep(time.Second) continue } } if err := r.watchTable(w); err != nil { - log.Errorf("Error watching table: %v", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error watching table: %v", err) + } time.Sleep(time.Second) } @@ -391,7 +399,9 @@ func (r *router) advertiseEvents() error { for key, advert := range adverts { // process the advert if err := adverts.process(advert); err != nil { - log.Debugf("Router failed processing advert %d: %v", key, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router failed processing advert %d: %v", key, err) + } continue } // if suppressed go to the next advert @@ -416,7 +426,9 @@ func (r *router) advertiseEvents() error { // advertise events to subscribers if len(events) > 0 { - log.Debugf("Router publishing %d events", len(events)) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router publishing %d events", len(events)) + } go r.publishAdvert(RouteUpdate, events) } case e := <-r.eventChan: @@ -437,7 +449,9 @@ func (r *router) advertiseEvents() error { now := time.Now() - log.Debugf("Router processing table event %s for service %s %s", e.Type, e.Route.Service, e.Route.Address) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router processing table event %s for service %s %s", e.Type, e.Route.Service, e.Route.Address) + } // check if we have already registered the route hash := e.Route.Hash() @@ -459,7 +473,9 @@ func (r *router) advertiseEvents() error { // process the advert if err := adverts.process(a); err != nil { - log.Debugf("Router error processing advert %d: %v", hash, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router error processing advert %d: %v", hash, err) + } continue } @@ -467,7 +483,9 @@ func (r *router) advertiseEvents() error { a.lastSeen = now // increment the penalty a.penalty += Penalty - log.Debugf("Router advert %d for route %s %s event penalty: %f", hash, a.event.Route.Service, a.event.Route.Address, a.penalty) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router advert %d for route %s %s event penalty: %f", hash, a.event.Route.Service, a.event.Route.Address, a.penalty) + } case <-r.exit: if w != nil { w.Stop() @@ -542,14 +560,18 @@ func (r *router) Start() error { if w == nil { w, err = r.options.Registry.Watch() if err != nil { - log.Errorf("failed creating registry watcher: %v", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("failed creating registry watcher: %v", err) + } time.Sleep(time.Second) continue } } if err := r.watchRegistry(w); err != nil { - log.Errorf("Error watching the registry: %v", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error watching the registry: %v", err) + } time.Sleep(time.Second) } @@ -606,7 +628,9 @@ func (r *router) Advertise() (<-chan *Advert, error) { return default: if err := r.advertiseEvents(); err != nil { - log.Errorf("Error adveritising events: %v", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error adveritising events: %v", err) + } } } }() @@ -626,19 +650,25 @@ func (r *router) Process(a *Advert) error { return events[i].Timestamp.Before(events[j].Timestamp) }) - log.Tracef("Router %s processing advert from: %s", r.options.Id, a.Id) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Router %s processing advert from: %s", r.options.Id, a.Id) + } for _, event := range events { // skip if the router is the origin of this route if event.Route.Router == r.options.Id { - log.Tracef("Router skipping processing its own route: %s", r.options.Id) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Router skipping processing its own route: %s", r.options.Id) + } continue } // create a copy of the route route := event.Route action := event.Type - log.Tracef("Router %s applying %s from router %s for service %s %s", r.options.Id, action, route.Router, route.Service, route.Address) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("Router %s applying %s from router %s for service %s %s", r.options.Id, action, route.Router, route.Service, route.Address) + } if err := r.manageRoute(route, action.String()); err != nil { return fmt.Errorf("failed applying action %s to routing table: %s", action, err) @@ -661,7 +691,9 @@ func (r *router) flushRouteEvents(evType EventType) ([]*Event, error) { return nil, err } - log.Debugf("Router advertising %d routes with strategy %s", len(routes), r.options.Advertise) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router advertising %d routes with strategy %s", len(routes), r.options.Advertise) + } // build a list of events to advertise events := make([]*Event, len(routes)) diff --git a/router/default_test.go b/router/default_test.go index 42a79595..19d36423 100644 --- a/router/default_test.go +++ b/router/default_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - log "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry/memory" ) @@ -30,7 +29,7 @@ func TestRouterStartStop(t *testing.T) { if err := r.Stop(); err != nil { t.Errorf("failed to stop router: %v", err) } - log.Debugf("TestRouterStartStop STOPPED") + t.Logf("TestRouterStartStop STOPPED") } func TestRouterAdvertise(t *testing.T) { @@ -50,7 +49,7 @@ func TestRouterAdvertise(t *testing.T) { // receive announce event ann := <-ch - log.Debugf("received announce advert: %v", ann) + t.Logf("received announce advert: %v", ann) // Generate random unique routes nrRoutes := 5 @@ -82,9 +81,9 @@ func TestRouterAdvertise(t *testing.T) { wg.Done() defer close(createDone) for _, route := range routes { - log.Debugf("Creating route %v", route) + t.Logf("Creating route %v", route) if err := r.Table().Create(route); err != nil { - log.Debugf("Failed to create route: %v", err) + t.Logf("Failed to create route: %v", err) errChan <- err return } @@ -106,7 +105,7 @@ func TestRouterAdvertise(t *testing.T) { t.Errorf("failed advertising events: %v", advertErr) default: // do nothing for now - log.Debugf("Router advert received: %v", advert) + t.Logf("Router advert received: %v", advert) adverts += len(advert.Events) } return diff --git a/router/table.go b/router/table.go index 649cea65..cd5e5815 100644 --- a/router/table.go +++ b/router/table.go @@ -6,7 +6,7 @@ import ( "time" "github.com/google/uuid" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" ) var ( @@ -68,7 +68,9 @@ func (t *table) Create(r Route) error { // add new route to the table for the route destination if _, ok := t.routes[service][sum]; !ok { t.routes[service][sum] = r - log.Debugf("Router emitting %s for route: %s", Create, r.Address) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router emitting %s for route: %s", Create, r.Address) + } go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r}) return nil } @@ -93,7 +95,9 @@ func (t *table) Delete(r Route) error { } delete(t.routes[service], sum) - log.Debugf("Router emitting %s for route: %s", Delete, r.Address) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router emitting %s for route: %s", Delete, r.Address) + } go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r}) return nil @@ -114,7 +118,9 @@ func (t *table) Update(r Route) error { if _, ok := t.routes[service][sum]; !ok { t.routes[service][sum] = r - log.Debugf("Router emitting %s for route: %s", Update, r.Address) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Router emitting %s for route: %s", Update, r.Address) + } go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r}) return nil } diff --git a/runtime/default.go b/runtime/default.go index fdd1ed25..8230e8df 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -5,7 +5,7 @@ import ( "sync" "time" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" ) type runtime struct { @@ -71,7 +71,9 @@ func (r *runtime) run(events <-chan Event) { return nil } - log.Debugf("Runtime updating service %s", name) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime updating service %s", name) + } // this will cause a delete followed by created if err := r.Update(service.Service); err != nil { @@ -97,9 +99,13 @@ func (r *runtime) run(events <-chan Event) { } // TODO: check service error - log.Debugf("Runtime starting %s", service.Name) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime starting %s", service.Name) + } if err := service.Start(); err != nil { - log.Debugf("Runtime error starting %s: %v", service.Name, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime error starting %s: %v", service.Name, err) + } } } r.RUnlock() @@ -108,12 +114,18 @@ func (r *runtime) run(events <-chan Event) { continue } // TODO: check service error - log.Debugf("Runtime starting service %s", service.Name) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime starting service %s", service.Name) + } if err := service.Start(); err != nil { - log.Debugf("Runtime error starting service %s: %v", service.Name, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime error starting service %s: %v", service.Name, err) + } } case event := <-events: - log.Debugf("Runtime received notification event: %v", event) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime received notification event: %v", event) + } // NOTE: we only handle Update events for now switch event.Type { case Update: @@ -122,11 +134,15 @@ func (r *runtime) run(events <-chan Event) { service, ok := r.services[event.Service] r.RUnlock() if !ok { - log.Debugf("Runtime unknown service: %s", event.Service) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime unknown service: %s", event.Service) + } continue } if err := processEvent(event, service); err != nil { - log.Debugf("Runtime error updating service %s: %v", event.Service, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime error updating service %s: %v", event.Service, err) + } } continue } @@ -138,12 +154,16 @@ func (r *runtime) run(events <-chan Event) { // if blank service was received we update all services for _, service := range services { if err := processEvent(event, service); err != nil { - log.Debugf("Runtime error updating service %s: %v", service.Name, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime error updating service %s: %v", service.Name, err) + } } } } case <-r.closed: - log.Debugf("Runtime stopped") + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime stopped") + } return } } @@ -242,7 +262,9 @@ func (r *runtime) Delete(s *Service) error { r.Lock() defer r.Unlock() - log.Debugf("Runtime deleting service %s", s.Name) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime deleting service %s", s.Name) + } if s, ok := r.services[s.Name]; ok { // check if running if s.Running() { @@ -295,7 +317,9 @@ func (r *runtime) Start() error { events, err = r.options.Scheduler.Notify() if err != nil { // TODO: should we bail here? - log.Debugf("Runtime failed to start update notifier") + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime failed to start update notifier") + } } } @@ -324,7 +348,9 @@ func (r *runtime) Stop() error { // stop all the services for _, service := range r.services { - log.Debugf("Runtime stopping %s", service.Name) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime stopping %s", service.Name) + } service.Stop() } // stop the scheduler diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 760a824a..a6590547 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -7,7 +7,7 @@ import ( "sync" "time" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/runtime" "github.com/micro/go-micro/v2/util/kubernetes/client" ) @@ -129,7 +129,9 @@ func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, e status = "ready" } } - log.Debugf("Runtime setting %s service deployment status: %v", name, status) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime setting %s service deployment status: %v", name, status) + } svc.Metadata["status"] = status } } @@ -156,7 +158,9 @@ func (k *kubernetes) run(events <-chan runtime.Event) { // - do we even need the ticker for k8s services? case event := <-events: // NOTE: we only handle Update events for now - log.Debugf("Runtime received notification event: %v", event) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime received notification event: %v", event) + } switch event.Type { case runtime.Update: // only process if there's an actual service @@ -188,7 +192,9 @@ func (k *kubernetes) run(events <-chan runtime.Event) { }, labels) if err != nil { - log.Debugf("Runtime update failed to get service %s: %v", event.Service, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime update failed to get service %s: %v", event.Service, err) + } continue } @@ -215,16 +221,21 @@ func (k *kubernetes) run(events <-chan runtime.Event) { // update the build time service.Spec.Template.Metadata.Annotations["build"] = event.Timestamp.Format(time.RFC3339) - - log.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name) + } if err := k.client.Update(deploymentResource(&service)); err != nil { - log.Debugf("Runtime failed to update service %s: %v", event.Service, err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime failed to update service %s: %v", event.Service, err) + } continue } } } case <-k.closed: - log.Debugf("Runtime stopped") + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime stopped") + } return } } @@ -313,7 +324,9 @@ func (k *kubernetes) List() ([]*runtime.Service, error) { "micro": k.options.Type, } - log.Debugf("Runtime listing all micro services") + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime listing all micro services") + } return k.getService(labels) } @@ -365,7 +378,9 @@ func (k *kubernetes) Start() error { events, err = k.options.Scheduler.Notify() if err != nil { // TODO: should we bail here? - log.Debugf("Runtime failed to start update notifier") + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime failed to start update notifier") + } } } diff --git a/runtime/kubernetes/service.go b/runtime/kubernetes/service.go index 15d05bcb..73a9099d 100644 --- a/runtime/kubernetes/service.go +++ b/runtime/kubernetes/service.go @@ -4,7 +4,7 @@ import ( "encoding/json" "strings" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/runtime" "github.com/micro/go-micro/v2/util/kubernetes/api" "github.com/micro/go-micro/v2/util/kubernetes/client" @@ -108,7 +108,9 @@ func serviceResource(s *client.Service) *client.Resource { func (s *service) Start(k client.Client) error { // create deployment first; if we fail, we dont create service if err := k.Create(deploymentResource(s.kdeploy)); err != nil { - log.Debugf("Runtime failed to create deployment: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime failed to create deployment: %v", err) + } s.Status("error", err) v := parseError(err) if v.Reason == "AlreadyExists" { @@ -118,7 +120,9 @@ func (s *service) Start(k client.Client) error { } // create service now that the deployment has been created if err := k.Create(serviceResource(s.kservice)); err != nil { - log.Debugf("Runtime failed to create service: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime failed to create service: %v", err) + } s.Status("error", err) v := parseError(err) if v.Reason == "AlreadyExists" { @@ -135,13 +139,17 @@ func (s *service) Start(k client.Client) error { func (s *service) Stop(k client.Client) error { // first attempt to delete service if err := k.Delete(serviceResource(s.kservice)); err != nil { - log.Debugf("Runtime failed to delete service: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime failed to delete service: %v", err) + } s.Status("error", err) return err } // delete deployment once the service has been deleted if err := k.Delete(deploymentResource(s.kdeploy)); err != nil { - log.Debugf("Runtime failed to delete deployment: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime failed to delete deployment: %v", err) + } s.Status("error", err) return err } @@ -153,12 +161,16 @@ func (s *service) Stop(k client.Client) error { func (s *service) Update(k client.Client) error { if err := k.Update(deploymentResource(s.kdeploy)); err != nil { - log.Debugf("Runtime failed to update deployment: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime failed to update deployment: %v", err) + } s.Status("error", err) return err } if err := k.Update(serviceResource(s.kservice)); err != nil { - log.Debugf("Runtime failed to update service: %v", err) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime failed to update service: %v", err) + } return err } diff --git a/runtime/local/build/docker/docker.go b/runtime/local/build/docker/docker.go index 01bc18a4..e164df55 100644 --- a/runtime/local/build/docker/docker.go +++ b/runtime/local/build/docker/docker.go @@ -9,7 +9,7 @@ import ( "path/filepath" docker "github.com/fsouza/go-dockerclient" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/runtime/local/build" ) @@ -84,7 +84,7 @@ func NewBuilder(opts ...build.Option) build.Builder { endpoint := "unix:///var/run/docker.sock" client, err := docker.NewClient(endpoint) if err != nil { - log.Fatal(err) + logger.Fatal(err) } return &Builder{ Options: options, diff --git a/runtime/service.go b/runtime/service.go index c93bec73..c211d91b 100644 --- a/runtime/service.go +++ b/runtime/service.go @@ -6,7 +6,7 @@ import ( "sync" "time" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/runtime/local/build" "github.com/micro/go-micro/v2/runtime/local/process" proc "github.com/micro/go-micro/v2/runtime/local/process/os" @@ -111,7 +111,9 @@ func (s *service) Start() error { delete(s.Metadata, "error") // TODO: pull source & build binary - log.Debugf("Runtime service %s forking new process", s.Service.Name) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime service %s forking new process", s.Service.Name) + } p, err := s.Process.Fork(s.Exec) if err != nil { s.Metadata["status"] = "error" diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 66539132..4015780d 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -17,7 +17,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/errors" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" meta "github.com/micro/go-micro/v2/metadata" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/server" @@ -358,8 +358,10 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) { defer func() { if r := recover(); r != nil { - log.Error("panic recovered: ", r) - log.Error(string(debug.Stack())) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("panic recovered: ", r) + logger.Error(string(debug.Stack())) + } err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r) } }() @@ -658,7 +660,9 @@ func (g *grpcServer) Register() error { g.Unlock() if !registered { - log.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) + } } // create registry options @@ -693,7 +697,9 @@ func (g *grpcServer) Register() error { opts = append(opts, broker.DisableAutoAck()) } - log.Infof("Subscribing to topic: %s", sb.Topic()) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("Subscribing to topic: %s", sb.Topic()) + } sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil { return err @@ -745,7 +751,9 @@ func (g *grpcServer) Deregister() error { Nodes: []*registry.Node{node}, } - log.Infof("Deregistering node: %s", node.Id) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("Deregistering node: %s", node.Id) + } if err := config.Registry.Deregister(service); err != nil { return err } @@ -761,7 +769,9 @@ func (g *grpcServer) Deregister() error { for sb, subs := range g.subscribers { for _, sub := range subs { - log.Infof("Unsubscribing from topic: %s", sub.Topic()) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("Unsubscribing from topic: %s", sub.Topic()) + } sub.Unsubscribe() } g.subscribers[sb] = nil @@ -807,7 +817,9 @@ func (g *grpcServer) Start() error { } } - log.Infof("Server [grpc] Listening on %s", ts.Addr().String()) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("Server [grpc] Listening on %s", ts.Addr().String()) + } g.Lock() g.opts.Address = ts.Addr().String() g.Unlock() @@ -819,18 +831,24 @@ func (g *grpcServer) Start() error { return err } - log.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + } } // announce self to the world if err := g.Register(); err != nil { - log.Errorf("Server register error: ", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Server register error: ", err) + } } // micro: go ts.Accept(s.accept) go func() { if err := g.srv.Serve(ts); err != nil { - log.Errorf("gRPC Server start error: ", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("gRPC Server start error: ", err) + } } }() @@ -852,7 +870,9 @@ func (g *grpcServer) Start() error { // register self on interval case <-t.C: if err := g.Register(); err != nil { - log.Error("Server register error: ", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("Server register error: ", err) + } } // wait for exit case ch = <-g.exit: @@ -862,7 +882,9 @@ func (g *grpcServer) Start() error { // deregister self if err := g.Deregister(); err != nil { - log.Error("Server deregister error: ", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("Server deregister error: ", err) + } } // wait for waitgroup @@ -887,7 +909,9 @@ func (g *grpcServer) Start() error { // close transport ch <- nil - log.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + } // disconnect broker config.Broker.Disconnect() }() diff --git a/server/grpc/server.go b/server/grpc/server.go index 81bf9f7c..024a90f1 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -14,7 +14,7 @@ import ( "unicode" "unicode/utf8" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/server" ) @@ -86,7 +86,9 @@ func prepareEndpoint(method reflect.Method) *methodType { replyType = mtype.In(3) contextType = mtype.In(1) default: - log.Error("method", mname, "of", mtype, "has wrong number of ins:", mtype.NumIn()) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("method", mname, "of", mtype, "has wrong number of ins:", mtype.NumIn()) + } return nil } @@ -94,7 +96,9 @@ func prepareEndpoint(method reflect.Method) *methodType { // check stream type streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() if !argType.Implements(streamType) { - log.Error(mname, "argument does not implement Streamer interface:", argType) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(mname, "argument does not implement Streamer interface:", argType) + } return nil } } else { @@ -102,30 +106,40 @@ func prepareEndpoint(method reflect.Method) *methodType { // First arg need not be a pointer. if !isExportedOrBuiltinType(argType) { - log.Error(mname, "argument type not exported:", argType) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(mname, "argument type not exported:", argType) + } return nil } if replyType.Kind() != reflect.Ptr { - log.Error("method", mname, "reply type not a pointer:", replyType) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("method", mname, "reply type not a pointer:", replyType) + } return nil } // Reply type must be exported. if !isExportedOrBuiltinType(replyType) { - log.Error("method", mname, "reply type not exported:", replyType) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("method", mname, "reply type not exported:", replyType) + } return nil } } // Endpoint() needs one out. if mtype.NumOut() != 1 { - log.Error("method", mname, "has wrong number of outs:", mtype.NumOut()) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("method", mname, "has wrong number of outs:", mtype.NumOut()) + } return nil } // The return type of the method must be error. if returnType := mtype.Out(0); returnType != typeOfError { - log.Error("method", mname, "returns", returnType.String(), "not error") + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("method", mname, "returns", returnType.String(), "not error") + } return nil } return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} @@ -142,11 +156,13 @@ func (server *rServer) register(rcvr interface{}) error { s.rcvr = reflect.ValueOf(rcvr) sname := reflect.Indirect(s.rcvr).Type().Name() if sname == "" { - log.Fatal("rpc: no service name for type", s.typ.String()) + logger.Fatal("rpc: no service name for type", s.typ.String()) } if !isExported(sname) { s := "rpc Register: type " + sname + " is not exported" - log.Error(s) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(s) + } return errors.New(s) } if _, present := server.serviceMap[sname]; present { @@ -165,7 +181,9 @@ func (server *rServer) register(rcvr interface{}) error { if len(s.method) == 0 { s := "rpc Register: type " + sname + " has no exported methods of suitable type" - log.Error(s) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(s) + } return errors.New(s) } server.serviceMap[s.name] = s diff --git a/server/grpc/subscriber.go b/server/grpc/subscriber.go index c36071fd..5ac957c2 100644 --- a/server/grpc/subscriber.go +++ b/server/grpc/subscriber.go @@ -9,7 +9,7 @@ import ( "github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/errors" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/metadata" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/server" @@ -171,8 +171,10 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke defer func() { if r := recover(); r != nil { - log.Error("panic recovered: ", r) - log.Error(string(debug.Stack())) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error("panic recovered: ", r) + logger.Error(string(debug.Stack())) + } err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r) } }() diff --git a/server/rpc_router.go b/server/rpc_router.go index 82b2c6b9..33561449 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -20,7 +20,6 @@ import ( "github.com/micro/go-micro/v2/codec" merrors "github.com/micro/go-micro/v2/errors" - log "github.com/micro/go-micro/v2/logger" ) var ( diff --git a/server/rpc_server.go b/server/rpc_server.go index 173f4f66..4619fa64 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -15,7 +15,7 @@ import ( "github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/codec" raw "github.com/micro/go-micro/v2/codec/bytes" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/metadata" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/transport" @@ -158,8 +158,10 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // recover any panics if r := recover(); r != nil { - log.Error("panic recovered: ", r) - log.Error(string(debug.Stack())) + if logger.V(logger.ErrorLevel, log) { + log.Error("panic recovered: ", r) + log.Error(string(debug.Stack())) + } } }() @@ -377,8 +379,10 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // recover any panics for outbound process if r := recover(); r != nil { - log.Error("panic recovered: ", r) - log.Error(string(debug.Stack())) + if logger.V(logger.ErrorLevel, log) { + log.Error("panic recovered: ", r) + log.Error(string(debug.Stack())) + } } }() diff --git a/server/server.go b/server/server.go index e11bc14d..7b121b70 100644 --- a/server/server.go +++ b/server/server.go @@ -10,7 +10,7 @@ import ( "github.com/google/uuid" "github.com/micro/go-micro/v2/codec" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" ) @@ -139,6 +139,7 @@ var ( // NewServer creates a new server NewServer func(...Option) Server = newRpcServer + log = logger.NewHelper(logger.DefaultLogger).WithFields(map[string]interface{}{"service": "server"}) ) // DefaultOptions returns config options for the default service @@ -200,21 +201,26 @@ func Run() error { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) - log.Infof("Received signal %s", <-ch) - + if logger.V(logger.InfoLevel, log) { + log.Infof("Received signal %s", <-ch) + } return Stop() } // Start starts the default server func Start() error { config := DefaultServer.Options() - log.Infof("Starting server %s id %s", config.Name, config.Id) + if logger.V(logger.InfoLevel, log) { + log.Infof("Starting server %s id %s", config.Name, config.Id) + } return DefaultServer.Start() } // Stop stops the default server func Stop() error { - log.Infof("Stopping server") + if logger.V(logger.InfoLevel, log) { + log.Infof("Stopping server") + } return DefaultServer.Stop() } diff --git a/service.go b/service.go index 3ce750ae..a6c77469 100644 --- a/service.go +++ b/service.go @@ -14,7 +14,7 @@ import ( "github.com/micro/go-micro/v2/debug/service/handler" "github.com/micro/go-micro/v2/debug/stats" "github.com/micro/go-micro/v2/debug/trace" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/plugin" "github.com/micro/go-micro/v2/server" "github.com/micro/go-micro/v2/util/config" @@ -77,12 +77,12 @@ func (s *service) Init(opts ...Option) { // load the plugin c, err := plugin.Load(p) if err != nil { - log.Fatal(err) + logger.Fatal(err) } // initialise the plugin if err := plugin.Init(c); err != nil { - log.Fatal(err) + logger.Fatal(err) } } @@ -101,7 +101,7 @@ func (s *service) Init(opts ...Option) { cmd.Server(&s.opts.Server), cmd.Profile(&s.opts.Profile), ); err != nil { - log.Fatal(err) + logger.Fatal(err) } // TODO: replace Cmd.Init with config.Load @@ -194,7 +194,9 @@ func (s *service) Run() error { defer s.opts.Profile.Stop() } - log.Infof("Starting [service] %s", s.Name()) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("Starting [service] %s", s.Name()) + } if err := s.Start(); err != nil { return err diff --git a/store/cockroach/cockroach.go b/store/cockroach/cockroach.go index 63e16f15..fcda1d6f 100644 --- a/store/cockroach/cockroach.go +++ b/store/cockroach/cockroach.go @@ -10,7 +10,7 @@ import ( "time" "github.com/lib/pq" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/store" "github.com/pkg/errors" ) @@ -264,7 +264,7 @@ func NewStore(opts ...store.Option) store.Store { // configure the store if err := s.configure(); err != nil { - log.Fatal(err) + logger.Fatal(err) } // return store diff --git a/sync/cron.go b/sync/cron.go index caf51e71..fddb7e49 100644 --- a/sync/cron.go +++ b/sync/cron.go @@ -5,7 +5,7 @@ import ( "math" "time" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/sync/leader/etcd" "github.com/micro/go-micro/v2/sync/task" "github.com/micro/go-micro/v2/sync/task/local" @@ -35,7 +35,9 @@ func (c *syncCron) Schedule(s task.Schedule, t task.Command) error { // leader election e, err := c.opts.Leader.Elect(id) if err != nil { - log.Errorf("[cron] leader election error: %v", err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[cron] leader election error: %v", err) + } time.Sleep(backoff(i)) i++ continue @@ -55,9 +57,13 @@ func (c *syncCron) Schedule(s task.Schedule, t task.Command) error { break Tick } - log.Infof("[cron] executing command %s", t.Name) + if logger.V(logger.InfoLevel, logger.DefaultLogger) { + logger.Infof("[cron] executing command %s", t.Name) + } if err := c.opts.Task.Run(t); err != nil { - log.Errorf("[cron] error executing command %s: %v", t.Name, err) + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("[cron] error executing command %s: %v", t.Name, err) + } } // leader revoked case <-r: diff --git a/transport/grpc/handler.go b/transport/grpc/handler.go index d5a626a0..01ab2049 100644 --- a/transport/grpc/handler.go +++ b/transport/grpc/handler.go @@ -4,7 +4,7 @@ import ( "runtime/debug" "github.com/micro/go-micro/v2/errors" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/transport" pb "github.com/micro/go-micro/v2/transport/grpc/proto" "google.golang.org/grpc/peer" @@ -30,7 +30,7 @@ func (m *microTransport) Stream(ts pb.Transport_StreamServer) (err error) { defer func() { if r := recover(); r != nil { - log.Error(r, string(debug.Stack())) + logger.Error(r, string(debug.Stack())) sock.Close() err = errors.InternalServerError("go.micro.transport", "panic recovered: %v", r) } diff --git a/tunnel/crypto.go b/tunnel/crypto.go index fd76eb95..9f70c444 100644 --- a/tunnel/crypto.go +++ b/tunnel/crypto.go @@ -24,20 +24,8 @@ func hash(key []byte) []byte { } // Encrypt encrypts data and returns the encrypted data -func Encrypt(data []byte, key []byte) ([]byte, error) { - // generate a new AES cipher using our 32 byte key - c, err := aes.NewCipher(hash(key)) - if err != nil { - return nil, err - } - - // gcm or Galois/Counter Mode, is a mode of operation - // for symmetric key cryptographic block ciphers - // - https://en.wikipedia.org/wiki/Galois/Counter_Mode - gcm, err := cipher.NewGCM(c) - if err != nil { - return nil, err - } +func Encrypt(gcm cipher.AEAD, data []byte) ([]byte, error) { + var err error // get new byte array the size of the nonce from pool // NOTE: we might use smaller nonce size in the future @@ -54,19 +42,29 @@ func Encrypt(data []byte, key []byte) ([]byte, error) { } // Decrypt decrypts the payload and returns the decrypted data -func Decrypt(data []byte, key []byte) ([]byte, error) { - // generate AES cipher for decrypting the message +func newCipher(key []byte) (cipher.AEAD, error) { + var err error + + // generate a new AES cipher using our 32 byte key for decrypting the message c, err := aes.NewCipher(hash(key)) if err != nil { return nil, err } - // we use GCM to encrypt the payload + // gcm or Galois/Counter Mode, is a mode of operation + // for symmetric key cryptographic block ciphers + // - https://en.wikipedia.org/wiki/Galois/Counter_Mode gcm, err := cipher.NewGCM(c) if err != nil { return nil, err } + return gcm, nil +} + +func Decrypt(gcm cipher.AEAD, data []byte) ([]byte, error) { + var err error + nonceSize := gcm.NonceSize() if len(data) < nonceSize { diff --git a/tunnel/crypto_test.go b/tunnel/crypto_test.go index 4faac2cf..af0e6a28 100644 --- a/tunnel/crypto_test.go +++ b/tunnel/crypto_test.go @@ -7,9 +7,14 @@ import ( func TestEncrypt(t *testing.T) { key := []byte("tokenpassphrase") + gcm, err := newCipher(key) + if err != nil { + t.Fatal(err) + } + data := []byte("supersecret") - cipherText, err := Encrypt(data, key) + cipherText, err := Encrypt(gcm, data) if err != nil { t.Errorf("failed to encrypt data: %v", err) } @@ -22,14 +27,19 @@ func TestEncrypt(t *testing.T) { func TestDecrypt(t *testing.T) { key := []byte("tokenpassphrase") + gcm, err := newCipher(key) + if err != nil { + t.Fatal(err) + } + data := []byte("supersecret") - cipherText, err := Encrypt(data, key) + cipherText, err := Encrypt(gcm, data) if err != nil { t.Errorf("failed to encrypt data: %v", err) } - plainText, err := Decrypt(cipherText, key) + plainText, err := Decrypt(gcm, cipherText) if err != nil { t.Errorf("failed to decrypt data: %v", err) } diff --git a/tunnel/default.go b/tunnel/default.go index 7cb636dd..b573b4e9 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -8,7 +8,7 @@ import ( "time" "github.com/google/uuid" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/transport" ) @@ -120,7 +120,8 @@ func (t *tun) listChannels() []string { } // newSession creates a new session and saves it -func (t *tun) newSession(channel, sessionId string) (*session, bool) { +func (t *tun) newSession(channel, sessionId string) (*session, bool, error) { + // new session s := &session{ tunnel: t.id, @@ -133,6 +134,11 @@ func (t *tun) newSession(channel, sessionId string) (*session, bool) { errChan: make(chan error, 1), key: []byte(t.token + channel + sessionId), } + gcm, err := newCipher(s.key) + if err != nil { + return nil, false, err + } + s.gcm = gcm // save session t.Lock() @@ -140,14 +146,14 @@ func (t *tun) newSession(channel, sessionId string) (*session, bool) { if ok { // session already exists t.Unlock() - return nil, false + return nil, false, nil } t.sessions[channel+sessionId] = s t.Unlock() // return session - return s, true + return s, true, nil } // TODO: use tunnel id as part of the session @@ -193,11 +199,14 @@ func (t *tun) announce(channel, session string, link *link) { } } - log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel) - + if logger.V(logger.TraceLevel, log) { + log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel) + } // send back the announcement if err := link.Send(msg); err != nil { - log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err) + } } } @@ -241,18 +250,26 @@ func (t *tun) manageLink(link *link) { wait(DiscoverTime) // send a discovery message to the link - log.Debugf("Tunnel sending discover to link: %v", link.Remote()) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel sending discover to link: %v", link.Remote()) + } if err := t.sendMsg("discover", link); err != nil { - log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err) + } } case <-keepalive.C: // wait half the keepalive time wait(KeepAliveTime) // send keepalive message - log.Debugf("Tunnel sending keepalive to link: %v", link.Remote()) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel sending keepalive to link: %v", link.Remote()) + } if err := t.sendMsg("keepalive", link); err != nil { - log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err) + } t.delLink(link.Remote()) return } @@ -301,8 +318,9 @@ func (t *tun) manageLinks() { t.Lock() for link, node := range delLinks { - log.Debugf("Tunnel deleting dead link for %s", node) - + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel deleting dead link for %s", node) + } // check if the link exists l, ok := t.links[node] if ok { @@ -335,7 +353,9 @@ func (t *tun) manageLinks() { // if we're using quic it should be a max 10 second handshake period link, err := t.setupLink(node) if err != nil { - log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) + } return } @@ -384,7 +404,9 @@ func (t *tun) process() { // if the link is not connected skip it if !connected { - log.Debugf("Link for node %s not connected", id) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Link for node %s not connected", id) + } err = ErrLinkDisconnected continue } @@ -428,7 +450,9 @@ func (t *tun) process() { // no links to send to if len(sendTo) == 0 { - log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel) + if logger.V(logger.DebugLevel, log) { + log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel) + } t.respond(msg, err) continue } @@ -454,7 +478,9 @@ func (t *tun) sendTo(links []*link, msg *message) error { // the function that sends the actual message send := func(link *link, msg *transport.Message) error { if err := link.Send(msg); err != nil { - log.Debugf("Tunnel error sending %+v to %s: %v", msg.Header, link.Remote(), err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel error sending %+v to %s: %v", msg.Header, link.Remote(), err) + } t.delLink(link.Remote()) return err } @@ -493,7 +519,9 @@ func (t *tun) sendTo(links []*link, msg *message) error { // send the message for _, link := range links { // send the message via the current link - log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote()) + if logger.V(logger.TraceLevel, log) { + log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote()) + } // blast it in a go routine since its multicast/broadcast if msg.mode > Unicast { @@ -552,7 +580,9 @@ func (t *tun) delLink(remote string) { continue } // close and delete - log.Debugf("Tunnel deleting link node: %s remote: %s", id, link.Remote()) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel deleting link node: %s remote: %s", id, link.Remote()) + } link.Close() delete(t.links, id) } @@ -602,7 +632,9 @@ func (t *tun) listen(link *link) { // if its not connected throw away the link // the first message we process needs to be connect if !connected && mtype != "connect" { - log.Debugf("Tunnel link %s not connected", link.id) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel link %s not connected", link.id) + } return } @@ -611,7 +643,9 @@ func (t *tun) listen(link *link) { // discover, announce, session, keepalive switch mtype { case "connect": - log.Debugf("Tunnel link %s received connect message", link.Remote()) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel link %s received connect message", link.Remote()) + } link.Lock() @@ -644,11 +678,15 @@ func (t *tun) listen(link *link) { // if there is no channel then we close the link // as its a signal from the other side to close the connection if len(channel) == 0 { - log.Debugf("Tunnel link %s received close message", link.Remote()) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel link %s received close message", link.Remote()) + } return } - log.Debugf("Tunnel link %s received close message for %s", link.Remote(), channel) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel link %s received close message for %s", link.Remote(), channel) + } // the entire listener was closed by the remote side so we need to // remove the channel mapping for it. should we also close sessions? if sessionId == "listener" { @@ -673,13 +711,17 @@ func (t *tun) listen(link *link) { } // otherwise its a session mapping of sorts case "keepalive": - log.Debugf("Tunnel link %s received keepalive", link.Remote()) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel link %s received keepalive", link.Remote()) + } // save the keepalive link.keepalive() continue // a new connection dialled outbound case "open": - log.Debugf("Tunnel link %s received open %s %s", link.id, channel, sessionId) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel link %s received open %s %s", link.id, channel, sessionId) + } // we just let it pass through to be processed // an accept returned by the listener case "accept": @@ -697,11 +739,14 @@ func (t *tun) listen(link *link) { // a continued session case "session": // process message - log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote()) + if logger.V(logger.TraceLevel, log) { + log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote()) + } // an announcement of a channel listener case "announce": - log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote()) - + if logger.V(logger.TraceLevel, log) { + log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote()) + } // process the announcement channels := strings.Split(channel, ",") @@ -773,7 +818,9 @@ func (t *tun) listen(link *link) { s, exists = t.getSession(channel, "listener") // only return accept to the session case mtype == "accept": - log.Debugf("Tunnel received accept message for channel: %s session: %s", channel, sessionId) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel received accept message for channel: %s session: %s", channel, sessionId) + } s, exists = t.getSession(channel, sessionId) if exists && s.accepted { continue @@ -793,7 +840,9 @@ func (t *tun) listen(link *link) { // bail if no session or listener has been found if !exists { - log.Tracef("Tunnel skipping no channel: %s session: %s exists", channel, sessionId) + if logger.V(logger.TraceLevel, log) { + log.Tracef("Tunnel skipping no channel: %s session: %s exists", channel, sessionId) + } // drop it, we don't care about // messages we don't know about continue @@ -808,9 +857,9 @@ func (t *tun) listen(link *link) { default: // otherwise process } - - log.Tracef("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype) - + if logger.V(logger.TraceLevel, log) { + log.Tracef("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype) + } // construct a new transport message tmsg := &transport.Message{ Header: msg.Header, @@ -851,16 +900,19 @@ func (t *tun) sendMsg(method string, link *link) error { // setupLink connects to node and returns link if successful // It returns error if the link failed to be established func (t *tun) setupLink(node string) (*link, error) { - log.Debugf("Tunnel setting up link: %s", node) - + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel setting up link: %s", node) + } c, err := t.options.Transport.Dial(node) if err != nil { - log.Debugf("Tunnel failed to connect to %s: %v", node, err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel failed to connect to %s: %v", node, err) + } return nil, err } - - log.Debugf("Tunnel connected to %s", node) - + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel connected to %s", node) + } // create a new link link := newLink(c) @@ -905,7 +957,9 @@ func (t *tun) setupLinks() { // create new link link, err := t.setupLink(node) if err != nil { - log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) + } return } @@ -931,8 +985,9 @@ func (t *tun) connect() error { go func() { // accept inbound connections err := l.Accept(func(sock transport.Socket) { - log.Debugf("Tunnel accepted connection from %s", sock.Remote()) - + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel accepted connection from %s", sock.Remote()) + } // create a new link link := newLink(sock) @@ -1089,7 +1144,9 @@ func (t *tun) Close() error { return nil } - log.Debug("Tunnel closing") + if logger.V(logger.DebugLevel, log) { + log.Debug("Tunnel closing") + } select { case <-t.closed: @@ -1117,11 +1174,18 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { o(&options) } - log.Debugf("Tunnel dialing %s", channel) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel dialing %s", channel) + } // create a new session - c, ok := t.newSession(channel, t.newSessionId()) - if !ok { + c, ok, err := t.newSession(channel, t.newSessionId()) + if err != nil { + if logger.V(logger.DebugLevel, log) { + log.Error(err) + } + return nil, err + } else if !ok { return nil, errors.New("error dialing " + channel) } @@ -1171,7 +1235,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { if len(links) == 0 { // delete session and return error t.delSession(c.channel, c.session) - log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound) + } return nil, ErrLinkNotFound } @@ -1206,7 +1272,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { err := c.Discover() if err != nil { t.delSession(c.channel, c.session) - log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err) + } return nil, err } @@ -1244,7 +1312,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { if err := c.Open(); err != nil { // delete the session t.delSession(c.channel, c.session) - log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err) + } return nil, err } @@ -1269,8 +1339,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // Accept a connection on the address func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) { - log.Debugf("Tunnel listening on %s", channel) - + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel listening on %s", channel) + } options := ListenOptions{ // Read timeout defaults to never Timeout: time.Duration(-1), @@ -1281,8 +1352,13 @@ func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) { } // create a new session by hashing the address - c, ok := t.newSession(channel, "listener") - if !ok { + c, ok, err := t.newSession(channel, "listener") + if err != nil { + if logger.V(logger.ErrorLevel, log) { + log.Error(err) + } + return nil, err + } else if !ok { return nil, errors.New("already listening on " + channel) } diff --git a/tunnel/link.go b/tunnel/link.go index e698a280..5872d9df 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -8,7 +8,7 @@ import ( "time" "github.com/google/uuid" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/transport" ) @@ -268,8 +268,9 @@ func (l *link) manage() { // check the type of message switch { case bytes.Equal(p.message.Body, linkRequest): - log.Tracef("Link %s received link request", linkId) - + if logger.V(logger.TraceLevel, log) { + log.Tracef("Link %s received link request", linkId) + } // send response if err := send(linkResponse); err != nil { l.Lock() @@ -279,7 +280,9 @@ func (l *link) manage() { case bytes.Equal(p.message.Body, linkResponse): // set round trip time d := time.Since(now) - log.Tracef("Link %s received link response in %v", linkId, d) + if logger.V(logger.TraceLevel, log) { + log.Tracef("Link %s received link response in %v", linkId, d) + } // set the RTT l.setRTT(d) } diff --git a/tunnel/listener.go b/tunnel/listener.go index dedde945..d775804e 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -4,7 +4,7 @@ import ( "io" "sync" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" ) type tunListener struct { @@ -66,7 +66,9 @@ func (t *tunListener) process() { // get a session sess, ok := conns[sessionId] - log.Tracef("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, m.session, m.typ, ok) + if logger.V(logger.TraceLevel, log) { + log.Tracef("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, m.session, m.typ, ok) + } if !ok { // we only process open and session types switch m.typ { @@ -152,7 +154,9 @@ func (t *tunListener) process() { case <-sess.closed: delete(conns, sessionId) case sess.recv <- m: - log.Tracef("Tunnel listener sent to recv chan channel %s session %s type %s", m.channel, sessionId, m.typ) + if logger.V(logger.TraceLevel, log) { + log.Tracef("Tunnel listener sent to recv chan channel %s session %s type %s", m.channel, sessionId, m.typ) + } } } } diff --git a/tunnel/options.go b/tunnel/options.go index 166680d9..954ccd50 100644 --- a/tunnel/options.go +++ b/tunnel/options.go @@ -4,6 +4,7 @@ import ( "time" "github.com/google/uuid" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/transport" "github.com/micro/go-micro/v2/transport/quic" ) @@ -13,6 +14,7 @@ var ( DefaultAddress = ":0" // The shared default token DefaultToken = "go.micro.tunnel" + log = logger.NewHelper(logger.DefaultLogger).WithFields(map[string]interface{}{"service": "tunnel"}) ) type Option func(*Options) diff --git a/tunnel/session.go b/tunnel/session.go index be34e55f..8037546e 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -1,11 +1,13 @@ package tunnel import ( + "crypto/cipher" "encoding/base32" "io" + "sync" "time" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/transport" ) @@ -49,6 +51,9 @@ type session struct { errChan chan error // key for session encryption key []byte + // cipher for session + gcm cipher.AEAD + sync.RWMutex } // message is sent over the send channel @@ -166,7 +171,9 @@ func (s *session) waitFor(msgType string, timeout time.Duration) (*message, erro // ignore what we don't want if msg.typ != msgType { - log.Debugf("Tunnel received non %s message in waiting for %s", msg.typ, msgType) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel received non %s message in waiting for %s", msg.typ, msgType) + } continue } @@ -185,7 +192,9 @@ func (s *session) waitFor(msgType string, timeout time.Duration) (*message, erro // ignore what we don't want if msg.typ != msgType { - log.Debugf("Tunnel received non %s message in waiting for %s", msg.typ, msgType) + if logger.V(logger.DebugLevel, log) { + log.Debugf("Tunnel received non %s message in waiting for %s", msg.typ, msgType) + } continue } @@ -327,8 +336,23 @@ func (s *session) Announce() error { // Send is used to send a message func (s *session) Send(m *transport.Message) error { + var err error + + s.RLock() + gcm := s.gcm + s.RUnlock() + + if gcm == nil { + gcm, err = newCipher(s.key) + if err != nil { + return err + } + s.Lock() + s.gcm = gcm + s.Unlock() + } // encrypt the transport message payload - body, err := Encrypt(m.Body, s.key) + body, err := Encrypt(gcm, m.Body) if err != nil { log.Debugf("failed to encrypt message body: %v", err) return err @@ -343,7 +367,7 @@ func (s *session) Send(m *transport.Message) error { // encrypt all the headers for k, v := range m.Header { // encrypt the transport message payload - val, err := Encrypt([]byte(v), s.key) + val, err := Encrypt(s.gcm, []byte(v)) if err != nil { log.Debugf("failed to encrypt message header %s: %v", k, err) return err @@ -362,8 +386,9 @@ func (s *session) Send(m *transport.Message) error { msg.link = "" } - log.Tracef("Appending %+v to send backlog", msg) - + if logger.V(logger.TraceLevel, log) { + log.Tracef("Appending to send backlog: %v", msg) + } // send the actual message if err := s.sendMsg(msg); err != nil { return err @@ -389,16 +414,27 @@ func (s *session) Recv(m *transport.Message) error { default: } - log.Tracef("Received %+v from recv backlog", msg) + if logger.V(logger.TraceLevel, log) { + log.Tracef("Received from recv backlog: %v", msg) + } + + gcm, err := newCipher([]byte(s.token + s.channel + msg.session)) + if err != nil { + if logger.V(logger.ErrorLevel, log) { + log.Errorf("unable to create cipher: %v", err) + } + return err + } - key := []byte(s.token + s.channel + msg.session) // decrypt the received payload using the token // we have to used msg.session because multicast has a shared // session id of "multicast" in this session struct on // the listener side - msg.data.Body, err = Decrypt(msg.data.Body, key) + msg.data.Body, err = Decrypt(gcm, msg.data.Body) if err != nil { - log.Debugf("failed to decrypt message body: %v", err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("failed to decrypt message body: %v", err) + } return err } @@ -407,14 +443,18 @@ func (s *session) Recv(m *transport.Message) error { // decode the header values h, err := base32.StdEncoding.DecodeString(v) if err != nil { - log.Debugf("failed to decode message header %s: %v", k, err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("failed to decode message header %s: %v", k, err) + } return err } // dencrypt the transport message payload - val, err := Decrypt(h, key) + val, err := Decrypt(gcm, h) if err != nil { - log.Debugf("failed to decrypt message header %s: %v", k, err) + if logger.V(logger.DebugLevel, log) { + log.Debugf("failed to decrypt message header %s: %v", k, err) + } return err } // add decrypted header value diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go index 1e702a32..2b9d7db0 100644 --- a/util/kubernetes/client/client.go +++ b/util/kubernetes/client/client.go @@ -12,7 +12,7 @@ import ( "path" "strings" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/util/kubernetes/api" ) @@ -190,7 +190,9 @@ func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) { // NewService returns default micro kubernetes service definition func NewService(name, version, typ string) *Service { - log.Tracef("kubernetes default service: name: %s, version: %s", name, version) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("kubernetes default service: name: %s, version: %s", name, version) + } Labels := map[string]string{ "name": name, @@ -227,7 +229,9 @@ func NewService(name, version, typ string) *Service { // NewService returns default micro kubernetes deployment definition func NewDeployment(name, version, typ string) *Deployment { - log.Tracef("kubernetes default deployment: name: %s, version: %s", name, version) + if logger.V(logger.TraceLevel, logger.DefaultLogger) { + logger.Tracef("kubernetes default deployment: name: %s, version: %s", name, version) + } Labels := map[string]string{ "name": name, @@ -303,26 +307,26 @@ func NewClusterClient() *client { s, err := os.Stat(serviceAccountPath) if err != nil { - log.Fatal(err) + logger.Fatal(err) } if s == nil || !s.IsDir() { - log.Fatal(errors.New("service account not found")) + logger.Fatal(errors.New("service account not found")) } token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token")) if err != nil { - log.Fatal(err) + logger.Fatal(err) } t := string(token) ns, err := detectNamespace() if err != nil { - log.Fatal(err) + logger.Fatal(err) } crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt")) if err != nil { - log.Fatal(err) + logger.Fatal(err) } c := &http.Client{ diff --git a/web/service.go b/web/service.go index b133539d..0cd683a4 100644 --- a/web/service.go +++ b/web/service.go @@ -15,7 +15,7 @@ import ( "github.com/micro/cli/v2" "github.com/micro/go-micro/v2" - log "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" maddr "github.com/micro/go-micro/v2/util/addr" mhttp "github.com/micro/go-micro/v2/util/http" @@ -125,7 +125,9 @@ func (s *service) register() error { // use RegisterCheck func before register if err := s.opts.RegisterCheck(s.opts.Context); err != nil { - log.Errorf("Server %s-%s register check error: %s", s.opts.Name, s.opts.Id, err) + if logger.V(logger.ErrorLevel, log) { + log.Errorf("Server %s-%s register check error: %s", s.opts.Name, s.opts.Id, err) + } return err } @@ -190,7 +192,9 @@ func (s *service) start() error { if s.static { _, err := os.Stat(static) if err == nil { - log.Infof("Enabling static file serving from %s", static) + if logger.V(logger.InfoLevel, log) { + log.Infof("Enabling static file serving from %s", static) + } s.mux.Handle("/", http.FileServer(http.Dir(static))) } } @@ -222,7 +226,9 @@ func (s *service) start() error { ch <- l.Close() }() - log.Infof("Listening on %v", l.Addr().String()) + if logger.V(logger.InfoLevel, log) { + log.Infof("Listening on %v", l.Addr().String()) + } return nil } @@ -244,7 +250,9 @@ func (s *service) stop() error { s.exit <- ch s.running = false - log.Info("Stopping") + if logger.V(logger.InfoLevel, log) { + log.Info("Stopping") + } for _, fn := range s.opts.AfterStop { if err := fn(); err != nil { @@ -391,10 +399,14 @@ func (s *service) Run() error { select { // wait on kill signal case sig := <-ch: - log.Infof("Received signal %s", sig) + if logger.V(logger.InfoLevel, log) { + log.Infof("Received signal %s", sig) + } // wait on context cancel case <-s.opts.Context.Done(): - log.Info("Received context shutdown") + if logger.V(logger.InfoLevel, log) { + log.Info("Received context shutdown") + } } // exit reg loop diff --git a/web/web.go b/web/web.go index 5c06c2ae..ae3eb7f1 100644 --- a/web/web.go +++ b/web/web.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + "github.com/micro/go-micro/v2/logger" ) // Service is a web service with service discovery built in @@ -35,6 +36,8 @@ var ( // static directory DefaultStaticDir = "html" DefaultRegisterCheck = func(context.Context) error { return nil } + + log = logger.NewHelper(logger.DefaultLogger).WithFields(map[string]interface{}{"service": "web"}) ) // NewService returns a new web.Service