Add Init to registry

This commit is contained in:
Asim Aslam 2018-08-09 18:08:59 +01:00 committed by Vasiliy Tolstov
parent 9009937453
commit c845b3bd68

138
gossip.go
View File

@ -38,6 +38,7 @@ type gossipRegistry struct {
broadcasts *memberlist.TransmitLimitedQueue broadcasts *memberlist.TransmitLimitedQueue
updates chan *update updates chan *update
options registry.Options options registry.Options
member *memberlist.Memberlist
sync.RWMutex sync.RWMutex
services map[string][]*registry.Service services map[string][]*registry.Service
@ -64,6 +65,80 @@ func init() {
cmd.DefaultRegistries["gossip"] = NewRegistry cmd.DefaultRegistries["gossip"] = NewRegistry
} }
func configure(g *gossipRegistry, opts ...registry.Option) error {
addrs := func() []string {
var cAddrs []string
for _, addr := range g.options.Addrs {
if len(addr) > 0 {
cAddrs = append(cAddrs, addr)
}
}
return cAddrs
}
cAddrs := addrs()
hostname, _ := os.Hostname()
for _, o := range opts {
o(&g.options)
}
newAddrs := addrs()
// no new nodes and existing member. no configure
if (len(newAddrs) == len(cAddrs)) && g.member != nil {
return nil
}
// shutdown old member
if g.member != nil {
log.Logf("Shutdown old memberlist: %v", g.member.Shutdown())
}
cAddrs = newAddrs
broadcasts := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(cAddrs)
},
RetransmitMult: 3,
}
c := memberlist.DefaultLocalConfig()
c.BindPort = 0
c.Name = hostname + "-" + uuid.NewUUID().String()
c.Delegate = &delegate{
updates: g.updates,
broadcasts: broadcasts,
}
if g.options.Secure {
k, ok := g.options.Context.Value(contextSecretKey{}).([]byte)
if !ok {
k = DefaultKey
}
c.SecretKey = k
}
m, err := memberlist.Create(c)
if err != nil {
log.Fatalf("Error creating memberlist: %v", err)
}
if len(cAddrs) > 0 {
_, err := m.Join(cAddrs)
if err != nil {
log.Fatalf("Error joining members: %v", err)
}
}
g.broadcasts = broadcasts
g.member = m
log.Logf("Local memberlist node %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
return nil
}
func addNodes(old, neu []*registry.Node) []*registry.Node { func addNodes(old, neu []*registry.Node) []*registry.Node {
for _, n := range neu { for _, n := range neu {
var seen bool var seen bool
@ -342,6 +417,10 @@ func (m *gossipRegistry) run() {
} }
} }
func (m *gossipRegistry) Init(opts ...registry.Option) error {
return configure(m, opts...)
}
func (m *gossipRegistry) Options() registry.Options { func (m *gossipRegistry) Options() registry.Options {
return m.options return m.options
} }
@ -433,66 +512,13 @@ func (m *gossipRegistry) String() string {
} }
func NewRegistry(opts ...registry.Option) registry.Registry { func NewRegistry(opts ...registry.Option) registry.Registry {
var options registry.Options
for _, o := range opts {
o(&options)
}
cAddrs := []string{}
hostname, _ := os.Hostname()
updates := make(chan *update, 100)
for _, addr := range options.Addrs {
if len(addr) > 0 {
cAddrs = append(cAddrs, addr)
}
}
broadcasts := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(cAddrs)
},
RetransmitMult: 3,
}
mr := &gossipRegistry{ mr := &gossipRegistry{
options: options, options: registry.Options{},
broadcasts: broadcasts, updates: make(chan *update, 100),
services: make(map[string][]*registry.Service), services: make(map[string][]*registry.Service),
updates: updates,
subs: make(map[string]chan *registry.Result), subs: make(map[string]chan *registry.Result),
} }
configure(mr, opts...)
go mr.run() go mr.run()
c := memberlist.DefaultLocalConfig()
c.BindPort = 0
c.Name = hostname + "-" + uuid.NewUUID().String()
c.Delegate = &delegate{
updates: updates,
broadcasts: broadcasts,
}
if options.Secure {
k, ok := options.Context.Value(contextSecretKey{}).([]byte)
if !ok {
k = DefaultKey
}
c.SecretKey = k
}
m, err := memberlist.Create(c)
if err != nil {
log.Fatalf("Error creating memberlist: %v", err)
}
if len(cAddrs) > 0 {
_, err := m.Join(cAddrs)
if err != nil {
log.Fatalf("Error joining members: %v", err)
}
}
log.Logf("Local memberlist node %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
return mr return mr
} }