From 544b4d37573c555605e2d18bf2888de27657daad Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 6 Dec 2018 18:30:26 +0000 Subject: [PATCH] Gossip registry now a default registry --- README.md | 25 ++- gossip.go | 518 +------------------------------------------------ gossip_test.go | 78 -------- options.go | 15 -- watch.go | 60 ------ 5 files changed, 14 insertions(+), 682 deletions(-) delete mode 100644 gossip_test.go delete mode 100644 options.go delete mode 100644 watch.go diff --git a/README.md b/README.md index e0a2119..f81aa3a 100644 --- a/README.md +++ b/README.md @@ -1,26 +1,23 @@ # Gossip Registry -Gossip is a registry plugin for go-micro which uses hashicorp/memberlist to broadcast registry information -via the SWIM protocol. +Gossip is a zero dependency registry which uses hashicorp/memberlist to broadcast registry information via the SWIM protocol. ## Usage -Import the plugin as per usual +Start with the registry flag or env var -```go -import _ "github.com/micro/go-plugins/registry/gossip" -``` - -Start with the registry flag - -```go -go run service.go --registry=gossip +```bash +MICRO_REGISTRY=gossip go run service.go ``` On startup you'll see something like -```go -2016/06/19 14:05:43 Local memberlist node 127.0.0.1:45465 +```bash +2018/12/06 18:17:48 Registry Listening on 192.168.1.65:56390 ``` -To join this gossip ring use `--registry=gossip --registry_address 127.0.0.1:45465` when starting other nodes +To join this gossip ring set the registry address using flag or env var + +```bash +MICRO_REGISTRY_ADDRESS= 192.168.1.65:56390 +``` diff --git a/gossip.go b/gossip.go index e73ddb8..e2ba0e0 100644 --- a/gossip.go +++ b/gossip.go @@ -2,523 +2,11 @@ package gossip import ( - "encoding/json" - "fmt" - "os" - "sync" - "time" - - "github.com/google/uuid" - "github.com/hashicorp/memberlist" - "github.com/micro/go-log" - "github.com/micro/go-micro/cmd" "github.com/micro/go-micro/registry" - "github.com/mitchellh/hashstructure" + "github.com/micro/go-micro/registry/gossip" ) -type action int - -const ( - addAction action = iota - delAction - syncAction -) - -type broadcast struct { - msg []byte - notify chan<- struct{} -} - -type delegate struct { - broadcasts *memberlist.TransmitLimitedQueue - updates chan *update -} - -type gossipRegistry struct { - broadcasts *memberlist.TransmitLimitedQueue - updates chan *update - options registry.Options - member *memberlist.Memberlist - - sync.RWMutex - services map[string][]*registry.Service - - s sync.RWMutex - subs map[string]chan *registry.Result -} - -type update struct { - Action action - Service *registry.Service - Timestamp int64 - Expires int64 - sync chan *registry.Service -} - -var ( - // You should change this if using secure - DefaultKey = []byte("gossipKey") - ExpiryTick = time.Second * 10 -) - -func init() { - cmd.DefaultRegistries["gossip"] = NewRegistry -} - -func configure(g *gossipRegistry, opts ...registry.Option) error { - addrs := func() []string { - var cAddrs []string - for _, addr := range g.options.Addrs { - if len(addr) > 0 { - cAddrs = append(cAddrs, addr) - } - } - return cAddrs - } - - cAddrs := addrs() - hostname, _ := os.Hostname() - - for _, o := range opts { - o(&g.options) - } - - newAddrs := addrs() - - // no new nodes and existing member. no configure - if (len(newAddrs) == len(cAddrs)) && g.member != nil { - return nil - } - - // shutdown old member - if g.member != nil { - log.Logf("Shutdown old memberlist: %v", g.member.Shutdown()) - } - - cAddrs = newAddrs - - broadcasts := &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(cAddrs) - }, - RetransmitMult: 3, - } - - c := memberlist.DefaultLocalConfig() - c.BindPort = 0 - c.Name = hostname + "-" + uuid.New().String() - c.Delegate = &delegate{ - updates: g.updates, - broadcasts: broadcasts, - } - - if g.options.Secure { - k, ok := g.options.Context.Value(contextSecretKey{}).([]byte) - if !ok { - k = DefaultKey - } - c.SecretKey = k - } - - m, err := memberlist.Create(c) - if err != nil { - log.Fatalf("Error creating memberlist: %v", err) - } - - if len(cAddrs) > 0 { - _, err := m.Join(cAddrs) - if err != nil { - log.Fatalf("Error joining members: %v", err) - } - } - - g.broadcasts = broadcasts - g.member = m - - log.Logf("Local memberlist node %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port) - return nil -} - -func addNodes(old, neu []*registry.Node) []*registry.Node { - for _, n := range neu { - var seen bool - for i, o := range old { - if o.Id == n.Id { - seen = true - old[i] = n - break - } - } - if !seen { - old = append(old, n) - } - } - return old -} - -func addServices(old, neu []*registry.Service) []*registry.Service { - for _, s := range neu { - var seen bool - for i, o := range old { - if o.Version == s.Version { - s.Nodes = addNodes(o.Nodes, s.Nodes) - seen = true - old[i] = s - break - } - } - if !seen { - old = append(old, s) - } - } - return old -} - -func delNodes(old, del []*registry.Node) []*registry.Node { - var nodes []*registry.Node - for _, o := range old { - var rem bool - for _, n := range del { - if o.Id == n.Id { - rem = true - break - } - } - if !rem { - nodes = append(nodes, o) - } - } - return nodes -} - -func delServices(old, del []*registry.Service) []*registry.Service { - var services []*registry.Service - for i, o := range old { - var rem bool - for _, s := range del { - if o.Version == s.Version { - old[i].Nodes = delNodes(o.Nodes, s.Nodes) - if len(old[i].Nodes) == 0 { - rem = true - } - } - } - if !rem { - services = append(services, o) - } - } - return services -} - -func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { - return false -} - -func (b *broadcast) Message() []byte { - return b.msg -} - -func (b *broadcast) Finished() { - if b.notify != nil { - close(b.notify) - } -} - -func (d *delegate) NodeMeta(limit int) []byte { - return []byte{} -} - -func (d *delegate) NotifyMsg(b []byte) { - if len(b) == 0 { - return - } - - buf := make([]byte, len(b)) - copy(buf, b) - - go func() { - switch buf[0] { - case 'd': // data - var updates []*update - if err := json.Unmarshal(buf[1:], &updates); err != nil { - return - } - for _, u := range updates { - d.updates <- u - } - } - }() -} - -func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { - return d.broadcasts.GetBroadcasts(overhead, limit) -} - -func (d *delegate) LocalState(join bool) []byte { - if !join { - return []byte{} - } - - syncCh := make(chan *registry.Service, 1) - m := map[string][]*registry.Service{} - - d.updates <- &update{ - Action: syncAction, - sync: syncCh, - } - - for s := range syncCh { - m[s.Name] = append(m[s.Name], s) - } - - b, _ := json.Marshal(m) - return b -} - -func (d *delegate) MergeRemoteState(buf []byte, join bool) { - if len(buf) == 0 { - return - } - if !join { - return - } - - var m map[string][]*registry.Service - if err := json.Unmarshal(buf, &m); err != nil { - return - } - - for _, services := range m { - for _, service := range services { - d.updates <- &update{ - Action: addAction, - Service: service, - sync: nil, - } - } - } -} - -func (m *gossipRegistry) publish(action string, services []*registry.Service) { - m.s.RLock() - for _, sub := range m.subs { - go func(sub chan *registry.Result) { - for _, service := range services { - sub <- ®istry.Result{Action: action, Service: service} - } - }(sub) - } - m.s.RUnlock() -} - -func (m *gossipRegistry) subscribe() (chan *registry.Result, chan bool) { - next := make(chan *registry.Result, 10) - exit := make(chan bool) - - id := uuid.New().String() - - m.s.Lock() - m.subs[id] = next - m.s.Unlock() - - go func() { - <-exit - m.s.Lock() - delete(m.subs, id) - close(next) - m.s.Unlock() - }() - - return next, exit -} - -func (m *gossipRegistry) run() { - var mtx sync.Mutex - updates := map[uint64]*update{} - - // expiry loop - go func() { - t := time.NewTicker(ExpiryTick) - defer t.Stop() - - for _ = range t.C { - now := time.Now().Unix() - - mtx.Lock() - for k, v := range updates { - // check if expiry time has passed - if d := (v.Timestamp + v.Expires) - now; d < 0 { - // delete from records - delete(updates, k) - // set to delete - v.Action = delAction - // fire a new update - m.updates <- v - } - } - mtx.Unlock() - } - }() - - for u := range m.updates { - switch u.Action { - case addAction: - m.Lock() - if service, ok := m.services[u.Service.Name]; !ok { - m.services[u.Service.Name] = []*registry.Service{u.Service} - - } else { - m.services[u.Service.Name] = addServices(service, []*registry.Service{u.Service}) - } - m.Unlock() - go m.publish("add", []*registry.Service{u.Service}) - - // we need to expire the node at some point in the future - if u.Expires > 0 { - // create a hash of this service - if hash, err := hashstructure.Hash(u.Service, nil); err == nil { - mtx.Lock() - updates[hash] = u - mtx.Unlock() - } - } - case delAction: - m.Lock() - if service, ok := m.services[u.Service.Name]; ok { - if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 { - delete(m.services, u.Service.Name) - } else { - m.services[u.Service.Name] = services - } - } - m.Unlock() - go m.publish("delete", []*registry.Service{u.Service}) - - // delete from expiry checks - if hash, err := hashstructure.Hash(u.Service, nil); err == nil { - mtx.Lock() - delete(updates, hash) - mtx.Unlock() - } - case syncAction: - if u.sync == nil { - continue - } - m.RLock() - for _, services := range m.services { - for _, service := range services { - u.sync <- service - } - go m.publish("add", services) - } - m.RUnlock() - close(u.sync) - } - } -} - -func (m *gossipRegistry) Init(opts ...registry.Option) error { - return configure(m, opts...) -} - -func (m *gossipRegistry) Options() registry.Options { - return m.options -} - -func (m *gossipRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { - m.Lock() - if service, ok := m.services[s.Name]; !ok { - m.services[s.Name] = []*registry.Service{s} - } else { - m.services[s.Name] = addServices(service, []*registry.Service{s}) - } - m.Unlock() - - var options registry.RegisterOptions - for _, o := range opts { - o(&options) - } - - b, _ := json.Marshal([]*update{ - &update{ - Action: addAction, - Service: s, - Timestamp: time.Now().Unix(), - Expires: int64(options.TTL.Seconds()), - }, - }) - - m.broadcasts.QueueBroadcast(&broadcast{ - msg: append([]byte("d"), b...), - notify: nil, - }) - - return nil -} - -func (m *gossipRegistry) Deregister(s *registry.Service) error { - m.Lock() - if service, ok := m.services[s.Name]; ok { - if services := delServices(service, []*registry.Service{s}); len(services) == 0 { - delete(m.services, s.Name) - } else { - m.services[s.Name] = services - } - } - m.Unlock() - - b, _ := json.Marshal([]*update{ - &update{ - Action: delAction, - Service: s, - }, - }) - - m.broadcasts.QueueBroadcast(&broadcast{ - msg: append([]byte("d"), b...), - notify: nil, - }) - - return nil -} - -func (m *gossipRegistry) GetService(name string) ([]*registry.Service, error) { - m.RLock() - service, ok := m.services[name] - m.RUnlock() - if !ok { - return nil, fmt.Errorf("Service %s not found", name) - } - return service, nil -} - -func (m *gossipRegistry) ListServices() ([]*registry.Service, error) { - var services []*registry.Service - m.RLock() - for _, service := range m.services { - services = append(services, service...) - } - m.RUnlock() - return services, nil -} - -func (m *gossipRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { - n, e := m.subscribe() - return newGossipWatcher(n, e, opts...) -} - -func (m *gossipRegistry) String() string { - return "gossip" -} - +// NewRegistry returns a new gossip registry func NewRegistry(opts ...registry.Option) registry.Registry { - mr := &gossipRegistry{ - options: registry.Options{}, - updates: make(chan *update, 100), - services: make(map[string][]*registry.Service), - subs: make(map[string]chan *registry.Result), - } - configure(mr, opts...) - go mr.run() - return mr + return gossip.NewRegistry(opts...) } diff --git a/gossip_test.go b/gossip_test.go deleted file mode 100644 index a77dda5..0000000 --- a/gossip_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package gossip - -import ( - "testing" - - "github.com/micro/go-micro/registry" -) - -func TestDelServices(t *testing.T) { - services := []*registry.Service{ - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-123", - Address: "localhost", - Port: 9999, - }, - }, - }, - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-123", - Address: "localhost", - Port: 6666, - }, - }, - }, - } - - servs := delServices([]*registry.Service{services[0]}, []*registry.Service{services[1]}) - if i := len(servs); i > 0 { - t.Errorf("Expected 0 nodes, got %d: %+v", i, servs) - } - t.Logf("Services %+v", servs) -} - -func TestDelNodes(t *testing.T) { - services := []*registry.Service{ - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-123", - Address: "localhost", - Port: 9999, - }, - { - Id: "foo-321", - Address: "localhost", - Port: 6666, - }, - }, - }, - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-123", - Address: "localhost", - Port: 6666, - }, - }, - }, - } - - nodes := delNodes(services[0].Nodes, services[1].Nodes) - if i := len(nodes); i != 1 { - t.Errorf("Expected only 1 node, got %d: %+v", i, nodes) - } - t.Logf("Nodes %+v", nodes) -} diff --git a/options.go b/options.go deleted file mode 100644 index 3c721e9..0000000 --- a/options.go +++ /dev/null @@ -1,15 +0,0 @@ -package gossip - -import ( - "context" - - "github.com/micro/go-micro/registry" -) - -type contextSecretKey struct{} - -func SecretKey(k []byte) registry.Option { - return func(o *registry.Options) { - o.Context = context.WithValue(o.Context, contextSecretKey{}, k) - } -} diff --git a/watch.go b/watch.go deleted file mode 100644 index f893aa6..0000000 --- a/watch.go +++ /dev/null @@ -1,60 +0,0 @@ -package gossip - -import ( - "errors" - - "github.com/micro/go-micro/registry" -) - -type gossipWatcher struct { - wo registry.WatchOptions - next chan *registry.Result - stop chan bool -} - -func newGossipWatcher(ch chan *registry.Result, exit chan bool, opts ...registry.WatchOption) (registry.Watcher, error) { - var wo registry.WatchOptions - for _, o := range opts { - o(&wo) - } - - stop := make(chan bool) - - go func() { - <-stop - close(exit) - }() - - return &gossipWatcher{ - wo: wo, - next: ch, - stop: stop, - }, nil -} - -func (m *gossipWatcher) Next() (*registry.Result, error) { - for { - select { - case r, ok := <-m.next: - if !ok { - return nil, errors.New("result chan closed") - } - // check watch options - if len(m.wo.Service) > 0 && r.Service.Name != m.wo.Service { - continue - } - return r, nil - case <-m.stop: - return nil, errors.New("watcher stopped") - } - } -} - -func (m *gossipWatcher) Stop() { - select { - case <-m.stop: - return - default: - close(m.stop) - } -}