Merge pull request #991 from micro/k8s-update
Patch spec template annotations and use event timestmap
This commit is contained in:
commit
44dd0b1302
@ -2,8 +2,6 @@ package runtime
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -60,6 +58,34 @@ func (r *runtime) run(events <-chan Event) {
|
||||
t := time.NewTicker(time.Second * 5)
|
||||
defer t.Stop()
|
||||
|
||||
// process event processes an incoming event
|
||||
processEvent := func(event Event, service *service) error {
|
||||
// get current vals
|
||||
r.RLock()
|
||||
name := service.Name
|
||||
updated := service.updated
|
||||
r.RUnlock()
|
||||
|
||||
// only process if the timestamp is newer
|
||||
if !event.Timestamp.After(updated) {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debugf("Runtime updating service %s", name)
|
||||
|
||||
// this will cause a delete followed by created
|
||||
if err := r.Update(service.Service); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update the local timestamp
|
||||
r.Lock()
|
||||
service.updated = updated
|
||||
r.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
@ -91,36 +117,6 @@ func (r *runtime) run(events <-chan Event) {
|
||||
// NOTE: we only handle Update events for now
|
||||
switch event.Type {
|
||||
case Update:
|
||||
// parse returned response to timestamp
|
||||
updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64)
|
||||
if err != nil {
|
||||
log.Debugf("Runtime error parsing build time for %s: %v", event.Service, err)
|
||||
continue
|
||||
}
|
||||
buildTime := time.Unix(updateTimeStamp, 0)
|
||||
processEvent := func(event Event, service *Service) error {
|
||||
r.RLock()
|
||||
name := service.Name
|
||||
version := service.Version
|
||||
r.RUnlock()
|
||||
|
||||
buildTimeStamp, err := strconv.ParseInt(version, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
muBuild := time.Unix(buildTimeStamp, 0)
|
||||
if buildTime.After(muBuild) {
|
||||
log.Debugf("Runtime updating service %s", name)
|
||||
if err := r.Update(service); err != nil {
|
||||
return err
|
||||
}
|
||||
r.Lock()
|
||||
service.Version = fmt.Sprintf("%d", buildTime.Unix())
|
||||
r.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(event.Service) > 0 {
|
||||
r.RLock()
|
||||
service, ok := r.services[event.Service]
|
||||
@ -129,14 +125,19 @@ func (r *runtime) run(events <-chan Event) {
|
||||
log.Debugf("Runtime unknown service: %s", event.Service)
|
||||
continue
|
||||
}
|
||||
if err := processEvent(event, service.Service); err != nil {
|
||||
if err := processEvent(event, service); err != nil {
|
||||
log.Debugf("Runtime error updating service %s: %v", event.Service, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
r.RLock()
|
||||
services := r.services
|
||||
r.RUnlock()
|
||||
|
||||
// if blank service was received we update all services
|
||||
for _, service := range r.services {
|
||||
if err := processEvent(event, service.Service); err != nil {
|
||||
for _, service := range services {
|
||||
if err := processEvent(event, service); err != nil {
|
||||
log.Debugf("Runtime error updating service %s: %v", service.Name, err)
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, e
|
||||
}
|
||||
|
||||
// parse out deployment build
|
||||
if build, ok := kdep.Metadata.Annotations["build"]; ok {
|
||||
if build, ok := kdep.Spec.Template.Metadata.Annotations["build"]; ok {
|
||||
buildTime, err := time.Parse(time.RFC3339, build)
|
||||
if err != nil {
|
||||
log.Debugf("Runtime failed parsing build time for %s: %v", name, err)
|
||||
@ -228,7 +228,18 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
|
||||
// update build time annotation
|
||||
if service.Spec.Template.Metadata.Annotations == nil {
|
||||
service.Spec.Template.Metadata.Annotations = make(map[string]string)
|
||||
|
||||
}
|
||||
|
||||
// check the existing build timestamp
|
||||
if build, ok := service.Spec.Template.Metadata.Annotations["build"]; ok {
|
||||
buildTime, err := time.Parse(time.RFC3339, build)
|
||||
if err == nil && !event.Timestamp.After(buildTime) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// update the build time
|
||||
service.Spec.Template.Metadata.Annotations["build"] = event.Timestamp.Format(time.RFC3339)
|
||||
|
||||
log.Debugf("Runtime updating service: %s", event.Service)
|
||||
|
@ -36,7 +36,10 @@ func newService(s *runtime.Service, c runtime.CreateOptions) *service {
|
||||
kdeploy.Metadata.Annotations["group"] = "micro"
|
||||
|
||||
// set a build timestamp to the current time
|
||||
kdeploy.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339)
|
||||
if kdeploy.Spec.Template.Metadata.Annotations == nil {
|
||||
kdeploy.Spec.Template.Metadata.Annotations = make(map[string]string)
|
||||
}
|
||||
kdeploy.Spec.Template.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339)
|
||||
|
||||
// define the environment values used by the container
|
||||
env := make([]client.EnvVar, 0, len(c.Env))
|
||||
|
@ -3,6 +3,7 @@ package runtime
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/runtime/build"
|
||||
|
||||
@ -17,6 +18,7 @@ type service struct {
|
||||
running bool
|
||||
closed chan bool
|
||||
err error
|
||||
updated time.Time
|
||||
|
||||
// output for logs
|
||||
output io.Writer
|
||||
@ -64,8 +66,9 @@ func newService(s *Service, c CreateOptions) *service {
|
||||
Env: c.Env,
|
||||
Args: args,
|
||||
},
|
||||
closed: make(chan bool),
|
||||
output: c.Output,
|
||||
closed: make(chan bool),
|
||||
output: c.Output,
|
||||
updated: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user