Further consolidate the libraries
This commit is contained in:
		
							
								
								
									
										393
									
								
								registry.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										393
									
								
								registry.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,393 @@ | ||||
| // Package registry provides a dynamic api service router | ||||
| package registry | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"net/http" | ||||
| 	"regexp" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/micro/go-micro/api" | ||||
| 	"github.com/micro/go-micro/api/router" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| 	"github.com/micro/go-micro/registry/cache" | ||||
| ) | ||||
|  | ||||
| // router is the default router | ||||
| type registryRouter struct { | ||||
| 	exit chan bool | ||||
| 	opts router.Options | ||||
|  | ||||
| 	// registry cache | ||||
| 	rc cache.Cache | ||||
|  | ||||
| 	sync.RWMutex | ||||
| 	eps map[string]*api.Service | ||||
| } | ||||
|  | ||||
| func setNamespace(ns, name string) string { | ||||
| 	ns = strings.TrimSpace(ns) | ||||
| 	name = strings.TrimSpace(name) | ||||
|  | ||||
| 	// no namespace | ||||
| 	if len(ns) == 0 { | ||||
| 		return name | ||||
| 	} | ||||
|  | ||||
| 	switch { | ||||
| 	// has - suffix | ||||
| 	case strings.HasSuffix(ns, "-"): | ||||
| 		return strings.Replace(ns+name, ".", "-", -1) | ||||
| 	// has . suffix | ||||
| 	case strings.HasSuffix(ns, "."): | ||||
| 		return ns + name | ||||
| 	} | ||||
|  | ||||
| 	// default join . | ||||
| 	return strings.Join([]string{ns, name}, ".") | ||||
| } | ||||
|  | ||||
| func (r *registryRouter) isClosed() bool { | ||||
| 	select { | ||||
| 	case <-r.exit: | ||||
| 		return true | ||||
| 	default: | ||||
| 		return false | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // refresh list of api services | ||||
| func (r *registryRouter) refresh() { | ||||
| 	var attempts int | ||||
|  | ||||
| 	for { | ||||
| 		services, err := r.opts.Registry.ListServices() | ||||
| 		if err != nil { | ||||
| 			attempts++ | ||||
| 			log.Println("Error listing endpoints", err) | ||||
| 			time.Sleep(time.Duration(attempts) * time.Second) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		attempts = 0 | ||||
|  | ||||
| 		// for each service, get service and store endpoints | ||||
| 		for _, s := range services { | ||||
| 			// only get services for this namespace | ||||
| 			if !strings.HasPrefix(s.Name, r.opts.Namespace) { | ||||
| 				continue | ||||
| 			} | ||||
| 			service, err := r.rc.GetService(s.Name) | ||||
| 			if err != nil { | ||||
| 				continue | ||||
| 			} | ||||
| 			r.store(service) | ||||
| 		} | ||||
|  | ||||
| 		// refresh list in 10 minutes... cruft | ||||
| 		select { | ||||
| 		case <-time.After(time.Minute * 10): | ||||
| 		case <-r.exit: | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // process watch event | ||||
| func (r *registryRouter) process(res *registry.Result) { | ||||
| 	// skip these things | ||||
| 	if res == nil || res.Service == nil || !strings.HasPrefix(res.Service.Name, r.opts.Namespace) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// get entry from cache | ||||
| 	service, err := r.rc.GetService(res.Service.Name) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// update our local endpoints | ||||
| 	r.store(service) | ||||
| } | ||||
|  | ||||
| // store local endpoint cache | ||||
| func (r *registryRouter) store(services []*registry.Service) { | ||||
| 	// endpoints | ||||
| 	eps := map[string]*api.Service{} | ||||
|  | ||||
| 	// services | ||||
| 	names := map[string]bool{} | ||||
|  | ||||
| 	// create a new endpoint mapping | ||||
| 	for _, service := range services { | ||||
| 		// set names we need later | ||||
| 		names[service.Name] = true | ||||
|  | ||||
| 		// map per endpoint | ||||
| 		for _, endpoint := range service.Endpoints { | ||||
| 			// create a key service:endpoint_name | ||||
| 			key := fmt.Sprintf("%s:%s", service.Name, endpoint.Name) | ||||
| 			// decode endpoint | ||||
| 			end := api.Decode(endpoint.Metadata) | ||||
|  | ||||
| 			// if we got nothing skip | ||||
| 			if err := api.Validate(end); err != nil { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			// try get endpoint | ||||
| 			ep, ok := eps[key] | ||||
| 			if !ok { | ||||
| 				ep = &api.Service{Name: service.Name} | ||||
| 			} | ||||
|  | ||||
| 			// overwrite the endpoint | ||||
| 			ep.Endpoint = end | ||||
| 			// append services | ||||
| 			ep.Services = append(ep.Services, service) | ||||
| 			// store it | ||||
| 			eps[key] = ep | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	r.Lock() | ||||
| 	defer r.Unlock() | ||||
|  | ||||
| 	// delete any existing eps for services we know | ||||
| 	for key, service := range r.eps { | ||||
| 		// skip what we don't care about | ||||
| 		if !names[service.Name] { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// ok we know this thing | ||||
| 		// delete delete delete | ||||
| 		delete(r.eps, key) | ||||
| 	} | ||||
|  | ||||
| 	// now set the eps we have | ||||
| 	for name, endpoint := range eps { | ||||
| 		r.eps[name] = endpoint | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // watch for endpoint changes | ||||
| func (r *registryRouter) watch() { | ||||
| 	var attempts int | ||||
|  | ||||
| 	for { | ||||
| 		if r.isClosed() { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		// watch for changes | ||||
| 		w, err := r.opts.Registry.Watch() | ||||
| 		if err != nil { | ||||
| 			attempts++ | ||||
| 			log.Println("Error watching endpoints", err) | ||||
| 			time.Sleep(time.Duration(attempts) * time.Second) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		ch := make(chan bool) | ||||
|  | ||||
| 		go func() { | ||||
| 			select { | ||||
| 			case <-ch: | ||||
| 				w.Stop() | ||||
| 			case <-r.exit: | ||||
| 				w.Stop() | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		// reset if we get here | ||||
| 		attempts = 0 | ||||
|  | ||||
| 		for { | ||||
| 			// process next event | ||||
| 			res, err := w.Next() | ||||
| 			if err != nil { | ||||
| 				log.Println("Error getting next endpoint", err) | ||||
| 				close(ch) | ||||
| 				break | ||||
| 			} | ||||
| 			r.process(res) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (r *registryRouter) Options() router.Options { | ||||
| 	return r.opts | ||||
| } | ||||
|  | ||||
| func (r *registryRouter) Close() error { | ||||
| 	select { | ||||
| 	case <-r.exit: | ||||
| 		return nil | ||||
| 	default: | ||||
| 		close(r.exit) | ||||
| 		r.rc.Stop() | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) { | ||||
| 	if r.isClosed() { | ||||
| 		return nil, errors.New("router closed") | ||||
| 	} | ||||
|  | ||||
| 	r.RLock() | ||||
| 	defer r.RUnlock() | ||||
|  | ||||
| 	// use the first match | ||||
| 	// TODO: weighted matching | ||||
| 	for _, e := range r.eps { | ||||
| 		ep := e.Endpoint | ||||
|  | ||||
| 		// match | ||||
| 		var pathMatch, hostMatch, methodMatch bool | ||||
|  | ||||
| 		// 1. try method GET, POST, PUT, etc | ||||
| 		// 2. try host example.com, foobar.com, etc | ||||
| 		// 3. try path /foo/bar, /bar/baz, etc | ||||
|  | ||||
| 		// 1. try match method | ||||
| 		for _, m := range ep.Method { | ||||
| 			if req.Method == m { | ||||
| 				methodMatch = true | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// no match on method pass | ||||
| 		if len(ep.Method) > 0 && !methodMatch { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// 2. try match host | ||||
| 		for _, h := range ep.Host { | ||||
| 			if req.Host == h { | ||||
| 				hostMatch = true | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// no match on host pass | ||||
| 		if len(ep.Host) > 0 && !hostMatch { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// 3. try match paths | ||||
| 		for _, p := range ep.Path { | ||||
| 			re, err := regexp.CompilePOSIX(p) | ||||
| 			if err == nil && re.MatchString(req.URL.Path) { | ||||
| 				pathMatch = true | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// no match pass | ||||
| 		if len(ep.Path) > 0 && !pathMatch { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// TODO: Percentage traffic | ||||
|  | ||||
| 		// we got here, so its a match | ||||
| 		return e, nil | ||||
| 	} | ||||
|  | ||||
| 	// no match | ||||
| 	return nil, errors.New("not found") | ||||
| } | ||||
|  | ||||
| func (r *registryRouter) Route(req *http.Request) (*api.Service, error) { | ||||
| 	if r.isClosed() { | ||||
| 		return nil, errors.New("router closed") | ||||
| 	} | ||||
|  | ||||
| 	// try get an endpoint | ||||
| 	ep, err := r.Endpoint(req) | ||||
| 	if err == nil { | ||||
| 		return ep, nil | ||||
| 	} | ||||
|  | ||||
| 	// error not nil | ||||
| 	// ignore that shit | ||||
| 	// TODO: don't ignore that shit | ||||
|  | ||||
| 	// get the service name | ||||
| 	rp, err := r.opts.Resolver.Resolve(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	// service name | ||||
| 	name := setNamespace(r.opts.Namespace, rp.Name) | ||||
|  | ||||
| 	// get service | ||||
| 	services, err := r.rc.GetService(name) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	// only use endpoint matching when the meta handler is set aka api.Default | ||||
| 	switch r.opts.Handler { | ||||
| 	// rpc handlers | ||||
| 	case "meta", "api", "rpc": | ||||
| 		handler := r.opts.Handler | ||||
|  | ||||
| 		// set default handler to api | ||||
| 		if r.opts.Handler == "meta" { | ||||
| 			handler = "rpc" | ||||
| 		} | ||||
|  | ||||
| 		// construct api service | ||||
| 		return &api.Service{ | ||||
| 			Name: name, | ||||
| 			Endpoint: &api.Endpoint{ | ||||
| 				Name:    rp.Method, | ||||
| 				Handler: handler, | ||||
| 			}, | ||||
| 			Services: services, | ||||
| 		}, nil | ||||
| 	// http handler | ||||
| 	case "http", "proxy", "web": | ||||
| 		// construct api service | ||||
| 		return &api.Service{ | ||||
| 			Name: name, | ||||
| 			Endpoint: &api.Endpoint{ | ||||
| 				Name:    req.URL.String(), | ||||
| 				Handler: r.opts.Handler, | ||||
| 				Host:    []string{req.Host}, | ||||
| 				Method:  []string{req.Method}, | ||||
| 				Path:    []string{req.URL.Path}, | ||||
| 			}, | ||||
| 			Services: services, | ||||
| 		}, nil | ||||
| 	} | ||||
|  | ||||
| 	return nil, errors.New("unknown handler") | ||||
| } | ||||
|  | ||||
| func newRouter(opts ...router.Option) *registryRouter { | ||||
| 	options := router.NewOptions(opts...) | ||||
| 	r := ®istryRouter{ | ||||
| 		exit: make(chan bool), | ||||
| 		opts: options, | ||||
| 		rc:   cache.New(options.Registry), | ||||
| 		eps:  make(map[string]*api.Service), | ||||
| 	} | ||||
| 	go r.watch() | ||||
| 	go r.refresh() | ||||
| 	return r | ||||
| } | ||||
|  | ||||
| // NewRouter returns the default router | ||||
| func NewRouter(opts ...router.Option) router.Router { | ||||
| 	return newRouter(opts...) | ||||
| } | ||||
							
								
								
									
										181
									
								
								registry_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										181
									
								
								registry_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,181 @@ | ||||
| package registry | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/micro/go-micro/api" | ||||
| ) | ||||
|  | ||||
| func TestSetNamespace(t *testing.T) { | ||||
| 	testCases := []struct { | ||||
| 		namespace string | ||||
| 		name      string | ||||
| 		expected  string | ||||
| 	}{ | ||||
| 		// default dotted path | ||||
| 		{ | ||||
| 			"go.micro.api", | ||||
| 			"foo", | ||||
| 			"go.micro.api.foo", | ||||
| 		}, | ||||
| 		// dotted end | ||||
| 		{ | ||||
| 			"go.micro.api.", | ||||
| 			"foo", | ||||
| 			"go.micro.api.foo", | ||||
| 		}, | ||||
| 		// dashed end | ||||
| 		{ | ||||
| 			"go-micro-api-", | ||||
| 			"foo", | ||||
| 			"go-micro-api-foo", | ||||
| 		}, | ||||
| 		// no namespace | ||||
| 		{ | ||||
| 			"", | ||||
| 			"foo", | ||||
| 			"foo", | ||||
| 		}, | ||||
| 		{ | ||||
| 			"go-micro-api-", | ||||
| 			"v2.foo", | ||||
| 			"go-micro-api-v2-foo", | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, test := range testCases { | ||||
| 		name := setNamespace(test.namespace, test.name) | ||||
| 		if name != test.expected { | ||||
| 			t.Fatalf("expected name %s got %s", test.expected, name) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestRouter(t *testing.T) { | ||||
| 	r := newRouter() | ||||
|  | ||||
| 	compare := func(expect, got []string) bool { | ||||
| 		// no data to compare, return true | ||||
| 		if len(expect) == 0 && len(got) == 0 { | ||||
| 			return true | ||||
| 		} | ||||
| 		// no data expected but got some return false | ||||
| 		if len(expect) == 0 && len(got) > 0 { | ||||
| 			return false | ||||
| 		} | ||||
|  | ||||
| 		// compare expected with what we got | ||||
| 		for _, e := range expect { | ||||
| 			var seen bool | ||||
| 			for _, g := range got { | ||||
| 				if e == g { | ||||
| 					seen = true | ||||
| 					break | ||||
| 				} | ||||
| 			} | ||||
| 			if !seen { | ||||
| 				return false | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// we're done, return true | ||||
| 		return true | ||||
| 	} | ||||
|  | ||||
| 	testData := []struct { | ||||
| 		e *api.Endpoint | ||||
| 		r *http.Request | ||||
| 		m bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			e: &api.Endpoint{ | ||||
| 				Name:   "Foo.Bar", | ||||
| 				Host:   []string{"example.com"}, | ||||
| 				Method: []string{"GET"}, | ||||
| 				Path:   []string{"/foo"}, | ||||
| 			}, | ||||
| 			r: &http.Request{ | ||||
| 				Host:   "example.com", | ||||
| 				Method: "GET", | ||||
| 				URL: &url.URL{ | ||||
| 					Path: "/foo", | ||||
| 				}, | ||||
| 			}, | ||||
| 			m: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			e: &api.Endpoint{ | ||||
| 				Name:   "Bar.Baz", | ||||
| 				Host:   []string{"example.com", "foo.com"}, | ||||
| 				Method: []string{"GET", "POST"}, | ||||
| 				Path:   []string{"/foo/bar"}, | ||||
| 			}, | ||||
| 			r: &http.Request{ | ||||
| 				Host:   "foo.com", | ||||
| 				Method: "POST", | ||||
| 				URL: &url.URL{ | ||||
| 					Path: "/foo/bar", | ||||
| 				}, | ||||
| 			}, | ||||
| 			m: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			e: &api.Endpoint{ | ||||
| 				Name:   "Test.Cruft", | ||||
| 				Host:   []string{"example.com", "foo.com"}, | ||||
| 				Method: []string{"GET", "POST"}, | ||||
| 				Path:   []string{"/xyz"}, | ||||
| 			}, | ||||
| 			r: &http.Request{ | ||||
| 				Host:   "fail.com", | ||||
| 				Method: "DELETE", | ||||
| 				URL: &url.URL{ | ||||
| 					Path: "/test/fail", | ||||
| 				}, | ||||
| 			}, | ||||
| 			m: false, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, d := range testData { | ||||
| 		key := fmt.Sprintf("%s:%s", "test.service", d.e.Name) | ||||
| 		r.eps[key] = &api.Service{ | ||||
| 			Endpoint: d.e, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for _, d := range testData { | ||||
| 		e, err := r.Endpoint(d.r) | ||||
| 		if d.m && err != nil { | ||||
| 			t.Fatalf("expected match, got %v", err) | ||||
| 		} | ||||
| 		if !d.m && err == nil { | ||||
| 			t.Fatal("expected error got match") | ||||
| 		} | ||||
| 		// skip testing the non match | ||||
| 		if !d.m { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		ep := e.Endpoint | ||||
|  | ||||
| 		// test the match | ||||
| 		if d.e.Name != ep.Name { | ||||
| 			t.Fatalf("expected %v got %v", d.e.Name, ep.Name) | ||||
| 		} | ||||
| 		if ok := compare(d.e.Method, ep.Method); !ok { | ||||
| 			t.Fatalf("expected %v got %v", d.e.Method, ep.Method) | ||||
| 		} | ||||
| 		if ok := compare(d.e.Path, ep.Path); !ok { | ||||
| 			t.Fatalf("expected %v got %v", d.e.Path, ep.Path) | ||||
| 		} | ||||
| 		if ok := compare(d.e.Host, ep.Host); !ok { | ||||
| 			t.Fatalf("expected %v got %v", d.e.Host, ep.Host) | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
|  | ||||
| } | ||||
		Reference in New Issue
	
	Block a user