From 7c311aea19463c05b80aaa7a470fd28266289588 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 4 Nov 2020 23:48:24 +0300 Subject: [PATCH 1/2] api: extract routers to external repos Signed-off-by: Vasiliy Tolstov --- api/router/registry/registry.go | 498 --------------------------- api/router/registry/registry_test.go | 38 -- api/router/router_test.go | 257 -------------- api/router/static/static.go | 356 ------------------- 4 files changed, 1149 deletions(-) delete mode 100644 api/router/registry/registry.go delete mode 100644 api/router/registry/registry_test.go delete mode 100644 api/router/router_test.go delete mode 100644 api/router/static/static.go diff --git a/api/router/registry/registry.go b/api/router/registry/registry.go deleted file mode 100644 index e6844464..00000000 --- a/api/router/registry/registry.go +++ /dev/null @@ -1,498 +0,0 @@ -// Package registry provides a dynamic api service router -package registry - -import ( - "errors" - "fmt" - "net/http" - "regexp" - "strings" - "sync" - "time" - - "github.com/unistack-org/micro/v3/api" - "github.com/unistack-org/micro/v3/api/router" - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/metadata" - "github.com/unistack-org/micro/v3/registry" - util "github.com/unistack-org/micro/v3/util/router" -) - -// endpoint struct, that holds compiled pcre -type endpoint struct { - hostregs []*regexp.Regexp - pathregs []util.Pattern - pcreregs []*regexp.Regexp -} - -// router is the default router -type registryRouter struct { - exit chan bool - opts router.Options - - sync.RWMutex - eps map[string]*api.Service - // compiled regexp for host and path - ceps map[string]*endpoint -} - -func (r *registryRouter) isClosed() bool { - select { - case <-r.exit: - return true - default: - return false - } -} - -// refresh list of api services -func (r *registryRouter) refresh() { - var attempts int - - for { - services, err := r.opts.Registry.ListServices(r.opts.Context) - if err != nil { - attempts++ - if logger.V(logger.ErrorLevel) { - logger.Errorf("unable to list services: %v", err) - } - time.Sleep(time.Duration(attempts) * time.Second) - continue - } - - attempts = 0 - - // for each service, get service and store endpoints - for _, s := range services { - service, err := r.opts.Registry.GetService(r.opts.Context, s.Name) - if err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("unable to get service: %v", err) - } - continue - } - r.store(service) - } - - // refresh list in 10 minutes... cruft - // use registry watching - select { - case <-time.After(time.Minute * 10): - case <-r.exit: - return - } - } -} - -// process watch event -func (r *registryRouter) process(res *registry.Result) { - // skip these things - if res == nil || res.Service == nil { - return - } - - // get entry from cache - service, err := r.opts.Registry.GetService(r.opts.Context, res.Service.Name) - if err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("unable to get %v service: %v", res.Service.Name, err) - } - return - } - - // update our local endpoints - r.store(service) -} - -// store local endpoint cache -func (r *registryRouter) store(services []*registry.Service) { - // endpoints - eps := map[string]*api.Service{} - - // services - names := map[string]bool{} - - // create a new endpoint mapping - for _, service := range services { - // set names we need later - names[service.Name] = true - - // map per endpoint - for _, sep := range service.Endpoints { - // create a key service:endpoint_name - key := fmt.Sprintf("%s.%s", service.Name, sep.Name) - // decode endpoint - end := api.Decode(sep.Metadata) - // no endpoint or no name - if end == nil || len(end.Name) == 0 { - continue - } - // if we got nothing skip - if err := api.Validate(end); err != nil { - if logger.V(logger.TraceLevel) { - logger.Tracef("endpoint validation failed: %v", err) - } - continue - } - - // try get endpoint - ep, ok := eps[key] - if !ok { - ep = &api.Service{Name: service.Name} - } - - // overwrite the endpoint - ep.Endpoint = end - // append services - ep.Services = append(ep.Services, service) - // store it - eps[key] = ep - } - } - - r.Lock() - defer r.Unlock() - - // delete any existing eps for services we know - for key, service := range r.eps { - // skip what we don't care about - if !names[service.Name] { - continue - } - - // ok we know this thing - // delete delete delete - delete(r.eps, key) - } - - // now set the eps we have - for name, ep := range eps { - r.eps[name] = ep - cep := &endpoint{} - - for _, h := range ep.Endpoint.Host { - if h == "" || h == "*" { - continue - } - hostreg, err := regexp.CompilePOSIX(h) - if err != nil { - if logger.V(logger.TraceLevel) { - logger.Tracef("endpoint have invalid host regexp: %v", err) - } - continue - } - cep.hostregs = append(cep.hostregs, hostreg) - } - - for _, p := range ep.Endpoint.Path { - var pcreok bool - - if p[0] == '^' && p[len(p)-1] == '$' { - pcrereg, err := regexp.CompilePOSIX(p) - if err == nil { - cep.pcreregs = append(cep.pcreregs, pcrereg) - pcreok = true - } - } - - rule, err := util.Parse(p) - if err != nil && !pcreok { - if logger.V(logger.TraceLevel) { - logger.Tracef("endpoint have invalid path pattern: %v", err) - } - continue - } else if err != nil && pcreok { - continue - } - - tpl := rule.Compile() - pathreg, err := util.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "") - if err != nil { - if logger.V(logger.TraceLevel) { - logger.Tracef("endpoint have invalid path pattern: %v", err) - } - continue - } - cep.pathregs = append(cep.pathregs, pathreg) - } - - r.ceps[name] = cep - } -} - -// watch for endpoint changes -func (r *registryRouter) watch() { - var attempts int - - for { - if r.isClosed() { - return - } - - // watch for changes - w, err := r.opts.Registry.Watch(r.opts.Context) - if err != nil { - attempts++ - if logger.V(logger.ErrorLevel) { - logger.Errorf("error watching endpoints: %v", err) - } - time.Sleep(time.Duration(attempts) * time.Second) - continue - } - - ch := make(chan bool) - - go func() { - select { - case <-ch: - w.Stop() - case <-r.exit: - w.Stop() - } - }() - - // reset if we get here - attempts = 0 - - for { - // process next event - res, err := w.Next() - if err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("error getting next endpoint: %v", err) - } - close(ch) - break - } - r.process(res) - } - } -} - -func (r *registryRouter) Options() router.Options { - return r.opts -} - -func (r *registryRouter) Close() error { - select { - case <-r.exit: - return nil - default: - close(r.exit) - } - return nil -} - -func (r *registryRouter) Register(ep *api.Endpoint) error { - return nil -} - -func (r *registryRouter) Deregister(ep *api.Endpoint) error { - return nil -} - -func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) { - if r.isClosed() { - return nil, errors.New("router closed") - } - - r.RLock() - defer r.RUnlock() - - var idx int - if len(req.URL.Path) > 0 && req.URL.Path != "/" { - idx = 1 - } - path := strings.Split(req.URL.Path[idx:], "/") - - // use the first match - // TODO: weighted matching - for n, e := range r.eps { - cep, ok := r.ceps[n] - if !ok { - continue - } - ep := e.Endpoint - var mMatch, hMatch, pMatch bool - // 1. try method - for _, m := range ep.Method { - if m == req.Method { - mMatch = true - break - } - } - if !mMatch { - continue - } - if logger.V(logger.TraceLevel) { - logger.Tracef("api method match %s", req.Method) - } - - // 2. try host - if len(ep.Host) == 0 { - hMatch = true - } else { - for idx, h := range ep.Host { - if h == "" || h == "*" { - hMatch = true - break - } else { - if cep.hostregs[idx].MatchString(req.URL.Host) { - hMatch = true - break - } - } - } - } - if !hMatch { - continue - } - if logger.V(logger.TraceLevel) { - logger.Tracef("api host match %s", req.URL.Host) - } - - // 3. try path via google.api path matching - for _, pathreg := range cep.pathregs { - matches, err := pathreg.Match(path, "") - if err != nil { - if logger.V(logger.TraceLevel) { - logger.Tracef("api gpath not match %s != %v", path, pathreg) - } - continue - } - if logger.V(logger.TraceLevel) { - logger.Tracef("api gpath match %s = %v", path, pathreg) - } - pMatch = true - ctx := req.Context() - md, ok := metadata.FromContext(ctx) - if !ok { - md = make(metadata.Metadata) - } - for k, v := range matches { - md[fmt.Sprintf("x-api-field-%s", k)] = v - } - md["x-api-body"] = ep.Body - *req = *req.Clone(metadata.NewContext(ctx, md)) - break - } - - if !pMatch { - // 4. try path via pcre path matching - for _, pathreg := range cep.pcreregs { - if !pathreg.MatchString(req.URL.Path) { - if logger.V(logger.TraceLevel) { - logger.Tracef("api pcre path not match %s != %v", path, pathreg) - } - continue - } - if logger.V(logger.TraceLevel) { - logger.Tracef("api pcre path match %s != %v", path, pathreg) - } - pMatch = true - break - } - } - - if !pMatch { - continue - } - - // TODO: Percentage traffic - // we got here, so its a match - return e, nil - } - - // no match - return nil, errors.New("not found") -} - -func (r *registryRouter) Route(req *http.Request) (*api.Service, error) { - if r.isClosed() { - return nil, errors.New("router closed") - } - - // try get an endpoint - ep, err := r.Endpoint(req) - if err == nil { - return ep, nil - } - - // error not nil - // ignore that shit - // TODO: don't ignore that shit - - // get the service name - rp, err := r.opts.Resolver.Resolve(req) - if err != nil { - return nil, err - } - - // service name - name := rp.Name - - // get service - services, err := r.opts.Registry.GetService(r.opts.Context, name, registry.GetDomain(rp.Domain)) - if err != nil { - return nil, err - } - - // only use endpoint matching when the meta handler is set aka api.Default - switch r.opts.Handler { - // rpc handlers - case "meta", "api", "rpc": - handler := r.opts.Handler - - // set default handler to api - if r.opts.Handler == "meta" { - handler = "rpc" - } - - // construct api service - return &api.Service{ - Name: name, - Endpoint: &api.Endpoint{ - Name: rp.Method, - Handler: handler, - }, - Services: services, - }, nil - // http handler - case "http", "proxy", "web": - // construct api service - return &api.Service{ - Name: name, - Endpoint: &api.Endpoint{ - Name: req.URL.String(), - Handler: r.opts.Handler, - Host: []string{req.Host}, - Method: []string{req.Method}, - Path: []string{req.URL.Path}, - }, - Services: services, - }, nil - } - - return nil, errors.New("unknown handler") -} - -func newRouter(opts ...router.Option) (*registryRouter, error) { - options := router.NewOptions(opts...) - if options.Registry == nil { - return nil, fmt.Errorf("registry is not set") - } - r := ®istryRouter{ - exit: make(chan bool), - opts: options, - eps: make(map[string]*api.Service), - ceps: make(map[string]*endpoint), - } - go r.watch() - go r.refresh() - return r, nil -} - -// NewRouter returns the default router -func NewRouter(opts ...router.Option) (router.Router, error) { - return newRouter(opts...) -} diff --git a/api/router/registry/registry_test.go b/api/router/registry/registry_test.go deleted file mode 100644 index 21f8d761..00000000 --- a/api/router/registry/registry_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package registry - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/unistack-org/micro/v3/registry" -) - -func TestStoreRegex(t *testing.T) { - t.Skip() - router, err := newRouter() - if err != nil { - t.Fatal(err) - } - router.store([]*registry.Service{ - { - Name: "Foobar", - Version: "latest", - Endpoints: []*registry.Endpoint{ - { - Name: "foo", - Metadata: map[string]string{ - "endpoint": "FooEndpoint", - "description": "Some description", - "method": "POST", - "path": "^/foo/$", - "handler": "rpc", - }, - }, - }, - Metadata: map[string]string{}, - }, - }, - ) - - assert.Len(t, router.ceps["Foobar.foo"].pcreregs, 1) -} diff --git a/api/router/router_test.go b/api/router/router_test.go deleted file mode 100644 index 408330b7..00000000 --- a/api/router/router_test.go +++ /dev/null @@ -1,257 +0,0 @@ -// +build ignore - -package router_test - -import ( - "context" - "fmt" - "io/ioutil" - "log" - "net/http" - "testing" - "time" - - "github.com/unistack-org/micro/v3/api" - "github.com/unistack-org/micro/v3/api/handler" - "github.com/unistack-org/micro/v3/api/handler/rpc" - "github.com/unistack-org/micro/v3/api/router" - rregistry "github.com/unistack-org/micro/v3/api/router/registry" - rstatic "github.com/unistack-org/micro/v3/api/router/static" - "github.com/unistack-org/micro/v3/broker" - bmemory "github.com/unistack-org/micro/v3/broker/memory" - "github.com/unistack-org/micro/v3/client" - gcli "github.com/unistack-org/micro/v3/client/grpc" - rmemory "github.com/unistack-org/micro/v3/registry/memory" - rt "github.com/unistack-org/micro/v3/router" - regRouter "github.com/unistack-org/micro/v3/router/registry" - "github.com/unistack-org/micro/v3/server" - gsrv "github.com/unistack-org/micro/v3/server/grpc" - pb "github.com/unistack-org/micro/v3/server/grpc/proto" -) - -// server is used to implement helloworld.GreeterServer. -type testServer struct { -} - -// TestHello implements helloworld.GreeterServer -func (s *testServer) Call(ctx context.Context, req *pb.Request, rsp *pb.Response) error { - rsp.Msg = "Hello " + req.Uuid - return nil -} - -// TestHello implements helloworld.GreeterServer -func (s *testServer) CallPcre(ctx context.Context, req *pb.Request, rsp *pb.Response) error { - rsp.Msg = "Hello " + req.Uuid - return nil -} - -// TestHello implements helloworld.GreeterServer -func (s *testServer) CallPcreInvalid(ctx context.Context, req *pb.Request, rsp *pb.Response) error { - rsp.Msg = "Hello " + req.Uuid - return nil -} - -func initial(t *testing.T) (server.Server, client.Client) { - r := rmemory.NewRegistry() - b := bmemory.NewBroker(broker.Registry(r)) - - // create a new client - s := gsrv.NewServer( - server.Name("foo"), - server.Broker(b), - server.Registry(r), - ) - - rtr := regRouter.NewRouter( - rt.Registry(r), - ) - - // create a new server - c := gcli.NewClient( - client.Router(rtr), - client.Broker(b), - ) - - h := &testServer{} - pb.RegisterTestHandler(s, h) - - if err := s.Start(); err != nil { - t.Fatalf("failed to start: %v", err) - } - - return s, c -} - -func check(t *testing.T, addr string, path string, expected string) { - req, err := http.NewRequest("POST", fmt.Sprintf(path, addr), nil) - if err != nil { - t.Fatalf("Failed to created http.Request: %v", err) - } - req.Header.Set("Content-Type", "application/json") - rsp, err := (&http.Client{}).Do(req) - if err != nil { - t.Fatalf("Failed to created http.Request: %v", err) - } - defer rsp.Body.Close() - - buf, err := ioutil.ReadAll(rsp.Body) - if err != nil { - t.Fatal(err) - } - - jsonMsg := expected - if string(buf) != jsonMsg { - t.Fatalf("invalid message received, parsing error %s != %s", buf, jsonMsg) - } -} - -func TestRouterRegistryPcre(t *testing.T) { - s, c := initial(t) - defer s.Stop() - - router := rregistry.NewRouter( - router.WithHandler(rpc.Handler), - router.WithRegistry(s.Options().Registry), - ) - hrpc := rpc.NewHandler( - handler.WithClient(c), - handler.WithRouter(router), - ) - hsrv := &http.Server{ - Handler: hrpc, - Addr: "127.0.0.1:6543", - WriteTimeout: 15 * time.Second, - ReadTimeout: 15 * time.Second, - IdleTimeout: 20 * time.Second, - MaxHeaderBytes: 1024 * 1024 * 1, // 1Mb - } - - go func() { - log.Println(hsrv.ListenAndServe()) - }() - - defer hsrv.Close() - time.Sleep(1 * time.Second) - check(t, hsrv.Addr, "http://%s/api/v0/test/call/TEST", `{"msg":"Hello TEST"}`) -} - -func TestRouterStaticPcre(t *testing.T) { - s, c := initial(t) - defer s.Stop() - - router := rstatic.NewRouter( - router.WithHandler(rpc.Handler), - router.WithRegistry(s.Options().Registry), - ) - - err := router.Register(&api.Endpoint{ - Name: "foo.Test.Call", - Method: []string{"POST"}, - Path: []string{"^/api/v0/test/call/?$"}, - Handler: "rpc", - }) - if err != nil { - t.Fatal(err) - } - - hrpc := rpc.NewHandler( - handler.WithClient(c), - handler.WithRouter(router), - ) - hsrv := &http.Server{ - Handler: hrpc, - Addr: "127.0.0.1:6543", - WriteTimeout: 15 * time.Second, - ReadTimeout: 15 * time.Second, - IdleTimeout: 20 * time.Second, - MaxHeaderBytes: 1024 * 1024 * 1, // 1Mb - } - - go func() { - log.Println(hsrv.ListenAndServe()) - }() - defer hsrv.Close() - - time.Sleep(1 * time.Second) - check(t, hsrv.Addr, "http://%s/api/v0/test/call", `{"msg":"Hello "}`) -} - -func TestRouterStaticGpath(t *testing.T) { - s, c := initial(t) - defer s.Stop() - - router := rstatic.NewRouter( - router.WithHandler(rpc.Handler), - router.WithRegistry(s.Options().Registry), - ) - - err := router.Register(&api.Endpoint{ - Name: "foo.Test.Call", - Method: []string{"POST"}, - Path: []string{"/api/v0/test/call/{uuid}"}, - Handler: "rpc", - }) - if err != nil { - t.Fatal(err) - } - - hrpc := rpc.NewHandler( - handler.WithClient(c), - handler.WithRouter(router), - ) - hsrv := &http.Server{ - Handler: hrpc, - Addr: "127.0.0.1:6543", - WriteTimeout: 15 * time.Second, - ReadTimeout: 15 * time.Second, - IdleTimeout: 20 * time.Second, - MaxHeaderBytes: 1024 * 1024 * 1, // 1Mb - } - - go func() { - log.Println(hsrv.ListenAndServe()) - }() - defer hsrv.Close() - - time.Sleep(1 * time.Second) - check(t, hsrv.Addr, "http://%s/api/v0/test/call/TEST", `{"msg":"Hello TEST"}`) -} - -func TestRouterStaticPcreInvalid(t *testing.T) { - var ep *api.Endpoint - var err error - - s, c := initial(t) - defer s.Stop() - - router := rstatic.NewRouter( - router.WithHandler(rpc.Handler), - router.WithRegistry(s.Options().Registry), - ) - - ep = &api.Endpoint{ - Name: "foo.Test.Call", - Method: []string{"POST"}, - Path: []string{"^/api/v0/test/call/?"}, - Handler: "rpc", - } - - err = router.Register(ep) - if err == nil { - t.Fatalf("invalid endpoint %v", ep) - } - - ep = &api.Endpoint{ - Name: "foo.Test.Call", - Method: []string{"POST"}, - Path: []string{"/api/v0/test/call/?$"}, - Handler: "rpc", - } - - err = router.Register(ep) - if err == nil { - t.Fatalf("invalid endpoint %v", ep) - } - - _ = c -} diff --git a/api/router/static/static.go b/api/router/static/static.go deleted file mode 100644 index 8969ab59..00000000 --- a/api/router/static/static.go +++ /dev/null @@ -1,356 +0,0 @@ -package static - -import ( - "errors" - "fmt" - "net/http" - "regexp" - "strings" - "sync" - - "github.com/unistack-org/micro/v3/api" - "github.com/unistack-org/micro/v3/api/router" - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/metadata" - "github.com/unistack-org/micro/v3/registry" - rutil "github.com/unistack-org/micro/v3/util/registry" - util "github.com/unistack-org/micro/v3/util/router" -) - -type endpoint struct { - apiep *api.Endpoint - hostregs []*regexp.Regexp - pathregs []util.Pattern - pcreregs []*regexp.Regexp -} - -// router is the default router -type staticRouter struct { - exit chan bool - opts router.Options - sync.RWMutex - eps map[string]*endpoint -} - -func (r *staticRouter) isClosed() bool { - select { - case <-r.exit: - return true - default: - return false - } -} - -/* -// watch for endpoint changes -func (r *staticRouter) watch() { - var attempts int - - for { - if r.isClosed() { - return - } - - // watch for changes - w, err := r.opts.Registry.Watch() - if err != nil { - attempts++ - log.Println("Error watching endpoints", err) - time.Sleep(time.Duration(attempts) * time.Second) - continue - } - - ch := make(chan bool) - - go func() { - select { - case <-ch: - w.Stop() - case <-r.exit: - w.Stop() - } - }() - - // reset if we get here - attempts = 0 - - for { - // process next event - res, err := w.Next() - if err != nil { - log.Println("Error getting next endpoint", err) - close(ch) - break - } - r.process(res) - } - } -} -*/ - -func (r *staticRouter) Register(ep *api.Endpoint) error { - if err := api.Validate(ep); err != nil { - return err - } - - var pathregs []util.Pattern - var hostregs []*regexp.Regexp - var pcreregs []*regexp.Regexp - - for _, h := range ep.Host { - if h == "" || h == "*" { - continue - } - hostreg, err := regexp.CompilePOSIX(h) - if err != nil { - return err - } - hostregs = append(hostregs, hostreg) - } - - for _, p := range ep.Path { - var pcreok bool - - // pcre only when we have start and end markers - if p[0] == '^' && p[len(p)-1] == '$' { - pcrereg, err := regexp.CompilePOSIX(p) - if err == nil { - pcreregs = append(pcreregs, pcrereg) - pcreok = true - } - } - - rule, err := util.Parse(p) - if err != nil && !pcreok { - return err - } else if err != nil && pcreok { - continue - } - - tpl := rule.Compile() - pathreg, err := util.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "") - if err != nil { - return err - } - pathregs = append(pathregs, pathreg) - } - - r.Lock() - r.eps[ep.Name] = &endpoint{ - apiep: ep, - pcreregs: pcreregs, - pathregs: pathregs, - hostregs: hostregs, - } - r.Unlock() - return nil -} - -func (r *staticRouter) Deregister(ep *api.Endpoint) error { - if err := api.Validate(ep); err != nil { - return err - } - r.Lock() - delete(r.eps, ep.Name) - r.Unlock() - return nil -} - -func (r *staticRouter) Options() router.Options { - return r.opts -} - -func (r *staticRouter) Close() error { - select { - case <-r.exit: - return nil - default: - close(r.exit) - } - return nil -} - -func (r *staticRouter) Endpoint(req *http.Request) (*api.Service, error) { - ep, err := r.endpoint(req) - if err != nil { - return nil, err - } - - epf := strings.Split(ep.apiep.Name, ".") - services, err := r.opts.Registry.GetService(r.opts.Context, epf[0]) - if err != nil { - return nil, err - } - - // hack for stream endpoint - if ep.apiep.Stream { - svcs := rutil.Copy(services) - for _, svc := range svcs { - if len(svc.Endpoints) == 0 { - e := ®istry.Endpoint{} - e.Name = strings.Join(epf[1:], ".") - e.Metadata = make(map[string]string) - e.Metadata["stream"] = "true" - svc.Endpoints = append(svc.Endpoints, e) - } - for _, e := range svc.Endpoints { - e.Name = strings.Join(epf[1:], ".") - e.Metadata = make(map[string]string) - e.Metadata["stream"] = "true" - } - } - - services = svcs - } - - svc := &api.Service{ - Name: epf[0], - Endpoint: &api.Endpoint{ - Name: strings.Join(epf[1:], "."), - Handler: "rpc", - Host: ep.apiep.Host, - Method: ep.apiep.Method, - Path: ep.apiep.Path, - Body: ep.apiep.Body, - Stream: ep.apiep.Stream, - }, - Services: services, - } - - return svc, nil -} - -func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) { - if r.isClosed() { - return nil, errors.New("router closed") - } - - r.RLock() - defer r.RUnlock() - - var idx int - if len(req.URL.Path) > 0 && req.URL.Path != "/" { - idx = 1 - } - path := strings.Split(req.URL.Path[idx:], "/") - // use the first match - // TODO: weighted matching - - for _, ep := range r.eps { - var mMatch, hMatch, pMatch bool - - // 1. try method - for _, m := range ep.apiep.Method { - if m == req.Method { - mMatch = true - break - } - } - if !mMatch { - continue - } - if logger.V(logger.TraceLevel) { - logger.Tracef("api method match %s", req.Method) - } - - // 2. try host - if len(ep.apiep.Host) == 0 { - hMatch = true - } else { - for idx, h := range ep.apiep.Host { - if h == "" || h == "*" { - hMatch = true - break - } else { - if ep.hostregs[idx].MatchString(req.URL.Host) { - hMatch = true - break - } - } - } - } - if !hMatch { - continue - } - if logger.V(logger.TraceLevel) { - logger.Tracef("api host match %s", req.URL.Host) - } - - // 3. try google.api path - for _, pathreg := range ep.pathregs { - matches, err := pathreg.Match(path, "") - if err != nil { - if logger.V(logger.TraceLevel) { - logger.Tracef("api gpath not match %s != %v", path, pathreg) - } - continue - } - if logger.V(logger.TraceLevel) { - logger.Tracef("api gpath match %s = %v", path, pathreg) - } - pMatch = true - ctx := req.Context() - md, ok := metadata.FromContext(ctx) - if !ok { - md = make(metadata.Metadata) - } - for k, v := range matches { - md[fmt.Sprintf("x-api-field-%s", k)] = v - } - md["x-api-body"] = ep.apiep.Body - *req = *req.Clone(metadata.NewContext(ctx, md)) - break - } - - if !pMatch { - // 4. try path via pcre path matching - for _, pathreg := range ep.pcreregs { - if !pathreg.MatchString(req.URL.Path) { - if logger.V(logger.TraceLevel) { - logger.Tracef("api pcre path not match %s != %v", req.URL.Path, pathreg) - } - continue - } - pMatch = true - break - } - } - - if !pMatch { - continue - } - // TODO: Percentage traffic - - // we got here, so its a match - return ep, nil - } - - // no match - return nil, fmt.Errorf("endpoint not found for %v", req.URL) -} - -func (r *staticRouter) Route(req *http.Request) (*api.Service, error) { - if r.isClosed() { - return nil, errors.New("router closed") - } - - // try get an endpoint - ep, err := r.Endpoint(req) - if err != nil { - return nil, err - } - - return ep, nil -} - -func NewRouter(opts ...router.Option) *staticRouter { - options := router.NewOptions(opts...) - r := &staticRouter{ - exit: make(chan bool), - opts: options, - eps: make(map[string]*endpoint), - } - //go r.watch() - //go r.refresh() - return r -} -- 2.45.2 From e41bb5ebc57df5f33577e7d83b62b877d108b039 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 4 Nov 2020 00:38:12 +0300 Subject: [PATCH 2/2] rewrite logger Signed-off-by: Vasiliy Tolstov --- api/handler/rpc/rpc.go | 12 +-- api/handler/rpc/stream.go | 20 ++--- api/resolver/subdomain/subdomain.go | 2 +- api/server/acme/autocert/autocert.go | 2 +- api/server/http/http.go | 21 +++-- api/server/options.go | 12 ++- client/noop.go | 2 +- debug/log/memory/memory.go | 5 +- logger/helper.go | 114 ------------------------ logger/level.go | 60 +------------ logger/logger.go | 61 ++++++++----- logger/logger_test.go | 15 ++-- logger/{default.go => micro.go} | 128 ++++++++++----------------- logger/options.go | 9 +- network/tunnel/broker/broker.go | 4 +- registry/extractor.go | 4 +- server/noop.go | 68 +++++++------- server/subscriber.go | 9 +- service.go | 18 +++- util/auth/auth.go | 4 +- util/kubernetes/api/request.go | 2 +- util/kubernetes/client/client.go | 12 +-- util/mdns/server.go | 10 +-- util/router/parse.go | 10 +-- util/router/parse_test.go | 2 +- util/router/runtime.go | 16 ++-- 26 files changed, 235 insertions(+), 387 deletions(-) delete mode 100644 logger/helper.go rename logger/{default.go => micro.go} (53%) diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index 07950363..127e2dfa 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -224,7 +224,7 @@ func requestPayload(r *http.Request) ([]byte, error) { case strings.Contains(ct, "application/json-rpc"): msg := codec.Message{ Type: codec.Request, - Header: make(map[string]string), + Header: metadata.New(0), } c := jsonrpc.NewCodec(&buffer{r.Body}) if err = c.ReadHeader(&msg, codec.Request); err != nil { @@ -238,7 +238,7 @@ func requestPayload(r *http.Request) ([]byte, error) { case strings.Contains(ct, "application/proto-rpc"), strings.Contains(ct, "application/octet-stream"): msg := codec.Message{ Type: codec.Request, - Header: make(map[string]string), + Header: metadata.New(0), } c := protorpc.NewCodec(&buffer{r.Body}) if err = c.ReadHeader(&msg, codec.Request); err != nil { @@ -253,7 +253,7 @@ func requestPayload(r *http.Request) ([]byte, error) { r.ParseForm() // generate a new set of values from the form - vals := make(map[string]string) + vals := make(map[string]string, len(r.Form)) for k, v := range r.Form { vals[k] = strings.Join(v, ",") } @@ -268,7 +268,7 @@ func requestPayload(r *http.Request) ([]byte, error) { // dont user metadata.FromContext as it mangles names md, ok := metadata.FromContext(ctx) if !ok { - md = make(map[string]string) + md = metadata.New(0) } // allocate maximum @@ -445,7 +445,7 @@ func writeError(w http.ResponseWriter, r *http.Request, err error) { _, werr := w.Write([]byte(ce.Error())) if werr != nil { if logger.V(logger.ErrorLevel) { - logger.Error(werr) + logger.Error(werr.Error()) } } } @@ -471,7 +471,7 @@ func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) { _, err := w.Write(rsp) if err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } } diff --git a/api/handler/rpc/stream.go b/api/handler/rpc/stream.go index 541b8df6..067c890e 100644 --- a/api/handler/rpc/stream.go +++ b/api/handler/rpc/stream.go @@ -50,7 +50,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, payload, err := requestPayload(r) if err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } @@ -73,7 +73,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, conn, rw, _, err := upgrader.Upgrade(r, w) if err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } @@ -81,7 +81,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, defer func() { if err := conn.Close(); err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } @@ -117,7 +117,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, stream, err := c.Stream(ctx, req, callOpt) if err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } @@ -125,7 +125,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, if request != nil { if err = stream.Send(request); err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } @@ -151,7 +151,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, return } if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } @@ -159,13 +159,13 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, // write the response if err := wsutil.WriteServerMessage(rw, op, buf); err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } if err = rw.Flush(); err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } @@ -196,7 +196,7 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) { } } if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } @@ -213,7 +213,7 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) { request := &raw.Frame{Data: buf} if err := stream.Send(request); err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } return } diff --git a/api/resolver/subdomain/subdomain.go b/api/resolver/subdomain/subdomain.go index e4a1e0db..1cec59c1 100644 --- a/api/resolver/subdomain/subdomain.go +++ b/api/resolver/subdomain/subdomain.go @@ -55,7 +55,7 @@ func (r *Resolver) Domain(req *http.Request) string { domain, err := publicsuffix.EffectiveTLDPlusOne(host) if err != nil { if logger.V(logger.DebugLevel) { - logger.Debugf("Unable to extract domain from %v", host) + logger.Debug("Unable to extract domain from %v", host) } return "" } diff --git a/api/server/acme/autocert/autocert.go b/api/server/acme/autocert/autocert.go index da3625a3..38438fa6 100644 --- a/api/server/acme/autocert/autocert.go +++ b/api/server/acme/autocert/autocert.go @@ -36,7 +36,7 @@ func (a *autocertProvider) TLSConfig(hosts ...string) (*tls.Config, error) { dir := cacheDir() if err := os.MkdirAll(dir, 0700); err != nil { if logger.V(logger.InfoLevel) { - logger.Infof("warning: autocert not using a cache: %v", err) + logger.Info("warning: autocert not using a cache: %v", err) } } else { m.Cache = autocert.DirCache(dir) diff --git a/api/server/http/http.go b/api/server/http/http.go index 7486d1be..1366dde3 100644 --- a/api/server/http/http.go +++ b/api/server/http/http.go @@ -15,7 +15,7 @@ type httpServer struct { mux *http.ServeMux opts server.Options - mtx sync.RWMutex + sync.RWMutex address string exit chan chan error } @@ -30,8 +30,8 @@ func NewServer(address string, opts ...server.Option) server.Server { } func (s *httpServer) Address() string { - s.mtx.RLock() - defer s.mtx.RUnlock() + s.RLock() + defer s.RUnlock() return s.address } @@ -57,6 +57,9 @@ func (s *httpServer) Start() error { var l net.Listener var err error + s.RLock() + config := s.opts + s.RUnlock() if s.opts.EnableACME && s.opts.ACMEProvider != nil { // should we check the address to make sure its using :443? l, err = s.opts.ACMEProvider.Listen(s.opts.ACMEHosts...) @@ -70,19 +73,19 @@ func (s *httpServer) Start() error { return err } - if logger.V(logger.InfoLevel) { - logger.Infof("HTTP API Listening on %s", l.Addr().String()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("HTTP API Listening on %s", l.Addr().String()) } - s.mtx.Lock() + s.Lock() s.address = l.Addr().String() - s.mtx.Unlock() + s.Unlock() go func() { if err := http.Serve(l, s.mux); err != nil { // temporary fix - if logger.V(logger.ErrorLevel) { - logger.Errorf("serve err: %v", err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("serve err: %v", err) } s.Stop() } diff --git a/api/server/options.go b/api/server/options.go index f58d2b08..be1e723f 100644 --- a/api/server/options.go +++ b/api/server/options.go @@ -6,6 +6,7 @@ import ( "github.com/unistack-org/micro/v3/api/resolver" "github.com/unistack-org/micro/v3/api/server/acme" + "github.com/unistack-org/micro/v3/logger" ) // Option func @@ -21,11 +22,14 @@ type Options struct { TLSConfig *tls.Config Resolver resolver.Resolver Wrappers []Wrapper + Logger logger.Logger } // NewOptions returns new Options func NewOptions(opts ...Option) Options { - options := Options{} + options := Options{ + Logger: logger.DefaultLogger, + } for _, o := range opts { o(&options) } @@ -81,3 +85,9 @@ func Resolver(r resolver.Resolver) Option { o.Resolver = r } } + +func Logger(l logger.Logger) Option { + return func(o *Options) { + o.Logger = l + } +} diff --git a/client/noop.go b/client/noop.go index cee3c396..91349a9a 100644 --- a/client/noop.go +++ b/client/noop.go @@ -162,7 +162,7 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti md, ok := metadata.FromContext(ctx) if !ok { - md = make(map[string]string) + md = metadata.New(0) } md["Content-Type"] = p.ContentType() md["Micro-Topic"] = p.Topic() diff --git a/debug/log/memory/memory.go b/debug/log/memory/memory.go index d17e0da9..95a1401f 100644 --- a/debug/log/memory/memory.go +++ b/debug/log/memory/memory.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/unistack-org/micro/v3/debug/log" + "github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/util/ring" ) @@ -94,7 +95,7 @@ func (l *memoryLog) Stream() (log.Stream, error) { records <- log.Record{ Timestamp: entry.Timestamp, Message: entry.Value, - Metadata: make(map[string]string), + Metadata: metadata.New(0), } } // now stream continuously @@ -102,7 +103,7 @@ func (l *memoryLog) Stream() (log.Stream, error) { records <- log.Record{ Timestamp: entry.Timestamp, Message: entry.Value, - Metadata: make(map[string]string), + Metadata: metadata.New(0), } } }() diff --git a/logger/helper.go b/logger/helper.go deleted file mode 100644 index 94c2f364..00000000 --- a/logger/helper.go +++ /dev/null @@ -1,114 +0,0 @@ -package logger - -import ( - "os" -) - -type Helper struct { - Logger - fields map[string]interface{} -} - -func NewHelper(log Logger) *Helper { - return &Helper{Logger: log} -} - -func (h *Helper) Info(args ...interface{}) { - if !h.Logger.Options().Level.Enabled(InfoLevel) { - return - } - h.Logger.Fields(h.fields).Log(InfoLevel, args...) -} - -func (h *Helper) Infof(template string, args ...interface{}) { - if !h.Logger.Options().Level.Enabled(InfoLevel) { - return - } - h.Logger.Fields(h.fields).Logf(InfoLevel, template, args...) -} - -func (h *Helper) Trace(args ...interface{}) { - if !h.Logger.Options().Level.Enabled(TraceLevel) { - return - } - h.Logger.Fields(h.fields).Log(TraceLevel, args...) -} - -func (h *Helper) Tracef(template string, args ...interface{}) { - if !h.Logger.Options().Level.Enabled(TraceLevel) { - return - } - h.Logger.Fields(h.fields).Logf(TraceLevel, template, args...) -} - -func (h *Helper) Debug(args ...interface{}) { - if !h.Logger.Options().Level.Enabled(DebugLevel) { - return - } - h.Logger.Fields(h.fields).Log(DebugLevel, args...) -} - -func (h *Helper) Debugf(template string, args ...interface{}) { - if !h.Logger.Options().Level.Enabled(DebugLevel) { - return - } - h.Logger.Fields(h.fields).Logf(DebugLevel, template, args...) -} - -func (h *Helper) Warn(args ...interface{}) { - if !h.Logger.Options().Level.Enabled(WarnLevel) { - return - } - h.Logger.Fields(h.fields).Log(WarnLevel, args...) -} - -func (h *Helper) Warnf(template string, args ...interface{}) { - if !h.Logger.Options().Level.Enabled(WarnLevel) { - return - } - h.Logger.Fields(h.fields).Logf(WarnLevel, template, args...) -} - -func (h *Helper) Error(args ...interface{}) { - if !h.Logger.Options().Level.Enabled(ErrorLevel) { - return - } - h.Logger.Fields(h.fields).Log(ErrorLevel, args...) -} - -func (h *Helper) Errorf(template string, args ...interface{}) { - if !h.Logger.Options().Level.Enabled(ErrorLevel) { - return - } - h.Logger.Fields(h.fields).Logf(ErrorLevel, template, args...) -} - -func (h *Helper) Fatal(args ...interface{}) { - if !h.Logger.Options().Level.Enabled(FatalLevel) { - return - } - h.Logger.Fields(h.fields).Log(FatalLevel, args...) - os.Exit(1) -} - -func (h *Helper) Fatalf(template string, args ...interface{}) { - if !h.Logger.Options().Level.Enabled(FatalLevel) { - return - } - h.Logger.Fields(h.fields).Logf(FatalLevel, template, args...) - os.Exit(1) -} - -func (h *Helper) WithError(err error) *Helper { - fields := copyFields(h.fields) - fields["error"] = err - return &Helper{Logger: h.Logger, fields: fields} -} - -func (h *Helper) WithFields(fields map[string]interface{}) *Helper { - nfields := copyFields(fields) - for k, v := range h.fields { - nfields[k] = v - } - return &Helper{Logger: h.Logger, fields: nfields} -} diff --git a/logger/level.go b/logger/level.go index 4e27e204..6f07e2ff 100644 --- a/logger/level.go +++ b/logger/level.go @@ -2,7 +2,6 @@ package logger import ( "fmt" - "os" ) type Level int8 @@ -19,7 +18,7 @@ const ( WarnLevel // ErrorLevel level. Logs. Used for errors that should definitely be noted. ErrorLevel - // FatalLevel level. Logs and then calls `logger.Exit(1)`. highest level of severity. + // FatalLevel level. Logs and then calls `os.Exit(1)`. highest level of severity. FatalLevel ) @@ -63,60 +62,5 @@ func GetLevel(levelStr string) (Level, error) { case FatalLevel.String(): return FatalLevel, nil } - return InfoLevel, fmt.Errorf("Unknown Level String: '%s', defaulting to InfoLevel", levelStr) -} - -func Info(args ...interface{}) { - DefaultLogger.Log(InfoLevel, args...) -} - -func Infof(template string, args ...interface{}) { - DefaultLogger.Logf(InfoLevel, template, args...) -} - -func Trace(args ...interface{}) { - DefaultLogger.Log(TraceLevel, args...) -} - -func Tracef(template string, args ...interface{}) { - DefaultLogger.Logf(TraceLevel, template, args...) -} - -func Debug(args ...interface{}) { - DefaultLogger.Log(DebugLevel, args...) -} - -func Debugf(template string, args ...interface{}) { - DefaultLogger.Logf(DebugLevel, template, args...) -} - -func Warn(args ...interface{}) { - DefaultLogger.Log(WarnLevel, args...) -} - -func Warnf(template string, args ...interface{}) { - DefaultLogger.Logf(WarnLevel, template, args...) -} - -func Error(args ...interface{}) { - DefaultLogger.Log(ErrorLevel, args...) -} - -func Errorf(template string, args ...interface{}) { - DefaultLogger.Logf(ErrorLevel, template, args...) -} - -func Fatal(args ...interface{}) { - DefaultLogger.Log(FatalLevel, args...) - os.Exit(1) -} - -func Fatalf(template string, args ...interface{}) { - DefaultLogger.Logf(FatalLevel, template, args...) - os.Exit(1) -} - -// Returns true if the given level is at or lower the current logger level -func V(lvl Level) bool { - return DefaultLogger.Options().Level <= lvl + return InfoLevel, fmt.Errorf("unknown Level String: '%s', use InfoLevel", levelStr) } diff --git a/logger/logger.go b/logger/logger.go index 6af487a2..e2d5711f 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -3,7 +3,7 @@ package logger var ( // DefaultLogger variable - DefaultLogger Logger = NewHelper(NewLogger()) + DefaultLogger Logger = NewLogger() ) // Logger is a generic logging interface @@ -16,14 +16,50 @@ type Logger interface { Options() Options // Fields set fields to always be logged Fields(fields map[string]interface{}) Logger - // Log writes a log entry - Log(level Level, v ...interface{}) - // Logf writes a formatted log entry - Logf(level Level, format string, v ...interface{}) + // Info level message + Info(msg string, args ...interface{}) + // Trace level message + Trace(msg string, args ...interface{}) + // Debug level message + Debug(msg string, args ...interface{}) + // Warn level message + Warn(msg string, args ...interface{}) + // Error level message + Error(msg string, args ...interface{}) + // Fatal level message + Fatal(msg string, args ...interface{}) // String returns the name of logger String() string } +func Info(msg string, args ...interface{}) { + DefaultLogger.Info(msg, args...) +} + +func Error(msg string, args ...interface{}) { + DefaultLogger.Error(msg, args...) +} + +func Debug(msg string, args ...interface{}) { + DefaultLogger.Debug(msg, args...) +} + +func Warn(msg string, args ...interface{}) { + DefaultLogger.Warn(msg, args...) +} + +func Trace(msg string, args ...interface{}) { + DefaultLogger.Trace(msg, args...) +} + +func Fatal(msg string, args ...interface{}) { + DefaultLogger.Fatal(msg, args...) +} + +func V(level Level) bool { + return DefaultLogger.V(level) +} + // Init initialize logger func Init(opts ...Option) error { return DefaultLogger.Init(opts...) @@ -33,18 +69,3 @@ func Init(opts ...Option) error { func Fields(fields map[string]interface{}) Logger { return DefaultLogger.Fields(fields) } - -// Log writes log with specific level -func Log(level Level, v ...interface{}) { - DefaultLogger.Log(level, v...) -} - -// Logf writes formatted log with specific level -func Logf(level Level, format string, v ...interface{}) { - DefaultLogger.Logf(level, format, v...) -} - -// String return logger name -func String() string { - return DefaultLogger.String() -} diff --git a/logger/logger_test.go b/logger/logger_test.go index f36a3a3b..10f0b0c2 100644 --- a/logger/logger_test.go +++ b/logger/logger_test.go @@ -6,13 +6,10 @@ import ( func TestLogger(t *testing.T) { l := NewLogger(WithLevel(TraceLevel)) - h1 := NewHelper(l).WithFields(map[string]interface{}{"key1": "val1"}) - h1.Trace("trace_msg1") - h1.Warn("warn_msg1") - - h2 := NewHelper(l).WithFields(map[string]interface{}{"key2": "val2"}) - h2.Trace("trace_msg2") - h2.Warn("warn_msg2") - - l.Fields(map[string]interface{}{"key3": "val4"}).Log(InfoLevel, "test_msg") + if err := l.Init(); err != nil { + t.Fatal(err) + } + l.Trace("trace_msg1") + l.Warn("warn_msg1") + l.Fields(map[string]interface{}{"error": "test"}).Info("error message") } diff --git a/logger/default.go b/logger/micro.go similarity index 53% rename from logger/default.go rename to logger/micro.go index 47c0ca56..d4b12c81 100644 --- a/logger/default.go +++ b/logger/micro.go @@ -1,16 +1,13 @@ package logger import ( - "context" + "encoding/json" "fmt" "os" "runtime" - "sort" "strings" "sync" "time" - - dlog "github.com/unistack-org/micro/v3/debug/log" ) func init() { @@ -19,31 +16,33 @@ func init() { lvl = InfoLevel } - DefaultLogger = NewHelper(NewLogger(WithLevel(lvl))) + DefaultLogger = NewLogger(WithLevel(lvl)) } type defaultLogger struct { sync.RWMutex opts Options + enc *json.Encoder } // Init(opts...) should only overwrite provided options func (l *defaultLogger) Init(opts ...Option) error { + l.Lock() + defer l.Unlock() + for _, o := range opts { o(&l.opts) } + l.enc = json.NewEncoder(l.opts.Out) return nil } func (l *defaultLogger) String() string { - return "default" + return "micro" } func (l *defaultLogger) V(level Level) bool { - if l.opts.Level.Enabled(level) { - return true - } - return false + return l.opts.Level.Enabled(level) } func (l *defaultLogger) Fields(fields map[string]interface{}) Logger { @@ -85,45 +84,32 @@ func logCallerfilePath(loggingFilePath string) string { return loggingFilePath[idx+1:] } -func (l *defaultLogger) Log(level Level, v ...interface{}) { - if !l.V(level) { - return - } - - l.RLock() - fields := copyFields(l.opts.Fields) - l.RUnlock() - - fields["level"] = level.String() - - if _, file, line, ok := runtime.Caller(l.opts.CallerSkipCount); ok { - fields["file"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line) - } - - rec := dlog.Record{ - Timestamp: time.Now(), - Message: fmt.Sprint(v...), - Metadata: make(map[string]string, len(fields)), - } - - keys := make([]string, 0, len(fields)) - for k, v := range fields { - keys = append(keys, k) - rec.Metadata[k] = fmt.Sprintf("%v", v) - } - - sort.Strings(keys) - metadata := "" - - for _, k := range keys { - metadata += fmt.Sprintf(" %s=%v", k, fields[k]) - } - - t := rec.Timestamp.Format("2006-01-02 15:04:05") - fmt.Printf("%s %s %v\n", t, metadata, rec.Message) +func (l *defaultLogger) Info(msg string, args ...interface{}) { + l.log(InfoLevel, msg, args...) } -func (l *defaultLogger) Logf(level Level, format string, v ...interface{}) { +func (l *defaultLogger) Error(msg string, args ...interface{}) { + l.log(ErrorLevel, msg, args...) +} + +func (l *defaultLogger) Debug(msg string, args ...interface{}) { + l.log(DebugLevel, msg, args...) +} + +func (l *defaultLogger) Warn(msg string, args ...interface{}) { + l.log(WarnLevel, msg, args...) +} + +func (l *defaultLogger) Trace(msg string, args ...interface{}) { + l.log(TraceLevel, msg, args...) +} + +func (l *defaultLogger) Fatal(msg string, args ...interface{}) { + l.log(FatalLevel, msg, args...) + os.Exit(1) +} + +func (l *defaultLogger) log(level Level, msg string, args ...interface{}) { if !l.V(level) { return } @@ -135,30 +121,20 @@ func (l *defaultLogger) Logf(level Level, format string, v ...interface{}) { fields["level"] = level.String() if _, file, line, ok := runtime.Caller(l.opts.CallerSkipCount); ok { - fields["file"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line) + fields["caller"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line) } - rec := dlog.Record{ - Timestamp: time.Now(), - Message: fmt.Sprintf(format, v...), - Metadata: make(map[string]string, len(fields)), + fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05") + if len(msg) > 0 { + if len(args) > 0 { + fields["msg"] = fmt.Sprintf(msg, args...) + } else { + fields["msg"] = msg + } } - - keys := make([]string, 0, len(fields)) - for k, v := range fields { - keys = append(keys, k) - rec.Metadata[k] = fmt.Sprintf("%v", v) - } - - sort.Strings(keys) - metadata := "" - - for _, k := range keys { - metadata += fmt.Sprintf(" %s=%v", k, fields[k]) - } - - t := rec.Timestamp.Format("2006-01-02 15:04:05") - fmt.Printf("%s %s %v\n", t, metadata, rec.Message) + l.RLock() + _ = l.enc.Encode(fields) + l.RUnlock() } func (l *defaultLogger) Options() Options { @@ -172,19 +148,7 @@ func (l *defaultLogger) Options() Options { // NewLogger builds a new logger based on options func NewLogger(opts ...Option) Logger { - // Default options - options := Options{ - Level: InfoLevel, - Fields: make(map[string]interface{}), - Out: os.Stderr, - CallerSkipCount: 2, - Context: context.Background(), - } - - l := &defaultLogger{opts: options} - if err := l.Init(opts...); err != nil { - l.Log(FatalLevel, err) - } - + l := &defaultLogger{opts: NewOptions(opts...)} + l.enc = json.NewEncoder(l.opts.Out) return l } diff --git a/logger/options.go b/logger/options.go index c0a94604..791de0e3 100644 --- a/logger/options.go +++ b/logger/options.go @@ -3,6 +3,7 @@ package logger import ( "context" "io" + "os" ) type Option func(*Options) @@ -21,7 +22,13 @@ type Options struct { } func NewOptions(opts ...Option) Options { - options := Options{} + options := Options{ + Level: InfoLevel, + Fields: make(map[string]interface{}), + Out: os.Stderr, + CallerSkipCount: 2, + Context: context.Background(), + } for _, o := range opts { o(&options) } diff --git a/network/tunnel/broker/broker.go b/network/tunnel/broker/broker.go index f798f2a3..b13f5cff 100644 --- a/network/tunnel/broker/broker.go +++ b/network/tunnel/broker/broker.go @@ -113,11 +113,11 @@ func (t *tunSubscriber) run() { m := new(transport.Message) if err := c.Recv(m); err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } if err = c.Close(); err != nil { if logger.V(logger.ErrorLevel) { - logger.Error(err) + logger.Error(err.Error()) } } continue diff --git a/registry/extractor.go b/registry/extractor.go index 64ca9458..602eb615 100644 --- a/registry/extractor.go +++ b/registry/extractor.go @@ -4,6 +4,8 @@ import ( "fmt" "reflect" "strings" + + "github.com/unistack-org/micro/v3/metadata" ) // Extract *Value from reflect.Type @@ -94,7 +96,7 @@ func ExtractEndpoint(method reflect.Method) *Endpoint { Name: method.Name, Request: request, Response: response, - Metadata: make(map[string]string), + Metadata: metadata.New(0), } if stream { diff --git a/server/noop.go b/server/noop.go index f3466253..8eb13954 100644 --- a/server/noop.go +++ b/server/noop.go @@ -187,8 +187,8 @@ func (n *noopServer) Register() error { n.RUnlock() if !registered { - if logger.V(logger.InfoLevel) { - logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) } } @@ -220,8 +220,8 @@ func (n *noopServer) Register() error { opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)) - if logger.V(logger.InfoLevel) { - logger.Infof("Subscribing to topic: %s", sb.Topic()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Subscribing to topic: %s", sb.Topic()) } sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...) if err != nil { @@ -250,8 +250,8 @@ func (n *noopServer) Deregister() error { return err } - if logger.V(logger.InfoLevel) { - logger.Infof("Deregistering node: %s", service.Nodes[0].Id) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("deregistering node: %s", service.Nodes[0].Id) } if err := DefaultDeregisterFunc(service, config); err != nil { @@ -280,12 +280,12 @@ func (n *noopServer) Deregister() error { wg.Add(1) go func(s broker.Subscriber) { defer wg.Done() - if logger.V(logger.InfoLevel) { - logger.Infof("Unsubscribing from topic: %s", s.Topic()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("unsubscribing from topic: %s", s.Topic()) } if err := s.Unsubscribe(cx); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Unsubscribing from topic: %s err: %v", s.Topic(), err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("unsubscribing from topic: %s err: %v", s.Topic(), err) } } }(sub) @@ -307,8 +307,8 @@ func (n *noopServer) Start() error { config := n.Options() n.RUnlock() - if logger.V(logger.InfoLevel) { - logger.Infof("Server [noop] Listening on %s", config.Address) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Server [noop] Listening on %s", config.Address) } n.Lock() if len(config.Advertise) == 0 { @@ -320,27 +320,27 @@ func (n *noopServer) Start() error { if len(n.subscribers) > 0 { // connect to the broker if err := config.Broker.Connect(config.Context); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Broker [%s] connect error: %v", config.Broker.String(), err) } return err } - if logger.V(logger.InfoLevel) { - logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) } } // use RegisterCheck func before register if err := config.RegisterCheck(config.Context); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, err) } } else { // announce self to the world if err := n.Register(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server register error: %v", err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server register error: %v", err) } } } @@ -367,24 +367,24 @@ func (n *noopServer) Start() error { n.RUnlock() rerr := config.RegisterCheck(config.Context) if rerr != nil && registered { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) } // deregister self in case of error if err := n.Deregister(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err) } } } else if rerr != nil && !registered { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr) } continue } if err := n.Register(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err) } } // wait for exit @@ -395,8 +395,8 @@ func (n *noopServer) Start() error { // deregister self if err := n.Deregister(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error("Server deregister error: ", err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server deregister error: ", err) } } @@ -419,13 +419,13 @@ func (n *noopServer) Start() error { // close transport ch <- nil - if logger.V(logger.InfoLevel) { - logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) } // disconnect broker if err := config.Broker.Disconnect(config.Context); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Broker [%s] disconnect error: %v", config.Broker.String(), err) } } }() diff --git a/server/subscriber.go b/server/subscriber.go index fa12664f..87e7c899 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -189,9 +189,12 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl return func(p broker.Event) (err error) { defer func() { if r := recover(); r != nil { - if logger.V(logger.ErrorLevel) { - logger.Error("panic recovered: ", r) - logger.Error(string(debug.Stack())) + n.RLock() + config := n.opts + n.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("panic recovered: ", r) + config.Logger.Error(string(debug.Stack())) } err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) } diff --git a/service.go b/service.go index ed9deae3..a77f3d62 100644 --- a/service.go +++ b/service.go @@ -17,6 +17,7 @@ import ( type service struct { opts Options + sync.RWMutex once sync.Once } @@ -128,8 +129,13 @@ func (s *service) String() string { func (s *service) Start() error { var err error - if logger.V(logger.InfoLevel) { - logger.Infof("Starting [service] %s", s.Name()) + + s.RLock() + config := s.opts + s.RUnlock() + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Starting [service] %s", s.Name()) } for _, fn := range s.opts.BeforeStart { @@ -174,8 +180,12 @@ func (s *service) Start() error { } func (s *service) Stop() error { - if logger.V(logger.InfoLevel) { - logger.Infof("Stoppping [service] %s", s.Name()) + s.RLock() + config := s.opts + s.RUnlock() + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Stoppping [service] %s", s.Name()) } var err error diff --git a/util/auth/auth.go b/util/auth/auth.go index 5cf52aa2..ccd75c4c 100644 --- a/util/auth/auth.go +++ b/util/auth/auth.go @@ -26,7 +26,7 @@ func Verify(a auth.Auth) error { return err } if logger.V(logger.DebugLevel) { - logger.Debugf("Auth [%v] Generated an auth account", a.String()) + logger.Debug("Auth [%v] Generated an auth account: %s", a.String()) } accID = acc.ID @@ -68,7 +68,7 @@ func Verify(a auth.Auth) error { ) if err != nil { if logger.V(logger.WarnLevel) { - logger.Warnf("[Auth] Error refreshing token: %v", err) + logger.Warn("[Auth] Error refreshing token: %v", err) } continue } diff --git a/util/kubernetes/api/request.go b/util/kubernetes/api/request.go index 81b7441a..6aff945a 100644 --- a/util/kubernetes/api/request.go +++ b/util/kubernetes/api/request.go @@ -219,7 +219,7 @@ func (r *Request) Do() *Response { } } - logger.Debugf("[Kubernetes] %v %v", req.Method, req.URL.String()) + logger.Debug("[Kubernetes] %v %v", req.Method, req.URL.String()) res, err := r.client.Do(req) if err != nil { return &Response{ diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go index e390e25a..d37a2bd1 100644 --- a/util/kubernetes/client/client.go +++ b/util/kubernetes/client/client.go @@ -228,7 +228,7 @@ func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) { // NewService returns default micro kubernetes service definition func NewService(name, version, typ, namespace string) *Service { if logger.V(logger.TraceLevel) { - logger.Tracef("kubernetes default service: name: %s, version: %s", name, version) + logger.Trace("kubernetes default service: name: %s, version: %s", name, version) } Labels := map[string]string{ @@ -271,7 +271,7 @@ func NewService(name, version, typ, namespace string) *Service { // NewService returns default micro kubernetes deployment definition func NewDeployment(name, version, typ, namespace string) *Deployment { if logger.V(logger.TraceLevel) { - logger.Tracef("kubernetes default deployment: name: %s, version: %s", name, version) + logger.Trace("kubernetes default deployment: name: %s, version: %s", name, version) } Labels := map[string]string{ @@ -363,21 +363,21 @@ func NewClusterClient() *client { s, err := os.Stat(serviceAccountPath) if err != nil { - logger.Fatal(err) + logger.Fatal(err.Error()) } if s == nil || !s.IsDir() { - logger.Fatal(errors.New("service account not found")) + logger.Fatal("service account not found") } token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token")) if err != nil { - logger.Fatal(err) + logger.Fatal(err.Error()) } t := string(token) crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt")) if err != nil { - logger.Fatal(err) + logger.Fatal(err.Error()) } c := &http.Client{ diff --git a/util/mdns/server.go b/util/mdns/server.go index 44ae8d6b..27b32ba1 100644 --- a/util/mdns/server.go +++ b/util/mdns/server.go @@ -9,7 +9,7 @@ import ( "time" "github.com/miekg/dns" - log "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/logger" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" ) @@ -196,7 +196,7 @@ func (s *Server) recv(c *net.UDPConn) { continue } if err := s.parsePacket(buf[:n], from); err != nil { - log.Errorf("[ERR] mdns: Failed to handle query: %v", err) + logger.Error("[ERR] mdns: Failed to handle query: %v", err) } } } @@ -205,7 +205,7 @@ func (s *Server) recv(c *net.UDPConn) { func (s *Server) parsePacket(packet []byte, from net.Addr) error { var msg dns.Msg if err := msg.Unpack(packet); err != nil { - log.Errorf("[ERR] mdns: Failed to unpack packet: %v", err) + logger.Error("[ERR] mdns: Failed to unpack packet: %v", err) return err } // TODO: This is a bit of a hack @@ -384,7 +384,7 @@ func (s *Server) probe() { for i := 0; i < 3; i++ { if err := s.SendMulticast(q); err != nil { - log.Errorf("[ERR] mdns: failed to send probe:", err.Error()) + logger.Error("[ERR] mdns: failed to send probe: %v", err) } time.Sleep(time.Duration(randomizer.Intn(250)) * time.Millisecond) } @@ -410,7 +410,7 @@ func (s *Server) probe() { timer := time.NewTimer(timeout) for i := 0; i < 3; i++ { if err := s.SendMulticast(resp); err != nil { - log.Errorf("[ERR] mdns: failed to send announcement:", err.Error()) + logger.Error("[ERR] mdns: failed to send announcement:", err.Error()) } select { case <-timer.C: diff --git a/util/router/parse.go b/util/router/parse.go index 9ba06101..c736d453 100644 --- a/util/router/parse.go +++ b/util/router/parse.go @@ -103,20 +103,20 @@ type parser struct { // topLevelSegments is the target of this parser. func (p *parser) topLevelSegments() ([]segment, error) { if logger.V(logger.TraceLevel) { - logger.Debugf("Parsing %q", p.tokens) + logger.Debug("Parsing %q", p.tokens) } segs, err := p.segments() if err != nil { return nil, err } if logger.V(logger.TraceLevel) { - logger.Tracef("accept segments: %q; %q", p.accepted, p.tokens) + logger.Trace("accept segments: %q; %q", p.accepted, p.tokens) } if _, err := p.accept(typeEOF); err != nil { return nil, fmt.Errorf("unexpected token %q after segments %q", p.tokens[0], strings.Join(p.accepted, "")) } if logger.V(logger.TraceLevel) { - logger.Tracef("accept eof: %q; %q", p.accepted, p.tokens) + logger.Trace("accept eof: %q; %q", p.accepted, p.tokens) } return segs, nil } @@ -128,7 +128,7 @@ func (p *parser) segments() ([]segment, error) { } if logger.V(logger.TraceLevel) { - logger.Tracef("accept segment: %q; %q", p.accepted, p.tokens) + logger.Trace("accept segment: %q; %q", p.accepted, p.tokens) } segs := []segment{s} for { @@ -141,7 +141,7 @@ func (p *parser) segments() ([]segment, error) { } segs = append(segs, s) if logger.V(logger.TraceLevel) { - logger.Tracef("accept segment: %q; %q", p.accepted, p.tokens) + logger.Trace("accept segment: %q; %q", p.accepted, p.tokens) } } } diff --git a/util/router/parse_test.go b/util/router/parse_test.go index 526d2f37..9806d1b3 100644 --- a/util/router/parse_test.go +++ b/util/router/parse_test.go @@ -316,6 +316,6 @@ func TestParseSegmentsWithErrors(t *testing.T) { t.Errorf("parser{%q}.segments() succeeded; want InvalidTemplateError; accepted %#v", spec.tokens, segs) continue } - logger.Info(err) + logger.Info(err.Error()) } } diff --git a/util/router/runtime.go b/util/router/runtime.go index bd17445e..0d46cb68 100644 --- a/util/router/runtime.go +++ b/util/router/runtime.go @@ -63,7 +63,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt if version != 1 { if logger.V(logger.DebugLevel) { - logger.Debugf("unsupported version: %d", version) + logger.Debug("unsupported version: %d", version) } return Pattern{}, ErrInvalidPattern } @@ -71,7 +71,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt l := len(ops) if l%2 != 0 { if logger.V(logger.DebugLevel) { - logger.Debugf("odd number of ops codes: %d", l) + logger.Debug("odd number of ops codes: %d", l) } return Pattern{}, ErrInvalidPattern } @@ -105,7 +105,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt case OpLitPush: if op.operand < 0 || len(pool) <= op.operand { if logger.V(logger.TraceLevel) { - logger.Tracef("negative literal index: %d", op.operand) + logger.Trace("negative literal index: %d", op.operand) } return Pattern{}, ErrInvalidPattern } @@ -116,7 +116,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt case OpConcatN: if op.operand <= 0 { if logger.V(logger.TraceLevel) { - logger.Tracef("negative concat size: %d", op.operand) + logger.Trace("negative concat size: %d", op.operand) } return Pattern{}, ErrInvalidPattern } @@ -131,7 +131,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt case OpCapture: if op.operand < 0 || len(pool) <= op.operand { if logger.V(logger.TraceLevel) { - logger.Tracef("variable name index out of bound: %d", op.operand) + logger.Trace("variable name index out of bound: %d", op.operand) } return Pattern{}, ErrInvalidPattern } @@ -147,7 +147,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt } default: if logger.V(logger.DebugLevel) { - logger.Tracef("invalid opcode: %d", op.code) + logger.Trace("invalid opcode: %d", op.code) } return Pattern{}, ErrInvalidPattern } @@ -172,7 +172,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt func MustPattern(p Pattern, err error) Pattern { if err != nil { if logger.V(logger.FatalLevel) { - logger.Fatalf("Pattern initialization failed: %v", err) + logger.Fatal("Pattern initialization failed: %v", err) } } return p @@ -235,7 +235,7 @@ func (p Pattern) Match(components []string, verb string) (map[string]string, err if pos < l { return nil, ErrNotMatch } - bindings := make(map[string]string) + bindings := make(map[string]string, len(captured)) for i, val := range captured { bindings[p.vars[i]] = val } -- 2.45.2