Start runtime services inline

This commit is contained in:
Asim Aslam 2019-12-24 17:51:30 +00:00
parent 14c9c412cd
commit 2fe64001c0
4 changed files with 18 additions and 70 deletions

View File

@ -15,7 +15,7 @@ import (
) )
type klog struct { type klog struct {
client client.Kubernetes client client.Client
log.Options log.Options
} }

View File

@ -167,12 +167,16 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error {
return errors.New("missing exec command") return errors.New("missing exec command")
} }
// save service // create new service
r.services[s.Name] = newService(s, options) service := newService(s, options)
// push into start queue // start the service
log.Debugf("Runtime creating service %s", s.Name) if err := service.Start(); err != nil {
r.start <- r.services[s.Name] return err
}
// save service
r.services[s.Name] = service
return nil return nil
} }

View File

@ -20,24 +20,16 @@ const (
stop stop
) )
// task is queued into runtime queue
type task struct {
action action
service *service
}
type kubernetes struct { type kubernetes struct {
sync.RWMutex sync.RWMutex
// options configure runtime // options configure runtime
options runtime.Options options runtime.Options
// indicates if we're running // indicates if we're running
running bool running bool
// task queue for kubernetes services
queue chan *task
// used to stop the runtime // used to stop the runtime
closed chan bool closed chan bool
// client is kubernetes client // client is kubernetes client
client client.Kubernetes client client.Client
} }
// getService queries kubernetes for micro service // getService queries kubernetes for micro service
@ -163,30 +155,6 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
case <-t.C: case <-t.C:
// TODO: figure out what to do here // TODO: figure out what to do here
// - do we even need the ticker for k8s services? // - do we even need the ticker for k8s services?
case task := <-k.queue:
// The task queue is used to take actions e.g (CRUD - R)
switch task.action {
case start:
log.Debugf("Runtime starting new service: %s", task.service.Name)
if err := task.service.Start(k.client); err != nil {
log.Debugf("Runtime failed to start service %s: %v", task.service.Name, err)
continue
}
case stop:
log.Debugf("Runtime stopping service: %s", task.service.Name)
if err := task.service.Stop(k.client); err != nil {
log.Debugf("Runtime failed to stop service %s: %v", task.service.Name, err)
continue
}
case update:
log.Debugf("Runtime updating service: %s", task.service.Name)
if err := task.service.Update(k.client); err != nil {
log.Debugf("Runtime failed to update service %s: %v", task.service.Name, err)
continue
}
default:
log.Debugf("Runtime received unknown action for service: %s", task.service.Name)
}
case event := <-events: case event := <-events:
// NOTE: we only handle Update events for now // NOTE: we only handle Update events for now
log.Debugf("Runtime received notification event: %v", event) log.Debugf("Runtime received notification event: %v", event)
@ -299,15 +267,8 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er
// create new kubernetes micro service // create new kubernetes micro service
service := newService(s, options) service := newService(s, options)
log.Debugf("Runtime queueing service %s version %s for start action", service.Name, service.Version) // start the service
return service.Start(k.client)
// push into start queue
k.queue <- &task{
action: start,
service: service,
}
return nil
} }
// Read returns all instances of given service // Read returns all instances of given service
@ -365,15 +326,7 @@ func (k *kubernetes) Update(s *runtime.Service) error {
// update build time annotation // update build time annotation
service.kdeploy.Spec.Template.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339) service.kdeploy.Spec.Template.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339)
log.Debugf("Runtime queueing service %s for update action", service.Name) return service.Update(k.client)
// queue service for removal
k.queue <- &task{
action: update,
service: service,
}
return nil
} }
// Delete removes a service // Delete removes a service
@ -386,15 +339,7 @@ func (k *kubernetes) Delete(s *runtime.Service) error {
Type: k.options.Type, Type: k.options.Type,
}) })
log.Debugf("Runtime queueing service %s for delete action", service.Name) return service.Stop(k.client)
// queue service for removal
k.queue <- &task{
action: stop,
service: service,
}
return nil
} }
// Start starts the runtime // Start starts the runtime
@ -475,7 +420,6 @@ func NewRuntime(opts ...runtime.Option) runtime.Runtime {
return &kubernetes{ return &kubernetes{
options: options, options: options,
closed: make(chan bool), closed: make(chan bool),
queue: make(chan *task, 128),
client: client, client: client,
} }
} }

View File

@ -85,7 +85,7 @@ func serviceResource(s *client.Service) *client.Resource {
} }
// Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects // Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects
func (s *service) Start(k client.Kubernetes) error { func (s *service) Start(k client.Client) error {
// create deployment first; if we fail, we dont create service // create deployment first; if we fail, we dont create service
if err := k.Create(deploymentResource(s.kdeploy)); err != nil { if err := k.Create(deploymentResource(s.kdeploy)); err != nil {
log.Debugf("Runtime failed to create deployment: %v", err) log.Debugf("Runtime failed to create deployment: %v", err)
@ -100,7 +100,7 @@ func (s *service) Start(k client.Kubernetes) error {
return nil return nil
} }
func (s *service) Stop(k client.Kubernetes) error { func (s *service) Stop(k client.Client) error {
// first attempt to delete service // first attempt to delete service
if err := k.Delete(serviceResource(s.kservice)); err != nil { if err := k.Delete(serviceResource(s.kservice)); err != nil {
log.Debugf("Runtime failed to delete service: %v", err) log.Debugf("Runtime failed to delete service: %v", err)
@ -115,7 +115,7 @@ func (s *service) Stop(k client.Kubernetes) error {
return nil return nil
} }
func (s *service) Update(k client.Kubernetes) error { func (s *service) Update(k client.Client) error {
if err := k.Update(deploymentResource(s.kdeploy)); err != nil { if err := k.Update(deploymentResource(s.kdeploy)); err != nil {
log.Debugf("Runtime failed to update deployment: %v", err) log.Debugf("Runtime failed to update deployment: %v", err)
return err return err