Merge pull request #795 from milosgajdos83/advert-events

Rather than append to list of events just keep the last event
This commit is contained in:
Asim Aslam 2019-09-26 18:13:28 +01:00 committed by GitHub
commit e1bb4d7379
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -347,9 +347,10 @@ func (r *router) advertiseTable() error {
} }
} }
// routeAdvert contains a list of route events to be advertised // routeAdvert contains a route event to be advertised
type routeAdvert struct { type routeAdvert struct {
events []*Event // event received from routing table
event *Event
// lastUpdate records the time of the last advert update // lastUpdate records the time of the last advert update
lastUpdate time.Time lastUpdate time.Time
// penalty is current advert penalty // penalty is current advert penalty
@ -398,9 +399,11 @@ func (r *router) advertiseEvents() error {
// suppress/recover the event based on its penalty level // suppress/recover the event based on its penalty level
switch { switch {
case advert.penalty > AdvertSuppress && !advert.isSuppressed: case advert.penalty > AdvertSuppress && !advert.isSuppressed:
log.Debugf("Router supressing advert %d for route %s", key, advert.event.Route.Address)
advert.isSuppressed = true advert.isSuppressed = true
advert.suppressTime = time.Now() advert.suppressTime = time.Now()
case advert.penalty < AdvertRecover && advert.isSuppressed: case advert.penalty < AdvertRecover && advert.isSuppressed:
log.Debugf("Router recovering advert %d for route %s", key, advert.event.Route.Address)
advert.isSuppressed = false advert.isSuppressed = false
} }
@ -413,15 +416,13 @@ func (r *router) advertiseEvents() error {
} }
if !advert.isSuppressed { if !advert.isSuppressed {
for _, event := range advert.events {
e := new(Event) e := new(Event)
*e = *event *e = *(advert.event)
events = append(events, e) events = append(events, e)
// delete the advert from the advertMap // delete the advert from the advertMap
delete(advertMap, key) delete(advertMap, key)
} }
} }
}
// advertise all Update events to subscribers // advertise all Update events to subscribers
if len(events) > 0 { if len(events) > 0 {
@ -445,13 +446,11 @@ func (r *router) advertiseEvents() error {
} }
// check if we have already registered the route // check if we have already registered the route
// we use the route hash as advertMap key
hash := e.Route.Hash() hash := e.Route.Hash()
advert, ok := advertMap[hash] advert, ok := advertMap[hash]
if !ok { if !ok {
events := []*Event{e}
advert = &routeAdvert{ advert = &routeAdvert{
events: events, event: e,
penalty: penalty, penalty: penalty,
lastUpdate: time.Now(), lastUpdate: time.Now(),
} }
@ -459,18 +458,15 @@ func (r *router) advertiseEvents() error {
continue continue
} }
// attempt to squash last two events if possible // override the route event only if the last event was different
lastEvent := advert.events[len(advert.events)-1] if advert.event.Type != e.Type {
if lastEvent.Type == e.Type && lastEvent.Route.Hash() == hash { advert.event = e
log.Debugf("Router squashing event %s with hash %d for service %s", e.Type, hash, e.Route.Address)
advert.events[len(advert.events)-1] = e
} else {
advert.events = append(advert.events, e)
} }
// update event penalty and recorded timestamp // update event penalty and timestamp
advert.lastUpdate = time.Now() advert.lastUpdate = time.Now()
advert.penalty += penalty advert.penalty += penalty
log.Debugf("Router advert %d for route %s event penalty: %f", hash, advert.event.Route.Address, advert.penalty)
case <-r.exit: case <-r.exit:
// first wait for the advertiser to finish // first wait for the advertiser to finish
r.advertWg.Wait() r.advertWg.Wait()