From c845b3bd68a6daba4d09bbccb3dc049278371d48 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 9 Aug 2018 18:08:59 +0100 Subject: [PATCH] Add Init to registry --- gossip.go | 142 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 84 insertions(+), 58 deletions(-) diff --git a/gossip.go b/gossip.go index 358468c..2be2779 100644 --- a/gossip.go +++ b/gossip.go @@ -38,6 +38,7 @@ type gossipRegistry struct { broadcasts *memberlist.TransmitLimitedQueue updates chan *update options registry.Options + member *memberlist.Memberlist sync.RWMutex services map[string][]*registry.Service @@ -64,6 +65,80 @@ func init() { cmd.DefaultRegistries["gossip"] = NewRegistry } +func configure(g *gossipRegistry, opts ...registry.Option) error { + addrs := func() []string { + var cAddrs []string + for _, addr := range g.options.Addrs { + if len(addr) > 0 { + cAddrs = append(cAddrs, addr) + } + } + return cAddrs + } + + cAddrs := addrs() + hostname, _ := os.Hostname() + + for _, o := range opts { + o(&g.options) + } + + newAddrs := addrs() + + // no new nodes and existing member. no configure + if (len(newAddrs) == len(cAddrs)) && g.member != nil { + return nil + } + + // shutdown old member + if g.member != nil { + log.Logf("Shutdown old memberlist: %v", g.member.Shutdown()) + } + + cAddrs = newAddrs + + broadcasts := &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + return len(cAddrs) + }, + RetransmitMult: 3, + } + + c := memberlist.DefaultLocalConfig() + c.BindPort = 0 + c.Name = hostname + "-" + uuid.NewUUID().String() + c.Delegate = &delegate{ + updates: g.updates, + broadcasts: broadcasts, + } + + if g.options.Secure { + k, ok := g.options.Context.Value(contextSecretKey{}).([]byte) + if !ok { + k = DefaultKey + } + c.SecretKey = k + } + + m, err := memberlist.Create(c) + if err != nil { + log.Fatalf("Error creating memberlist: %v", err) + } + + if len(cAddrs) > 0 { + _, err := m.Join(cAddrs) + if err != nil { + log.Fatalf("Error joining members: %v", err) + } + } + + g.broadcasts = broadcasts + g.member = m + + log.Logf("Local memberlist node %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port) + return nil +} + func addNodes(old, neu []*registry.Node) []*registry.Node { for _, n := range neu { var seen bool @@ -342,6 +417,10 @@ func (m *gossipRegistry) run() { } } +func (m *gossipRegistry) Init(opts ...registry.Option) error { + return configure(m, opts...) +} + func (m *gossipRegistry) Options() registry.Options { return m.options } @@ -433,66 +512,13 @@ func (m *gossipRegistry) String() string { } func NewRegistry(opts ...registry.Option) registry.Registry { - var options registry.Options - for _, o := range opts { - o(&options) - } - - cAddrs := []string{} - hostname, _ := os.Hostname() - updates := make(chan *update, 100) - - for _, addr := range options.Addrs { - if len(addr) > 0 { - cAddrs = append(cAddrs, addr) - } - } - - broadcasts := &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(cAddrs) - }, - RetransmitMult: 3, - } - mr := &gossipRegistry{ - options: options, - broadcasts: broadcasts, - services: make(map[string][]*registry.Service), - updates: updates, - subs: make(map[string]chan *registry.Result), + options: registry.Options{}, + updates: make(chan *update, 100), + services: make(map[string][]*registry.Service), + subs: make(map[string]chan *registry.Result), } - + configure(mr, opts...) go mr.run() - - c := memberlist.DefaultLocalConfig() - c.BindPort = 0 - c.Name = hostname + "-" + uuid.NewUUID().String() - c.Delegate = &delegate{ - updates: updates, - broadcasts: broadcasts, - } - - if options.Secure { - k, ok := options.Context.Value(contextSecretKey{}).([]byte) - if !ok { - k = DefaultKey - } - c.SecretKey = k - } - - m, err := memberlist.Create(c) - if err != nil { - log.Fatalf("Error creating memberlist: %v", err) - } - - if len(cAddrs) > 0 { - _, err := m.Join(cAddrs) - if err != nil { - log.Fatalf("Error joining members: %v", err) - } - } - - log.Logf("Local memberlist node %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port) return mr }