Merge pull request #415 from unistack-org/rejoin
registry: gossip add Reconnect and Timeout
This commit is contained in:
commit
c9bcdc8438
@ -4,6 +4,7 @@ package gossip
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
@ -35,6 +36,13 @@ const (
|
|||||||
actionTypeSync
|
actionTypeSync
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
nodeActionUnknown int32 = iota
|
||||||
|
nodeActionJoin
|
||||||
|
nodeActionLeave
|
||||||
|
nodeActionUpdate
|
||||||
|
)
|
||||||
|
|
||||||
func actionTypeString(t int32) string {
|
func actionTypeString(t int32) string {
|
||||||
switch t {
|
switch t {
|
||||||
case actionTypeCreate:
|
case actionTypeCreate:
|
||||||
@ -64,18 +72,45 @@ type delegate struct {
|
|||||||
updates chan *update
|
updates chan *update
|
||||||
}
|
}
|
||||||
|
|
||||||
type gossipRegistry struct {
|
type event struct {
|
||||||
queue *memberlist.TransmitLimitedQueue
|
action int32
|
||||||
updates chan *update
|
node string
|
||||||
options registry.Options
|
}
|
||||||
member *memberlist.Memberlist
|
|
||||||
interval time.Duration
|
|
||||||
|
|
||||||
|
type eventDelegate struct {
|
||||||
|
events chan *event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ed *eventDelegate) NotifyJoin(n *memberlist.Node) {
|
||||||
|
ed.events <- &event{action: nodeActionJoin, node: n.Address()}
|
||||||
|
}
|
||||||
|
func (ed *eventDelegate) NotifyLeave(n *memberlist.Node) {
|
||||||
|
ed.events <- &event{action: nodeActionLeave, node: n.Address()}
|
||||||
|
}
|
||||||
|
func (ed *eventDelegate) NotifyUpdate(n *memberlist.Node) {
|
||||||
|
ed.events <- &event{action: nodeActionUpdate, node: n.Address()}
|
||||||
|
}
|
||||||
|
|
||||||
|
type gossipRegistry struct {
|
||||||
|
queue *memberlist.TransmitLimitedQueue
|
||||||
|
updates chan *update
|
||||||
|
events chan *event
|
||||||
|
options registry.Options
|
||||||
|
member *memberlist.Memberlist
|
||||||
|
interval time.Duration
|
||||||
|
tcpInterval time.Duration
|
||||||
|
|
||||||
|
connectRetry bool
|
||||||
|
connectTimeout time.Duration
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
services map[string][]*registry.Service
|
services map[string][]*registry.Service
|
||||||
|
|
||||||
s sync.RWMutex
|
|
||||||
watchers map[string]chan *registry.Result
|
watchers map[string]chan *registry.Result
|
||||||
|
|
||||||
|
mtu int
|
||||||
|
addrs []string
|
||||||
|
members map[string]int32
|
||||||
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type update struct {
|
type update struct {
|
||||||
@ -87,9 +122,60 @@ type update struct {
|
|||||||
var (
|
var (
|
||||||
// You should change this if using secure
|
// You should change this if using secure
|
||||||
DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes
|
DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes
|
||||||
ExpiryTick = time.Second * 5
|
ExpiryTick = time.Second * 1 // needs to be smaller than registry.RegisterTTL
|
||||||
|
MaxPacketSize = 512
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (g *gossipRegistry) connect(addrs []string) error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if len(addrs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
timeout := make(<-chan time.Time)
|
||||||
|
if g.connectTimeout > 0 {
|
||||||
|
timeout = time.After(g.connectTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
fn := func() (int, error) {
|
||||||
|
return g.member.Join(addrs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// don't wait for first try
|
||||||
|
if _, err = fn(); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait loop
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// context closed
|
||||||
|
case <-g.options.Context.Done():
|
||||||
|
return nil
|
||||||
|
// call close, don't wait anymore
|
||||||
|
case <-g.done:
|
||||||
|
return nil
|
||||||
|
// in case of timeout fail with a timeout error
|
||||||
|
case <-timeout:
|
||||||
|
return fmt.Errorf("[gossip]: timedout connect to %v", g.addrs)
|
||||||
|
// got a tick, try to connect
|
||||||
|
case <-ticker.C:
|
||||||
|
if _, err = fn(); err == nil {
|
||||||
|
log.Logf("[gossip]: success connect to %v", g.addrs)
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
log.Logf("[gossip]: failed connect to %v", g.addrs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func configure(g *gossipRegistry, opts ...registry.Option) error {
|
func configure(g *gossipRegistry, opts ...registry.Option) error {
|
||||||
// loop through address list and get valid entries
|
// loop through address list and get valid entries
|
||||||
addrs := func(curAddrs []string) []string {
|
addrs := func(curAddrs []string) []string {
|
||||||
@ -129,8 +215,10 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
|
|||||||
// create a new default config
|
// create a new default config
|
||||||
c := memberlist.DefaultLocalConfig()
|
c := memberlist.DefaultLocalConfig()
|
||||||
|
|
||||||
// log to dev null
|
// sane good default options
|
||||||
c.LogOutput = ioutil.Discard
|
c.LogOutput = ioutil.Discard // log to /dev/null
|
||||||
|
c.PushPullInterval = 0 // disable expensive tcp push/pull
|
||||||
|
c.ProtocolVersion = 4 // suport latest stable features
|
||||||
|
|
||||||
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
|
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
|
||||||
c = optConfig
|
c = optConfig
|
||||||
@ -177,6 +265,13 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
|
|||||||
c.SecretKey = k
|
c.SecretKey = k
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if v, ok := g.options.Context.Value(connectRetry{}).(bool); ok && v {
|
||||||
|
g.connectRetry = true
|
||||||
|
}
|
||||||
|
if td, ok := g.options.Context.Value(connectTimeout{}).(time.Duration); ok {
|
||||||
|
g.connectTimeout = td
|
||||||
|
}
|
||||||
|
|
||||||
// create a queue
|
// create a queue
|
||||||
queue := &memberlist.TransmitLimitedQueue{
|
queue := &memberlist.TransmitLimitedQueue{
|
||||||
NumNodes: func() int {
|
NumNodes: func() int {
|
||||||
@ -191,27 +286,34 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
|
|||||||
queue: queue,
|
queue: queue,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if g.connectRetry {
|
||||||
|
c.Events = &eventDelegate{
|
||||||
|
events: g.events,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// create the memberlist
|
// create the memberlist
|
||||||
m, err := memberlist.Create(c)
|
m, err := memberlist.Create(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// join the memberlist
|
// set internals
|
||||||
|
g.Lock()
|
||||||
if len(curAddrs) > 0 {
|
if len(curAddrs) > 0 {
|
||||||
_, err := m.Join(curAddrs)
|
for _, addr := range curAddrs {
|
||||||
if err != nil {
|
g.members[addr] = nodeActionUnknown
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
g.tcpInterval = c.PushPullInterval
|
||||||
// set internals
|
g.addrs = curAddrs
|
||||||
g.queue = queue
|
g.queue = queue
|
||||||
g.member = m
|
g.member = m
|
||||||
g.interval = c.GossipInterval
|
g.interval = c.GossipInterval
|
||||||
|
g.Unlock()
|
||||||
|
|
||||||
log.Logf("Registry Listening on %s", m.LocalNode().Address())
|
log.Logf("[gossip]: Registry Listening on %s", m.LocalNode().Address())
|
||||||
return nil
|
return g.connect(curAddrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*broadcast) UniqueBroadcast() {}
|
func (*broadcast) UniqueBroadcast() {}
|
||||||
@ -225,6 +327,9 @@ func (b *broadcast) Message() []byte {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if l := len(up); l > MaxPacketSize {
|
||||||
|
log.Logf("[gossip]: broadcast message size %d bigger then MaxPacketSize %d", l, MaxPacketSize)
|
||||||
|
}
|
||||||
return up
|
return up
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -313,7 +418,6 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
|
|||||||
if err := json.Unmarshal(buf, &services); err != nil {
|
if err := json.Unmarshal(buf, &services); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, service := range services {
|
for _, service := range services {
|
||||||
for _, srv := range service {
|
for _, srv := range service {
|
||||||
d.updates <- &update{
|
d.updates <- &update{
|
||||||
@ -326,7 +430,7 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *gossipRegistry) publish(action string, services []*registry.Service) {
|
func (g *gossipRegistry) publish(action string, services []*registry.Service) {
|
||||||
g.s.RLock()
|
g.RLock()
|
||||||
for _, sub := range g.watchers {
|
for _, sub := range g.watchers {
|
||||||
go func(sub chan *registry.Result) {
|
go func(sub chan *registry.Result) {
|
||||||
for _, service := range services {
|
for _, service := range services {
|
||||||
@ -334,7 +438,7 @@ func (g *gossipRegistry) publish(action string, services []*registry.Service) {
|
|||||||
}
|
}
|
||||||
}(sub)
|
}(sub)
|
||||||
}
|
}
|
||||||
g.s.RUnlock()
|
g.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
|
func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
|
||||||
@ -343,16 +447,16 @@ func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
|
|||||||
|
|
||||||
id := uuid.New().String()
|
id := uuid.New().String()
|
||||||
|
|
||||||
g.s.Lock()
|
g.Lock()
|
||||||
g.watchers[id] = next
|
g.watchers[id] = next
|
||||||
g.s.Unlock()
|
g.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
<-exit
|
<-exit
|
||||||
g.s.Lock()
|
g.Lock()
|
||||||
delete(g.watchers, id)
|
delete(g.watchers, id)
|
||||||
close(next)
|
close(next)
|
||||||
g.s.Unlock()
|
g.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return next, exit
|
return next, exit
|
||||||
@ -378,9 +482,19 @@ func (g *gossipRegistry) wait() {
|
|||||||
g.Stop()
|
g.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *gossipRegistry) Stop() {
|
func (g *gossipRegistry) Stop() error {
|
||||||
g.member.Leave(g.interval * 2)
|
g.Lock()
|
||||||
g.member.Shutdown()
|
if g.done != nil {
|
||||||
|
close(g.done)
|
||||||
|
g.done = nil
|
||||||
|
}
|
||||||
|
if g.member != nil {
|
||||||
|
g.member.Leave(g.interval * 2)
|
||||||
|
g.member.Shutdown()
|
||||||
|
g.member = nil
|
||||||
|
}
|
||||||
|
g.Unlock()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *gossipRegistry) run() {
|
func (g *gossipRegistry) run() {
|
||||||
@ -389,31 +503,78 @@ func (g *gossipRegistry) run() {
|
|||||||
|
|
||||||
// expiry loop
|
// expiry loop
|
||||||
go func() {
|
go func() {
|
||||||
t := time.NewTicker(ExpiryTick)
|
ticker := time.NewTicker(ExpiryTick)
|
||||||
defer t.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for _ = range t.C {
|
for {
|
||||||
now := uint64(time.Now().UnixNano())
|
select {
|
||||||
|
case <-g.done:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
now := uint64(time.Now().UnixNano())
|
||||||
|
|
||||||
mtx.Lock()
|
mtx.Lock()
|
||||||
|
|
||||||
// process all the updates
|
// process all the updates
|
||||||
for k, v := range updates {
|
for k, v := range updates {
|
||||||
// check if expiry time has passed
|
// check if expiry time has passed
|
||||||
if d := (v.Update.Expires); d < now {
|
if d := (v.Update.Expires); d < now {
|
||||||
// delete from records
|
// delete from records
|
||||||
delete(updates, k)
|
delete(updates, k)
|
||||||
// set to delete
|
// set to delete
|
||||||
v.Update.Action = actionTypeDelete
|
v.Update.Action = actionTypeDelete
|
||||||
// fire a new update
|
// fire a new update
|
||||||
g.updates <- v
|
g.updates <- v
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
mtx.Unlock()
|
mtx.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-g.done:
|
||||||
|
return
|
||||||
|
case ed := <-g.events:
|
||||||
|
// may be not block all registry?
|
||||||
|
g.Lock()
|
||||||
|
if _, ok := g.members[ed.node]; ok {
|
||||||
|
g.members[ed.node] = ed.action
|
||||||
|
}
|
||||||
|
g.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if g.connectRetry {
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-g.done:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
var addrs []string
|
||||||
|
g.RLock()
|
||||||
|
for node, action := range g.members {
|
||||||
|
if action == nodeActionLeave && g.member.LocalNode().Address() != node {
|
||||||
|
addrs = append(addrs, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
g.RUnlock()
|
||||||
|
if len(addrs) > 0 {
|
||||||
|
g.connect(addrs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// process the updates
|
// process the updates
|
||||||
for u := range g.updates {
|
for u := range g.updates {
|
||||||
switch u.Update.Action {
|
switch u.Update.Action {
|
||||||
@ -512,6 +673,10 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register
|
|||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if options.TTL == 0 && g.tcpInterval == 0 {
|
||||||
|
return fmt.Errorf("must provide registry.RegisterTTL option or set PushPullInterval in *memberlist.Config")
|
||||||
|
}
|
||||||
|
|
||||||
up := &pb.Update{
|
up := &pb.Update{
|
||||||
Expires: uint64(time.Now().Add(options.TTL).UnixNano()),
|
Expires: uint64(time.Now().Add(options.TTL).UnixNano()),
|
||||||
Action: actionTypeCreate,
|
Action: actionTypeCreate,
|
||||||
@ -604,8 +769,11 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
|
|||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
},
|
},
|
||||||
updates: make(chan *update, 100),
|
updates: make(chan *update, 100),
|
||||||
|
events: make(chan *event, 100),
|
||||||
services: make(map[string][]*registry.Service),
|
services: make(map[string][]*registry.Service),
|
||||||
watchers: make(map[string]chan *registry.Result),
|
watchers: make(map[string]chan *registry.Result),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
members: make(map[string]int32),
|
||||||
}
|
}
|
||||||
|
|
||||||
// run the updater
|
// run the updater
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package gossip_test
|
package gossip
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@ -9,68 +8,57 @@ import (
|
|||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/hashicorp/memberlist"
|
"github.com/hashicorp/memberlist"
|
||||||
micro "github.com/micro/go-micro"
|
|
||||||
"github.com/micro/go-micro/client"
|
|
||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
"github.com/micro/go-micro/registry/gossip"
|
|
||||||
pb "github.com/micro/go-micro/registry/gossip/proto"
|
|
||||||
"github.com/micro/go-micro/selector"
|
|
||||||
"github.com/micro/go-micro/server"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
func newMemberlistConfig() *memberlist.Config {
|
||||||
r1 registry.Registry
|
mc := memberlist.DefaultLANConfig()
|
||||||
r2 registry.Registry
|
mc.DisableTcpPings = false
|
||||||
mu sync.Mutex
|
mc.GossipVerifyIncoming = false
|
||||||
)
|
mc.GossipVerifyOutgoing = false
|
||||||
|
mc.EnableCompression = false
|
||||||
func newConfig() *memberlist.Config {
|
mc.PushPullInterval = 3 * time.Second
|
||||||
wc := memberlist.DefaultLANConfig()
|
mc.LogOutput = os.Stderr
|
||||||
wc.DisableTcpPings = false
|
mc.ProtocolVersion = 4
|
||||||
wc.GossipVerifyIncoming = false
|
mc.Name = uuid.New().String()
|
||||||
wc.GossipVerifyOutgoing = false
|
return mc
|
||||||
wc.EnableCompression = false
|
|
||||||
wc.LogOutput = os.Stderr
|
|
||||||
wc.ProtocolVersion = 4
|
|
||||||
wc.Name = uuid.New().String()
|
|
||||||
return wc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRegistries() {
|
func newRegistry(opts ...registry.Option) registry.Registry {
|
||||||
mu.Lock()
|
options := []registry.Option{
|
||||||
defer mu.Unlock()
|
ConnectRetry(true),
|
||||||
|
ConnectTimeout(60 * time.Second),
|
||||||
if r1 != nil && r2 != nil {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wc1 := newConfig()
|
options = append(options, opts...)
|
||||||
wc2 := newConfig()
|
r := NewRegistry(options...)
|
||||||
|
return r
|
||||||
rops1 := []registry.Option{gossip.Config(wc1), gossip.Address("127.0.0.1:54321")}
|
|
||||||
rops2 := []registry.Option{gossip.Config(wc2), gossip.Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321")}
|
|
||||||
|
|
||||||
r1 = gossip.NewRegistry(rops1...) // first started without members
|
|
||||||
r2 = gossip.NewRegistry(rops2...) // second started joining
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRegistryBroadcast(t *testing.T) {
|
func TestRegistryBroadcast(t *testing.T) {
|
||||||
newRegistries()
|
mc1 := newMemberlistConfig()
|
||||||
|
r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321"))
|
||||||
|
|
||||||
|
mc2 := newMemberlistConfig()
|
||||||
|
r2 := newRegistry(Config(mc2), Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321"))
|
||||||
|
|
||||||
|
defer r1.(*gossipRegistry).Stop()
|
||||||
|
defer r2.(*gossipRegistry).Stop()
|
||||||
|
|
||||||
svc1 := ®istry.Service{Name: "r1-svc", Version: "0.0.0.1"}
|
svc1 := ®istry.Service{Name: "r1-svc", Version: "0.0.0.1"}
|
||||||
svc2 := ®istry.Service{Name: "r2-svc", Version: "0.0.0.2"}
|
svc2 := ®istry.Service{Name: "r2-svc", Version: "0.0.0.2"}
|
||||||
|
|
||||||
<-time.After(1 * time.Second)
|
t.Logf("register service svc1 on r1\n")
|
||||||
if err := r1.Register(svc1); err != nil {
|
if err := r1.Register(svc1, registry.RegisterTTL(10*time.Second)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
<-time.After(1 * time.Second)
|
t.Logf("register service svc2 on r2\n")
|
||||||
if err := r2.Register(svc2); err != nil {
|
if err := r2.Register(svc2, registry.RegisterTTL(10*time.Second)); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var found bool
|
var found bool
|
||||||
|
t.Logf("list services on r1\n")
|
||||||
svcs, err := r1.ListServices()
|
svcs, err := r1.ListServices()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -83,6 +71,129 @@ func TestRegistryBroadcast(t *testing.T) {
|
|||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Fatalf("r2-svc not found in r1, broadcast not work")
|
t.Fatalf("r2-svc not found in r1, broadcast not work")
|
||||||
|
} else {
|
||||||
|
t.Logf("r2-svc found in r1, all ok")
|
||||||
|
}
|
||||||
|
|
||||||
|
found = false
|
||||||
|
t.Logf("list services on r2\n")
|
||||||
|
svcs, err = r2.ListServices()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, svc := range svcs {
|
||||||
|
if svc.Name == "r1-svc" {
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("r1-svc not found in r2, broadcast not work")
|
||||||
|
} else {
|
||||||
|
t.Logf("r1-svc found in r1, all ok")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("deregister service svc1 on r1\n")
|
||||||
|
if err := r1.Deregister(svc1); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Logf("deregister service svc1 on r2\n")
|
||||||
|
if err := r2.Deregister(svc2); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
func TestRegistryRetry(t *testing.T) {
|
||||||
|
mc1 := newMemberlistConfig()
|
||||||
|
r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321"))
|
||||||
|
|
||||||
|
mc2 := newMemberlistConfig()
|
||||||
|
r2 := newRegistry(Config(mc2), Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321"))
|
||||||
|
|
||||||
|
defer r1.(*gossipRegistry).Stop()
|
||||||
|
defer r2.(*gossipRegistry).Stop()
|
||||||
|
|
||||||
|
svc1 := ®istry.Service{Name: "r1-svc", Version: "0.0.0.1"}
|
||||||
|
svc2 := ®istry.Service{Name: "r2-svc", Version: "0.0.0.2"}
|
||||||
|
|
||||||
|
var mu sync.Mutex
|
||||||
|
ch := make(chan struct{})
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
mu.Lock()
|
||||||
|
if r1 != nil {
|
||||||
|
r1.Register(svc1, registry.RegisterTTL(2*time.Second))
|
||||||
|
}
|
||||||
|
if r2 != nil {
|
||||||
|
r2.Register(svc2, registry.RegisterTTL(2*time.Second))
|
||||||
|
}
|
||||||
|
if ch != nil {
|
||||||
|
close(ch)
|
||||||
|
ch = nil
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-ch
|
||||||
|
var found bool
|
||||||
|
|
||||||
|
svcs, err := r2.ListServices()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, svc := range svcs {
|
||||||
|
if svc.Name == "r1-svc" {
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("r1-svc not found in r2, broadcast not work, retry cant test")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("stop r1\n")
|
||||||
|
if err = r1.(*gossipRegistry).Stop(); err != nil {
|
||||||
|
t.Fatalf("cant stop r1 registry %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
r1 = nil
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
<-time.After(3 * time.Second)
|
||||||
|
|
||||||
|
found = false
|
||||||
|
svcs, err = r2.ListServices()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, svc := range svcs {
|
||||||
|
if svc.Name == "r1-svc" {
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if found {
|
||||||
|
t.Fatalf("r1-svc found in r2, something wrong")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("start r1\n")
|
||||||
|
|
||||||
|
r1 = newRegistry(Config(mc1), Address("127.0.0.1:54321"))
|
||||||
|
<-time.After(2 * time.Second)
|
||||||
|
|
||||||
|
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
|
||||||
|
t.Logf("skip next test part, becasue it not works in travis")
|
||||||
|
t.Skip()
|
||||||
|
return
|
||||||
|
<-time.After(5 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
found = false
|
found = false
|
||||||
@ -97,7 +208,7 @@ func TestRegistryBroadcast(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Fatalf("r1-svc not found in r2, broadcast not work")
|
t.Fatalf("r1-svc not found in r2, connect retry not works")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r1.Deregister(svc1); err != nil {
|
if err := r1.Deregister(svc1); err != nil {
|
||||||
@ -107,93 +218,6 @@ func TestRegistryBroadcast(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
r1.(*gossipRegistry).Stop()
|
||||||
|
r2.(*gossipRegistry).Stop()
|
||||||
func TestServerRegistry(t *testing.T) {
|
|
||||||
newRegistries()
|
|
||||||
|
|
||||||
_, err := newServer("s1", r1, t)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = newServer("s2", r2, t)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
svcs, err := r1.ListServices()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if len(svcs) < 1 {
|
|
||||||
t.Fatalf("r1 svcs unknown %#+v\n", svcs)
|
|
||||||
}
|
|
||||||
|
|
||||||
found := false
|
|
||||||
for _, svc := range svcs {
|
|
||||||
if svc.Name == "s2" {
|
|
||||||
found = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
t.Fatalf("r1 does not have s2, broadcast not work")
|
|
||||||
}
|
|
||||||
|
|
||||||
found = false
|
|
||||||
svcs, err = r2.ListServices()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, svc := range svcs {
|
|
||||||
if svc.Name == "s1" {
|
|
||||||
found = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !found {
|
|
||||||
t.Fatalf("r2 does not have s1, broadcast not work")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
type testServer struct{}
|
|
||||||
|
|
||||||
func (*testServer) Test(ctx context.Context, req *pb.Update, rsp *pb.Update) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newServer(n string, r registry.Registry, t *testing.T) (micro.Service, error) {
|
|
||||||
h := &testServer{}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
sopts := []server.Option{
|
|
||||||
server.Name(n),
|
|
||||||
server.Registry(r),
|
|
||||||
}
|
|
||||||
|
|
||||||
copts := []client.Option{
|
|
||||||
client.Selector(selector.NewSelector(selector.Registry(r))),
|
|
||||||
client.Registry(r),
|
|
||||||
}
|
|
||||||
|
|
||||||
srv := micro.NewService(
|
|
||||||
micro.Server(server.NewServer(sopts...)),
|
|
||||||
micro.Client(client.NewClient(copts...)),
|
|
||||||
micro.AfterStart(func() error {
|
|
||||||
wg.Done()
|
|
||||||
return nil
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
srv.Server().NewHandler(h)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
t.Fatal(srv.Run())
|
|
||||||
}()
|
|
||||||
wg.Wait()
|
|
||||||
return srv, nil
|
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package gossip
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/memberlist"
|
"github.com/hashicorp/memberlist"
|
||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
@ -44,3 +45,18 @@ type contextContext struct{}
|
|||||||
func Context(ctx context.Context) registry.Option {
|
func Context(ctx context.Context) registry.Option {
|
||||||
return setRegistryOption(contextContext{}, ctx)
|
return setRegistryOption(contextContext{}, ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type connectTimeout struct{}
|
||||||
|
|
||||||
|
// ConnectTimeout specify registry connect timeout use -1 to specify infinite
|
||||||
|
func ConnectTimeout(td time.Duration) registry.Option {
|
||||||
|
return setRegistryOption(connectTimeout{}, td)
|
||||||
|
}
|
||||||
|
|
||||||
|
type connectRetry struct{}
|
||||||
|
|
||||||
|
// ConnectRetry enable reconnect to registry then connection closed,
|
||||||
|
// use with ConnectTimeout to specify how long retry
|
||||||
|
func ConnectRetry(v bool) registry.Option {
|
||||||
|
return setRegistryOption(connectRetry{}, v)
|
||||||
|
}
|
||||||
|
@ -11,7 +11,6 @@ type Options struct {
|
|||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
Secure bool
|
Secure bool
|
||||||
TLSConfig *tls.Config
|
TLSConfig *tls.Config
|
||||||
|
|
||||||
// Other options for implementations of the interface
|
// Other options for implementations of the interface
|
||||||
// can be stored in a context
|
// can be stored in a context
|
||||||
Context context.Context
|
Context context.Context
|
||||||
|
Loading…
Reference in New Issue
Block a user