diff --git a/debug/log/kubernetes/kubernetes.go b/debug/log/kubernetes/kubernetes.go index 317d0952..6fd8aa7d 100644 --- a/debug/log/kubernetes/kubernetes.go +++ b/debug/log/kubernetes/kubernetes.go @@ -15,7 +15,7 @@ import ( ) type klog struct { - client client.Kubernetes + client client.Client log.Options } diff --git a/runtime/default.go b/runtime/default.go index e9318d46..1f40380e 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -167,12 +167,16 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error { return errors.New("missing exec command") } - // save service - r.services[s.Name] = newService(s, options) + // create new service + service := newService(s, options) - // push into start queue - log.Debugf("Runtime creating service %s", s.Name) - r.start <- r.services[s.Name] + // start the service + if err := service.Start(); err != nil { + return err + } + + // save service + r.services[s.Name] = service return nil } diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 9c5d9089..6ba29df8 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -20,24 +20,16 @@ const ( stop ) -// task is queued into runtime queue -type task struct { - action action - service *service -} - type kubernetes struct { sync.RWMutex // options configure runtime options runtime.Options // indicates if we're running running bool - // task queue for kubernetes services - queue chan *task // used to stop the runtime closed chan bool // client is kubernetes client - client client.Kubernetes + client client.Client } // getService queries kubernetes for micro service @@ -163,30 +155,6 @@ func (k *kubernetes) run(events <-chan runtime.Event) { case <-t.C: // TODO: figure out what to do here // - do we even need the ticker for k8s services? - case task := <-k.queue: - // The task queue is used to take actions e.g (CRUD - R) - switch task.action { - case start: - log.Debugf("Runtime starting new service: %s", task.service.Name) - if err := task.service.Start(k.client); err != nil { - log.Debugf("Runtime failed to start service %s: %v", task.service.Name, err) - continue - } - case stop: - log.Debugf("Runtime stopping service: %s", task.service.Name) - if err := task.service.Stop(k.client); err != nil { - log.Debugf("Runtime failed to stop service %s: %v", task.service.Name, err) - continue - } - case update: - log.Debugf("Runtime updating service: %s", task.service.Name) - if err := task.service.Update(k.client); err != nil { - log.Debugf("Runtime failed to update service %s: %v", task.service.Name, err) - continue - } - default: - log.Debugf("Runtime received unknown action for service: %s", task.service.Name) - } case event := <-events: // NOTE: we only handle Update events for now log.Debugf("Runtime received notification event: %v", event) @@ -299,15 +267,8 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er // create new kubernetes micro service service := newService(s, options) - log.Debugf("Runtime queueing service %s version %s for start action", service.Name, service.Version) - - // push into start queue - k.queue <- &task{ - action: start, - service: service, - } - - return nil + // start the service + return service.Start(k.client) } // Read returns all instances of given service @@ -365,15 +326,7 @@ func (k *kubernetes) Update(s *runtime.Service) error { // update build time annotation service.kdeploy.Spec.Template.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339) - log.Debugf("Runtime queueing service %s for update action", service.Name) - - // queue service for removal - k.queue <- &task{ - action: update, - service: service, - } - - return nil + return service.Update(k.client) } // Delete removes a service @@ -386,15 +339,7 @@ func (k *kubernetes) Delete(s *runtime.Service) error { Type: k.options.Type, }) - log.Debugf("Runtime queueing service %s for delete action", service.Name) - - // queue service for removal - k.queue <- &task{ - action: stop, - service: service, - } - - return nil + return service.Stop(k.client) } // Start starts the runtime @@ -475,7 +420,6 @@ func NewRuntime(opts ...runtime.Option) runtime.Runtime { return &kubernetes{ options: options, closed: make(chan bool), - queue: make(chan *task, 128), client: client, } } diff --git a/runtime/kubernetes/service.go b/runtime/kubernetes/service.go index 704aafcf..b74eb7f2 100644 --- a/runtime/kubernetes/service.go +++ b/runtime/kubernetes/service.go @@ -85,7 +85,7 @@ func serviceResource(s *client.Service) *client.Resource { } // Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects -func (s *service) Start(k client.Kubernetes) error { +func (s *service) Start(k client.Client) error { // create deployment first; if we fail, we dont create service if err := k.Create(deploymentResource(s.kdeploy)); err != nil { log.Debugf("Runtime failed to create deployment: %v", err) @@ -100,7 +100,7 @@ func (s *service) Start(k client.Kubernetes) error { return nil } -func (s *service) Stop(k client.Kubernetes) error { +func (s *service) Stop(k client.Client) error { // first attempt to delete service if err := k.Delete(serviceResource(s.kservice)); err != nil { log.Debugf("Runtime failed to delete service: %v", err) @@ -115,7 +115,7 @@ func (s *service) Stop(k client.Kubernetes) error { return nil } -func (s *service) Update(k client.Kubernetes) error { +func (s *service) Update(k client.Client) error { if err := k.Update(deploymentResource(s.kdeploy)); err != nil { log.Debugf("Runtime failed to update deployment: %v", err) return err