diff --git a/client/selector/router/router.go b/client/selector/router/router.go new file mode 100644 index 00000000..d67b8767 --- /dev/null +++ b/client/selector/router/router.go @@ -0,0 +1,153 @@ +// Package router is a network/router selector +package router + +import ( + "context" + "net" + "sort" + "strconv" + "sync" + + "github.com/micro/go-micro/client/selector" + "github.com/micro/go-micro/network/router" + "github.com/micro/go-micro/registry" +) + +type routerSelector struct { + opts selector.Options + + // the router + r router.Router +} + +type routerKey struct{} + +func (r *routerSelector) Init(opts ...selector.Option) error { + // no op + return nil +} + +func (r *routerSelector) Options() selector.Options { + return r.opts +} + +func (r *routerSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { + // lookup router for routes for the service + routes, err := r.r.Table().Lookup(router.NewQuery( + router.QueryDestination(service), + )) + + if err != nil { + return nil, err + } + + // no routes return not found error + if len(routes) == 0 { + return nil, selector.ErrNotFound + } + + // TODO: apply filters by pseudo constructing service + + // sort the routes based on metric + sort.Slice(routes, func(i, j int) bool { + return routes[i].Metric < routes[j].Metric + }) + + // roundrobin assuming routes are in metric preference order + var i int + var mtx sync.Mutex + + return func() (*registry.Node, error) { + // get index and increment counter with every call to next + mtx.Lock() + idx := i + i++ + mtx.Unlock() + + // get route based on idx + route := routes[idx%len(routes)] + + // defaults to gateway and no port + address := route.Gateway + port := 0 + + // check if its host:port + host, pr, err := net.SplitHostPort(address) + if err == nil { + pp, _ := strconv.Atoi(pr) + // set port + port = pp + // set address + address = host + } + + // return as a node + return ®istry.Node{ + // TODO: add id and metadata if we can + Address: address, + Port: port, + }, nil + }, nil +} + +func (r *routerSelector) Mark(service string, node *registry.Node, err error) { + // TODO: pass back metrics or information to the router + return +} + +func (r *routerSelector) Reset(service string) { + // TODO: reset the metrics or information at the router + return +} + +func (r *routerSelector) Close() error { + // stop the router advertisements + return r.r.Stop() +} + +func (r *routerSelector) String() string { + return "router" +} + +// NewSelector returns a new router based selector +func NewSelector(opts ...selector.Option) selector.Selector { + options := selector.Options{ + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + // set default registry if not set + if options.Registry == nil { + options.Registry = registry.DefaultRegistry + } + + // try get from the context + r, ok := options.Context.Value(routerKey{}).(router.Router) + if !ok { + // TODO: Use router.DefaultRouter? + r = router.NewRouter( + router.Registry(options.Registry), + ) + } + + // start the router advertisements + r.Advertise() + + return &routerSelector{ + opts: options, + r: r, + } +} + +// WithRouter sets the router as an option +func WithRouter(r router.Router) selector.Option { + return func(o *selector.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, routerKey{}, r) + } +} diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index 2e7e2a6e..8d7c3c03 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -34,6 +34,7 @@ import ( // selectors "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/client/selector/dns" + "github.com/micro/go-micro/client/selector/router" "github.com/micro/go-micro/client/selector/static" // transports @@ -196,6 +197,7 @@ var ( "default": selector.NewSelector, "dns": dns.NewSelector, "cache": selector.NewSelector, + "router": router.NewSelector, "static": static.NewSelector, } diff --git a/network/default.go b/network/default.go new file mode 100644 index 00000000..90164d33 --- /dev/null +++ b/network/default.go @@ -0,0 +1,95 @@ +package network + +import ( + "sync" + + "github.com/micro/go-micro/config/options" + "github.com/micro/go-micro/network/router" + "github.com/micro/go-micro/network/proxy" +) + +type network struct { + options.Options + + // router + r router.Router + + // proxy + p proxy.Proxy + + // id of this network + id string + + // links maintained for this network + mtx sync.RWMutex + links []Link +} + +type node struct { + *network + + // address of this node + address string +} + + +type link struct { + // the embedded node + *node + + // length and weight of the link + mtx sync.RWMutex + length, weight int +} + +// network methods + +func (n *network) Id() string { + return n.id +} + +func (n *network) Connect() (Node, error) { + return nil, nil +} + +func (n *network) Peer(Network) (Link, error) { + return nil, nil +} + +func (n *network) Links() ([]Link, error) { + n.mtx.RLock() + defer n.mtx.RUnlock() + return n.links, nil +} + +// node methods + +func (n *node) Address() string { + return n.address +} + +func (n *node) Close() error { + return nil +} + +func (n *node) Accept() (*Message, error) { + return nil, nil +} + +func (n *node) Send(*Message) error { + return nil +} + +// link methods + +func (l *link) Length() int { + l.mtx.RLock() + defer l.mtx.RUnlock() + return l.length +} + +func (l *link) Weight() int { + l.mtx.RLock() + defer l.mtx.RUnlock() + return l.weight +} diff --git a/network/network.go b/network/network.go index 29d6653d..098c0807 100644 --- a/network/network.go +++ b/network/network.go @@ -51,6 +51,25 @@ type Message struct { } var ( - // TODO: set default network - DefaultNetwork Network + // The default network ID is local + DefaultNetworkId = "local" + + // just the standard network element + DefaultNetwork = NewNetwork() ) + +// NewNetwork returns a new network +func NewNetwork(opts ...options.Option) Network { + options := options.NewOptions(opts...) + + // get router + + // get proxy + + return &network{ + Options: options, + // fill the blanks + // router: r, + // proxy: p, + } +} diff --git a/network/proxy/mucp/mucp.go b/network/proxy/mucp/mucp.go index 9e8afd39..2d52058a 100644 --- a/network/proxy/mucp/mucp.go +++ b/network/proxy/mucp/mucp.go @@ -7,10 +7,12 @@ import ( "strings" "github.com/micro/go-micro/client" + rselect "github.com/micro/go-micro/client/selector/router" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/config/options" "github.com/micro/go-micro/network/proxy" + "github.com/micro/go-micro/network/router" "github.com/micro/go-micro/server" ) @@ -162,5 +164,18 @@ func NewProxy(opts ...options.Option) proxy.Proxy { p.Client = c.(client.Client) } + // get router + r, ok := p.Options.Values().Get("proxy.router") + if ok { + // set the router in the client + p.Client.Init( + // pass new selector as an option to the client + client.Selector(rselect.NewSelector( + // set the router in the selector + rselect.WithRouter(r.(router.Router)), + )), + ) + } + return p } diff --git a/network/proxy/proxy.go b/network/proxy/proxy.go index bb8fcc3b..22d485e8 100644 --- a/network/proxy/proxy.go +++ b/network/proxy/proxy.go @@ -6,6 +6,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/config/options" + "github.com/micro/go-micro/network/router" "github.com/micro/go-micro/server" ) @@ -29,3 +30,8 @@ func WithEndpoint(e string) options.Option { func WithClient(c client.Client) options.Option { return options.WithValue("proxy.client", c) } + +// WithRouter specifies the router to use +func WithRouter(r router.Router) options.Option { + return options.WithValue("proxy.router", r) +}