* api: add static router and improve path parser in rpc handler Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * expose metadata context key to be able to get unmodified map keys Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * server/grpc: fix jsonpb codec for protobuf msg Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * api/handler/rpc: write 204 status code when rsp is nil Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * api/handler/rpc: add check for nil response for non javascript Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
		
			
				
	
	
		
			417 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			417 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package registry provides a dynamic api service router
 | 
						|
package registry
 | 
						|
 | 
						|
import (
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"net/http"
 | 
						|
	"regexp"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/micro/go-micro/v2/api"
 | 
						|
	"github.com/micro/go-micro/v2/api/router"
 | 
						|
	"github.com/micro/go-micro/v2/logger"
 | 
						|
	"github.com/micro/go-micro/v2/registry"
 | 
						|
	"github.com/micro/go-micro/v2/registry/cache"
 | 
						|
)
 | 
						|
 | 
						|
// router is the default router
 | 
						|
type registryRouter struct {
 | 
						|
	exit chan bool
 | 
						|
	opts router.Options
 | 
						|
 | 
						|
	// registry cache
 | 
						|
	rc cache.Cache
 | 
						|
 | 
						|
	sync.RWMutex
 | 
						|
	eps map[string]*api.Service
 | 
						|
}
 | 
						|
 | 
						|
func setNamespace(ns, name string) string {
 | 
						|
	ns = strings.TrimSpace(ns)
 | 
						|
	name = strings.TrimSpace(name)
 | 
						|
 | 
						|
	// no namespace
 | 
						|
	if len(ns) == 0 {
 | 
						|
		return name
 | 
						|
	}
 | 
						|
 | 
						|
	switch {
 | 
						|
	// has - suffix
 | 
						|
	case strings.HasSuffix(ns, "-"):
 | 
						|
		return strings.Replace(ns+name, ".", "-", -1)
 | 
						|
	// has . suffix
 | 
						|
	case strings.HasSuffix(ns, "."):
 | 
						|
		return ns + name
 | 
						|
	}
 | 
						|
 | 
						|
	// default join .
 | 
						|
	return strings.Join([]string{ns, name}, ".")
 | 
						|
}
 | 
						|
 | 
						|
