commit
1fbf8b2e20
@ -224,7 +224,7 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
|||||||
case strings.Contains(ct, "application/json-rpc"):
|
case strings.Contains(ct, "application/json-rpc"):
|
||||||
msg := codec.Message{
|
msg := codec.Message{
|
||||||
Type: codec.Request,
|
Type: codec.Request,
|
||||||
Header: make(map[string]string),
|
Header: metadata.New(0),
|
||||||
}
|
}
|
||||||
c := jsonrpc.NewCodec(&buffer{r.Body})
|
c := jsonrpc.NewCodec(&buffer{r.Body})
|
||||||
if err = c.ReadHeader(&msg, codec.Request); err != nil {
|
if err = c.ReadHeader(&msg, codec.Request); err != nil {
|
||||||
@ -238,7 +238,7 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
|||||||
case strings.Contains(ct, "application/proto-rpc"), strings.Contains(ct, "application/octet-stream"):
|
case strings.Contains(ct, "application/proto-rpc"), strings.Contains(ct, "application/octet-stream"):
|
||||||
msg := codec.Message{
|
msg := codec.Message{
|
||||||
Type: codec.Request,
|
Type: codec.Request,
|
||||||
Header: make(map[string]string),
|
Header: metadata.New(0),
|
||||||
}
|
}
|
||||||
c := protorpc.NewCodec(&buffer{r.Body})
|
c := protorpc.NewCodec(&buffer{r.Body})
|
||||||
if err = c.ReadHeader(&msg, codec.Request); err != nil {
|
if err = c.ReadHeader(&msg, codec.Request); err != nil {
|
||||||
@ -253,7 +253,7 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
|||||||
r.ParseForm()
|
r.ParseForm()
|
||||||
|
|
||||||
// generate a new set of values from the form
|
// generate a new set of values from the form
|
||||||
vals := make(map[string]string)
|
vals := make(map[string]string, len(r.Form))
|
||||||
for k, v := range r.Form {
|
for k, v := range r.Form {
|
||||||
vals[k] = strings.Join(v, ",")
|
vals[k] = strings.Join(v, ",")
|
||||||
}
|
}
|
||||||
@ -268,7 +268,7 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
|||||||
// dont user metadata.FromContext as it mangles names
|
// dont user metadata.FromContext as it mangles names
|
||||||
md, ok := metadata.FromContext(ctx)
|
md, ok := metadata.FromContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
md = make(map[string]string)
|
md = metadata.New(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// allocate maximum
|
// allocate maximum
|
||||||
@ -445,7 +445,7 @@ func writeError(w http.ResponseWriter, r *http.Request, err error) {
|
|||||||
_, werr := w.Write([]byte(ce.Error()))
|
_, werr := w.Write([]byte(ce.Error()))
|
||||||
if werr != nil {
|
if werr != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(werr)
|
logger.Error(werr.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -471,7 +471,7 @@ func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) {
|
|||||||
_, err := w.Write(rsp)
|
_, err := w.Write(rsp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
|||||||
payload, err := requestPayload(r)
|
payload, err := requestPayload(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -73,7 +73,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
|||||||
conn, rw, _, err := upgrader.Upgrade(r, w)
|
conn, rw, _, err := upgrader.Upgrade(r, w)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -81,7 +81,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
|||||||
defer func() {
|
defer func() {
|
||||||
if err := conn.Close(); err != nil {
|
if err := conn.Close(); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -117,7 +117,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
|||||||
stream, err := c.Stream(ctx, req, callOpt)
|
stream, err := c.Stream(ctx, req, callOpt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -125,7 +125,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
|||||||
if request != nil {
|
if request != nil {
|
||||||
if err = stream.Send(request); err != nil {
|
if err = stream.Send(request); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -151,7 +151,7 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -159,13 +159,13 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
|||||||
// write the response
|
// write the response
|
||||||
if err := wsutil.WriteServerMessage(rw, op, buf); err != nil {
|
if err := wsutil.WriteServerMessage(rw, op, buf); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err = rw.Flush(); err != nil {
|
if err = rw.Flush(); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -196,7 +196,7 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -213,7 +213,7 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) {
|
|||||||
request := &raw.Frame{Data: buf}
|
request := &raw.Frame{Data: buf}
|
||||||
if err := stream.Send(request); err != nil {
|
if err := stream.Send(request); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,7 @@ func (r *Resolver) Domain(req *http.Request) string {
|
|||||||
domain, err := publicsuffix.EffectiveTLDPlusOne(host)
|
domain, err := publicsuffix.EffectiveTLDPlusOne(host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger.V(logger.DebugLevel) {
|
if logger.V(logger.DebugLevel) {
|
||||||
logger.Debugf("Unable to extract domain from %v", host)
|
logger.Debug("Unable to extract domain from %v", host)
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
@ -1,498 +0,0 @@
|
|||||||
// Package registry provides a dynamic api service router
|
|
||||||
package registry
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"regexp"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/unistack-org/micro/v3/api"
|
|
||||||
"github.com/unistack-org/micro/v3/api/router"
|
|
||||||
"github.com/unistack-org/micro/v3/logger"
|
|
||||||
"github.com/unistack-org/micro/v3/metadata"
|
|
||||||
"github.com/unistack-org/micro/v3/registry"
|
|
||||||
util "github.com/unistack-org/micro/v3/util/router"
|
|
||||||
)
|
|
||||||
|
|
||||||
// endpoint struct, that holds compiled pcre
|
|
||||||
type endpoint struct {
|
|
||||||
hostregs []*regexp.Regexp
|
|
||||||
pathregs []util.Pattern
|
|
||||||
pcreregs []*regexp.Regexp
|
|
||||||
}
|
|
||||||
|
|
||||||
// router is the default router
|
|
||||||
type registryRouter struct {
|
|
||||||
exit chan bool
|
|
||||||
opts router.Options
|
|
||||||
|
|
||||||
sync.RWMutex
|
|
||||||
eps map[string]*api.Service
|
|
||||||
// compiled regexp for host and path
|
|
||||||
ceps map[string]*endpoint
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *registryRouter) isClosed() bool {
|
|
||||||
select {
|
|
||||||
case <-r.exit:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// refresh list of api services
|
|
||||||
func (r *registryRouter) refresh() {
|
|
||||||
var attempts int
|
|
||||||
|
|
||||||
for {
|
|
||||||
services, err := r.opts.Registry.ListServices(r.opts.Context)
|
|
||||||
if err != nil {
|
|
||||||
attempts++
|
|
||||||
if logger.V(logger.ErrorLevel) {
|
|
||||||
logger.Errorf("unable to list services: %v", err)
|
|
||||||
}
|
|
||||||
time.Sleep(time.Duration(attempts) * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
attempts = 0
|
|
||||||
|
|
||||||
// for each service, get service and store endpoints
|
|
||||||
for _, s := range services {
|
|
||||||
service, err := r.opts.Registry.GetService(r.opts.Context, s.Name)
|
|
||||||
if err != nil {
|
|
||||||
if logger.V(logger.ErrorLevel) {
|
|
||||||
logger.Errorf("unable to get service: %v", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
r.store(service)
|
|
||||||
}
|
|
||||||
|
|
||||||
// refresh list in 10 minutes... cruft
|
|
||||||
// use registry watching
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Minute * 10):
|
|
||||||
case <-r.exit:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// process watch event
|
|
||||||
func (r *registryRouter) process(res *registry.Result) {
|
|
||||||
// skip these things
|
|
||||||
if res == nil || res.Service == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// get entry from cache
|
|
||||||
service, err := r.opts.Registry.GetService(r.opts.Context, res.Service.Name)
|
|
||||||
if err != nil {
|
|
||||||
if logger.V(logger.ErrorLevel) {
|
|
||||||
logger.Errorf("unable to get %v service: %v", res.Service.Name, err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// update our local endpoints
|
|
||||||
r.store(service)
|
|
||||||
}
|
|
||||||
|
|
||||||
// store local endpoint cache
|
|
||||||
func (r *registryRouter) store(services []*registry.Service) {
|
|
||||||
// endpoints
|
|
||||||
eps := map[string]*api.Service{}
|
|
||||||
|
|
||||||
// services
|
|
||||||
names := map[string]bool{}
|
|
||||||
|
|
||||||
// create a new endpoint mapping
|
|
||||||
for _, service := range services {
|
|
||||||
// set names we need later
|
|
||||||
names[service.Name] = true
|
|
||||||
|
|
||||||
// map per endpoint
|
|
||||||
for _, sep := range service.Endpoints {
|
|
||||||
// create a key service:endpoint_name
|
|
||||||
key := fmt.Sprintf("%s.%s", service.Name, sep.Name)
|
|
||||||
// decode endpoint
|
|
||||||
end := api.Decode(sep.Metadata)
|
|
||||||
// no endpoint or no name
|
|
||||||
if end == nil || len(end.Name) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// if we got nothing skip
|
|
||||||
if err := api.Validate(end); err != nil {
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("endpoint validation failed: %v", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// try get endpoint
|
|
||||||
ep, ok := eps[key]
|
|
||||||
if !ok {
|
|
||||||
ep = &api.Service{Name: service.Name}
|
|
||||||
}
|
|
||||||
|
|
||||||
// overwrite the endpoint
|
|
||||||
ep.Endpoint = end
|
|
||||||
// append services
|
|
||||||
ep.Services = append(ep.Services, service)
|
|
||||||
// store it
|
|
||||||
eps[key] = ep
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
r.Lock()
|
|
||||||
defer r.Unlock()
|
|
||||||
|
|
||||||
// delete any existing eps for services we know
|
|
||||||
for key, service := range r.eps {
|
|
||||||
// skip what we don't care about
|
|
||||||
if !names[service.Name] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// ok we know this thing
|
|
||||||
// delete delete delete
|
|
||||||
delete(r.eps, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// now set the eps we have
|
|
||||||
for name, ep := range eps {
|
|
||||||
r.eps[name] = ep
|
|
||||||
cep := &endpoint{}
|
|
||||||
|
|
||||||
for _, h := range ep.Endpoint.Host {
|
|
||||||
if h == "" || h == "*" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
hostreg, err := regexp.CompilePOSIX(h)
|
|
||||||
if err != nil {
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("endpoint have invalid host regexp: %v", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cep.hostregs = append(cep.hostregs, hostreg)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, p := range ep.Endpoint.Path {
|
|
||||||
var pcreok bool
|
|
||||||
|
|
||||||
if p[0] == '^' && p[len(p)-1] == '$' {
|
|
||||||
pcrereg, err := regexp.CompilePOSIX(p)
|
|
||||||
if err == nil {
|
|
||||||
cep.pcreregs = append(cep.pcreregs, pcrereg)
|
|
||||||
pcreok = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rule, err := util.Parse(p)
|
|
||||||
if err != nil && !pcreok {
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("endpoint have invalid path pattern: %v", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
} else if err != nil && pcreok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
tpl := rule.Compile()
|
|
||||||
pathreg, err := util.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "")
|
|
||||||
if err != nil {
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("endpoint have invalid path pattern: %v", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cep.pathregs = append(cep.pathregs, pathreg)
|
|
||||||
}
|
|
||||||
|
|
||||||
r.ceps[name] = cep
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// watch for endpoint changes
|
|
||||||
func (r *registryRouter) watch() {
|
|
||||||
var attempts int
|
|
||||||
|
|
||||||
for {
|
|
||||||
if r.isClosed() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// watch for changes
|
|
||||||
w, err := r.opts.Registry.Watch(r.opts.Context)
|
|
||||||
if err != nil {
|
|
||||||
attempts++
|
|
||||||
if logger.V(logger.ErrorLevel) {
|
|
||||||
logger.Errorf("error watching endpoints: %v", err)
|
|
||||||
}
|
|
||||||
time.Sleep(time.Duration(attempts) * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ch := make(chan bool)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
w.Stop()
|
|
||||||
case <-r.exit:
|
|
||||||
w.Stop()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// reset if we get here
|
|
||||||
attempts = 0
|
|
||||||
|
|
||||||
for {
|
|
||||||
// process next event
|
|
||||||
res, err := w.Next()
|
|
||||||
if err != nil {
|
|
||||||
if logger.V(logger.ErrorLevel) {
|
|
||||||
logger.Errorf("error getting next endpoint: %v", err)
|
|
||||||
}
|
|
||||||
close(ch)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
r.process(res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *registryRouter) Options() router.Options {
|
|
||||||
return r.opts
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *registryRouter) Close() error {
|
|
||||||
select {
|
|
||||||
case <-r.exit:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
close(r.exit)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *registryRouter) Register(ep *api.Endpoint) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *registryRouter) Deregister(ep *api.Endpoint) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
|
|
||||||
if r.isClosed() {
|
|
||||||
return nil, errors.New("router closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
r.RLock()
|
|
||||||
defer r.RUnlock()
|
|
||||||
|
|
||||||
var idx int
|
|
||||||
if len(req.URL.Path) > 0 && req.URL.Path != "/" {
|
|
||||||
idx = 1
|
|
||||||
}
|
|
||||||
path := strings.Split(req.URL.Path[idx:], "/")
|
|
||||||
|
|
||||||
// use the first match
|
|
||||||
// TODO: weighted matching
|
|
||||||
for n, e := range r.eps {
|
|
||||||
cep, ok := r.ceps[n]
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
ep := e.Endpoint
|
|
||||||
var mMatch, hMatch, pMatch bool
|
|
||||||
// 1. try method
|
|
||||||
for _, m := range ep.Method {
|
|
||||||
if m == req.Method {
|
|
||||||
mMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !mMatch {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api method match %s", req.Method)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. try host
|
|
||||||
if len(ep.Host) == 0 {
|
|
||||||
hMatch = true
|
|
||||||
} else {
|
|
||||||
for idx, h := range ep.Host {
|
|
||||||
if h == "" || h == "*" {
|
|
||||||
hMatch = true
|
|
||||||
break
|
|
||||||
} else {
|
|
||||||
if cep.hostregs[idx].MatchString(req.URL.Host) {
|
|
||||||
hMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !hMatch {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api host match %s", req.URL.Host)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. try path via google.api path matching
|
|
||||||
for _, pathreg := range cep.pathregs {
|
|
||||||
matches, err := pathreg.Match(path, "")
|
|
||||||
if err != nil {
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api gpath not match %s != %v", path, pathreg)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api gpath match %s = %v", path, pathreg)
|
|
||||||
}
|
|
||||||
pMatch = true
|
|
||||||
ctx := req.Context()
|
|
||||||
md, ok := metadata.FromContext(ctx)
|
|
||||||
if !ok {
|
|
||||||
md = make(metadata.Metadata)
|
|
||||||
}
|
|
||||||
for k, v := range matches {
|
|
||||||
md[fmt.Sprintf("x-api-field-%s", k)] = v
|
|
||||||
}
|
|
||||||
md["x-api-body"] = ep.Body
|
|
||||||
*req = *req.Clone(metadata.NewContext(ctx, md))
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if !pMatch {
|
|
||||||
// 4. try path via pcre path matching
|
|
||||||
for _, pathreg := range cep.pcreregs {
|
|
||||||
if !pathreg.MatchString(req.URL.Path) {
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api pcre path not match %s != %v", path, pathreg)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api pcre path match %s != %v", path, pathreg)
|
|
||||||
}
|
|
||||||
pMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !pMatch {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Percentage traffic
|
|
||||||
// we got here, so its a match
|
|
||||||
return e, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// no match
|
|
||||||
return nil, errors.New("not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *registryRouter) Route(req *http.Request) (*api.Service, error) {
|
|
||||||
if r.isClosed() {
|
|
||||||
return nil, errors.New("router closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// try get an endpoint
|
|
||||||
ep, err := r.Endpoint(req)
|
|
||||||
if err == nil {
|
|
||||||
return ep, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// error not nil
|
|
||||||
// ignore that shit
|
|
||||||
// TODO: don't ignore that shit
|
|
||||||
|
|
||||||
// get the service name
|
|
||||||
rp, err := r.opts.Resolver.Resolve(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// service name
|
|
||||||
name := rp.Name
|
|
||||||
|
|
||||||
// get service
|
|
||||||
services, err := r.opts.Registry.GetService(r.opts.Context, name, registry.GetDomain(rp.Domain))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// only use endpoint matching when the meta handler is set aka api.Default
|
|
||||||
switch r.opts.Handler {
|
|
||||||
// rpc handlers
|
|
||||||
case "meta", "api", "rpc":
|
|
||||||
handler := r.opts.Handler
|
|
||||||
|
|
||||||
// set default handler to api
|
|
||||||
if r.opts.Handler == "meta" {
|
|
||||||
handler = "rpc"
|
|
||||||
}
|
|
||||||
|
|
||||||
// construct api service
|
|
||||||
return &api.Service{
|
|
||||||
Name: name,
|
|
||||||
Endpoint: &api.Endpoint{
|
|
||||||
Name: rp.Method,
|
|
||||||
Handler: handler,
|
|
||||||
},
|
|
||||||
Services: services,
|
|
||||||
}, nil
|
|
||||||
// http handler
|
|
||||||
case "http", "proxy", "web":
|
|
||||||
// construct api service
|
|
||||||
return &api.Service{
|
|
||||||
Name: name,
|
|
||||||
Endpoint: &api.Endpoint{
|
|
||||||
Name: req.URL.String(),
|
|
||||||
Handler: r.opts.Handler,
|
|
||||||
Host: []string{req.Host},
|
|
||||||
Method: []string{req.Method},
|
|
||||||
Path: []string{req.URL.Path},
|
|
||||||
},
|
|
||||||
Services: services,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, errors.New("unknown handler")
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRouter(opts ...router.Option) (*registryRouter, error) {
|
|
||||||
options := router.NewOptions(opts...)
|
|
||||||
if options.Registry == nil {
|
|
||||||
return nil, fmt.Errorf("registry is not set")
|
|
||||||
}
|
|
||||||
r := ®istryRouter{
|
|
||||||
exit: make(chan bool),
|
|
||||||
opts: options,
|
|
||||||
eps: make(map[string]*api.Service),
|
|
||||||
ceps: make(map[string]*endpoint),
|
|
||||||
}
|
|
||||||
go r.watch()
|
|
||||||
go r.refresh()
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewRouter returns the default router
|
|
||||||
func NewRouter(opts ...router.Option) (router.Router, error) {
|
|
||||||
return newRouter(opts...)
|
|
||||||
}
|
|
@ -1,38 +0,0 @@
|
|||||||
package registry
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/unistack-org/micro/v3/registry"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestStoreRegex(t *testing.T) {
|
|
||||||
t.Skip()
|
|
||||||
router, err := newRouter()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
router.store([]*registry.Service{
|
|
||||||
{
|
|
||||||
Name: "Foobar",
|
|
||||||
Version: "latest",
|
|
||||||
Endpoints: []*registry.Endpoint{
|
|
||||||
{
|
|
||||||
Name: "foo",
|
|
||||||
Metadata: map[string]string{
|
|
||||||
"endpoint": "FooEndpoint",
|
|
||||||
"description": "Some description",
|
|
||||||
"method": "POST",
|
|
||||||
"path": "^/foo/$",
|
|
||||||
"handler": "rpc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Metadata: map[string]string{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert.Len(t, router.ceps["Foobar.foo"].pcreregs, 1)
|
|
||||||
}
|
|
@ -1,257 +0,0 @@
|
|||||||
// +build ignore
|
|
||||||
|
|
||||||
package router_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/unistack-org/micro/v3/api"
|
|
||||||
"github.com/unistack-org/micro/v3/api/handler"
|
|
||||||
"github.com/unistack-org/micro/v3/api/handler/rpc"
|
|
||||||
"github.com/unistack-org/micro/v3/api/router"
|
|
||||||
rregistry "github.com/unistack-org/micro/v3/api/router/registry"
|
|
||||||
rstatic "github.com/unistack-org/micro/v3/api/router/static"
|
|
||||||
"github.com/unistack-org/micro/v3/broker"
|
|
||||||
bmemory "github.com/unistack-org/micro/v3/broker/memory"
|
|
||||||
"github.com/unistack-org/micro/v3/client"
|
|
||||||
gcli "github.com/unistack-org/micro/v3/client/grpc"
|
|
||||||
rmemory "github.com/unistack-org/micro/v3/registry/memory"
|
|
||||||
rt "github.com/unistack-org/micro/v3/router"
|
|
||||||
regRouter "github.com/unistack-org/micro/v3/router/registry"
|
|
||||||
"github.com/unistack-org/micro/v3/server"
|
|
||||||
gsrv "github.com/unistack-org/micro/v3/server/grpc"
|
|
||||||
pb "github.com/unistack-org/micro/v3/server/grpc/proto"
|
|
||||||
)
|
|
||||||
|
|
||||||
// server is used to implement helloworld.GreeterServer.
|
|
||||||
type testServer struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestHello implements helloworld.GreeterServer
|
|
||||||
func (s *testServer) Call(ctx context.Context, req *pb.Request, rsp *pb.Response) error {
|
|
||||||
rsp.Msg = "Hello " + req.Uuid
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestHello implements helloworld.GreeterServer
|
|
||||||
func (s *testServer) CallPcre(ctx context.Context, req *pb.Request, rsp *pb.Response) error {
|
|
||||||
rsp.Msg = "Hello " + req.Uuid
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestHello implements helloworld.GreeterServer
|
|
||||||
func (s *testServer) CallPcreInvalid(ctx context.Context, req *pb.Request, rsp *pb.Response) error {
|
|
||||||
rsp.Msg = "Hello " + req.Uuid
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func initial(t *testing.T) (server.Server, client.Client) {
|
|
||||||
r := rmemory.NewRegistry()
|
|
||||||
b := bmemory.NewBroker(broker.Registry(r))
|
|
||||||
|
|
||||||
// create a new client
|
|
||||||
s := gsrv.NewServer(
|
|
||||||
server.Name("foo"),
|
|
||||||
server.Broker(b),
|
|
||||||
server.Registry(r),
|
|
||||||
)
|
|
||||||
|
|
||||||
rtr := regRouter.NewRouter(
|
|
||||||
rt.Registry(r),
|
|
||||||
)
|
|
||||||
|
|
||||||
// create a new server
|
|
||||||
c := gcli.NewClient(
|
|
||||||
client.Router(rtr),
|
|
||||||
client.Broker(b),
|
|
||||||
)
|
|
||||||
|
|
||||||
h := &testServer{}
|
|
||||||
pb.RegisterTestHandler(s, h)
|
|
||||||
|
|
||||||
if err := s.Start(); err != nil {
|
|
||||||
t.Fatalf("failed to start: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, c
|
|
||||||
}
|
|
||||||
|
|
||||||
func check(t *testing.T, addr string, path string, expected string) {
|
|
||||||
req, err := http.NewRequest("POST", fmt.Sprintf(path, addr), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to created http.Request: %v", err)
|
|
||||||
}
|
|
||||||
req.Header.Set("Content-Type", "application/json")
|
|
||||||
rsp, err := (&http.Client{}).Do(req)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to created http.Request: %v", err)
|
|
||||||
}
|
|
||||||
defer rsp.Body.Close()
|
|
||||||
|
|
||||||
buf, err := ioutil.ReadAll(rsp.Body)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
jsonMsg := expected
|
|
||||||
if string(buf) != jsonMsg {
|
|
||||||
t.Fatalf("invalid message received, parsing error %s != %s", buf, jsonMsg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRouterRegistryPcre(t *testing.T) {
|
|
||||||
s, c := initial(t)
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
router := rregistry.NewRouter(
|
|
||||||
router.WithHandler(rpc.Handler),
|
|
||||||
router.WithRegistry(s.Options().Registry),
|
|
||||||
)
|
|
||||||
hrpc := rpc.NewHandler(
|
|
||||||
handler.WithClient(c),
|
|
||||||
handler.WithRouter(router),
|
|
||||||
)
|
|
||||||
hsrv := &http.Server{
|
|
||||||
Handler: hrpc,
|
|
||||||
Addr: "127.0.0.1:6543",
|
|
||||||
WriteTimeout: 15 * time.Second,
|
|
||||||
ReadTimeout: 15 * time.Second,
|
|
||||||
IdleTimeout: 20 * time.Second,
|
|
||||||
MaxHeaderBytes: 1024 * 1024 * 1, // 1Mb
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
log.Println(hsrv.ListenAndServe())
|
|
||||||
}()
|
|
||||||
|
|
||||||
defer hsrv.Close()
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
check(t, hsrv.Addr, "http://%s/api/v0/test/call/TEST", `{"msg":"Hello TEST"}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRouterStaticPcre(t *testing.T) {
|
|
||||||
s, c := initial(t)
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
router := rstatic.NewRouter(
|
|
||||||
router.WithHandler(rpc.Handler),
|
|
||||||
router.WithRegistry(s.Options().Registry),
|
|
||||||
)
|
|
||||||
|
|
||||||
err := router.Register(&api.Endpoint{
|
|
||||||
Name: "foo.Test.Call",
|
|
||||||
Method: []string{"POST"},
|
|
||||||
Path: []string{"^/api/v0/test/call/?$"},
|
|
||||||
Handler: "rpc",
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
hrpc := rpc.NewHandler(
|
|
||||||
handler.WithClient(c),
|
|
||||||
handler.WithRouter(router),
|
|
||||||
)
|
|
||||||
hsrv := &http.Server{
|
|
||||||
Handler: hrpc,
|
|
||||||
Addr: "127.0.0.1:6543",
|
|
||||||
WriteTimeout: 15 * time.Second,
|
|
||||||
ReadTimeout: 15 * time.Second,
|
|
||||||
IdleTimeout: 20 * time.Second,
|
|
||||||
MaxHeaderBytes: 1024 * 1024 * 1, // 1Mb
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
log.Println(hsrv.ListenAndServe())
|
|
||||||
}()
|
|
||||||
defer hsrv.Close()
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
check(t, hsrv.Addr, "http://%s/api/v0/test/call", `{"msg":"Hello "}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRouterStaticGpath(t *testing.T) {
|
|
||||||
s, c := initial(t)
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
router := rstatic.NewRouter(
|
|
||||||
router.WithHandler(rpc.Handler),
|
|
||||||
router.WithRegistry(s.Options().Registry),
|
|
||||||
)
|
|
||||||
|
|
||||||
err := router.Register(&api.Endpoint{
|
|
||||||
Name: "foo.Test.Call",
|
|
||||||
Method: []string{"POST"},
|
|
||||||
Path: []string{"/api/v0/test/call/{uuid}"},
|
|
||||||
Handler: "rpc",
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
hrpc := rpc.NewHandler(
|
|
||||||
handler.WithClient(c),
|
|
||||||
handler.WithRouter(router),
|
|
||||||
)
|
|
||||||
hsrv := &http.Server{
|
|
||||||
Handler: hrpc,
|
|
||||||
Addr: "127.0.0.1:6543",
|
|
||||||
WriteTimeout: 15 * time.Second,
|
|
||||||
ReadTimeout: 15 * time.Second,
|
|
||||||
IdleTimeout: 20 * time.Second,
|
|
||||||
MaxHeaderBytes: 1024 * 1024 * 1, // 1Mb
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
log.Println(hsrv.ListenAndServe())
|
|
||||||
}()
|
|
||||||
defer hsrv.Close()
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
check(t, hsrv.Addr, "http://%s/api/v0/test/call/TEST", `{"msg":"Hello TEST"}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRouterStaticPcreInvalid(t *testing.T) {
|
|
||||||
var ep *api.Endpoint
|
|
||||||
var err error
|
|
||||||
|
|
||||||
s, c := initial(t)
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
router := rstatic.NewRouter(
|
|
||||||
router.WithHandler(rpc.Handler),
|
|
||||||
router.WithRegistry(s.Options().Registry),
|
|
||||||
)
|
|
||||||
|
|
||||||
ep = &api.Endpoint{
|
|
||||||
Name: "foo.Test.Call",
|
|
||||||
Method: []string{"POST"},
|
|
||||||
Path: []string{"^/api/v0/test/call/?"},
|
|
||||||
Handler: "rpc",
|
|
||||||
}
|
|
||||||
|
|
||||||
err = router.Register(ep)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("invalid endpoint %v", ep)
|
|
||||||
}
|
|
||||||
|
|
||||||
ep = &api.Endpoint{
|
|
||||||
Name: "foo.Test.Call",
|
|
||||||
Method: []string{"POST"},
|
|
||||||
Path: []string{"/api/v0/test/call/?$"},
|
|
||||||
Handler: "rpc",
|
|
||||||
}
|
|
||||||
|
|
||||||
err = router.Register(ep)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("invalid endpoint %v", ep)
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = c
|
|
||||||
}
|
|
@ -1,356 +0,0 @@
|
|||||||
package static
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"regexp"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/unistack-org/micro/v3/api"
|
|
||||||
"github.com/unistack-org/micro/v3/api/router"
|
|
||||||
"github.com/unistack-org/micro/v3/logger"
|
|
||||||
"github.com/unistack-org/micro/v3/metadata"
|
|
||||||
"github.com/unistack-org/micro/v3/registry"
|
|
||||||
rutil "github.com/unistack-org/micro/v3/util/registry"
|
|
||||||
util "github.com/unistack-org/micro/v3/util/router"
|
|
||||||
)
|
|
||||||
|
|
||||||
type endpoint struct {
|
|
||||||
apiep *api.Endpoint
|
|
||||||
hostregs []*regexp.Regexp
|
|
||||||
pathregs []util.Pattern
|
|
||||||
pcreregs []*regexp.Regexp
|
|
||||||
}
|
|
||||||
|
|
||||||
// router is the default router
|
|
||||||
type staticRouter struct {
|
|
||||||
exit chan bool
|
|
||||||
opts router.Options
|
|
||||||
sync.RWMutex
|
|
||||||
eps map[string]*endpoint
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *staticRouter) isClosed() bool {
|
|
||||||
select {
|
|
||||||
case <-r.exit:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
// watch for endpoint changes
|
|
||||||
func (r *staticRouter) watch() {
|
|
||||||
var attempts int
|
|
||||||
|
|
||||||
for {
|
|
||||||
if r.isClosed() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// watch for changes
|
|
||||||
w, err := r.opts.Registry.Watch()
|
|
||||||
if err != nil {
|
|
||||||
attempts++
|
|
||||||
log.Println("Error watching endpoints", err)
|
|
||||||
time.Sleep(time.Duration(attempts) * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ch := make(chan bool)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-ch:
|
|
||||||
w.Stop()
|
|
||||||
case <-r.exit:
|
|
||||||
w.Stop()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// reset if we get here
|
|
||||||
attempts = 0
|
|
||||||
|
|
||||||
for {
|
|
||||||
// process next event
|
|
||||||
res, err := w.Next()
|
|
||||||
if err != nil {
|
|
||||||
log.Println("Error getting next endpoint", err)
|
|
||||||
close(ch)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
r.process(res)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
func (r *staticRouter) Register(ep *api.Endpoint) error {
|
|
||||||
if err := api.Validate(ep); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var pathregs []util.Pattern
|
|
||||||
var hostregs []*regexp.Regexp
|
|
||||||
var pcreregs []*regexp.Regexp
|
|
||||||
|
|
||||||
for _, h := range ep.Host {
|
|
||||||
if h == "" || h == "*" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
hostreg, err := regexp.CompilePOSIX(h)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
hostregs = append(hostregs, hostreg)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, p := range ep.Path {
|
|
||||||
var pcreok bool
|
|
||||||
|
|
||||||
// pcre only when we have start and end markers
|
|
||||||
if p[0] == '^' && p[len(p)-1] == '$' {
|
|
||||||
pcrereg, err := regexp.CompilePOSIX(p)
|
|
||||||
if err == nil {
|
|
||||||
pcreregs = append(pcreregs, pcrereg)
|
|
||||||
pcreok = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rule, err := util.Parse(p)
|
|
||||||
if err != nil && !pcreok {
|
|
||||||
return err
|
|
||||||
} else if err != nil && pcreok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
tpl := rule.Compile()
|
|
||||||
pathreg, err := util.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
pathregs = append(pathregs, pathreg)
|
|
||||||
}
|
|
||||||
|
|
||||||
r.Lock()
|
|
||||||
r.eps[ep.Name] = &endpoint{
|
|
||||||
apiep: ep,
|
|
||||||
pcreregs: pcreregs,
|
|
||||||
pathregs: pathregs,
|
|
||||||
hostregs: hostregs,
|
|
||||||
}
|
|
||||||
r.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *staticRouter) Deregister(ep *api.Endpoint) error {
|
|
||||||
if err := api.Validate(ep); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
r.Lock()
|
|
||||||
delete(r.eps, ep.Name)
|
|
||||||
r.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *staticRouter) Options() router.Options {
|
|
||||||
return r.opts
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *staticRouter) Close() error {
|
|
||||||
select {
|
|
||||||
case <-r.exit:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
close(r.exit)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *staticRouter) Endpoint(req *http.Request) (*api.Service, error) {
|
|
||||||
ep, err := r.endpoint(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
epf := strings.Split(ep.apiep.Name, ".")
|
|
||||||
services, err := r.opts.Registry.GetService(r.opts.Context, epf[0])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// hack for stream endpoint
|
|
||||||
if ep.apiep.Stream {
|
|
||||||
svcs := rutil.Copy(services)
|
|
||||||
for _, svc := range svcs {
|
|
||||||
if len(svc.Endpoints) == 0 {
|
|
||||||
e := ®istry.Endpoint{}
|
|
||||||
e.Name = strings.Join(epf[1:], ".")
|
|
||||||
e.Metadata = make(map[string]string)
|
|
||||||
e.Metadata["stream"] = "true"
|
|
||||||
svc.Endpoints = append(svc.Endpoints, e)
|
|
||||||
}
|
|
||||||
for _, e := range svc.Endpoints {
|
|
||||||
e.Name = strings.Join(epf[1:], ".")
|
|
||||||
e.Metadata = make(map[string]string)
|
|
||||||
e.Metadata["stream"] = "true"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
services = svcs
|
|
||||||
}
|
|
||||||
|
|
||||||
svc := &api.Service{
|
|
||||||
Name: epf[0],
|
|
||||||
Endpoint: &api.Endpoint{
|
|
||||||
Name: strings.Join(epf[1:], "."),
|
|
||||||
Handler: "rpc",
|
|
||||||
Host: ep.apiep.Host,
|
|
||||||
Method: ep.apiep.Method,
|
|
||||||
Path: ep.apiep.Path,
|
|
||||||
Body: ep.apiep.Body,
|
|
||||||
Stream: ep.apiep.Stream,
|
|
||||||
},
|
|
||||||
Services: services,
|
|
||||||
}
|
|
||||||
|
|
||||||
return svc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) {
|
|
||||||
if r.isClosed() {
|
|
||||||
return nil, errors.New("router closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
r.RLock()
|
|
||||||
defer r.RUnlock()
|
|
||||||
|
|
||||||
var idx int
|
|
||||||
if len(req.URL.Path) > 0 && req.URL.Path != "/" {
|
|
||||||
idx = 1
|
|
||||||
}
|
|
||||||
path := strings.Split(req.URL.Path[idx:], "/")
|
|
||||||
// use the first match
|
|
||||||
// TODO: weighted matching
|
|
||||||
|
|
||||||
for _, ep := range r.eps {
|
|
||||||
var mMatch, hMatch, pMatch bool
|
|
||||||
|
|
||||||
// 1. try method
|
|
||||||
for _, m := range ep.apiep.Method {
|
|
||||||
if m == req.Method {
|
|
||||||
mMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !mMatch {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api method match %s", req.Method)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. try host
|
|
||||||
if len(ep.apiep.Host) == 0 {
|
|
||||||
hMatch = true
|
|
||||||
} else {
|
|
||||||
for idx, h := range ep.apiep.Host {
|
|
||||||
if h == "" || h == "*" {
|
|
||||||
hMatch = true
|
|
||||||
break
|
|
||||||
} else {
|
|
||||||
if ep.hostregs[idx].MatchString(req.URL.Host) {
|
|
||||||
hMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !hMatch {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api host match %s", req.URL.Host)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. try google.api path
|
|
||||||
for _, pathreg := range ep.pathregs {
|
|
||||||
matches, err := pathreg.Match(path, "")
|
|
||||||
if err != nil {
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api gpath not match %s != %v", path, pathreg)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api gpath match %s = %v", path, pathreg)
|
|
||||||
}
|
|
||||||
pMatch = true
|
|
||||||
ctx := req.Context()
|
|
||||||
md, ok := metadata.FromContext(ctx)
|
|
||||||
if !ok {
|
|
||||||
md = make(metadata.Metadata)
|
|
||||||
}
|
|
||||||
for k, v := range matches {
|
|
||||||
md[fmt.Sprintf("x-api-field-%s", k)] = v
|
|
||||||
}
|
|
||||||
md["x-api-body"] = ep.apiep.Body
|
|
||||||
*req = *req.Clone(metadata.NewContext(ctx, md))
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if !pMatch {
|
|
||||||
// 4. try path via pcre path matching
|
|
||||||
for _, pathreg := range ep.pcreregs {
|
|
||||||
if !pathreg.MatchString(req.URL.Path) {
|
|
||||||
if logger.V(logger.TraceLevel) {
|
|
||||||
logger.Tracef("api pcre path not match %s != %v", req.URL.Path, pathreg)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
pMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !pMatch {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// TODO: Percentage traffic
|
|
||||||
|
|
||||||
// we got here, so its a match
|
|
||||||
return ep, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// no match
|
|
||||||
return nil, fmt.Errorf("endpoint not found for %v", req.URL)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *staticRouter) Route(req *http.Request) (*api.Service, error) {
|
|
||||||
if r.isClosed() {
|
|
||||||
return nil, errors.New("router closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// try get an endpoint
|
|
||||||
ep, err := r.Endpoint(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return ep, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRouter(opts ...router.Option) *staticRouter {
|
|
||||||
options := router.NewOptions(opts...)
|
|
||||||
r := &staticRouter{
|
|
||||||
exit: make(chan bool),
|
|
||||||
opts: options,
|
|
||||||
eps: make(map[string]*endpoint),
|
|
||||||
}
|
|
||||||
//go r.watch()
|
|
||||||
//go r.refresh()
|
|
||||||
return r
|
|
||||||
}
|
|
@ -36,7 +36,7 @@ func (a *autocertProvider) TLSConfig(hosts ...string) (*tls.Config, error) {
|
|||||||
dir := cacheDir()
|
dir := cacheDir()
|
||||||
if err := os.MkdirAll(dir, 0700); err != nil {
|
if err := os.MkdirAll(dir, 0700); err != nil {
|
||||||
if logger.V(logger.InfoLevel) {
|
if logger.V(logger.InfoLevel) {
|
||||||
logger.Infof("warning: autocert not using a cache: %v", err)
|
logger.Info("warning: autocert not using a cache: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
m.Cache = autocert.DirCache(dir)
|
m.Cache = autocert.DirCache(dir)
|
||||||
|
@ -15,7 +15,7 @@ type httpServer struct {
|
|||||||
mux *http.ServeMux
|
mux *http.ServeMux
|
||||||
opts server.Options
|
opts server.Options
|
||||||
|
|
||||||
mtx sync.RWMutex
|
sync.RWMutex
|
||||||
address string
|
address string
|
||||||
exit chan chan error
|
exit chan chan error
|
||||||
}
|
}
|
||||||
@ -30,8 +30,8 @@ func NewServer(address string, opts ...server.Option) server.Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *httpServer) Address() string {
|
func (s *httpServer) Address() string {
|
||||||
s.mtx.RLock()
|
s.RLock()
|
||||||
defer s.mtx.RUnlock()
|
defer s.RUnlock()
|
||||||
return s.address
|
return s.address
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,6 +57,9 @@ func (s *httpServer) Start() error {
|
|||||||
var l net.Listener
|
var l net.Listener
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
s.RLock()
|
||||||
|
config := s.opts
|
||||||
|
s.RUnlock()
|
||||||
if s.opts.EnableACME && s.opts.ACMEProvider != nil {
|
if s.opts.EnableACME && s.opts.ACMEProvider != nil {
|
||||||
// should we check the address to make sure its using :443?
|
// should we check the address to make sure its using :443?
|
||||||
l, err = s.opts.ACMEProvider.Listen(s.opts.ACMEHosts...)
|
l, err = s.opts.ACMEProvider.Listen(s.opts.ACMEHosts...)
|
||||||
@ -70,19 +73,19 @@ func (s *httpServer) Start() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
logger.Infof("HTTP API Listening on %s", l.Addr().String())
|
config.Logger.Info("HTTP API Listening on %s", l.Addr().String())
|
||||||
}
|
}
|
||||||
|
|
||||||
s.mtx.Lock()
|
s.Lock()
|
||||||
s.address = l.Addr().String()
|
s.address = l.Addr().String()
|
||||||
s.mtx.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := http.Serve(l, s.mux); err != nil {
|
if err := http.Serve(l, s.mux); err != nil {
|
||||||
// temporary fix
|
// temporary fix
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("serve err: %v", err)
|
config.Logger.Error("serve err: %v", err)
|
||||||
}
|
}
|
||||||
s.Stop()
|
s.Stop()
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
|
|
||||||
"github.com/unistack-org/micro/v3/api/resolver"
|
"github.com/unistack-org/micro/v3/api/resolver"
|
||||||
"github.com/unistack-org/micro/v3/api/server/acme"
|
"github.com/unistack-org/micro/v3/api/server/acme"
|
||||||
|
"github.com/unistack-org/micro/v3/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option func
|
// Option func
|
||||||
@ -21,11 +22,14 @@ type Options struct {
|
|||||||
TLSConfig *tls.Config
|
TLSConfig *tls.Config
|
||||||
Resolver resolver.Resolver
|
Resolver resolver.Resolver
|
||||||
Wrappers []Wrapper
|
Wrappers []Wrapper
|
||||||
|
Logger logger.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOptions returns new Options
|
// NewOptions returns new Options
|
||||||
func NewOptions(opts ...Option) Options {
|
func NewOptions(opts ...Option) Options {
|
||||||
options := Options{}
|
options := Options{
|
||||||
|
Logger: logger.DefaultLogger,
|
||||||
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
@ -81,3 +85,9 @@ func Resolver(r resolver.Resolver) Option {
|
|||||||
o.Resolver = r
|
o.Resolver = r
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Logger(l logger.Logger) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Logger = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -162,7 +162,7 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti
|
|||||||
|
|
||||||
md, ok := metadata.FromContext(ctx)
|
md, ok := metadata.FromContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
md = make(map[string]string)
|
md = metadata.New(0)
|
||||||
}
|
}
|
||||||
md["Content-Type"] = p.ContentType()
|
md["Content-Type"] = p.ContentType()
|
||||||
md["Micro-Topic"] = p.Topic()
|
md["Micro-Topic"] = p.Topic()
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/unistack-org/micro/v3/debug/log"
|
"github.com/unistack-org/micro/v3/debug/log"
|
||||||
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
"github.com/unistack-org/micro/v3/util/ring"
|
"github.com/unistack-org/micro/v3/util/ring"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -94,7 +95,7 @@ func (l *memoryLog) Stream() (log.Stream, error) {
|
|||||||
records <- log.Record{
|
records <- log.Record{
|
||||||
Timestamp: entry.Timestamp,
|
Timestamp: entry.Timestamp,
|
||||||
Message: entry.Value,
|
Message: entry.Value,
|
||||||
Metadata: make(map[string]string),
|
Metadata: metadata.New(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// now stream continuously
|
// now stream continuously
|
||||||
@ -102,7 +103,7 @@ func (l *memoryLog) Stream() (log.Stream, error) {
|
|||||||
records <- log.Record{
|
records <- log.Record{
|
||||||
Timestamp: entry.Timestamp,
|
Timestamp: entry.Timestamp,
|
||||||
Message: entry.Value,
|
Message: entry.Value,
|
||||||
Metadata: make(map[string]string),
|
Metadata: metadata.New(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
114
logger/helper.go
114
logger/helper.go
@ -1,114 +0,0 @@
|
|||||||
package logger
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Helper struct {
|
|
||||||
Logger
|
|
||||||
fields map[string]interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHelper(log Logger) *Helper {
|
|
||||||
return &Helper{Logger: log}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Info(args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(InfoLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Log(InfoLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Infof(template string, args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(InfoLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Logf(InfoLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Trace(args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(TraceLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Log(TraceLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Tracef(template string, args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(TraceLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Logf(TraceLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Debug(args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(DebugLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Log(DebugLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Debugf(template string, args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(DebugLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Logf(DebugLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Warn(args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(WarnLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Log(WarnLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Warnf(template string, args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(WarnLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Logf(WarnLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Error(args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(ErrorLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Log(ErrorLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Errorf(template string, args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(ErrorLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Logf(ErrorLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Fatal(args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(FatalLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Log(FatalLevel, args...)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) Fatalf(template string, args ...interface{}) {
|
|
||||||
if !h.Logger.Options().Level.Enabled(FatalLevel) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.Logger.Fields(h.fields).Logf(FatalLevel, template, args...)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) WithError(err error) *Helper {
|
|
||||||
fields := copyFields(h.fields)
|
|
||||||
fields["error"] = err
|
|
||||||
return &Helper{Logger: h.Logger, fields: fields}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Helper) WithFields(fields map[string]interface{}) *Helper {
|
|
||||||
nfields := copyFields(fields)
|
|
||||||
for k, v := range h.fields {
|
|
||||||
nfields[k] = v
|
|
||||||
}
|
|
||||||
return &Helper{Logger: h.Logger, fields: nfields}
|
|
||||||
}
|
|
@ -2,7 +2,6 @@ package logger
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Level int8
|
type Level int8
|
||||||
@ -19,7 +18,7 @@ const (
|
|||||||
WarnLevel
|
WarnLevel
|
||||||
// ErrorLevel level. Logs. Used for errors that should definitely be noted.
|
// ErrorLevel level. Logs. Used for errors that should definitely be noted.
|
||||||
ErrorLevel
|
ErrorLevel
|
||||||
// FatalLevel level. Logs and then calls `logger.Exit(1)`. highest level of severity.
|
// FatalLevel level. Logs and then calls `os.Exit(1)`. highest level of severity.
|
||||||
FatalLevel
|
FatalLevel
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -63,60 +62,5 @@ func GetLevel(levelStr string) (Level, error) {
|
|||||||
case FatalLevel.String():
|
case FatalLevel.String():
|
||||||
return FatalLevel, nil
|
return FatalLevel, nil
|
||||||
}
|
}
|
||||||
return InfoLevel, fmt.Errorf("Unknown Level String: '%s', defaulting to InfoLevel", levelStr)
|
return InfoLevel, fmt.Errorf("unknown Level String: '%s', use InfoLevel", levelStr)
|
||||||
}
|
|
||||||
|
|
||||||
func Info(args ...interface{}) {
|
|
||||||
DefaultLogger.Log(InfoLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Infof(template string, args ...interface{}) {
|
|
||||||
DefaultLogger.Logf(InfoLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Trace(args ...interface{}) {
|
|
||||||
DefaultLogger.Log(TraceLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Tracef(template string, args ...interface{}) {
|
|
||||||
DefaultLogger.Logf(TraceLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Debug(args ...interface{}) {
|
|
||||||
DefaultLogger.Log(DebugLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Debugf(template string, args ...interface{}) {
|
|
||||||
DefaultLogger.Logf(DebugLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Warn(args ...interface{}) {
|
|
||||||
DefaultLogger.Log(WarnLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Warnf(template string, args ...interface{}) {
|
|
||||||
DefaultLogger.Logf(WarnLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Error(args ...interface{}) {
|
|
||||||
DefaultLogger.Log(ErrorLevel, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Errorf(template string, args ...interface{}) {
|
|
||||||
DefaultLogger.Logf(ErrorLevel, template, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Fatal(args ...interface{}) {
|
|
||||||
DefaultLogger.Log(FatalLevel, args...)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Fatalf(template string, args ...interface{}) {
|
|
||||||
DefaultLogger.Logf(FatalLevel, template, args...)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns true if the given level is at or lower the current logger level
|
|
||||||
func V(lvl Level) bool {
|
|
||||||
return DefaultLogger.Options().Level <= lvl
|
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@ package logger
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
// DefaultLogger variable
|
// DefaultLogger variable
|
||||||
DefaultLogger Logger = NewHelper(NewLogger())
|
DefaultLogger Logger = NewLogger()
|
||||||
)
|
)
|
||||||
|
|
||||||
// Logger is a generic logging interface
|
// Logger is a generic logging interface
|
||||||
@ -16,14 +16,50 @@ type Logger interface {
|
|||||||
Options() Options
|
Options() Options
|
||||||
// Fields set fields to always be logged
|
// Fields set fields to always be logged
|
||||||
Fields(fields map[string]interface{}) Logger
|
Fields(fields map[string]interface{}) Logger
|
||||||
// Log writes a log entry
|
// Info level message
|
||||||
Log(level Level, v ...interface{})
|
Info(msg string, args ...interface{})
|
||||||
// Logf writes a formatted log entry
|
// Trace level message
|
||||||
Logf(level Level, format string, v ...interface{})
|
Trace(msg string, args ...interface{})
|
||||||
|
// Debug level message
|
||||||
|
Debug(msg string, args ...interface{})
|
||||||
|
// Warn level message
|
||||||
|
Warn(msg string, args ...interface{})
|
||||||
|
// Error level message
|
||||||
|
Error(msg string, args ...interface{})
|
||||||
|
// Fatal level message
|
||||||
|
Fatal(msg string, args ...interface{})
|
||||||
// String returns the name of logger
|
// String returns the name of logger
|
||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Info(msg string, args ...interface{}) {
|
||||||
|
DefaultLogger.Info(msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Error(msg string, args ...interface{}) {
|
||||||
|
DefaultLogger.Error(msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Debug(msg string, args ...interface{}) {
|
||||||
|
DefaultLogger.Debug(msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Warn(msg string, args ...interface{}) {
|
||||||
|
DefaultLogger.Warn(msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Trace(msg string, args ...interface{}) {
|
||||||
|
DefaultLogger.Trace(msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Fatal(msg string, args ...interface{}) {
|
||||||
|
DefaultLogger.Fatal(msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func V(level Level) bool {
|
||||||
|
return DefaultLogger.V(level)
|
||||||
|
}
|
||||||
|
|
||||||
// Init initialize logger
|
// Init initialize logger
|
||||||
func Init(opts ...Option) error {
|
func Init(opts ...Option) error {
|
||||||
return DefaultLogger.Init(opts...)
|
return DefaultLogger.Init(opts...)
|
||||||
@ -33,18 +69,3 @@ func Init(opts ...Option) error {
|
|||||||
func Fields(fields map[string]interface{}) Logger {
|
func Fields(fields map[string]interface{}) Logger {
|
||||||
return DefaultLogger.Fields(fields)
|
return DefaultLogger.Fields(fields)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log writes log with specific level
|
|
||||||
func Log(level Level, v ...interface{}) {
|
|
||||||
DefaultLogger.Log(level, v...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logf writes formatted log with specific level
|
|
||||||
func Logf(level Level, format string, v ...interface{}) {
|
|
||||||
DefaultLogger.Logf(level, format, v...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// String return logger name
|
|
||||||
func String() string {
|
|
||||||
return DefaultLogger.String()
|
|
||||||
}
|
|
||||||
|
@ -6,13 +6,10 @@ import (
|
|||||||
|
|
||||||
func TestLogger(t *testing.T) {
|
func TestLogger(t *testing.T) {
|
||||||
l := NewLogger(WithLevel(TraceLevel))
|
l := NewLogger(WithLevel(TraceLevel))
|
||||||
h1 := NewHelper(l).WithFields(map[string]interface{}{"key1": "val1"})
|
if err := l.Init(); err != nil {
|
||||||
h1.Trace("trace_msg1")
|
t.Fatal(err)
|
||||||
h1.Warn("warn_msg1")
|
}
|
||||||
|
l.Trace("trace_msg1")
|
||||||
h2 := NewHelper(l).WithFields(map[string]interface{}{"key2": "val2"})
|
l.Warn("warn_msg1")
|
||||||
h2.Trace("trace_msg2")
|
l.Fields(map[string]interface{}{"error": "test"}).Info("error message")
|
||||||
h2.Warn("warn_msg2")
|
|
||||||
|
|
||||||
l.Fields(map[string]interface{}{"key3": "val4"}).Log(InfoLevel, "test_msg")
|
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,13 @@
|
|||||||
package logger
|
package logger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
dlog "github.com/unistack-org/micro/v3/debug/log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -19,31 +16,33 @@ func init() {
|
|||||||
lvl = InfoLevel
|
lvl = InfoLevel
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultLogger = NewHelper(NewLogger(WithLevel(lvl)))
|
DefaultLogger = NewLogger(WithLevel(lvl))
|
||||||
}
|
}
|
||||||
|
|
||||||
type defaultLogger struct {
|
type defaultLogger struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
opts Options
|
opts Options
|
||||||
|
enc *json.Encoder
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init(opts...) should only overwrite provided options
|
// Init(opts...) should only overwrite provided options
|
||||||
func (l *defaultLogger) Init(opts ...Option) error {
|
func (l *defaultLogger) Init(opts ...Option) error {
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&l.opts)
|
o(&l.opts)
|
||||||
}
|
}
|
||||||
|
l.enc = json.NewEncoder(l.opts.Out)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *defaultLogger) String() string {
|
func (l *defaultLogger) String() string {
|
||||||
return "default"
|
return "micro"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *defaultLogger) V(level Level) bool {
|
func (l *defaultLogger) V(level Level) bool {
|
||||||
if l.opts.Level.Enabled(level) {
|
return l.opts.Level.Enabled(level)
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *defaultLogger) Fields(fields map[string]interface{}) Logger {
|
func (l *defaultLogger) Fields(fields map[string]interface{}) Logger {
|
||||||
@ -85,45 +84,32 @@ func logCallerfilePath(loggingFilePath string) string {
|
|||||||
return loggingFilePath[idx+1:]
|
return loggingFilePath[idx+1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *defaultLogger) Log(level Level, v ...interface{}) {
|
func (l *defaultLogger) Info(msg string, args ...interface{}) {
|
||||||
if !l.V(level) {
|
l.log(InfoLevel, msg, args...)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
l.RLock()
|
|
||||||
fields := copyFields(l.opts.Fields)
|
|
||||||
l.RUnlock()
|
|
||||||
|
|
||||||
fields["level"] = level.String()
|
|
||||||
|
|
||||||
if _, file, line, ok := runtime.Caller(l.opts.CallerSkipCount); ok {
|
|
||||||
fields["file"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line)
|
|
||||||
}
|
|
||||||
|
|
||||||
rec := dlog.Record{
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
Message: fmt.Sprint(v...),
|
|
||||||
Metadata: make(map[string]string, len(fields)),
|
|
||||||
}
|
|
||||||
|
|
||||||
keys := make([]string, 0, len(fields))
|
|
||||||
for k, v := range fields {
|
|
||||||
keys = append(keys, k)
|
|
||||||
rec.Metadata[k] = fmt.Sprintf("%v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Strings(keys)
|
|
||||||
metadata := ""
|
|
||||||
|
|
||||||
for _, k := range keys {
|
|
||||||
metadata += fmt.Sprintf(" %s=%v", k, fields[k])
|
|
||||||
}
|
|
||||||
|
|
||||||
t := rec.Timestamp.Format("2006-01-02 15:04:05")
|
|
||||||
fmt.Printf("%s %s %v\n", t, metadata, rec.Message)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *defaultLogger) Logf(level Level, format string, v ...interface{}) {
|
func (l *defaultLogger) Error(msg string, args ...interface{}) {
|
||||||
|
l.log(ErrorLevel, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *defaultLogger) Debug(msg string, args ...interface{}) {
|
||||||
|
l.log(DebugLevel, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *defaultLogger) Warn(msg string, args ...interface{}) {
|
||||||
|
l.log(WarnLevel, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *defaultLogger) Trace(msg string, args ...interface{}) {
|
||||||
|
l.log(TraceLevel, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *defaultLogger) Fatal(msg string, args ...interface{}) {
|
||||||
|
l.log(FatalLevel, msg, args...)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *defaultLogger) log(level Level, msg string, args ...interface{}) {
|
||||||
if !l.V(level) {
|
if !l.V(level) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -135,30 +121,20 @@ func (l *defaultLogger) Logf(level Level, format string, v ...interface{}) {
|
|||||||
fields["level"] = level.String()
|
fields["level"] = level.String()
|
||||||
|
|
||||||
if _, file, line, ok := runtime.Caller(l.opts.CallerSkipCount); ok {
|
if _, file, line, ok := runtime.Caller(l.opts.CallerSkipCount); ok {
|
||||||
fields["file"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line)
|
fields["caller"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line)
|
||||||
}
|
}
|
||||||
|
|
||||||
rec := dlog.Record{
|
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
|
||||||
Timestamp: time.Now(),
|
if len(msg) > 0 {
|
||||||
Message: fmt.Sprintf(format, v...),
|
if len(args) > 0 {
|
||||||
Metadata: make(map[string]string, len(fields)),
|
fields["msg"] = fmt.Sprintf(msg, args...)
|
||||||
|
} else {
|
||||||
|
fields["msg"] = msg
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
l.RLock()
|
||||||
keys := make([]string, 0, len(fields))
|
_ = l.enc.Encode(fields)
|
||||||
for k, v := range fields {
|
l.RUnlock()
|
||||||
keys = append(keys, k)
|
|
||||||
rec.Metadata[k] = fmt.Sprintf("%v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Strings(keys)
|
|
||||||
metadata := ""
|
|
||||||
|
|
||||||
for _, k := range keys {
|
|
||||||
metadata += fmt.Sprintf(" %s=%v", k, fields[k])
|
|
||||||
}
|
|
||||||
|
|
||||||
t := rec.Timestamp.Format("2006-01-02 15:04:05")
|
|
||||||
fmt.Printf("%s %s %v\n", t, metadata, rec.Message)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *defaultLogger) Options() Options {
|
func (l *defaultLogger) Options() Options {
|
||||||
@ -172,19 +148,7 @@ func (l *defaultLogger) Options() Options {
|
|||||||
|
|
||||||
// NewLogger builds a new logger based on options
|
// NewLogger builds a new logger based on options
|
||||||
func NewLogger(opts ...Option) Logger {
|
func NewLogger(opts ...Option) Logger {
|
||||||
// Default options
|
l := &defaultLogger{opts: NewOptions(opts...)}
|
||||||
options := Options{
|
l.enc = json.NewEncoder(l.opts.Out)
|
||||||
Level: InfoLevel,
|
|
||||||
Fields: make(map[string]interface{}),
|
|
||||||
Out: os.Stderr,
|
|
||||||
CallerSkipCount: 2,
|
|
||||||
Context: context.Background(),
|
|
||||||
}
|
|
||||||
|
|
||||||
l := &defaultLogger{opts: options}
|
|
||||||
if err := l.Init(opts...); err != nil {
|
|
||||||
l.Log(FatalLevel, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return l
|
return l
|
||||||
}
|
}
|
@ -3,6 +3,7 @@ package logger
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Option func(*Options)
|
type Option func(*Options)
|
||||||
@ -21,7 +22,13 @@ type Options struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewOptions(opts ...Option) Options {
|
func NewOptions(opts ...Option) Options {
|
||||||
options := Options{}
|
options := Options{
|
||||||
|
Level: InfoLevel,
|
||||||
|
Fields: make(map[string]interface{}),
|
||||||
|
Out: os.Stderr,
|
||||||
|
CallerSkipCount: 2,
|
||||||
|
Context: context.Background(),
|
||||||
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
@ -113,11 +113,11 @@ func (t *tunSubscriber) run() {
|
|||||||
m := new(transport.Message)
|
m := new(transport.Message)
|
||||||
if err := c.Recv(m); err != nil {
|
if err := c.Recv(m); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
if err = c.Close(); err != nil {
|
if err = c.Close(); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if logger.V(logger.ErrorLevel) {
|
||||||
logger.Error(err)
|
logger.Error(err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
@ -4,6 +4,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Extract *Value from reflect.Type
|
// Extract *Value from reflect.Type
|
||||||
@ -94,7 +96,7 @@ func ExtractEndpoint(method reflect.Method) *Endpoint {
|
|||||||
Name: method.Name,
|
Name: method.Name,
|
||||||
Request: request,
|
Request: request,
|
||||||
Response: response,
|
Response: response,
|
||||||
Metadata: make(map[string]string),
|
Metadata: metadata.New(0),
|
||||||
}
|
}
|
||||||
|
|
||||||
if stream {
|
if stream {
|
||||||
|
@ -187,8 +187,8 @@ func (n *noopServer) Register() error {
|
|||||||
n.RUnlock()
|
n.RUnlock()
|
||||||
|
|
||||||
if !registered {
|
if !registered {
|
||||||
if logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id)
|
config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -220,8 +220,8 @@ func (n *noopServer) Register() error {
|
|||||||
|
|
||||||
opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck))
|
opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||||
|
|
||||||
if logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
logger.Infof("Subscribing to topic: %s", sb.Topic())
|
config.Logger.Info("Subscribing to topic: %s", sb.Topic())
|
||||||
}
|
}
|
||||||
sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...)
|
sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -250,8 +250,8 @@ func (n *noopServer) Deregister() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
logger.Infof("Deregistering node: %s", service.Nodes[0].Id)
|
config.Logger.Info("deregistering node: %s", service.Nodes[0].Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := DefaultDeregisterFunc(service, config); err != nil {
|
if err := DefaultDeregisterFunc(service, config); err != nil {
|
||||||
@ -280,12 +280,12 @@ func (n *noopServer) Deregister() error {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(s broker.Subscriber) {
|
go func(s broker.Subscriber) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
logger.Infof("Unsubscribing from topic: %s", s.Topic())
|
config.Logger.Info("unsubscribing from topic: %s", s.Topic())
|
||||||
}
|
}
|
||||||
if err := s.Unsubscribe(cx); err != nil {
|
if err := s.Unsubscribe(cx); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("Unsubscribing from topic: %s err: %v", s.Topic(), err)
|
config.Logger.Error("unsubscribing from topic: %s err: %v", s.Topic(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(sub)
|
}(sub)
|
||||||
@ -307,8 +307,8 @@ func (n *noopServer) Start() error {
|
|||||||
config := n.Options()
|
config := n.Options()
|
||||||
n.RUnlock()
|
n.RUnlock()
|
||||||
|
|
||||||
if logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
logger.Infof("Server [noop] Listening on %s", config.Address)
|
config.Logger.Info("Server [noop] Listening on %s", config.Address)
|
||||||
}
|
}
|
||||||
n.Lock()
|
n.Lock()
|
||||||
if len(config.Advertise) == 0 {
|
if len(config.Advertise) == 0 {
|
||||||
@ -320,27 +320,27 @@ func (n *noopServer) Start() error {
|
|||||||
if len(n.subscribers) > 0 {
|
if len(n.subscribers) > 0 {
|
||||||
// connect to the broker
|
// connect to the broker
|
||||||
if err := config.Broker.Connect(config.Context); err != nil {
|
if err := config.Broker.Connect(config.Context); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
|
config.Logger.Error("Broker [%s] connect error: %v", config.Broker.String(), err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
|
config.Logger.Info("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// use RegisterCheck func before register
|
// use RegisterCheck func before register
|
||||||
if err := config.RegisterCheck(config.Context); err != nil {
|
if err := config.RegisterCheck(config.Context); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
|
config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// announce self to the world
|
// announce self to the world
|
||||||
if err := n.Register(); err != nil {
|
if err := n.Register(); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("Server register error: %v", err)
|
config.Logger.Error("Server register error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -367,24 +367,24 @@ func (n *noopServer) Start() error {
|
|||||||
n.RUnlock()
|
n.RUnlock()
|
||||||
rerr := config.RegisterCheck(config.Context)
|
rerr := config.RegisterCheck(config.Context)
|
||||||
if rerr != nil && registered {
|
if rerr != nil && registered {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
|
config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
|
||||||
}
|
}
|
||||||
// deregister self in case of error
|
// deregister self in case of error
|
||||||
if err := n.Deregister(); err != nil {
|
if err := n.Deregister(); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
|
config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if rerr != nil && !registered {
|
} else if rerr != nil && !registered {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
|
config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := n.Register(); err != nil {
|
if err := n.Register(); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
|
config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// wait for exit
|
// wait for exit
|
||||||
@ -395,8 +395,8 @@ func (n *noopServer) Start() error {
|
|||||||
|
|
||||||
// deregister self
|
// deregister self
|
||||||
if err := n.Deregister(); err != nil {
|
if err := n.Deregister(); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Error("Server deregister error: ", err)
|
config.Logger.Error("Server deregister error: ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -419,13 +419,13 @@ func (n *noopServer) Start() error {
|
|||||||
// close transport
|
// close transport
|
||||||
ch <- nil
|
ch <- nil
|
||||||
|
|
||||||
if logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
|
config.Logger.Info("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
|
||||||
}
|
}
|
||||||
// disconnect broker
|
// disconnect broker
|
||||||
if err := config.Broker.Disconnect(config.Context); err != nil {
|
if err := config.Broker.Disconnect(config.Context); err != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
|
config.Logger.Error("Broker [%s] disconnect error: %v", config.Broker.String(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -189,9 +189,12 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
|
|||||||
return func(p broker.Event) (err error) {
|
return func(p broker.Event) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
if logger.V(logger.ErrorLevel) {
|
n.RLock()
|
||||||
logger.Error("panic recovered: ", r)
|
config := n.opts
|
||||||
logger.Error(string(debug.Stack()))
|
n.RUnlock()
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Error("panic recovered: ", r)
|
||||||
|
config.Logger.Error(string(debug.Stack()))
|
||||||
}
|
}
|
||||||
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
|
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
|
||||||
}
|
}
|
||||||
|
18
service.go
18
service.go
@ -17,6 +17,7 @@ import (
|
|||||||
type service struct {
|
type service struct {
|
||||||
opts Options
|
opts Options
|
||||||
|
|
||||||
|
sync.RWMutex
|
||||||
once sync.Once
|
once sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,8 +129,13 @@ func (s *service) String() string {
|
|||||||
|
|
||||||
func (s *service) Start() error {
|
func (s *service) Start() error {
|
||||||
var err error
|
var err error
|
||||||
if logger.V(logger.InfoLevel) {
|
|
||||||
logger.Infof("Starting [service] %s", s.Name())
|
s.RLock()
|
||||||
|
config := s.opts
|
||||||
|
s.RUnlock()
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Info("Starting [service] %s", s.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fn := range s.opts.BeforeStart {
|
for _, fn := range s.opts.BeforeStart {
|
||||||
@ -174,8 +180,12 @@ func (s *service) Start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Stop() error {
|
func (s *service) Stop() error {
|
||||||
if logger.V(logger.InfoLevel) {
|
s.RLock()
|
||||||
logger.Infof("Stoppping [service] %s", s.Name())
|
config := s.opts
|
||||||
|
s.RUnlock()
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Info("Stoppping [service] %s", s.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
@ -26,7 +26,7 @@ func Verify(a auth.Auth) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if logger.V(logger.DebugLevel) {
|
if logger.V(logger.DebugLevel) {
|
||||||
logger.Debugf("Auth [%v] Generated an auth account", a.String())
|
logger.Debug("Auth [%v] Generated an auth account: %s", a.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
accID = acc.ID
|
accID = acc.ID
|
||||||
@ -68,7 +68,7 @@ func Verify(a auth.Auth) error {
|
|||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger.V(logger.WarnLevel) {
|
if logger.V(logger.WarnLevel) {
|
||||||
logger.Warnf("[Auth] Error refreshing token: %v", err)
|
logger.Warn("[Auth] Error refreshing token: %v", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -219,7 +219,7 @@ func (r *Request) Do() *Response {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debugf("[Kubernetes] %v %v", req.Method, req.URL.String())
|
logger.Debug("[Kubernetes] %v %v", req.Method, req.URL.String())
|
||||||
res, err := r.client.Do(req)
|
res, err := r.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &Response{
|
return &Response{
|
||||||
|
@ -228,7 +228,7 @@ func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) {
|
|||||||
// NewService returns default micro kubernetes service definition
|
// NewService returns default micro kubernetes service definition
|
||||||
func NewService(name, version, typ, namespace string) *Service {
|
func NewService(name, version, typ, namespace string) *Service {
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Tracef("kubernetes default service: name: %s, version: %s", name, version)
|
logger.Trace("kubernetes default service: name: %s, version: %s", name, version)
|
||||||
}
|
}
|
||||||
|
|
||||||
Labels := map[string]string{
|
Labels := map[string]string{
|
||||||
@ -271,7 +271,7 @@ func NewService(name, version, typ, namespace string) *Service {
|
|||||||
// NewService returns default micro kubernetes deployment definition
|
// NewService returns default micro kubernetes deployment definition
|
||||||
func NewDeployment(name, version, typ, namespace string) *Deployment {
|
func NewDeployment(name, version, typ, namespace string) *Deployment {
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Tracef("kubernetes default deployment: name: %s, version: %s", name, version)
|
logger.Trace("kubernetes default deployment: name: %s, version: %s", name, version)
|
||||||
}
|
}
|
||||||
|
|
||||||
Labels := map[string]string{
|
Labels := map[string]string{
|
||||||
@ -363,21 +363,21 @@ func NewClusterClient() *client {
|
|||||||
|
|
||||||
s, err := os.Stat(serviceAccountPath)
|
s, err := os.Stat(serviceAccountPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatal(err)
|
logger.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
if s == nil || !s.IsDir() {
|
if s == nil || !s.IsDir() {
|
||||||
logger.Fatal(errors.New("service account not found"))
|
logger.Fatal("service account not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token"))
|
token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatal(err)
|
logger.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
t := string(token)
|
t := string(token)
|
||||||
|
|
||||||
crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt"))
|
crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatal(err)
|
logger.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
c := &http.Client{
|
c := &http.Client{
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/miekg/dns"
|
"github.com/miekg/dns"
|
||||||
log "github.com/unistack-org/micro/v3/logger"
|
"github.com/unistack-org/micro/v3/logger"
|
||||||
"golang.org/x/net/ipv4"
|
"golang.org/x/net/ipv4"
|
||||||
"golang.org/x/net/ipv6"
|
"golang.org/x/net/ipv6"
|
||||||
)
|
)
|
||||||
@ -196,7 +196,7 @@ func (s *Server) recv(c *net.UDPConn) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := s.parsePacket(buf[:n], from); err != nil {
|
if err := s.parsePacket(buf[:n], from); err != nil {
|
||||||
log.Errorf("[ERR] mdns: Failed to handle query: %v", err)
|
logger.Error("[ERR] mdns: Failed to handle query: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -205,7 +205,7 @@ func (s *Server) recv(c *net.UDPConn) {
|
|||||||
func (s *Server) parsePacket(packet []byte, from net.Addr) error {
|
func (s *Server) parsePacket(packet []byte, from net.Addr) error {
|
||||||
var msg dns.Msg
|
var msg dns.Msg
|
||||||
if err := msg.Unpack(packet); err != nil {
|
if err := msg.Unpack(packet); err != nil {
|
||||||
log.Errorf("[ERR] mdns: Failed to unpack packet: %v", err)
|
logger.Error("[ERR] mdns: Failed to unpack packet: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// TODO: This is a bit of a hack
|
// TODO: This is a bit of a hack
|
||||||
@ -384,7 +384,7 @@ func (s *Server) probe() {
|
|||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
if err := s.SendMulticast(q); err != nil {
|
if err := s.SendMulticast(q); err != nil {
|
||||||
log.Errorf("[ERR] mdns: failed to send probe:", err.Error())
|
logger.Error("[ERR] mdns: failed to send probe: %v", err)
|
||||||
}
|
}
|
||||||
time.Sleep(time.Duration(randomizer.Intn(250)) * time.Millisecond)
|
time.Sleep(time.Duration(randomizer.Intn(250)) * time.Millisecond)
|
||||||
}
|
}
|
||||||
@ -410,7 +410,7 @@ func (s *Server) probe() {
|
|||||||
timer := time.NewTimer(timeout)
|
timer := time.NewTimer(timeout)
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
if err := s.SendMulticast(resp); err != nil {
|
if err := s.SendMulticast(resp); err != nil {
|
||||||
log.Errorf("[ERR] mdns: failed to send announcement:", err.Error())
|
logger.Error("[ERR] mdns: failed to send announcement:", err.Error())
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
|
@ -103,20 +103,20 @@ type parser struct {
|
|||||||
// topLevelSegments is the target of this parser.
|
// topLevelSegments is the target of this parser.
|
||||||
func (p *parser) topLevelSegments() ([]segment, error) {
|
func (p *parser) topLevelSegments() ([]segment, error) {
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Debugf("Parsing %q", p.tokens)
|
logger.Debug("Parsing %q", p.tokens)
|
||||||
}
|
}
|
||||||
segs, err := p.segments()
|
segs, err := p.segments()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Tracef("accept segments: %q; %q", p.accepted, p.tokens)
|
logger.Trace("accept segments: %q; %q", p.accepted, p.tokens)
|
||||||
}
|
}
|
||||||
if _, err := p.accept(typeEOF); err != nil {
|
if _, err := p.accept(typeEOF); err != nil {
|
||||||
return nil, fmt.Errorf("unexpected token %q after segments %q", p.tokens[0], strings.Join(p.accepted, ""))
|
return nil, fmt.Errorf("unexpected token %q after segments %q", p.tokens[0], strings.Join(p.accepted, ""))
|
||||||
}
|
}
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Tracef("accept eof: %q; %q", p.accepted, p.tokens)
|
logger.Trace("accept eof: %q; %q", p.accepted, p.tokens)
|
||||||
}
|
}
|
||||||
return segs, nil
|
return segs, nil
|
||||||
}
|
}
|
||||||
@ -128,7 +128,7 @@ func (p *parser) segments() ([]segment, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Tracef("accept segment: %q; %q", p.accepted, p.tokens)
|
logger.Trace("accept segment: %q; %q", p.accepted, p.tokens)
|
||||||
}
|
}
|
||||||
segs := []segment{s}
|
segs := []segment{s}
|
||||||
for {
|
for {
|
||||||
@ -141,7 +141,7 @@ func (p *parser) segments() ([]segment, error) {
|
|||||||
}
|
}
|
||||||
segs = append(segs, s)
|
segs = append(segs, s)
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Tracef("accept segment: %q; %q", p.accepted, p.tokens)
|
logger.Trace("accept segment: %q; %q", p.accepted, p.tokens)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -316,6 +316,6 @@ func TestParseSegmentsWithErrors(t *testing.T) {
|
|||||||
t.Errorf("parser{%q}.segments() succeeded; want InvalidTemplateError; accepted %#v", spec.tokens, segs)
|
t.Errorf("parser{%q}.segments() succeeded; want InvalidTemplateError; accepted %#v", spec.tokens, segs)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
logger.Info(err)
|
logger.Info(err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -63,7 +63,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
|||||||
|
|
||||||
if version != 1 {
|
if version != 1 {
|
||||||
if logger.V(logger.DebugLevel) {
|
if logger.V(logger.DebugLevel) {
|
||||||
logger.Debugf("unsupported version: %d", version)
|
logger.Debug("unsupported version: %d", version)
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
}
|
}
|
||||||
@ -71,7 +71,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
|||||||
l := len(ops)
|
l := len(ops)
|
||||||
if l%2 != 0 {
|
if l%2 != 0 {
|
||||||
if logger.V(logger.DebugLevel) {
|
if logger.V(logger.DebugLevel) {
|
||||||
logger.Debugf("odd number of ops codes: %d", l)
|
logger.Debug("odd number of ops codes: %d", l)
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
}
|
}
|
||||||
@ -105,7 +105,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
|||||||
case OpLitPush:
|
case OpLitPush:
|
||||||
if op.operand < 0 || len(pool) <= op.operand {
|
if op.operand < 0 || len(pool) <= op.operand {
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Tracef("negative literal index: %d", op.operand)
|
logger.Trace("negative literal index: %d", op.operand)
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
}
|
}
|
||||||
@ -116,7 +116,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
|||||||
case OpConcatN:
|
case OpConcatN:
|
||||||
if op.operand <= 0 {
|
if op.operand <= 0 {
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Tracef("negative concat size: %d", op.operand)
|
logger.Trace("negative concat size: %d", op.operand)
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
}
|
}
|
||||||
@ -131,7 +131,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
|||||||
case OpCapture:
|
case OpCapture:
|
||||||
if op.operand < 0 || len(pool) <= op.operand {
|
if op.operand < 0 || len(pool) <= op.operand {
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Tracef("variable name index out of bound: %d", op.operand)
|
logger.Trace("variable name index out of bound: %d", op.operand)
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
}
|
}
|
||||||
@ -147,7 +147,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
|||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
if logger.V(logger.DebugLevel) {
|
if logger.V(logger.DebugLevel) {
|
||||||
logger.Tracef("invalid opcode: %d", op.code)
|
logger.Trace("invalid opcode: %d", op.code)
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
}
|
}
|
||||||
@ -172,7 +172,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
|||||||
func MustPattern(p Pattern, err error) Pattern {
|
func MustPattern(p Pattern, err error) Pattern {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger.V(logger.FatalLevel) {
|
if logger.V(logger.FatalLevel) {
|
||||||
logger.Fatalf("Pattern initialization failed: %v", err)
|
logger.Fatal("Pattern initialization failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return p
|
return p
|
||||||
@ -235,7 +235,7 @@ func (p Pattern) Match(components []string, verb string) (map[string]string, err
|
|||||||
if pos < l {
|
if pos < l {
|
||||||
return nil, ErrNotMatch
|
return nil, ErrNotMatch
|
||||||
}
|
}
|
||||||
bindings := make(map[string]string)
|
bindings := make(map[string]string, len(captured))
|
||||||
for i, val := range captured {
|
for i, val := range captured {
|
||||||
bindings[p.vars[i]] = val
|
bindings[p.vars[i]] = val
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user