router/registry: fix initialization bug (#1893)
This commit is contained in:
parent
1106f1d996
commit
07fef9fd33
@ -28,6 +28,7 @@ type rtr struct {
|
|||||||
table *table
|
table *table
|
||||||
options router.Options
|
options router.Options
|
||||||
exit chan bool
|
exit chan bool
|
||||||
|
initChan chan bool
|
||||||
eventChan chan *router.Event
|
eventChan chan *router.Event
|
||||||
|
|
||||||
// advert subscribers
|
// advert subscribers
|
||||||
@ -48,6 +49,7 @@ func NewRouter(opts ...router.Option) router.Router {
|
|||||||
// construct the router
|
// construct the router
|
||||||
r := &rtr{
|
r := &rtr{
|
||||||
options: options,
|
options: options,
|
||||||
|
initChan: make(chan bool),
|
||||||
subscribers: make(map[string]chan *router.Advert),
|
subscribers: make(map[string]chan *router.Advert),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,17 +64,17 @@ func NewRouter(opts ...router.Option) router.Router {
|
|||||||
|
|
||||||
// Init initializes router with given options
|
// Init initializes router with given options
|
||||||
func (r *rtr) Init(opts ...router.Option) error {
|
func (r *rtr) Init(opts ...router.Option) error {
|
||||||
// stop the router before we initialize
|
|
||||||
if err := r.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&r.options)
|
o(&r.options)
|
||||||
}
|
}
|
||||||
|
r.Unlock()
|
||||||
|
|
||||||
|
// push a message to the init chan so the watchers
|
||||||
|
// can reset in the case the registry was changed
|
||||||
|
go func() {
|
||||||
|
r.initChan <- true
|
||||||
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -498,14 +500,9 @@ func (r *rtr) start() error {
|
|||||||
// create error and exit channels
|
// create error and exit channels
|
||||||
r.exit = make(chan bool)
|
r.exit = make(chan bool)
|
||||||
|
|
||||||
// registry watcher
|
|
||||||
w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed creating registry watcher: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
var err error
|
var err error
|
||||||
|
var w registry.Watcher
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -514,9 +511,17 @@ func (r *rtr) start() error {
|
|||||||
w.Stop()
|
w.Stop()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
case <-r.initChan:
|
||||||
|
// the registry could have changed during initialization
|
||||||
|
// so if there was a watcher setup, stop it and create a
|
||||||
|
// new one
|
||||||
|
if w != nil {
|
||||||
|
w.Stop()
|
||||||
|
w = nil
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
if w == nil {
|
if w == nil {
|
||||||
w, err = r.options.Registry.Watch()
|
w, err = r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger.V(logger.WarnLevel, logger.DefaultLogger) {
|
if logger.V(logger.WarnLevel, logger.DefaultLogger) {
|
||||||
logger.Warnf("failed creating registry watcher: %v", err)
|
logger.Warnf("failed creating registry watcher: %v", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user