selector: new selector interface with random & roundrobin implementation (#1761)
* selector: implement new selector interface plus random & roundrobin implementations * selector/roundrobin: remove unused consts * router: add close method to interface * selector/roundrobin: fix concurrent map iteration and map write * selector: replace variadic argument on Select
This commit is contained in:
parent
a95accad56
commit
6337c92cd0
48
selector/random.go
Normal file
48
selector/random.go
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
package selector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v2/router"
|
||||||
|
)
|
||||||
|
|
||||||
|
type random struct{}
|
||||||
|
|
||||||
|
func (r *random) Init(opts ...Option) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *random) Options() Options {
|
||||||
|
return Options{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *random) Select(routes []*router.Route) (*router.Route, error) {
|
||||||
|
// we can't select from an empty pool of routes
|
||||||
|
if len(routes) == 0 {
|
||||||
|
return nil, ErrNoneAvailable
|
||||||
|
}
|
||||||
|
|
||||||
|
// if there is only one route provided we'll select it
|
||||||
|
if len(routes) == 1 {
|
||||||
|
return routes[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// select a random route from the slice
|
||||||
|
return routes[rand.Intn(len(routes)-1)], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *random) Record(route *router.Route, err error) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *random) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *random) String() string {
|
||||||
|
return "random"
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSelector(...Option) Selector {
|
||||||
|
return &random{}
|
||||||
|
}
|
10
selector/random/random.go
Normal file
10
selector/random/random.go
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
package random
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/micro/go-micro/v2/selector"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewSelector returns a random selector
|
||||||
|
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||||
|
return selector.DefaultSelector
|
||||||
|
}
|
11
selector/random/random_test.go
Normal file
11
selector/random/random_test.go
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
package random
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v2/selector"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRandom(t *testing.T) {
|
||||||
|
selector.Tests(t, NewSelector())
|
||||||
|
}
|
112
selector/roundrobin/roundrobin.go
Normal file
112
selector/roundrobin/roundrobin.go
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
package roundrobin
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v2/router"
|
||||||
|
"github.com/micro/go-micro/v2/selector"
|
||||||
|
)
|
||||||
|
|
||||||
|
var routeTTL = time.Minute * 15
|
||||||
|
|
||||||
|
// NewSelector returns an initalised round robin selector
|
||||||
|
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||||
|
r := &roundrobin{
|
||||||
|
routes: make(map[uint64]time.Time),
|
||||||
|
ticker: time.NewTicker(time.Minute),
|
||||||
|
}
|
||||||
|
go r.cleanRoutes()
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
type roundrobin struct {
|
||||||
|
ticker *time.Ticker
|
||||||
|
|
||||||
|
// routes is a map with the key being a route's hash and the value being the last time it
|
||||||
|
// was used to perform a request
|
||||||
|
routes map[uint64]time.Time
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *roundrobin) Init(opts ...selector.Option) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *roundrobin) Options() selector.Options {
|
||||||
|
return selector.Options{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *roundrobin) Select(routes []*router.Route) (*router.Route, error) {
|
||||||
|
if len(routes) == 0 {
|
||||||
|
return nil, selector.ErrNoneAvailable
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
|
// setLastUsed will update the last used time for a route
|
||||||
|
setLastUsed := func(hash uint64) {
|
||||||
|
r.routes[hash] = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculate the route hashes once
|
||||||
|
hashes := make(map[*router.Route]uint64, len(routes))
|
||||||
|
for _, s := range routes {
|
||||||
|
hashes[s] = s.Hash()
|
||||||
|
}
|
||||||
|
|
||||||
|
// if a route hasn't yet been seen, prioritise it
|
||||||
|
for srv, hash := range hashes {
|
||||||
|
if _, ok := r.routes[hash]; !ok {
|
||||||
|
setLastUsed(hash)
|
||||||
|
return srv, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort the services by the time they were last used
|
||||||
|
sort.SliceStable(routes, func(i, j int) bool {
|
||||||
|
iLastSeen := r.routes[hashes[routes[i]]]
|
||||||
|
jLastSeen := r.routes[hashes[routes[j]]]
|
||||||
|
return iLastSeen.UnixNano() < jLastSeen.UnixNano()
|
||||||
|
})
|
||||||
|
|
||||||
|
// return the route which was last used
|
||||||
|
setLastUsed(hashes[routes[0]])
|
||||||
|
return routes[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *roundrobin) Record(srv *router.Route, err error) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *roundrobin) Close() error {
|
||||||
|
r.ticker.Stop()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *roundrobin) String() string {
|
||||||
|
return "roundrobin"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *roundrobin) cleanRoutes() {
|
||||||
|
for {
|
||||||
|
// watch for ticks until the ticker is closed
|
||||||
|
if _, ok := <-r.ticker.C; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Lock()
|
||||||
|
|
||||||
|
// copy the slice to prevent concurrent map iteration and map write
|
||||||
|
rts := r.routes
|
||||||
|
|
||||||
|
for hash, t := range rts {
|
||||||
|
if t.Unix() < time.Now().Add(-routeTTL).Unix() {
|
||||||
|
delete(r.routes, hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.Unlock()
|
||||||
|
}
|
||||||
|
}
|
49
selector/roundrobin/roundrobin_test.go
Normal file
49
selector/roundrobin/roundrobin_test.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
package roundrobin
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v2/router"
|
||||||
|
"github.com/micro/go-micro/v2/selector"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRoundRobin(t *testing.T) {
|
||||||
|
selector.Tests(t, NewSelector())
|
||||||
|
|
||||||
|
r1 := &router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8000"}
|
||||||
|
r2 := &router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8001"}
|
||||||
|
r3 := &router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8002"}
|
||||||
|
|
||||||
|
sel := NewSelector()
|
||||||
|
|
||||||
|
// By passing r1 and r2 first, it forces a set sequence of (r1 => r2 => r3 => r1)
|
||||||
|
|
||||||
|
r, err := sel.Select([]*router.Route{r1})
|
||||||
|
assert.Nil(t, err, "Error should be nil")
|
||||||
|
assert.Equal(t, r1, r, "Expected route to be r1")
|
||||||
|
|
||||||
|
r, err = sel.Select([]*router.Route{r2})
|
||||||
|
assert.Nil(t, err, "Error should be nil")
|
||||||
|
assert.Equal(t, r2, r, "Expected route to be r2")
|
||||||
|
|
||||||
|
// Because r1 and r2 have been recently called, r3 should be chosen
|
||||||
|
|
||||||
|
r, err = sel.Select([]*router.Route{r1, r2, r3})
|
||||||
|
assert.Nil(t, err, "Error should be nil")
|
||||||
|
assert.Equal(t, r3, r, "Expected route to be r3")
|
||||||
|
|
||||||
|
// r1 was called longest ago, so it should be prioritised
|
||||||
|
|
||||||
|
r, err = sel.Select([]*router.Route{r1, r2, r3})
|
||||||
|
assert.Nil(t, err, "Error should be nil")
|
||||||
|
assert.Equal(t, r1, r, "Expected route to be r1")
|
||||||
|
|
||||||
|
r, err = sel.Select([]*router.Route{r1, r2, r3})
|
||||||
|
assert.Nil(t, err, "Error should be nil")
|
||||||
|
assert.Equal(t, r2, r, "Expected route to be r2")
|
||||||
|
|
||||||
|
r, err = sel.Select([]*router.Route{r1, r2, r3})
|
||||||
|
assert.Nil(t, err, "Error should be nil")
|
||||||
|
assert.Equal(t, r3, r, "Expected route to be r3")
|
||||||
|
}
|
42
selector/selector.go
Normal file
42
selector/selector.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package selector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v2/router"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// DefaultSelector is the default selector
|
||||||
|
DefaultSelector = NewSelector()
|
||||||
|
|
||||||
|
// ErrNoneAvailable is returned by select when no routes were provided to select from
|
||||||
|
ErrNoneAvailable = errors.New("none available")
|
||||||
|
)
|
||||||
|
|
||||||
|
// Selector selects a route from a pool
|
||||||
|
type Selector interface {
|
||||||
|
// Init a selector with options
|
||||||
|
Init(...Option) error
|
||||||
|
// Options the selector is using
|
||||||
|
Options() Options
|
||||||
|
// Select a route from the pool using the strategy
|
||||||
|
Select([]*router.Route) (*router.Route, error)
|
||||||
|
// Record the error returned from a route to inform future selection
|
||||||
|
Record(*router.Route, error) error
|
||||||
|
// Close the selector
|
||||||
|
Close() error
|
||||||
|
// String returns the name of the selector
|
||||||
|
String() string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Options used to configure a selector
|
||||||
|
type Options struct{}
|
||||||
|
|
||||||
|
// Option updates the options
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
|
// NewSelector creates new selector and returns it
|
||||||
|
func NewSelector(opts ...Option) Selector {
|
||||||
|
return newSelector(opts...)
|
||||||
|
}
|
45
selector/tests.go
Normal file
45
selector/tests.go
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
package selector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v2/router"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Tests runs all the tests against a selector to ensure the implementations are consistent
|
||||||
|
func Tests(t *testing.T, s Selector) {
|
||||||
|
r1 := &router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8000"}
|
||||||
|
r2 := &router.Route{Service: "go.micro.service.foo", Address: "127.0.0.1:8001"}
|
||||||
|
|
||||||
|
t.Run("Select", func(t *testing.T) {
|
||||||
|
t.Run("NoRoutes", func(t *testing.T) {
|
||||||
|
srv, err := s.Select([]*router.Route{})
|
||||||
|
assert.Nil(t, srv, "Route should be nil")
|
||||||
|
assert.Equal(t, ErrNoneAvailable, err, "Expected error to be none available")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("OneRoute", func(t *testing.T) {
|
||||||
|
srv, err := s.Select([]*router.Route{r1})
|
||||||
|
assert.Nil(t, err, "Error should be nil")
|
||||||
|
assert.Equal(t, r1, srv, "Expected the route to be returned")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("MultipleRoutes", func(t *testing.T) {
|
||||||
|
srv, err := s.Select([]*router.Route{r1, r2})
|
||||||
|
assert.Nil(t, err, "Error should be nil")
|
||||||
|
if srv.Address != r1.Address && srv.Address != r2.Address {
|
||||||
|
t.Errorf("Expected the route to be one of the inputs")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Record", func(t *testing.T) {
|
||||||
|
err := s.Record(r1, nil)
|
||||||
|
assert.Nil(t, err, "Expected the error to be nil")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("String", func(t *testing.T) {
|
||||||
|
assert.NotEmpty(t, s.String(), "String returned a blank string")
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user