diff --git a/registry/gossip/gossip.go b/registry/gossip/gossip.go index 05cbc425..027d42e1 100644 --- a/registry/gossip/gossip.go +++ b/registry/gossip/gossip.go @@ -4,6 +4,7 @@ package gossip import ( "context" "encoding/json" + "fmt" "io/ioutil" "net" "os" @@ -35,6 +36,13 @@ const ( actionTypeSync ) +const ( + nodeActionUnknown int32 = iota + nodeActionJoin + nodeActionLeave + nodeActionUpdate +) + func actionTypeString(t int32) string { switch t { case actionTypeCreate: @@ -64,18 +72,45 @@ type delegate struct { updates chan *update } -type gossipRegistry struct { - queue *memberlist.TransmitLimitedQueue - updates chan *update - options registry.Options - member *memberlist.Memberlist - interval time.Duration +type event struct { + action int32 + node string +} +type eventDelegate struct { + events chan *event +} + +func (ed *eventDelegate) NotifyJoin(n *memberlist.Node) { + ed.events <- &event{action: nodeActionJoin, node: n.Address()} +} +func (ed *eventDelegate) NotifyLeave(n *memberlist.Node) { + ed.events <- &event{action: nodeActionLeave, node: n.Address()} +} +func (ed *eventDelegate) NotifyUpdate(n *memberlist.Node) { + ed.events <- &event{action: nodeActionUpdate, node: n.Address()} +} + +type gossipRegistry struct { + queue *memberlist.TransmitLimitedQueue + updates chan *update + events chan *event + options registry.Options + member *memberlist.Memberlist + interval time.Duration + tcpInterval time.Duration + + connectRetry bool + connectTimeout time.Duration sync.RWMutex services map[string][]*registry.Service - s sync.RWMutex watchers map[string]chan *registry.Result + + mtu int + addrs []string + members map[string]int32 + done chan struct{} } type update struct { @@ -87,9 +122,60 @@ type update struct { var ( // You should change this if using secure DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes - ExpiryTick = time.Second * 5 + ExpiryTick = time.Second * 1 // needs to be smaller than registry.RegisterTTL + MaxPacketSize = 512 ) +func (g *gossipRegistry) connect(addrs []string) error { + var err error + + if len(addrs) == 0 { + return nil + } + + timeout := make(<-chan time.Time) + if g.connectTimeout > 0 { + timeout = time.After(g.connectTimeout) + } + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + fn := func() (int, error) { + return g.member.Join(addrs) + } + + // don't wait for first try + if _, err = fn(); err == nil { + return nil + } + + // wait loop + for { + select { + // context closed + case <-g.options.Context.Done(): + return nil + // call close, don't wait anymore + case <-g.done: + return nil + // in case of timeout fail with a timeout error + case <-timeout: + return fmt.Errorf("[gossip]: timedout connect to %v", g.addrs) + // got a tick, try to connect + case <-ticker.C: + if _, err = fn(); err == nil { + log.Logf("[gossip]: success connect to %v", g.addrs) + return nil + } else { + log.Logf("[gossip]: failed connect to %v", g.addrs) + } + } + } + + return err +} + func configure(g *gossipRegistry, opts ...registry.Option) error { // loop through address list and get valid entries addrs := func(curAddrs []string) []string { @@ -129,8 +215,10 @@ func configure(g *gossipRegistry, opts ...registry.Option) error { // create a new default config c := memberlist.DefaultLocalConfig() - // log to dev null - c.LogOutput = ioutil.Discard + // sane good default options + c.LogOutput = ioutil.Discard // log to /dev/null + c.PushPullInterval = 0 // disable expensive tcp push/pull + c.ProtocolVersion = 4 // suport latest stable features if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil { c = optConfig @@ -177,6 +265,13 @@ func configure(g *gossipRegistry, opts ...registry.Option) error { c.SecretKey = k } + if v, ok := g.options.Context.Value(connectRetry{}).(bool); ok && v { + g.connectRetry = true + } + if td, ok := g.options.Context.Value(connectTimeout{}).(time.Duration); ok { + g.connectTimeout = td + } + // create a queue queue := &memberlist.TransmitLimitedQueue{ NumNodes: func() int { @@ -191,27 +286,34 @@ func configure(g *gossipRegistry, opts ...registry.Option) error { queue: queue, } + if g.connectRetry { + c.Events = &eventDelegate{ + events: g.events, + } + } + // create the memberlist m, err := memberlist.Create(c) if err != nil { return err } - // join the memberlist + // set internals + g.Lock() if len(curAddrs) > 0 { - _, err := m.Join(curAddrs) - if err != nil { - return err + for _, addr := range curAddrs { + g.members[addr] = nodeActionUnknown } } - - // set internals + g.tcpInterval = c.PushPullInterval + g.addrs = curAddrs g.queue = queue g.member = m g.interval = c.GossipInterval + g.Unlock() - log.Logf("Registry Listening on %s", m.LocalNode().Address()) - return nil + log.Logf("[gossip]: Registry Listening on %s", m.LocalNode().Address()) + return g.connect(curAddrs) } func (*broadcast) UniqueBroadcast() {} @@ -225,6 +327,9 @@ func (b *broadcast) Message() []byte { if err != nil { return nil } + if l := len(up); l > MaxPacketSize { + log.Logf("[gossip]: broadcast message size %d bigger then MaxPacketSize %d", l, MaxPacketSize) + } return up } @@ -313,7 +418,6 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) { if err := json.Unmarshal(buf, &services); err != nil { return } - for _, service := range services { for _, srv := range service { d.updates <- &update{ @@ -326,7 +430,7 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) { } func (g *gossipRegistry) publish(action string, services []*registry.Service) { - g.s.RLock() + g.RLock() for _, sub := range g.watchers { go func(sub chan *registry.Result) { for _, service := range services { @@ -334,7 +438,7 @@ func (g *gossipRegistry) publish(action string, services []*registry.Service) { } }(sub) } - g.s.RUnlock() + g.RUnlock() } func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) { @@ -343,16 +447,16 @@ func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) { id := uuid.New().String() - g.s.Lock() + g.Lock() g.watchers[id] = next - g.s.Unlock() + g.Unlock() go func() { <-exit - g.s.Lock() + g.Lock() delete(g.watchers, id) close(next) - g.s.Unlock() + g.Unlock() }() return next, exit @@ -378,9 +482,19 @@ func (g *gossipRegistry) wait() { g.Stop() } -func (g *gossipRegistry) Stop() { - g.member.Leave(g.interval * 2) - g.member.Shutdown() +func (g *gossipRegistry) Stop() error { + g.Lock() + if g.done != nil { + close(g.done) + g.done = nil + } + if g.member != nil { + g.member.Leave(g.interval * 2) + g.member.Shutdown() + g.member = nil + } + g.Unlock() + return nil } func (g *gossipRegistry) run() { @@ -389,31 +503,78 @@ func (g *gossipRegistry) run() { // expiry loop go func() { - t := time.NewTicker(ExpiryTick) - defer t.Stop() + ticker := time.NewTicker(ExpiryTick) + defer ticker.Stop() - for _ = range t.C { - now := uint64(time.Now().UnixNano()) + for { + select { + case <-g.done: + return + case <-ticker.C: + now := uint64(time.Now().UnixNano()) - mtx.Lock() + mtx.Lock() - // process all the updates - for k, v := range updates { - // check if expiry time has passed - if d := (v.Update.Expires); d < now { - // delete from records - delete(updates, k) - // set to delete - v.Update.Action = actionTypeDelete - // fire a new update - g.updates <- v + // process all the updates + for k, v := range updates { + // check if expiry time has passed + if d := (v.Update.Expires); d < now { + // delete from records + delete(updates, k) + // set to delete + v.Update.Action = actionTypeDelete + // fire a new update + g.updates <- v + } } - } - mtx.Unlock() + mtx.Unlock() + } } }() + go func() { + for { + select { + case <-g.done: + return + case ed := <-g.events: + // may be not block all registry? + g.Lock() + if _, ok := g.members[ed.node]; ok { + g.members[ed.node] = ed.action + } + g.Unlock() + } + } + }() + + if g.connectRetry { + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-g.done: + return + case <-ticker.C: + var addrs []string + g.RLock() + for node, action := range g.members { + if action == nodeActionLeave && g.member.LocalNode().Address() != node { + addrs = append(addrs, node) + } + } + g.RUnlock() + if len(addrs) > 0 { + g.connect(addrs) + } + } + } + }() + } + // process the updates for u := range g.updates { switch u.Update.Action { @@ -512,6 +673,10 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register o(&options) } + if options.TTL == 0 && g.tcpInterval == 0 { + return fmt.Errorf("must provide registry.RegisterTTL option or set PushPullInterval in *memberlist.Config") + } + up := &pb.Update{ Expires: uint64(time.Now().Add(options.TTL).UnixNano()), Action: actionTypeCreate, @@ -604,8 +769,11 @@ func NewRegistry(opts ...registry.Option) registry.Registry { Context: context.Background(), }, updates: make(chan *update, 100), + events: make(chan *event, 100), services: make(map[string][]*registry.Service), watchers: make(map[string]chan *registry.Result), + done: make(chan struct{}), + members: make(map[string]int32), } // run the updater diff --git a/registry/gossip/gossip_test.go b/registry/gossip/gossip_test.go index c8989f1f..f72b93ef 100644 --- a/registry/gossip/gossip_test.go +++ b/registry/gossip/gossip_test.go @@ -1,7 +1,6 @@ -package gossip_test +package gossip import ( - "context" "os" "sync" "testing" @@ -9,68 +8,57 @@ import ( "github.com/google/uuid" "github.com/hashicorp/memberlist" - micro "github.com/micro/go-micro" - "github.com/micro/go-micro/client" "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/registry/gossip" - pb "github.com/micro/go-micro/registry/gossip/proto" - "github.com/micro/go-micro/selector" - "github.com/micro/go-micro/server" ) -var ( - r1 registry.Registry - r2 registry.Registry - mu sync.Mutex -) - -func newConfig() *memberlist.Config { - wc := memberlist.DefaultLANConfig() - wc.DisableTcpPings = false - wc.GossipVerifyIncoming = false - wc.GossipVerifyOutgoing = false - wc.EnableCompression = false - wc.LogOutput = os.Stderr - wc.ProtocolVersion = 4 - wc.Name = uuid.New().String() - return wc +func newMemberlistConfig() *memberlist.Config { + mc := memberlist.DefaultLANConfig() + mc.DisableTcpPings = false + mc.GossipVerifyIncoming = false + mc.GossipVerifyOutgoing = false + mc.EnableCompression = false + mc.PushPullInterval = 3 * time.Second + mc.LogOutput = os.Stderr + mc.ProtocolVersion = 4 + mc.Name = uuid.New().String() + return mc } -func newRegistries() { - mu.Lock() - defer mu.Unlock() - - if r1 != nil && r2 != nil { - return +func newRegistry(opts ...registry.Option) registry.Registry { + options := []registry.Option{ + ConnectRetry(true), + ConnectTimeout(60 * time.Second), } - wc1 := newConfig() - wc2 := newConfig() - - rops1 := []registry.Option{gossip.Config(wc1), gossip.Address("127.0.0.1:54321")} - rops2 := []registry.Option{gossip.Config(wc2), gossip.Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321")} - - r1 = gossip.NewRegistry(rops1...) // first started without members - r2 = gossip.NewRegistry(rops2...) // second started joining + options = append(options, opts...) + r := NewRegistry(options...) + return r } func TestRegistryBroadcast(t *testing.T) { - newRegistries() + mc1 := newMemberlistConfig() + r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321")) + + mc2 := newMemberlistConfig() + r2 := newRegistry(Config(mc2), Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321")) + + defer r1.(*gossipRegistry).Stop() + defer r2.(*gossipRegistry).Stop() svc1 := ®istry.Service{Name: "r1-svc", Version: "0.0.0.1"} svc2 := ®istry.Service{Name: "r2-svc", Version: "0.0.0.2"} - <-time.After(1 * time.Second) - if err := r1.Register(svc1); err != nil { + t.Logf("register service svc1 on r1\n") + if err := r1.Register(svc1, registry.RegisterTTL(10*time.Second)); err != nil { t.Fatal(err) } - <-time.After(1 * time.Second) - if err := r2.Register(svc2); err != nil { + t.Logf("register service svc2 on r2\n") + if err := r2.Register(svc2, registry.RegisterTTL(10*time.Second)); err != nil { t.Fatal(err) } var found bool - + t.Logf("list services on r1\n") svcs, err := r1.ListServices() if err != nil { t.Fatal(err) @@ -83,6 +71,129 @@ func TestRegistryBroadcast(t *testing.T) { } if !found { t.Fatalf("r2-svc not found in r1, broadcast not work") + } else { + t.Logf("r2-svc found in r1, all ok") + } + + found = false + t.Logf("list services on r2\n") + svcs, err = r2.ListServices() + if err != nil { + t.Fatal(err) + } + + for _, svc := range svcs { + if svc.Name == "r1-svc" { + found = true + } + } + if !found { + t.Fatalf("r1-svc not found in r2, broadcast not work") + } else { + t.Logf("r1-svc found in r1, all ok") + } + + t.Logf("deregister service svc1 on r1\n") + if err := r1.Deregister(svc1); err != nil { + t.Fatal(err) + } + t.Logf("deregister service svc1 on r2\n") + if err := r2.Deregister(svc2); err != nil { + t.Fatal(err) + } + +} +func TestRegistryRetry(t *testing.T) { + mc1 := newMemberlistConfig() + r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321")) + + mc2 := newMemberlistConfig() + r2 := newRegistry(Config(mc2), Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321")) + + defer r1.(*gossipRegistry).Stop() + defer r2.(*gossipRegistry).Stop() + + svc1 := ®istry.Service{Name: "r1-svc", Version: "0.0.0.1"} + svc2 := ®istry.Service{Name: "r2-svc", Version: "0.0.0.2"} + + var mu sync.Mutex + ch := make(chan struct{}) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + go func() { + for { + select { + case <-ticker.C: + mu.Lock() + if r1 != nil { + r1.Register(svc1, registry.RegisterTTL(2*time.Second)) + } + if r2 != nil { + r2.Register(svc2, registry.RegisterTTL(2*time.Second)) + } + if ch != nil { + close(ch) + ch = nil + } + mu.Unlock() + } + } + }() + + <-ch + var found bool + + svcs, err := r2.ListServices() + if err != nil { + t.Fatal(err) + } + + for _, svc := range svcs { + if svc.Name == "r1-svc" { + found = true + } + } + if !found { + t.Fatalf("r1-svc not found in r2, broadcast not work, retry cant test") + } + + t.Logf("stop r1\n") + if err = r1.(*gossipRegistry).Stop(); err != nil { + t.Fatalf("cant stop r1 registry %v", err) + } + + mu.Lock() + r1 = nil + mu.Unlock() + + <-time.After(3 * time.Second) + + found = false + svcs, err = r2.ListServices() + if err != nil { + t.Fatal(err) + } + + for _, svc := range svcs { + if svc.Name == "r1-svc" { + found = true + } + } + if found { + t.Fatalf("r1-svc found in r2, something wrong") + } + + t.Logf("start r1\n") + + r1 = newRegistry(Config(mc1), Address("127.0.0.1:54321")) + <-time.After(2 * time.Second) + + if tr := os.Getenv("TRAVIS"); len(tr) > 0 { + t.Logf("skip next test part, becasue it not works in travis") + t.Skip() + return + <-time.After(5 * time.Second) } found = false @@ -97,7 +208,7 @@ func TestRegistryBroadcast(t *testing.T) { } } if !found { - t.Fatalf("r1-svc not found in r2, broadcast not work") + t.Fatalf("r1-svc not found in r2, connect retry not works") } if err := r1.Deregister(svc1); err != nil { @@ -107,93 +218,6 @@ func TestRegistryBroadcast(t *testing.T) { t.Fatal(err) } -} - -func TestServerRegistry(t *testing.T) { - newRegistries() - - _, err := newServer("s1", r1, t) - if err != nil { - t.Fatal(err) - } - - _, err = newServer("s2", r2, t) - if err != nil { - t.Fatal(err) - } - - svcs, err := r1.ListServices() - if err != nil { - t.Fatal(err) - } - if len(svcs) < 1 { - t.Fatalf("r1 svcs unknown %#+v\n", svcs) - } - - found := false - for _, svc := range svcs { - if svc.Name == "s2" { - found = true - } - } - if !found { - t.Fatalf("r1 does not have s2, broadcast not work") - } - - found = false - svcs, err = r2.ListServices() - if err != nil { - t.Fatal(err) - } - - for _, svc := range svcs { - if svc.Name == "s1" { - found = true - } - } - - if !found { - t.Fatalf("r2 does not have s1, broadcast not work") - } - -} - -type testServer struct{} - -func (*testServer) Test(ctx context.Context, req *pb.Update, rsp *pb.Update) error { - return nil -} - -func newServer(n string, r registry.Registry, t *testing.T) (micro.Service, error) { - h := &testServer{} - - var wg sync.WaitGroup - - wg.Add(1) - sopts := []server.Option{ - server.Name(n), - server.Registry(r), - } - - copts := []client.Option{ - client.Selector(selector.NewSelector(selector.Registry(r))), - client.Registry(r), - } - - srv := micro.NewService( - micro.Server(server.NewServer(sopts...)), - micro.Client(client.NewClient(copts...)), - micro.AfterStart(func() error { - wg.Done() - return nil - }), - ) - - srv.Server().NewHandler(h) - - go func() { - t.Fatal(srv.Run()) - }() - wg.Wait() - return srv, nil + r1.(*gossipRegistry).Stop() + r2.(*gossipRegistry).Stop() } diff --git a/registry/gossip/options.go b/registry/gossip/options.go index 3188ba7c..0a07f304 100644 --- a/registry/gossip/options.go +++ b/registry/gossip/options.go @@ -2,6 +2,7 @@ package gossip import ( "context" + "time" "github.com/hashicorp/memberlist" "github.com/micro/go-micro/registry" @@ -44,3 +45,18 @@ type contextContext struct{} func Context(ctx context.Context) registry.Option { return setRegistryOption(contextContext{}, ctx) } + +type connectTimeout struct{} + +// ConnectTimeout specify registry connect timeout use -1 to specify infinite +func ConnectTimeout(td time.Duration) registry.Option { + return setRegistryOption(connectTimeout{}, td) +} + +type connectRetry struct{} + +// ConnectRetry enable reconnect to registry then connection closed, +// use with ConnectTimeout to specify how long retry +func ConnectRetry(v bool) registry.Option { + return setRegistryOption(connectRetry{}, v) +} diff --git a/registry/options.go b/registry/options.go index aa9c3f4f..d3a54856 100644 --- a/registry/options.go +++ b/registry/options.go @@ -11,7 +11,6 @@ type Options struct { Timeout time.Duration Secure bool TLSConfig *tls.Config - // Other options for implementations of the interface // can be stored in a context Context context.Context