diff --git a/runtime/kubernetes/client/kubernetes.go b/runtime/kubernetes/client/kubernetes.go index e75862f8..677a2214 100644 --- a/runtime/kubernetes/client/kubernetes.go +++ b/runtime/kubernetes/client/kubernetes.go @@ -2,9 +2,7 @@ package client import ( - "strconv" "strings" - "time" "github.com/micro/go-micro/util/log" ) @@ -94,16 +92,6 @@ func NewDeployment(name, version string) *Deployment { Annotations: map[string]string{}, } - // TODO: we need to figure out this version stuff - // might have to add Build to runtime.Service - buildTime, err := strconv.ParseInt(version, 10, 64) - if err == nil { - buildUnixTimeUTC := time.Unix(buildTime, 0) - Metadata.Annotations["build"] = buildUnixTimeUTC.Format(time.RFC3339) - } else { - log.Tracef("could not parse build: %v", err) - } - // enable go modules by default env := EnvVar{ Name: "GO111MODULE", diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 91b224bc..ccac8ed4 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -4,7 +4,6 @@ package kubernetes import ( "errors" "fmt" - "strconv" "sync" "time" @@ -42,72 +41,6 @@ type kubernetes struct { client client.Kubernetes } -// NewRuntime creates new kubernetes runtime -func NewRuntime(opts ...runtime.Option) runtime.Runtime { - // get default options - options := runtime.Options{} - - // apply requested options - for _, o := range opts { - o(&options) - } - - // kubernetes client - client := client.NewClientInCluster() - - return &kubernetes{ - options: options, - closed: make(chan bool), - queue: make(chan *task, 128), - client: client, - } -} - -// Init initializes runtime options -func (k *kubernetes) Init(opts ...runtime.Option) error { - k.Lock() - defer k.Unlock() - - for _, o := range opts { - o(&k.options) - } - - return nil -} - -// Creates a service -func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error { - k.Lock() - defer k.Unlock() - - var options runtime.CreateOptions - for _, o := range opts { - o(&options) - } - - // quickly prevalidate the name and version - name := s.Name - if len(s.Version) > 0 { - name = name + "-" + s.Version - } - - // format as we'll format in the deployment - name = client.Format(name) - - // create new kubernetes micro service - service := newService(s, options) - - log.Debugf("Runtime queueing service %s version %s for start action", service.Name, service.Version) - - // push into start queue - k.queue <- &task{ - action: start, - service: service, - } - - return nil -} - // getService queries kubernetes for micro service // NOTE: this function is not thread-safe func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, error) { @@ -117,6 +50,8 @@ func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, e Kind: "service", Value: serviceList, } + + // get the service from k8s if err := k.client.Get(r, labels); err != nil { return nil, err } @@ -127,6 +62,8 @@ func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, e Kind: "deployment", Value: depList, } + + // get the deployment from k8s if err := k.client.Get(d, labels); err != nil { return nil, err } @@ -217,6 +154,139 @@ func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, e return services, nil } +// run runs the runtime management loop +func (k *kubernetes) run(events <-chan runtime.Event) { + t := time.NewTicker(time.Second * 10) + defer t.Stop() + + for { + select { + case <-t.C: + // TODO: figure out what to do here + // - do we even need the ticker for k8s services? + case task := <-k.queue: + // The task queue is used to take actions e.g (CRUD - R) + switch task.action { + case start: + log.Debugf("Runtime starting new service: %s", task.service.Name) + if err := task.service.Start(k.client); err != nil { + log.Debugf("Runtime failed to start service %s: %v", task.service.Name, err) + continue + } + case stop: + log.Debugf("Runtime stopping service: %s", task.service.Name) + if err := task.service.Stop(k.client); err != nil { + log.Debugf("Runtime failed to stop service %s: %v", task.service.Name, err) + continue + } + case update: + log.Debugf("Runtime updating service: %s", task.service.Name) + if err := task.service.Update(k.client); err != nil { + log.Debugf("Runtime failed to update service %s: %v", task.service.Name, err) + continue + } + default: + log.Debugf("Runtime received unknown action for service: %s", task.service.Name) + } + case event := <-events: + // NOTE: we only handle Update events for now + log.Debugf("Runtime received notification event: %v", event) + switch event.Type { + case runtime.Update: + // only process if there's an actual service + // we do not update all the things individually + if len(event.Service) == 0 { + continue + } + + // set the default labels + labels := map[string]string{ + "micro": "service", + "name": event.Service, + } + + if len(event.Version) > 0 { + labels["version"] = event.Version + } + + // get the deployment status + deployed := new(client.DeploymentList) + + // get the existing service rather than creating a new one + err := k.client.Get(&client.Resource{ + Kind: "deployment", + Value: deployed, + }, labels) + + if err != nil { + log.Debugf("Runtime update failed to get service %s: %v", event.Service, err) + continue + } + + // technically we should not receive multiple versions but hey ho + for _, service := range deployed.Items { + // update build time annotation + service.Spec.Template.Metadata.Annotations["build"] = event.Timestamp.Format(time.RFC3339) + + log.Debugf("Runtime updating service: %s", event.Service) + if err := k.client.Update(deploymentResource(&service)); err != nil { + log.Debugf("Runtime failed to update service %s: %v", event.Service, err) + continue + } + } + } + case <-k.closed: + log.Debugf("Runtime stopped") + return + } + } +} + +// Init initializes runtime options +func (k *kubernetes) Init(opts ...runtime.Option) error { + k.Lock() + defer k.Unlock() + + for _, o := range opts { + o(&k.options) + } + + return nil +} + +// Creates a service +func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error { + k.Lock() + defer k.Unlock() + + var options runtime.CreateOptions + for _, o := range opts { + o(&options) + } + + // quickly prevalidate the name and version + name := s.Name + if len(s.Version) > 0 { + name = name + "-" + s.Version + } + + // format as we'll format in the deployment + name = client.Format(name) + + // create new kubernetes micro service + service := newService(s, options) + + log.Debugf("Runtime queueing service %s version %s for start action", service.Name, service.Version) + + // push into start queue + k.queue <- &task{ + action: start, + service: service, + } + + return nil +} + // Read returns all instances of given service func (k *kubernetes) Read(name string, opts ...runtime.ReadOption) ([]*runtime.Service, error) { k.Lock() @@ -267,18 +337,11 @@ func (k *kubernetes) List() ([]*runtime.Service, error) { // Update the service in place func (k *kubernetes) Update(s *runtime.Service) error { - // parse version into human readable timestamp - updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64) - if err != nil { - return err - } - unixTimeUTC := time.Unix(updateTimeStamp, 0) - // create new kubernetes micro service service := newService(s, runtime.CreateOptions{}) // update build time annotation - service.kdeploy.Spec.Template.Metadata.Annotations["build"] = unixTimeUTC.Format(time.RFC3339) + service.kdeploy.Spec.Template.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339) log.Debugf("Runtime queueing service %s for update action", service.Name) @@ -291,7 +354,7 @@ func (k *kubernetes) Update(s *runtime.Service) error { return nil } -// Remove a service +// Delete removes a service func (k *kubernetes) Delete(s *runtime.Service) error { k.Lock() defer k.Unlock() @@ -310,76 +373,7 @@ func (k *kubernetes) Delete(s *runtime.Service) error { return nil } -// run runs the runtime management loop -func (k *kubernetes) run(events <-chan runtime.Event) { - t := time.NewTicker(time.Second * 10) - defer t.Stop() - - for { - select { - case <-t.C: - // TODO: figure out what to do here - // - do we even need the ticker for k8s services? - case task := <-k.queue: - switch task.action { - case start: - log.Debugf("Runtime starting new service: %s", task.service.Name) - if err := task.service.Start(k.client); err != nil { - log.Debugf("Runtime failed to start service %s: %v", task.service.Name, err) - continue - } - case stop: - log.Debugf("Runtime stopping service: %s", task.service.Name) - if err := task.service.Stop(k.client); err != nil { - log.Debugf("Runtime failed to stop service %s: %v", task.service.Name, err) - continue - } - case update: - log.Debugf("Runtime updating service: %s", task.service.Name) - if err := task.service.Update(k.client); err != nil { - log.Debugf("Runtime failed to update service %s: %v", task.service.Name, err) - continue - } - default: - log.Debugf("Runtime received unknown action for service: %s", task.service.Name) - } - case event := <-events: - // NOTE: we only handle Update events for now - log.Debugf("Runtime received notification event: %v", event) - switch event.Type { - case runtime.Update: - // parse returned response to timestamp - updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64) - if err != nil { - log.Debugf("Runtime error parsing update build time: %v", err) - continue - } - unixTimeUTC := time.Unix(updateTimeStamp, 0) - if len(event.Service) > 0 { - s := &runtime.Service{ - Name: event.Service, - Version: event.Version, - } - // create new kubernetes micro service - service := newService(s, runtime.CreateOptions{}) - // update build time annotation - service.kdeploy.Spec.Template.Metadata.Annotations["build"] = unixTimeUTC.Format(time.RFC3339) - - log.Debugf("Runtime updating service: %s", service.Name) - if err := service.Update(k.client); err != nil { - log.Debugf("Runtime failed to update service %s: %v", service.Name, err) - continue - } - } - } - case <-k.closed: - log.Debugf("Runtime stopped") - return - } - } -} - -// starts the runtime +// Start starts the runtime func (k *kubernetes) Start() error { k.Lock() defer k.Unlock() @@ -408,7 +402,7 @@ func (k *kubernetes) Start() error { return nil } -// Shutdown the runtime +// Stop shuts down the runtime func (k *kubernetes) Stop() error { k.Lock() defer k.Unlock() @@ -437,3 +431,24 @@ func (k *kubernetes) Stop() error { func (k *kubernetes) String() string { return "kubernetes" } + +// NewRuntime creates new kubernetes runtime +func NewRuntime(opts ...runtime.Option) runtime.Runtime { + // get default options + options := runtime.Options{} + + // apply requested options + for _, o := range opts { + o(&options) + } + + // kubernetes client + client := client.NewClientInCluster() + + return &kubernetes{ + options: options, + closed: make(chan bool), + queue: make(chan *task, 128), + client: client, + } +} diff --git a/runtime/kubernetes/service.go b/runtime/kubernetes/service.go index 2a73071c..1fcb4029 100644 --- a/runtime/kubernetes/service.go +++ b/runtime/kubernetes/service.go @@ -2,6 +2,7 @@ package kubernetes import ( "strings" + "time" "github.com/micro/go-micro/runtime" "github.com/micro/go-micro/runtime/kubernetes/client" @@ -34,6 +35,9 @@ func newService(s *runtime.Service, c runtime.CreateOptions) *service { kdeploy.Metadata.Annotations["owner"] = "micro" kdeploy.Metadata.Annotations["group"] = "micro" + // set a build timestamp to the current time + kdeploy.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339) + // define the environment values used by the container env := make([]client.EnvVar, 0, len(c.Env)) for _, evar := range c.Env {