diff --git a/api/resolver/options.go b/api/resolver/options.go index 87e99e29..07ec21a6 100644 --- a/api/resolver/options.go +++ b/api/resolver/options.go @@ -1,6 +1,8 @@ package resolver import ( + "context" + "github.com/unistack-org/micro/v3/registry" ) @@ -8,6 +10,7 @@ import ( type Options struct { Handler string ServicePrefix string + Context context.Context } // Option func @@ -29,7 +32,9 @@ func WithServicePrefix(p string) Option { // NewOptions returns new initialised options func NewOptions(opts ...Option) Options { - options := Options{} + options := Options{ + Context: context.Background(), + } for _, o := range opts { o(&options) } diff --git a/api/resolver/subdomain/subdomain.go b/api/resolver/subdomain/subdomain.go index 1cec59c1..ed2332a5 100644 --- a/api/resolver/subdomain/subdomain.go +++ b/api/resolver/subdomain/subdomain.go @@ -55,7 +55,7 @@ func (r *Resolver) Domain(req *http.Request) string { domain, err := publicsuffix.EffectiveTLDPlusOne(host) if err != nil { if logger.V(logger.DebugLevel) { - logger.Debug("Unable to extract domain from %v", host) + logger.Debug(r.opts.Context, "Unable to extract domain from %v", host) } return "" } diff --git a/api/server/acme/autocert/autocert.go b/api/server/acme/autocert/autocert.go index 38438fa6..88d265be 100644 --- a/api/server/acme/autocert/autocert.go +++ b/api/server/acme/autocert/autocert.go @@ -7,13 +7,16 @@ import ( "net" "os" + "github.com/unistack-org/micro/v3/api/server" "github.com/unistack-org/micro/v3/api/server/acme" "github.com/unistack-org/micro/v3/logger" "golang.org/x/crypto/acme/autocert" ) // autoCertACME is the ACME provider from golang.org/x/crypto/acme/autocert -type autocertProvider struct{} +type autocertProvider struct { + opts server.Options +} func (a *autocertProvider) Init(opts ...acme.Option) error { return nil @@ -36,7 +39,7 @@ func (a *autocertProvider) TLSConfig(hosts ...string) (*tls.Config, error) { dir := cacheDir() if err := os.MkdirAll(dir, 0700); err != nil { if logger.V(logger.InfoLevel) { - logger.Info("warning: autocert not using a cache: %v", err) + logger.Info(a.opts.Context, "warning: autocert not using a cache: %v", err) } } else { m.Cache = autocert.DirCache(dir) diff --git a/api/server/http/http.go b/api/server/http/http.go index 86c6b112..65688a9f 100644 --- a/api/server/http/http.go +++ b/api/server/http/http.go @@ -74,7 +74,7 @@ func (s *httpServer) Start() error { } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("HTTP API Listening on %s", l.Addr().String()) + config.Logger.Infof(s.opts.Context, "HTTP API Listening on %s", l.Addr().String()) } s.Lock() @@ -85,7 +85,7 @@ func (s *httpServer) Start() error { if err := http.Serve(l, s.mux); err != nil { // temporary fix if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("serve err: %v", err) + config.Logger.Errorf(s.opts.Context, "serve err: %v", err) } s.Stop() } diff --git a/api/server/options.go b/api/server/options.go index be1e723f..5b016764 100644 --- a/api/server/options.go +++ b/api/server/options.go @@ -1,6 +1,7 @@ package server import ( + "context" "crypto/tls" "net/http" @@ -23,12 +24,14 @@ type Options struct { Resolver resolver.Resolver Wrappers []Wrapper Logger logger.Logger + Context context.Context } // NewOptions returns new Options func NewOptions(opts ...Option) Options { options := Options{ - Logger: logger.DefaultLogger, + Logger: logger.DefaultLogger, + Context: context.Background(), } for _, o := range opts { o(&options) diff --git a/network/tunnel/broker/broker.go b/network/tunnel/broker/broker.go index b13f5cff..00981d13 100644 --- a/network/tunnel/broker/broker.go +++ b/network/tunnel/broker/broker.go @@ -113,11 +113,11 @@ func (t *tunSubscriber) run() { m := new(transport.Message) if err := c.Recv(m); err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) + logger.Error(t.opts.Context, err.Error()) } if err = c.Close(); err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err.Error()) + logger.Error(t.opts.Context, err.Error()) } } continue diff --git a/server/noop.go b/server/noop.go index 1721f5e5..b288d55f 100644 --- a/server/noop.go +++ b/server/noop.go @@ -187,7 +187,7 @@ func (n *noopServer) Register() error { if !registered { if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) + config.Logger.Infof(n.opts.Context, "registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) } } @@ -220,7 +220,7 @@ func (n *noopServer) Register() error { opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)) if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("subscribing to topic: %s", sb.Topic()) + config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic()) } sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...) if err != nil { @@ -250,7 +250,7 @@ func (n *noopServer) Deregister() error { } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("deregistering node: %s", service.Nodes[0].Id) + config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].Id) } if err := DefaultDeregisterFunc(service, config); err != nil { @@ -280,11 +280,11 @@ func (n *noopServer) Deregister() error { go func(s broker.Subscriber) { defer wg.Done() if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("unsubscribing from topic: %s", s.Topic()) + config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic()) } if err := s.Unsubscribe(cx); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("unsubscribing from topic: %s err: %v", s.Topic(), err) + config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err) } } }(sub) @@ -307,7 +307,7 @@ func (n *noopServer) Start() error { n.RUnlock() if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("server [noop] Listening on %s", config.Address) + config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address) } n.Lock() if len(config.Advertise) == 0 { @@ -320,26 +320,26 @@ func (n *noopServer) Start() error { // connect to the broker if err := config.Broker.Connect(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("broker [%s] connect error: %v", config.Broker.String(), err) + config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err) } return err } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) } } // use RegisterCheck func before register if err := config.RegisterCheck(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("server %s-%s register check error: %s", config.Name, config.Id, err) + config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.Id, err) } } else { // announce self to the world if err := n.Register(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("server register error: %v", err) + config.Logger.Errorf(n.opts.Context, "server register error: %v", err) } } } @@ -367,23 +367,23 @@ func (n *noopServer) Start() error { rerr := config.RegisterCheck(config.Context) if rerr != nil && registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) + config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) } // deregister self in case of error if err := n.Deregister(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("server %s-%s deregister error: %s", config.Name, config.Id, err) + config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.Id, err) } } } else if rerr != nil && !registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("server %s-%s register check error: %s", config.Name, config.Id, rerr) + config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.Id, rerr) } continue } if err := n.Register(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("server %s-%s register error: %s", config.Name, config.Id, err) + config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.Id, err) } } // wait for exit @@ -395,7 +395,7 @@ func (n *noopServer) Start() error { // deregister self if err := n.Deregister(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("server deregister error: ", err) + config.Logger.Errorf(n.opts.Context, "server deregister error: ", err) } } @@ -408,12 +408,12 @@ func (n *noopServer) Start() error { ch <- nil if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) } // disconnect broker if err := config.Broker.Disconnect(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf("broker [%s] disconnect error: %v", config.Broker.String(), err) + config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err) } } }() diff --git a/server/subscriber.go b/server/subscriber.go index 9d3575b5..3bebc732 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -191,8 +191,8 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl config := n.opts n.RUnlock() if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error("panic recovered: ", r) - config.Logger.Error(string(debug.Stack())) + config.Logger.Error(n.opts.Context, "panic recovered: ", r) + config.Logger.Error(n.opts.Context, string(debug.Stack())) } err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) } diff --git a/service.go b/service.go index 67e95538..cca23d04 100644 --- a/service.go +++ b/service.go @@ -154,7 +154,7 @@ func (s *service) Start() error { s.RUnlock() if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("starting [service] %s", s.Name()) + config.Logger.Infof(s.opts.Context, "starting [service] %s", s.Name()) } for _, fn := range s.opts.BeforeStart { @@ -215,7 +215,7 @@ func (s *service) Stop() error { s.RUnlock() if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof("stoppping [service] %s", s.Name()) + config.Logger.Infof(s.opts.Context, "stoppping [service] %s", s.Name()) } var err error diff --git a/util/auth/auth.go b/util/auth/auth.go index ccd75c4c..b185de4e 100644 --- a/util/auth/auth.go +++ b/util/auth/auth.go @@ -1,6 +1,7 @@ package auth import ( + "context" "time" "github.com/google/uuid" @@ -26,7 +27,7 @@ func Verify(a auth.Auth) error { return err } if logger.V(logger.DebugLevel) { - logger.Debug("Auth [%v] Generated an auth account: %s", a.String()) + logger.Debug(context.TODO(), "Auth [%v] Generated an auth account: %s", a.String()) } accID = acc.ID @@ -68,7 +69,7 @@ func Verify(a auth.Auth) error { ) if err != nil { if logger.V(logger.WarnLevel) { - logger.Warn("[Auth] Error refreshing token: %v", err) + logger.Warn(context.TODO(), "[Auth] Error refreshing token: %v", err) } continue } diff --git a/util/kubernetes/api/request.go b/util/kubernetes/api/request.go index 6aff945a..cc11b438 100644 --- a/util/kubernetes/api/request.go +++ b/util/kubernetes/api/request.go @@ -219,7 +219,7 @@ func (r *Request) Do() *Response { } } - logger.Debug("[Kubernetes] %v %v", req.Method, req.URL.String()) + logger.Debug(context.TODO(), "[Kubernetes] %v %v", req.Method, req.URL.String()) res, err := r.client.Do(req) if err != nil { return &Response{ diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go index 39214384..2635db93 100644 --- a/util/kubernetes/client/client.go +++ b/util/kubernetes/client/client.go @@ -3,6 +3,7 @@ package client import ( "bytes" + "context" "crypto/tls" "errors" "io" @@ -228,7 +229,7 @@ func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) { // NewService returns default micro kubernetes service definition func NewService(name, version, typ, namespace string) *Service { if logger.V(logger.TraceLevel) { - logger.Trace("kubernetes default service: name: %s, version: %s", name, version) + logger.Trace(context.TODO(), "kubernetes default service: name: %s, version: %s", name, version) } Labels := map[string]string{ @@ -271,7 +272,7 @@ func NewService(name, version, typ, namespace string) *Service { // NewService returns default micro kubernetes deployment definition func NewDeployment(name, version, typ, namespace string) *Deployment { if logger.V(logger.TraceLevel) { - logger.Trace("kubernetes default deployment: name: %s, version: %s", name, version) + logger.Trace(context.TODO(), "kubernetes default deployment: name: %s, version: %s", name, version) } Labels := map[string]string{ @@ -363,21 +364,21 @@ func NewClusterClient() *client { s, err := os.Stat(serviceAccountPath) if err != nil { - logger.Fatal(err.Error()) + logger.Fatal(context.TODO(), err.Error()) } if s == nil || !s.IsDir() { - logger.Fatal("service account not found") + logger.Fatal(context.TODO(), "service account not found") } token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token")) if err != nil { - logger.Fatal(err.Error()) + logger.Fatal(context.TODO(), err.Error()) } t := string(token) crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt")) if err != nil { - logger.Fatal(err.Error()) + logger.Fatal(context.TODO(), err.Error()) } c := &http.Client{ diff --git a/util/mdns/server.go b/util/mdns/server.go index 27b32ba1..7cdfdd43 100644 --- a/util/mdns/server.go +++ b/util/mdns/server.go @@ -1,6 +1,7 @@ package mdns import ( + "context" "fmt" "math/rand" "net" @@ -61,6 +62,8 @@ type Config struct { // LocalhostChecking if enabled asks the server to also send responses to 0.0.0.0 if the target IP // is this host (as defined by GetMachineIP). Useful in case machine is on a VPN which blocks comms on non standard ports LocalhostChecking bool + + Context context.Context } // Server is an mDNS server used to listen for mDNS queries and respond if we @@ -143,6 +146,10 @@ func NewServer(config *Config) (*Server, error) { outboundIP: ipFunc(), } + if s.config.Context == nil { + s.config.Context = context.Background() + } + go s.recv(s.ipv4List) go s.recv(s.ipv6List) @@ -196,7 +203,7 @@ func (s *Server) recv(c *net.UDPConn) { continue } if err := s.parsePacket(buf[:n], from); err != nil { - logger.Error("[ERR] mdns: Failed to handle query: %v", err) + logger.Errorf(s.config.Context, "[ERR] mdns: Failed to handle query: %v", err) } } } @@ -205,7 +212,7 @@ func (s *Server) recv(c *net.UDPConn) { func (s *Server) parsePacket(packet []byte, from net.Addr) error { var msg dns.Msg if err := msg.Unpack(packet); err != nil { - logger.Error("[ERR] mdns: Failed to unpack packet: %v", err) + logger.Errorf(s.config.Context, "[ERR] mdns: Failed to unpack packet: %v", err) return err } // TODO: This is a bit of a hack @@ -384,7 +391,7 @@ func (s *Server) probe() { for i := 0; i < 3; i++ { if err := s.SendMulticast(q); err != nil { - logger.Error("[ERR] mdns: failed to send probe: %v", err) + logger.Errorf(s.config.Context, "[ERR] mdns: failed to send probe: %v", err) } time.Sleep(time.Duration(randomizer.Intn(250)) * time.Millisecond) } @@ -410,7 +417,7 @@ func (s *Server) probe() { timer := time.NewTimer(timeout) for i := 0; i < 3; i++ { if err := s.SendMulticast(resp); err != nil { - logger.Error("[ERR] mdns: failed to send announcement:", err.Error()) + logger.Errorf(s.config.Context, "[ERR] mdns: failed to send announcement: %v", err) } select { case <-timer.C: diff --git a/util/router/parse.go b/util/router/parse.go index c736d453..ec78084f 100644 --- a/util/router/parse.go +++ b/util/router/parse.go @@ -3,6 +3,7 @@ package router // download from https://raw.githubusercontent.com/grpc-ecosystem/grpc-gateway/master/protoc-gen-grpc-gateway/httprule/parse.go import ( + "context" "fmt" "strings" @@ -103,20 +104,20 @@ type parser struct { // topLevelSegments is the target of this parser. func (p *parser) topLevelSegments() ([]segment, error) { if logger.V(logger.TraceLevel) { - logger.Debug("Parsing %q", p.tokens) + logger.Debug(context.TODO(), "Parsing %q", p.tokens) } segs, err := p.segments() if err != nil { return nil, err } if logger.V(logger.TraceLevel) { - logger.Trace("accept segments: %q; %q", p.accepted, p.tokens) + logger.Trace(context.TODO(), "accept segments: %q; %q", p.accepted, p.tokens) } if _, err := p.accept(typeEOF); err != nil { return nil, fmt.Errorf("unexpected token %q after segments %q", p.tokens[0], strings.Join(p.accepted, "")) } if logger.V(logger.TraceLevel) { - logger.Trace("accept eof: %q; %q", p.accepted, p.tokens) + logger.Trace(context.TODO(), "accept eof: %q; %q", p.accepted, p.tokens) } return segs, nil } @@ -128,7 +129,7 @@ func (p *parser) segments() ([]segment, error) { } if logger.V(logger.TraceLevel) { - logger.Trace("accept segment: %q; %q", p.accepted, p.tokens) + logger.Trace(context.TODO(), "accept segment: %q; %q", p.accepted, p.tokens) } segs := []segment{s} for { @@ -141,7 +142,7 @@ func (p *parser) segments() ([]segment, error) { } segs = append(segs, s) if logger.V(logger.TraceLevel) { - logger.Trace("accept segment: %q; %q", p.accepted, p.tokens) + logger.Trace(context.TODO(), "accept segment: %q; %q", p.accepted, p.tokens) } } } diff --git a/util/router/parse_test.go b/util/router/parse_test.go index 9806d1b3..696c94b9 100644 --- a/util/router/parse_test.go +++ b/util/router/parse_test.go @@ -3,6 +3,7 @@ package router // download from https://raw.githubusercontent.com/grpc-ecosystem/grpc-gateway/master/protoc-gen-grpc-gateway/httprule/parse_test.go import ( + "context" "flag" "fmt" "reflect" @@ -316,6 +317,6 @@ func TestParseSegmentsWithErrors(t *testing.T) { t.Errorf("parser{%q}.segments() succeeded; want InvalidTemplateError; accepted %#v", spec.tokens, segs) continue } - logger.Info(err.Error()) + logger.Info(context.TODO(), err.Error()) } } diff --git a/util/router/runtime.go b/util/router/runtime.go index 0d46cb68..23f3d6a7 100644 --- a/util/router/runtime.go +++ b/util/router/runtime.go @@ -3,6 +3,7 @@ package router // download from https://raw.githubusercontent.com/grpc-ecosystem/grpc-gateway/master/runtime/pattern.go import ( + "context" "errors" "fmt" "strings" @@ -63,7 +64,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt if version != 1 { if logger.V(logger.DebugLevel) { - logger.Debug("unsupported version: %d", version) + logger.Debug(context.TODO(), "unsupported version: %d", version) } return Pattern{}, ErrInvalidPattern } @@ -71,7 +72,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt l := len(ops) if l%2 != 0 { if logger.V(logger.DebugLevel) { - logger.Debug("odd number of ops codes: %d", l) + logger.Debug(context.TODO(), "odd number of ops codes: %d", l) } return Pattern{}, ErrInvalidPattern } @@ -96,7 +97,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt case OpPushM: if pushMSeen { if logger.V(logger.TraceLevel) { - logger.Trace("pushM appears twice") + logger.Trace(context.TODO(), "pushM appears twice") } return Pattern{}, ErrInvalidPattern } @@ -105,7 +106,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt case OpLitPush: if op.operand < 0 || len(pool) <= op.operand { if logger.V(logger.TraceLevel) { - logger.Trace("negative literal index: %d", op.operand) + logger.Trace(context.TODO(), "negative literal index: %d", op.operand) } return Pattern{}, ErrInvalidPattern } @@ -116,14 +117,14 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt case OpConcatN: if op.operand <= 0 { if logger.V(logger.TraceLevel) { - logger.Trace("negative concat size: %d", op.operand) + logger.Trace(context.TODO(), "negative concat size: %d", op.operand) } return Pattern{}, ErrInvalidPattern } stack -= op.operand if stack < 0 { if logger.V(logger.TraceLevel) { - logger.Trace("stack underflow") + logger.Trace(context.TODO(), "stack underflow") } return Pattern{}, ErrInvalidPattern } @@ -131,7 +132,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt case OpCapture: if op.operand < 0 || len(pool) <= op.operand { if logger.V(logger.TraceLevel) { - logger.Trace("variable name index out of bound: %d", op.operand) + logger.Trace(context.TODO(), "variable name index out of bound: %d", op.operand) } return Pattern{}, ErrInvalidPattern } @@ -141,13 +142,13 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt stack-- if stack < 0 { if logger.V(logger.DebugLevel) { - logger.Trace("stack underflow") + logger.Trace(context.TODO(), "stack underflow") } return Pattern{}, ErrInvalidPattern } default: if logger.V(logger.DebugLevel) { - logger.Trace("invalid opcode: %d", op.code) + logger.Trace(context.TODO(), "invalid opcode: %d", op.code) } return Pattern{}, ErrInvalidPattern } @@ -172,7 +173,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt func MustPattern(p Pattern, err error) Pattern { if err != nil { if logger.V(logger.FatalLevel) { - logger.Fatal("Pattern initialization failed: %v", err) + logger.Fatal(context.TODO(), "Pattern initialization failed: %v", err) } } return p