add router selector and network defaults

This commit is contained in:
Asim Aslam 2019-06-26 16:12:57 +01:00
parent 1a62c11166
commit ac098e4d78
6 changed files with 292 additions and 2 deletions

View File

@ -0,0 +1,153 @@
// Package router is a network/router selector
package router
import (
"context"
"net"
"sort"
"strconv"
"sync"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/network/router"
"github.com/micro/go-micro/registry"
)
type routerSelector struct {
opts selector.Options
// the router
r router.Router
}
type routerKey struct{}
func (r *routerSelector) Init(opts ...selector.Option) error {
// no op
return nil
}
func (r *routerSelector) Options() selector.Options {
return r.opts
}
func (r *routerSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
// lookup router for routes for the service
routes, err := r.r.Table().Lookup(router.NewQuery(
router.QueryDestination(service),
))
if err != nil {
return nil, err
}
// no routes return not found error
if len(routes) == 0 {
return nil, selector.ErrNotFound
}
// TODO: apply filters by pseudo constructing service
// sort the routes based on metric
sort.Slice(routes, func(i, j int) bool {
return routes[i].Metric < routes[j].Metric
})
// roundrobin assuming routes are in metric preference order
var i int
var mtx sync.Mutex
return func() (*registry.Node, error) {
// get index and increment counter with every call to next
mtx.Lock()
idx := i
i++
mtx.Unlock()
// get route based on idx
route := routes[idx%len(routes)]
// defaults to gateway and no port
address := route.Gateway
port := 0
// check if its host:port
host, pr, err := net.SplitHostPort(address)
if err == nil {
pp, _ := strconv.Atoi(pr)
// set port
port = pp
// set address
address = host
}
// return as a node
return &registry.Node{
// TODO: add id and metadata if we can
Address: address,
Port: port,
}, nil
}, nil
}
func (r *routerSelector) Mark(service string, node *registry.Node, err error) {
// TODO: pass back metrics or information to the router
return
}
func (r *routerSelector) Reset(service string) {
// TODO: reset the metrics or information at the router
return
}
func (r *routerSelector) Close() error {
// stop the router advertisements
return r.r.Stop()
}
func (r *routerSelector) String() string {
return "router"
}
// NewSelector returns a new router based selector
func NewSelector(opts ...selector.Option) selector.Selector {
options := selector.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
// set default registry if not set
if options.Registry == nil {
options.Registry = registry.DefaultRegistry
}
// try get from the context
r, ok := options.Context.Value(routerKey{}).(router.Router)
if !ok {
// TODO: Use router.DefaultRouter?
r = router.NewRouter(
router.Registry(options.Registry),
)
}
// start the router advertisements
r.Advertise()
return &routerSelector{
opts: options,
r: r,
}
}
// WithRouter sets the router as an option
func WithRouter(r router.Router) selector.Option {
return func(o *selector.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, routerKey{}, r)
}
}

View File

@ -34,6 +34,7 @@ import (
// selectors // selectors
"github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/client/selector/dns" "github.com/micro/go-micro/client/selector/dns"
"github.com/micro/go-micro/client/selector/router"
"github.com/micro/go-micro/client/selector/static" "github.com/micro/go-micro/client/selector/static"
// transports // transports
@ -196,6 +197,7 @@ var (
"default": selector.NewSelector, "default": selector.NewSelector,
"dns": dns.NewSelector, "dns": dns.NewSelector,
"cache": selector.NewSelector, "cache": selector.NewSelector,
"router": router.NewSelector,
"static": static.NewSelector, "static": static.NewSelector,
} }

95
network/default.go Normal file
View File

@ -0,0 +1,95 @@
package network
import (
"sync"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/network/router"
"github.com/micro/go-micro/network/proxy"
)
type network struct {
options.Options
// router
r router.Router
// proxy
p proxy.Proxy
// id of this network
id string
// links maintained for this network
mtx sync.RWMutex
links []Link
}
type node struct {
*network
// address of this node
address string
}
type link struct {
// the embedded node
*node
// length and weight of the link
mtx sync.RWMutex
length, weight int
}
// network methods
func (n *network) Id() string {
return n.id
}
func (n *network) Connect() (Node, error) {
return nil, nil
}
func (n *network) Peer(Network) (Link, error) {
return nil, nil
}
func (n *network) Links() ([]Link, error) {
n.mtx.RLock()
defer n.mtx.RUnlock()
return n.links, nil
}
// node methods
func (n *node) Address() string {
return n.address
}
func (n *node) Close() error {
return nil
}
func (n *node) Accept() (*Message, error) {
return nil, nil
}
func (n *node) Send(*Message) error {
return nil
}
// link methods
func (l *link) Length() int {
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.length
}
func (l *link) Weight() int {
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.weight
}

View File

@ -51,6 +51,25 @@ type Message struct {
} }
var ( var (
// TODO: set default network // The default network ID is local
DefaultNetwork Network DefaultNetworkId = "local"
// just the standard network element
DefaultNetwork = NewNetwork()
) )
// NewNetwork returns a new network
func NewNetwork(opts ...options.Option) Network {
options := options.NewOptions(opts...)
// get router
// get proxy
return &network{
Options: options,
// fill the blanks
// router: r,
// proxy: p,
}
}

View File

@ -7,10 +7,12 @@ import (
"strings" "strings"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
rselect "github.com/micro/go-micro/client/selector/router"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/network/proxy" "github.com/micro/go-micro/network/proxy"
"github.com/micro/go-micro/network/router"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
) )
@ -162,5 +164,18 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
p.Client = c.(client.Client) p.Client = c.(client.Client)
} }
// get router
r, ok := p.Options.Values().Get("proxy.router")
if ok {
// set the router in the client
p.Client.Init(
// pass new selector as an option to the client
client.Selector(rselect.NewSelector(
// set the router in the selector
rselect.WithRouter(r.(router.Router)),
)),
)
}
return p return p
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/network/router"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
) )
@ -29,3 +30,8 @@ func WithEndpoint(e string) options.Option {
func WithClient(c client.Client) options.Option { func WithClient(c client.Client) options.Option {
return options.WithValue("proxy.client", c) return options.WithValue("proxy.client", c)
} }
// WithRouter specifies the router to use
func WithRouter(r router.Router) options.Option {
return options.WithValue("proxy.router", r)
}