func (r *registryRouter) isClosed() bool {
 | 
						|
	select {
 | 
						|
	case <-r.exit:
 | 
						|
		return true
 | 
						|
	default:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// refresh list of api services
 | 
						|
func (r *registryRouter) refresh() {
 | 
						|
	var attempts int
 | 
						|
 | 
						|
	for {
 | 
						|
		services, err := r.opts.Registry.ListServices()
 | 
						|
		if err != nil {
 | 
						|
			attempts++
 | 
						|
			if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
 | 
						|
				logger.Errorf("unable to list services: %v", err)
 | 
						|
			}
 | 
						|
			time.Sleep(time.Duration(attempts) * time.Second)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		attempts = 0
 | 
						|
 | 
						|
		// for each service, get service and store endpoints
 | 
						|
		for _, s := range services {
 | 
						|
			// only get services for this namespace
 | 
						|
			if !strings.HasPrefix(s.Name, r.opts.Namespace) {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			service, err := r.rc.GetService(s.Name)
 | 
						|
			if err != nil {
 | 
						|
				if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
 | 
						|
					logger.Errorf("unable to get service: %v", err)
 | 
						|
				}
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			r.store(service)
 | 
						|
		}
 | 
						|
 | 
						|
		// refresh list in 10 minutes... cruft
 | 
						|
		select {
 | 
						|
		case <-time.After(time.Minute * 10):
 | 
						|
		case <-r.exit:
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// process watch event
 | 
						|
func (r *registryRouter) process(res *registry.Result) {
 | 
						|
	// skip these things
 | 
						|
	if res == nil || res.Service == nil || !strings.HasPrefix(res.Service.Name, r.opts.Namespace) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// get entry from cache
 | 
						|
	service, err := r.rc.GetService(res.Service.Name)
 | 
						|
	if err != nil {
 | 
						|
		if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
 | 
						|
			logger.Errorf("unable to get service: %v", err)
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// update our local endpoints
 | 
						|
	r.store(service)
 | 
						|
}
 | 
						|
 | 
						|
// store local endpoint cache
 | 
						|
func (r *registryRouter) store(services []*registry.Service) {
 | 
						|
	// endpoints
 | 
						|
	eps := map[string]*api.Service{}
 | 
						|
 | 
						|
	// services
 | 
						|
	names := map[string]bool{}
 | 
						|
 | 
						|
	// create a new endpoint mapping
 | 
						|
	for _, service := range services {
 | 
						|
		// set names we need later
 | 
						|
		names[service.Name] = true
 | 
						|
 | 
						|
		// map per endpoint
 | 
						|
		for _, endpoint := range service.Endpoints {
 | 
						|
			// create a key service:endpoint_name
 | 
						|
			key := fmt.Sprintf("%s:%s", service.Name, endpoint.Name)
 | 
						|
			// decode endpoint
 | 
						|
			end := api.Decode(endpoint.Metadata)
 | 
						|
 | 
						|
			// if we got nothing skip
 | 
						|
			if err := api.Validate(end); err != nil {
 | 
						|
				if logger.V(logger.TraceLevel, logger.DefaultLogger) {
 | 
						|
					logger.Tracef("endpoint validation failed: %v", err)
 | 
						|
				}
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			// try get endpoint
 | 
						|
			ep, ok := eps[key]
 | 
						|
			if !ok {
 | 
						|
				ep = &api.Service{Name: service.Name}
 | 
						|
			}
 | 
						|
 | 
						|
			// overwrite the endpoint
 | 
						|
			ep.Endpoint = end
 | 
						|
			// append services
 | 
						|
			ep.Services = append(ep.Services, service)
 | 
						|
			// store it
 | 
						|
			eps[key] = ep
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	r.Lock()
 | 
						|
	defer r.Unlock()
 | 
						|
 | 
						|
	// delete any existing eps for services we know
 | 
						|
	for key, service := range r.eps {
 | 
						|
		// skip what we don't care about
 | 
						|
		if !names[service.Name] {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// ok we know this thing
 | 
						|
		// delete delete delete
 | 
						|
		delete(r.eps, key)
 | 
						|
	}
 | 
						|
 | 
						|
	// now set the eps we have
 | 
						|
	for name, endpoint := range eps {
 | 
						|
		r.eps[name] = endpoint
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// watch for endpoint changes
 | 
						|
func (r *registryRouter) watch() {
 | 
						|
	var attempts int
 | 
						|
 | 
						|
	for {
 | 
						|
		if r.isClosed() {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// watch for changes
 | 
						|
		w, err := r.opts.Registry.Watch()
 | 
						|
		if err != nil {
 | 
						|
			attempts++
 | 
						|
			if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
 | 
						|
				logger.Errorf("error watching endpoints: %v", err)
 | 
						|
			}
 | 
						|
			time.Sleep(time.Duration(attempts) * time.Second)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		ch := make(chan bool)
 | 
						|
 | 
						|
		go func() {
 | 
						|
			select {
 | 
						|
			case <-ch:
 | 
						|
				w.Stop()
 | 
						|
			case <-r.exit:
 | 
						|
				w.Stop()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
 | 
						|
		// reset if we get here
 | 
						|
		attempts = 0
 | 
						|
 | 
						|
		for {
 | 
						|
			// process next event
 | 
						|
			res, err := w.Next()
 | 
						|
			if err != nil {
 | 
						|
				if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
 | 
						|
					logger.Errorf("error getting next endoint: %v", err)
 | 
						|
				}
 | 
						|
				close(ch)
 | 
						|
				break
 | 
						|
			}
 | 
						|
			r.process(res)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *registryRouter) Options() router.Options {
 | 
						|
	return r.opts
 | 
						|
}
 | 
						|
 | 
						|
func (r *registryRouter) Close() error {
 | 
						|
	select {
 | 
						|
	case <-r.exit:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
		close(r.exit)
 | 
						|
		r.rc.Stop()
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *registryRouter) Register(ep *api.Endpoint) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *registryRouter) Deregister(ep *api.Endpoint) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
 | 
						|
	if r.isClosed() {
 | 
						|
		return nil, errors.New("router closed")
 | 
						|
	}
 | 
						|
 | 
						|
	r.RLock()
 | 
						|
	defer r.RUnlock()
 | 
						|
 | 
						|
	// use the first match
 | 
						|
	// TODO: weighted matching
 | 
						|
	for _, e := range r.eps {
 | 
						|
		ep := e.Endpoint
 | 
						|
 | 
						|
		// match
 | 
						|
		var pathMatch, hostMatch, methodMatch bool
 | 
						|
 | 
						|
		// 1. try method GET, POST, PUT, etc
 | 
						|
		// 2. try host example.com, foobar.com, etc
 | 
						|
		// 3. try path /foo/bar, /bar/baz, etc
 | 
						|
 | 
						|
		// 1. try match method
 | 
						|
		for _, m := range ep.Method {
 | 
						|
			if req.Method == m {
 | 
						|
				methodMatch = true
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// no match on method pass
 | 
						|
		if len(ep.Method) > 0 && !methodMatch {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// 2. try match host
 | 
						|
		for _, h := range ep.Host {
 | 
						|
			if req.Host == h {
 | 
						|
				hostMatch = true
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// no match on host pass
 | 
						|
		if len(ep.Host) > 0 && !hostMatch {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// 3. try match paths
 | 
						|
		for _, p := range ep.Path {
 | 
						|
			re, err := regexp.CompilePOSIX(p)
 | 
						|
			if err == nil && re.MatchString(req.URL.Path) {
 | 
						|
				pathMatch = true
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// no match pass
 | 
						|
		if len(ep.Path) > 0 && !pathMatch {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// TODO: Percentage traffic
 | 
						|
 | 
						|
		// we got here, so its a match
 | 
						|
		return e, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// no match
 | 
						|
	return nil, errors.New("not found")
 | 
						|
}
 | 
						|
 | 
						|
func (r *registryRouter) Route(req *http.Request) (*api.Service, error) {
 | 
						|
	if r.isClosed() {
 | 
						|
		return nil, errors.New("router closed")
 | 
						|
	}
 | 
						|
 | 
						|
	// try get an endpoint
 | 
						|
	ep, err := r.Endpoint(req)
 | 
						|
	if err == nil {
 | 
						|
		return ep, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// error not nil
 | 
						|
	// ignore that shit
 | 
						|
	// TODO: don't ignore that shit
 | 
						|
 | 
						|
	// get the service name
 | 
						|
	rp, err := r.opts.Resolver.Resolve(req)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// service name
 | 
						|
	name := setNamespace(r.opts.Namespace, rp.Name)
 | 
						|
 | 
						|
	// get service
 | 
						|
	services, err := r.rc.GetService(name)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// only use endpoint matching when the meta handler is set aka api.Default
 | 
						|
	switch r.opts.Handler {
 | 
						|
	// rpc handlers
 | 
						|
	case "meta", "api", "rpc":
 | 
						|
		handler := r.opts.Handler
 | 
						|
 | 
						|
		// set default handler to api
 | 
						|
		if r.opts.Handler == "meta" {
 | 
						|
			handler = "rpc"
 | 
						|
		}
 | 
						|
 | 
						|
		// construct api service
 | 
						|
		return &api.Service{
 | 
						|
			Name: name,
 | 
						|
			Endpoint: &api.Endpoint{
 | 
						|
				Name:    rp.Method,
 | 
						|
				Handler: handler,
 | 
						|
			},
 | 
						|
			Services: services,
 | 
						|
		}, nil
 | 
						|
	// http handler
 | 
						|
	case "http", "proxy", "web":
 | 
						|
		// construct api service
 | 
						|
		return &api.Service{
 | 
						|
			Name: name,
 | 
						|
			Endpoint: &api.Endpoint{
 | 
						|
				Name:    req.URL.String(),
 | 
						|
				Handler: r.opts.Handler,
 | 
						|
				Host:    []string{req.Host},
 | 
						|
				Method:  []string{req.Method},
 | 
						|
				Path:    []string{req.URL.Path},
 | 
						|
			},
 | 
						|
			Services: services,
 | 
						|
		}, nil
 | 
						|
	}
 | 
						|
 | 
						|
	return nil, errors.New("unknown handler")
 | 
						|
}
 | 
						|
 | 
						|
func newRouter(opts ...router.Option) *registryRouter {
 | 
						|
	options := router.NewOptions(opts...)
 | 
						|
	r := ®istryRouter{
 | 
						|
		exit: make(chan bool),
 | 
						|
		opts: options,
 | 
						|
		rc:   cache.New(options.Registry),
 | 
						|
		eps:  make(map[string]*api.Service),
 | 
						|
	}
 | 
						|
	go r.watch()
 | 
						|
	go r.refresh()
 | 
						|
	return r
 | 
						|
}
 | 
						|
 | 
						|
// NewRouter returns the default router
 | 
						|
func NewRouter(opts ...router.Option) router.Router {
 | 
						|
	return newRouter(opts...)
 | 
						|
}
 |