Patch spec template annotations and use event timestmap
This commit is contained in:
@@ -2,8 +2,6 @@ package runtime
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -60,6 +58,34 @@ func (r *runtime) run(events <-chan Event) {
|
||||
t := time.NewTicker(time.Second * 5)
|
||||
defer t.Stop()
|
||||
|
||||
// process event processes an incoming event
|
||||
processEvent := func(event Event, service *service) error {
|
||||
// get current vals
|
||||
r.RLock()
|
||||
name := service.Name
|
||||
updated := service.updated
|
||||
r.RUnlock()
|
||||
|
||||
// only process if the timestamp is newer
|
||||
if !event.Timestamp.After(updated) {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debugf("Runtime updating service %s", name)
|
||||
|
||||
// this will cause a delete followed by created
|
||||
if err := r.Update(service.Service); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update the local timestamp
|
||||
r.Lock()
|
||||
service.updated = updated
|
||||
r.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
@@ -91,36 +117,6 @@ func (r *runtime) run(events <-chan Event) {
|
||||
// NOTE: we only handle Update events for now
|
||||
switch event.Type {
|
||||
case Update:
|
||||
// parse returned response to timestamp
|
||||
updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64)
|
||||
if err != nil {
|
||||
log.Debugf("Runtime error parsing build time for %s: %v", event.Service, err)
|
||||
continue
|
||||
}
|
||||
buildTime := time.Unix(updateTimeStamp, 0)
|
||||
processEvent := func(event Event, service *Service) error {
|
||||
r.RLock()
|
||||
name := service.Name
|
||||
version := service.Version
|
||||
r.RUnlock()
|
||||
|
||||
buildTimeStamp, err := strconv.ParseInt(version, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
muBuild := time.Unix(buildTimeStamp, 0)
|
||||
if buildTime.After(muBuild) {
|
||||
log.Debugf("Runtime updating service %s", name)
|
||||
if err := r.Update(service); err != nil {
|
||||
return err
|
||||
}
|
||||
r.Lock()
|
||||
service.Version = fmt.Sprintf("%d", buildTime.Unix())
|
||||
r.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(event.Service) > 0 {
|
||||
r.RLock()
|
||||
service, ok := r.services[event.Service]
|
||||
@@ -129,14 +125,19 @@ func (r *runtime) run(events <-chan Event) {
|
||||
log.Debugf("Runtime unknown service: %s", event.Service)
|
||||
continue
|
||||
}
|
||||
if err := processEvent(event, service.Service); err != nil {
|
||||
if err := processEvent(event, service); err != nil {
|
||||
log.Debugf("Runtime error updating service %s: %v", event.Service, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
r.RLock()
|
||||
services := r.services
|
||||
r.RUnlock()
|
||||
|
||||
// if blank service was received we update all services
|
||||
for _, service := range r.services {
|
||||
if err := processEvent(event, service.Service); err != nil {
|
||||
for _, service := range services {
|
||||
if err := processEvent(event, service); err != nil {
|
||||
log.Debugf("Runtime error updating service %s: %v", service.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user