diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index c69cf758..afadd244 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -24,8 +24,6 @@ type kubernetes struct { options runtime.Options // indicates if we're running running bool - // used to stop the runtime - closed chan bool // client is kubernetes client client client.Client // namespaces which exist @@ -240,93 +238,6 @@ func (k *kubernetes) getService(labels map[string]string, opts ...client.GetOpti return services, nil } -// run runs the runtime management loop -func (k *kubernetes) run(events <-chan runtime.Event) { - t := time.NewTicker(time.Second * 10) - defer t.Stop() - - for { - select { - case <-t.C: - // TODO: figure out what to do here - // - do we even need the ticker for k8s services? - case event := <-events: - // NOTE: we only handle Update events for now - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime received notification event: %v", event) - } - switch event.Type { - case runtime.Update: - // only process if there's an actual service - // we do not update all the things individually - if event.Service == nil { - continue - } - - // format the name - name := client.Format(event.Service.Name) - - // set the default labels - labels := map[string]string{ - "micro": k.options.Type, - "name": name, - } - - if len(event.Service.Version) > 0 { - labels["version"] = event.Service.Version - } - - // get the deployment status - deployed := new(client.DeploymentList) - - // get the existing service rather than creating a new one - err := k.client.Get(&client.Resource{ - Kind: "deployment", - Value: deployed, - }, client.GetLabels(labels)) - - if err != nil { - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime update failed to get service %s: %v", event.Service, err) - } - continue - } - - // technically we should not receive multiple versions but hey ho - for _, service := range deployed.Items { - // check the name matches - if service.Metadata.Name != name { - continue - } - - // update build time annotation - if service.Spec.Template.Metadata.Annotations == nil { - service.Spec.Template.Metadata.Annotations = make(map[string]string) - } - - // update the build time - service.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", event.Timestamp.Unix()) - - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name) - } - if err := k.client.Update(deploymentResource(&service)); err != nil { - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime failed to update service %s: %v", event.Service, err) - } - continue - } - } - } - case <-k.closed: - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime stopped") - } - return - } - } -} - // Init initializes runtime options func (k *kubernetes) Init(opts ...runtime.Option) error { k.Lock() @@ -599,22 +510,6 @@ func (k *kubernetes) Start() error { // set running k.running = true - k.closed = make(chan bool) - - var events <-chan runtime.Event - if k.options.Scheduler != nil { - var err error - events, err = k.options.Scheduler.Notify() - if err != nil { - // TODO: should we bail here? - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime failed to start update notifier") - } - } - } - - go k.run(events) - return nil } @@ -627,19 +522,8 @@ func (k *kubernetes) Stop() error { return nil } - select { - case <-k.closed: - return nil - default: - close(k.closed) - // set not running - k.running = false - // stop the scheduler - if k.options.Scheduler != nil { - return k.options.Scheduler.Close() - } - } - + // set not running + k.running = false return nil } @@ -666,7 +550,6 @@ func NewRuntime(opts ...runtime.Option) runtime.Runtime { return &kubernetes{ options: options, - closed: make(chan bool), client: client, } } diff --git a/runtime/local/local.go b/runtime/local/local.go index ad24f091..05ba0d7c 100644 --- a/runtime/local/local.go +++ b/runtime/local/local.go @@ -9,7 +9,6 @@ import ( "path/filepath" "strings" "sync" - "time" "github.com/hpcloud/tail" "github.com/micro/go-micro/v3/logger" @@ -30,8 +29,6 @@ type localRuntime struct { sync.RWMutex // options configure runtime options runtime.Options - // used to stop the runtime - closed chan bool // used to start new services start chan *service // indicates if we're running @@ -56,7 +53,6 @@ func NewRuntime(opts ...runtime.Option) runtime.Runtime { return &localRuntime{ options: options, - closed: make(chan bool), start: make(chan *service, 128), namespaces: make(map[string]map[string]*service), } @@ -74,136 +70,6 @@ func (r *localRuntime) Init(opts ...runtime.Option) error { return nil } -// run runs the runtime management loop -func (r *localRuntime) run(events <-chan runtime.Event) { - t := time.NewTicker(time.Second * 5) - defer t.Stop() - - // process event processes an incoming event - processEvent := func(event runtime.Event, service *service, ns string) error { - // get current vals - r.RLock() - name := service.Name - updated := service.updated - r.RUnlock() - - // only process if the timestamp is newer - if !event.Timestamp.After(updated) { - return nil - } - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime updating service %s in %v namespace", name, ns) - } - - // this will cause a delete followed by created - if err := r.Update(service.Service, runtime.UpdateNamespace(ns)); err != nil { - return err - } - - // update the local timestamp - r.Lock() - service.updated = updated - r.Unlock() - - return nil - } - - for { - select { - case <-t.C: - // check running services - r.RLock() - for _, sevices := range r.namespaces { - for _, service := range sevices { - if !service.ShouldStart() { - continue - } - - // TODO: check service error - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime starting %s", service.Name) - } - if err := service.Start(); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error starting %s: %v", service.Name, err) - } - } - } - } - r.RUnlock() - case service := <-r.start: - if !service.ShouldStart() { - continue - } - // TODO: check service error - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime starting service %s", service.Name) - } - if err := service.Start(); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error starting service %s: %v", service.Name, err) - } - } - case event := <-events: - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime received notification event: %v", event) - } - // NOTE: we only handle Update events for now - switch event.Type { - case runtime.Update: - if event.Service != nil { - ns := defaultNamespace - if event.Options != nil && len(event.Options.Namespace) > 0 { - ns = event.Options.Namespace - } - - r.RLock() - if _, ok := r.namespaces[ns]; !ok { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime unknown namespace: %s", ns) - } - r.RUnlock() - continue - } - service, ok := r.namespaces[ns][fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)] - r.RUnlock() - if !ok { - logger.Debugf("Runtime unknown service: %s", event.Service) - } - - if err := processEvent(event, service, ns); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error updating service %s: %v", event.Service, err) - } - } - continue - } - - r.RLock() - namespaces := r.namespaces - r.RUnlock() - - // if blank service was received we update all services - for ns, services := range namespaces { - for _, service := range services { - if err := processEvent(event, service, ns); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error updating service %s: %v", service.Name, err) - } - } - } - } - } - case <-r.closed: - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime stopped") - } - return - } - } -} - func logFile(serviceName string) string { // make the directory name := strings.Replace(serviceName, "/", "-", -1) @@ -524,22 +390,6 @@ func (r *localRuntime) Start() error { // set running r.running = true - r.closed = make(chan bool) - - var events <-chan runtime.Event - if r.options.Scheduler != nil { - var err error - events, err = r.options.Scheduler.Notify() - if err != nil { - // TODO: should we bail here? - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to start update notifier") - } - } - } - - go r.run(events) - return nil } @@ -552,28 +402,16 @@ func (r *localRuntime) Stop() error { return nil } - select { - case <-r.closed: - return nil - default: - close(r.closed) + // set not running + r.running = false - // set not running - r.running = false - - // stop all the services - for _, services := range r.namespaces { - for _, service := range services { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime stopping %s", service.Name) - } - service.Stop() + // stop all the services + for _, services := range r.namespaces { + for _, service := range services { + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime stopping %s", service.Name) } - } - - // stop the scheduler - if r.options.Scheduler != nil { - return r.options.Scheduler.Close() + service.Stop() } } diff --git a/runtime/options.go b/runtime/options.go index 661565f9..3ec72aec 100644 --- a/runtime/options.go +++ b/runtime/options.go @@ -15,8 +15,6 @@ type Options struct { Client client.Client // Base image to use Image string - // Scheduler for updates - Scheduler Scheduler // Source of the services repository Source string // Service type to manage @@ -30,13 +28,6 @@ func WithSource(src string) Option { } } -// WithScheduler specifies a scheduler for updates -func WithScheduler(n Scheduler) Option { - return func(o *Options) { - o.Scheduler = n - } -} - // WithType sets the service type to manage func WithType(t string) Option { return func(o *Options) { diff --git a/runtime/runtime.go b/runtime/runtime.go index 4e331443..61aceedb 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -50,14 +50,6 @@ type Log struct { Metadata map[string]string } -// Scheduler is a runtime service scheduler -type Scheduler interface { - // Notify publishes schedule events - Notify() (<-chan Event, error) - // Close stops the scheduler - Close() error -} - // EventType defines schedule event type EventType int