From 14155c7e02ab1b1a34e7353dfcebd9de80ab3947 Mon Sep 17 00:00:00 2001 From: Ben Toogood Date: Tue, 19 May 2020 09:28:00 +0100 Subject: [PATCH 1/3] Add runtime ErrNotFound --- runtime/runtime.go | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/runtime.go b/runtime/runtime.go index 567e8c3b..62813973 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -12,6 +12,7 @@ var ( // DefaultName is default runtime service name DefaultName = "go.micro.runtime" + ErrNotFound = errors.New("not found") ErrAlreadyExists = errors.New("already exists") ) From c19b349e961366de7495931c548bd120828c235b Mon Sep 17 00:00:00 2001 From: Ben Toogood Date: Tue, 19 May 2020 10:14:07 +0100 Subject: [PATCH 2/3] Update runtime.Event struct --- runtime/default.go | 4 ++-- runtime/kubernetes/kubernetes.go | 8 ++++---- runtime/runtime.go | 10 ++++++---- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/runtime/default.go b/runtime/default.go index 869070fa..1be03e2f 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -260,9 +260,9 @@ func (r *runtime) run(events <-chan Event) { // NOTE: we only handle Update events for now switch event.Type { case Update: - if len(event.Service) > 0 { + if event.Service != nil { r.RLock() - service, ok := r.services[fmt.Sprintf("%v:%v", event.Service, event.Version)] + service, ok := r.services[fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)] r.RUnlock() if !ok { if logger.V(logger.DebugLevel, logger.DefaultLogger) { diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 01f61769..5ce110c6 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -252,12 +252,12 @@ func (k *kubernetes) run(events <-chan runtime.Event) { case runtime.Update: // only process if there's an actual service // we do not update all the things individually - if len(event.Service) == 0 { + if event.Service == nil { continue } // format the name - name := client.Format(event.Service) + name := client.Format(event.Service.Name) // set the default labels labels := map[string]string{ @@ -265,8 +265,8 @@ func (k *kubernetes) run(events <-chan runtime.Event) { "name": name, } - if len(event.Version) > 0 { - labels["version"] = event.Version + if len(event.Service.Version) > 0 { + labels["version"] = event.Service.Version } // get the deployment status diff --git a/runtime/runtime.go b/runtime/runtime.go index 62813973..288604ba 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -86,14 +86,16 @@ func (t EventType) String() string { // Event is notification event type Event struct { + // ID of the event + ID string // Type is event type Type EventType // Timestamp is event timestamp Timestamp time.Time - // Service is the name of the service - Service string - // Version of the build - Version string + // Service the event relates to + Service *Service + // Options to use when processing the event + Options *CreateOptions } // Service is runtime service From 8875719619808b626df005f7a156c04b2807cb91 Mon Sep 17 00:00:00 2001 From: Ben Toogood Date: Tue, 19 May 2020 11:01:06 +0100 Subject: [PATCH 3/3] Default Runtime multi-tenancy --- runtime/default.go | 182 ++++++++++++++++++++++++++++++--------------- runtime/runtime.go | 1 - 2 files changed, 124 insertions(+), 59 deletions(-) diff --git a/runtime/default.go b/runtime/default.go index 1be03e2f..61a1f562 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -18,6 +18,9 @@ import ( "github.com/micro/go-micro/v2/runtime/local/git" ) +// defaultNamespace to use if not provided as an option +const defaultNamespace = "default" + type runtime struct { sync.RWMutex // options configure runtime @@ -28,9 +31,9 @@ type runtime struct { start chan *service // indicates if we're running running bool - // the service map - // TODO: track different versions of the same service - services map[string]*service + // namespaces stores services grouped by namespace, e.g. namespaces["foo"]["go.micro.auth:latest"] + // would return the latest version of go.micro.auth from the foo namespace + namespaces map[string]map[string]*service } // NewRuntime creates new local runtime and returns it @@ -48,10 +51,10 @@ func NewRuntime(opts ...Option) Runtime { _ = os.MkdirAll(path, 0755) return &runtime{ - options: options, - closed: make(chan bool), - start: make(chan *service, 128), - services: make(map[string]*service), + options: options, + closed: make(chan bool), + start: make(chan *service, 128), + namespaces: make(map[string]map[string]*service), } } @@ -190,7 +193,7 @@ func (r *runtime) run(events <-chan Event) { defer t.Stop() // process event processes an incoming event - processEvent := func(event Event, service *service) error { + processEvent := func(event Event, service *service, ns string) error { // get current vals r.RLock() name := service.Name @@ -203,11 +206,11 @@ func (r *runtime) run(events <-chan Event) { } if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime updating service %s", name) + logger.Debugf("Runtime updating service %s in %v namespace", name, ns) } // this will cause a delete followed by created - if err := r.Update(service.Service); err != nil { + if err := r.Update(service.Service, UpdateNamespace(ns)); err != nil { return err } @@ -224,18 +227,20 @@ func (r *runtime) run(events <-chan Event) { case <-t.C: // check running services r.RLock() - for _, service := range r.services { - if !service.ShouldStart() { - continue - } + for _, sevices := range r.namespaces { + for _, service := range sevices { + if !service.ShouldStart() { + continue + } - // TODO: check service error - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime starting %s", service.Name) - } - if err := service.Start(); err != nil { + // TODO: check service error if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error starting %s: %v", service.Name, err) + logger.Debugf("Runtime starting %s", service.Name) + } + if err := service.Start(); err != nil { + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime error starting %s: %v", service.Name, err) + } } } } @@ -261,16 +266,26 @@ func (r *runtime) run(events <-chan Event) { switch event.Type { case Update: if event.Service != nil { + ns := defaultNamespace + if event.Options != nil && len(event.Options.Namespace) > 0 { + ns = event.Options.Namespace + } + r.RLock() - service, ok := r.services[fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)] - r.RUnlock() - if !ok { + if _, ok := r.namespaces[ns]; !ok { if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime unknown service: %s", event.Service) + logger.Debugf("Runtime unknown namespace: %s", ns) } + r.RUnlock() continue } - if err := processEvent(event, service); err != nil { + service, ok := r.namespaces[ns][fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)] + r.RUnlock() + if !ok { + logger.Debugf("Runtime unknown service: %s", event.Service) + } + + if err := processEvent(event, service, ns); err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Runtime error updating service %s: %v", event.Service, err) } @@ -279,14 +294,16 @@ func (r *runtime) run(events <-chan Event) { } r.RLock() - services := r.services + namespaces := r.namespaces r.RUnlock() // if blank service was received we update all services - for _, service := range services { - if err := processEvent(event, service); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error updating service %s: %v", service.Name, err) + for ns, services := range namespaces { + for _, service := range services { + if err := processEvent(event, service, ns); err != nil { + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime error updating service %s: %v", service.Name, err) + } } } } @@ -320,20 +337,25 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error { r.Lock() defer r.Unlock() - if _, ok := r.services[serviceKey(s)]; ok { - return errors.New("service already running") - } - var options CreateOptions for _, o := range opts { o(&options) } - + if len(options.Namespace) == 0 { + options.Namespace = defaultNamespace + } if len(options.Command) == 0 { options.Command = []string{"go"} options.Args = []string{"run", "."} } + if _, ok := r.namespaces[options.Namespace]; !ok { + r.namespaces[options.Namespace] = make(map[string]*service) + } + if _, ok := r.namespaces[options.Namespace][serviceKey(s)]; ok { + return errors.New("service already running") + } + // create new service service := newService(s, options) @@ -353,7 +375,7 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error { } // save service - r.services[serviceKey(s)] = service + r.namespaces[options.Namespace][serviceKey(s)] = service return nil } @@ -481,6 +503,9 @@ func (r *runtime) Read(opts ...ReadOption) ([]*Service, error) { for _, o := range opts { o(&gopts) } + if len(gopts.Namespace) == 0 { + gopts.Namespace = defaultNamespace + } save := func(k, v string) bool { if len(k) == 0 { @@ -492,7 +517,11 @@ func (r *runtime) Read(opts ...ReadOption) ([]*Service, error) { //nolint:prealloc var services []*Service - for _, service := range r.services { + if _, ok := r.namespaces[gopts.Namespace]; !ok { + return make([]*Service, 0), nil + } + + for _, service := range r.namespaces[gopts.Namespace] { if !save(gopts.Service, service.Name) { continue } @@ -509,20 +538,37 @@ func (r *runtime) Read(opts ...ReadOption) ([]*Service, error) { // Update attemps to update the service func (r *runtime) Update(s *Service, opts ...UpdateOption) error { + var options UpdateOptions + for _, o := range opts { + o(&options) + } + if len(options.Namespace) == 0 { + options.Namespace = defaultNamespace + } + err := r.checkoutSourceIfNeeded(s) if err != nil { return err } + r.Lock() - service, ok := r.services[serviceKey(s)] + srvs, ok := r.namespaces[options.Namespace] r.Unlock() if !ok { return errors.New("Service not found") } - err = service.Stop() - if err != nil { + + r.Lock() + service, ok := srvs[serviceKey(s)] + r.Unlock() + if !ok { + return errors.New("Service not found") + } + + if err := service.Stop(); err != nil { return err } + return service.Start() } @@ -531,24 +577,41 @@ func (r *runtime) Delete(s *Service, opts ...DeleteOption) error { r.Lock() defer r.Unlock() - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime deleting service %s", s.Name) + var options DeleteOptions + for _, o := range opts { + o(&options) } - if s, ok := r.services[serviceKey(s)]; ok { - // check if running - if !s.Running() { - delete(r.services, s.key()) - return nil - } - // otherwise stop it - if err := s.Stop(); err != nil { - return err - } - // delete it - delete(r.services, s.key()) + if len(options.Namespace) == 0 { + options.Namespace = defaultNamespace + } + + srvs, ok := r.namespaces[options.Namespace] + if !ok { return nil } + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime deleting service %s", s.Name) + } + + service, ok := srvs[serviceKey(s)] + if !ok { + return nil + } + + // check if running + if !service.Running() { + delete(srvs, service.key()) + r.namespaces[options.Namespace] = srvs + return nil + } + // otherwise stop it + if err := service.Stop(); err != nil { + return err + } + // delete it + delete(srvs, service.key()) + r.namespaces[options.Namespace] = srvs return nil } @@ -602,12 +665,15 @@ func (r *runtime) Stop() error { r.running = false // stop all the services - for _, service := range r.services { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime stopping %s", service.Name) + for _, services := range r.namespaces { + for _, service := range services { + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime stopping %s", service.Name) + } + service.Stop() } - service.Stop() } + // stop the scheduler if r.options.Scheduler != nil { return r.options.Scheduler.Close() diff --git a/runtime/runtime.go b/runtime/runtime.go index 288604ba..1299baf5 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -12,7 +12,6 @@ var ( // DefaultName is default runtime service name DefaultName = "go.micro.runtime" - ErrNotFound = errors.New("not found") ErrAlreadyExists = errors.New("already exists") )