Removed event from eventMap once sent to be advertised

This commit is contained in:
Milos Gajdos 2019-07-06 00:36:15 +01:00
parent 72ef032162
commit b68f0e237f
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F

View File

@ -107,6 +107,7 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric
// get the service to retrieve all its info // get the service to retrieve all its info
srvs, err := reg.GetService(service.Name) srvs, err := reg.GetService(service.Name)
if err != nil { if err != nil {
log.Logf("r.addServiceRoutes() GetService() error: %v", err)
continue continue
} }
@ -157,6 +158,8 @@ func (r *router) watchServices(w registry.Watcher) error {
break break
} }
log.Logf("r.watchServices() new service event: %s", res.Service.Name)
route := Route{ route := Route{
Destination: res.Service.Name, Destination: res.Service.Name,
Router: r.opts.Address, Router: r.opts.Address,
@ -238,9 +241,6 @@ func (r *router) processEvents() error {
// ticker to periodically scan event for advertising // ticker to periodically scan event for advertising
ticker := time.NewTicker(AdvertiseTick) ticker := time.NewTicker(AdvertiseTick)
// TODO: Need to flag already advertised events otherwise we'll keep on advertising them
// as they keep getting advertised unless deleted and are only deleted when received by upstream
// advertEvent is a table event enriched with advert data // advertEvent is a table event enriched with advert data
type advertEvent struct { type advertEvent struct {
*Event *Event
@ -263,8 +263,8 @@ process:
case <-ticker.C: case <-ticker.C:
var events []*Event var events []*Event
// decay the penalties of existing events // decay the penalties of existing events
mu.RLock() mu.Lock()
for _, event := range eventMap { for advert, event := range eventMap {
delta := time.Since(event.timestamp).Seconds() delta := time.Since(event.timestamp).Seconds()
event.penalty = event.penalty * math.Exp(delta) event.penalty = event.penalty * math.Exp(delta)
// suppress or recover the event based on its current penalty // suppress or recover the event based on its current penalty
@ -278,9 +278,11 @@ process:
e := new(Event) e := new(Event)
*e = *event.Event *e = *event.Event
events = append(events, e) events = append(events, e)
// this deletes the advertised event from the map
delete(eventMap, advert)
} }
} }
mu.RUnlock() mu.Unlock()
if len(events) > 0 { if len(events) > 0 {
wg.Add(1) wg.Add(1)
@ -356,7 +358,9 @@ process:
} }
} }
// first wait for the advertiser to finish
wg.Wait() wg.Wait()
// close the advert channel
close(r.advertChan) close(r.advertChan)
log.Logf("r.processEvents(): event processor stopped") log.Logf("r.processEvents(): event processor stopped")
@ -390,8 +394,6 @@ func (r *router) manage(errChan <-chan error) {
} }
r.status = status r.status = status
log.Logf("r.manage(): router status: %v", r.status)
// stop the router if some error happened // stop the router if some error happened
if err != nil && code != Stopped { if err != nil && code != Stopped {
// this will stop watchers which will close r.advertChan // this will stop watchers which will close r.advertChan