Merge pull request #471 from unistack-org/gossip
fix data races in gossip registry
This commit is contained in:
commit
8d21dd456c
@ -159,6 +159,10 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
|
|||||||
|
|
||||||
// shutdown old member
|
// shutdown old member
|
||||||
g.Stop()
|
g.Stop()
|
||||||
|
|
||||||
|
// lock internals
|
||||||
|
g.Lock()
|
||||||
|
|
||||||
// new done chan
|
// new done chan
|
||||||
g.done = make(chan bool)
|
g.done = make(chan bool)
|
||||||
|
|
||||||
@ -250,16 +254,12 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
|
|||||||
events: g.events,
|
events: g.events,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// create the memberlist
|
// create the memberlist
|
||||||
m, err := memberlist.Create(c)
|
m, err := memberlist.Create(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// set internals
|
|
||||||
g.Lock()
|
|
||||||
|
|
||||||
if len(curAddrs) > 0 {
|
if len(curAddrs) > 0 {
|
||||||
for _, addr := range curAddrs {
|
for _, addr := range curAddrs {
|
||||||
g.members[addr] = nodeActionUnknown
|
g.members[addr] = nodeActionUnknown
|
||||||
@ -547,9 +547,13 @@ func (g *gossipRegistry) expiryLoop(updates *updates) {
|
|||||||
ticker := time.NewTicker(ExpiryTick)
|
ticker := time.NewTicker(ExpiryTick)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
g.RLock()
|
||||||
|
done := g.done
|
||||||
|
g.RUnlock()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-g.done:
|
case <-done:
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
now := uint64(time.Now().UnixNano())
|
now := uint64(time.Now().UnixNano())
|
||||||
@ -576,10 +580,13 @@ func (g *gossipRegistry) expiryLoop(updates *updates) {
|
|||||||
|
|
||||||
// process member events
|
// process member events
|
||||||
func (g *gossipRegistry) eventLoop() {
|
func (g *gossipRegistry) eventLoop() {
|
||||||
|
g.RLock()
|
||||||
|
done := g.done
|
||||||
|
g.RUnlock()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
// return when done
|
// return when done
|
||||||
case <-g.done:
|
case <-done:
|
||||||
return
|
return
|
||||||
case ev := <-g.events:
|
case ev := <-g.events:
|
||||||
// TODO: nonblocking update
|
// TODO: nonblocking update
|
||||||
@ -603,10 +610,12 @@ func (g *gossipRegistry) run() {
|
|||||||
// event loop
|
// event loop
|
||||||
go g.eventLoop()
|
go g.eventLoop()
|
||||||
|
|
||||||
|
g.RLock()
|
||||||
// connect loop
|
// connect loop
|
||||||
if g.connectRetry {
|
if g.connectRetry {
|
||||||
go g.connectLoop()
|
go g.connectLoop()
|
||||||
}
|
}
|
||||||
|
g.RUnlock()
|
||||||
|
|
||||||
// process the updates
|
// process the updates
|
||||||
for u := range g.updates {
|
for u := range g.updates {
|
||||||
@ -808,7 +817,6 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
|
|||||||
watchers: make(map[string]chan *registry.Result),
|
watchers: make(map[string]chan *registry.Result),
|
||||||
members: make(map[string]int32),
|
members: make(map[string]int32),
|
||||||
}
|
}
|
||||||
|
|
||||||
// run the updater
|
// run the updater
|
||||||
go g.run()
|
go g.run()
|
||||||
|
|
||||||
@ -816,7 +824,6 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
|
|||||||
if err := configure(g, opts...); err != nil {
|
if err := configure(g, opts...); err != nil {
|
||||||
log.Fatalf("[gossip] Error configuring registry: %v", err)
|
log.Fatalf("[gossip] Error configuring registry: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for setup
|
// wait for setup
|
||||||
<-time.After(g.interval * 2)
|
<-time.After(g.interval * 2)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user