diff --git a/runtime/default.go b/runtime/default.go index c9c198e1..4ec72929 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -2,8 +2,6 @@ package runtime import ( "errors" - "fmt" - "strconv" "sync" "time" @@ -60,6 +58,34 @@ func (r *runtime) run(events <-chan Event) { t := time.NewTicker(time.Second * 5) defer t.Stop() + // process event processes an incoming event + processEvent := func(event Event, service *service) error { + // get current vals + r.RLock() + name := service.Name + updated := service.updated + r.RUnlock() + + // only process if the timestamp is newer + if !event.Timestamp.After(updated) { + return nil + } + + log.Debugf("Runtime updating service %s", name) + + // this will cause a delete followed by created + if err := r.Update(service.Service); err != nil { + return err + } + + // update the local timestamp + r.Lock() + service.updated = updated + r.Unlock() + + return nil + } + for { select { case <-t.C: @@ -91,36 +117,6 @@ func (r *runtime) run(events <-chan Event) { // NOTE: we only handle Update events for now switch event.Type { case Update: - // parse returned response to timestamp - updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64) - if err != nil { - log.Debugf("Runtime error parsing build time for %s: %v", event.Service, err) - continue - } - buildTime := time.Unix(updateTimeStamp, 0) - processEvent := func(event Event, service *Service) error { - r.RLock() - name := service.Name - version := service.Version - r.RUnlock() - - buildTimeStamp, err := strconv.ParseInt(version, 10, 64) - if err != nil { - return err - } - muBuild := time.Unix(buildTimeStamp, 0) - if buildTime.After(muBuild) { - log.Debugf("Runtime updating service %s", name) - if err := r.Update(service); err != nil { - return err - } - r.Lock() - service.Version = fmt.Sprintf("%d", buildTime.Unix()) - r.Unlock() - } - return nil - } - if len(event.Service) > 0 { r.RLock() service, ok := r.services[event.Service] @@ -129,14 +125,19 @@ func (r *runtime) run(events <-chan Event) { log.Debugf("Runtime unknown service: %s", event.Service) continue } - if err := processEvent(event, service.Service); err != nil { + if err := processEvent(event, service); err != nil { log.Debugf("Runtime error updating service %s: %v", event.Service, err) } continue } + + r.RLock() + services := r.services + r.RUnlock() + // if blank service was received we update all services - for _, service := range r.services { - if err := processEvent(event, service.Service); err != nil { + for _, service := range services { + if err := processEvent(event, service); err != nil { log.Debugf("Runtime error updating service %s: %v", service.Name, err) } } diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 69b032b7..29c90925 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -130,7 +130,7 @@ func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, e } // parse out deployment build - if build, ok := kdep.Metadata.Annotations["build"]; ok { + if build, ok := kdep.Spec.Template.Metadata.Annotations["build"]; ok { buildTime, err := time.Parse(time.RFC3339, build) if err != nil { log.Debugf("Runtime failed parsing build time for %s: %v", name, err) @@ -228,7 +228,18 @@ func (k *kubernetes) run(events <-chan runtime.Event) { // update build time annotation if service.Spec.Template.Metadata.Annotations == nil { service.Spec.Template.Metadata.Annotations = make(map[string]string) + } + + // check the existing build timestamp + if build, ok := service.Spec.Template.Metadata.Annotations["build"]; ok { + buildTime, err := time.Parse(time.RFC3339, build) + if err == nil && !event.Timestamp.After(buildTime) { + continue + } + } + + // update the build time service.Spec.Template.Metadata.Annotations["build"] = event.Timestamp.Format(time.RFC3339) log.Debugf("Runtime updating service: %s", event.Service) diff --git a/runtime/kubernetes/service.go b/runtime/kubernetes/service.go index 1fcb4029..e4a5c6ad 100644 --- a/runtime/kubernetes/service.go +++ b/runtime/kubernetes/service.go @@ -36,7 +36,10 @@ func newService(s *runtime.Service, c runtime.CreateOptions) *service { kdeploy.Metadata.Annotations["group"] = "micro" // set a build timestamp to the current time - kdeploy.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339) + if kdeploy.Spec.Template.Metadata.Annotations == nil { + kdeploy.Spec.Template.Metadata.Annotations = make(map[string]string) + } + kdeploy.Spec.Template.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339) // define the environment values used by the container env := make([]client.EnvVar, 0, len(c.Env)) diff --git a/runtime/service.go b/runtime/service.go index cf142c16..a3945d64 100644 --- a/runtime/service.go +++ b/runtime/service.go @@ -3,6 +3,7 @@ package runtime import ( "io" "sync" + "time" "github.com/micro/go-micro/runtime/build" @@ -17,6 +18,7 @@ type service struct { running bool closed chan bool err error + updated time.Time // output for logs output io.Writer @@ -64,8 +66,9 @@ func newService(s *Service, c CreateOptions) *service { Env: c.Env, Args: args, }, - closed: make(chan bool), - output: c.Output, + closed: make(chan bool), + output: c.Output, + updated: time.Now(), } }