Merge pull request #1645 from micro/runtime-multitenancy

Runtime multi-tenancy
This commit is contained in:
ben-toogood 2020-05-19 17:06:11 +01:00 committed by GitHub
commit e61edf6280
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 135 additions and 67 deletions

View File

@ -18,6 +18,9 @@ import (
"github.com/micro/go-micro/v2/runtime/local/git" "github.com/micro/go-micro/v2/runtime/local/git"
) )
// defaultNamespace to use if not provided as an option
const defaultNamespace = "default"
type runtime struct { type runtime struct {
sync.RWMutex sync.RWMutex
// options configure runtime // options configure runtime
@ -28,9 +31,9 @@ type runtime struct {
start chan *service start chan *service
// indicates if we're running // indicates if we're running
running bool running bool
// the service map // namespaces stores services grouped by namespace, e.g. namespaces["foo"]["go.micro.auth:latest"]
// TODO: track different versions of the same service // would return the latest version of go.micro.auth from the foo namespace
services map[string]*service namespaces map[string]map[string]*service
} }
// NewRuntime creates new local runtime and returns it // NewRuntime creates new local runtime and returns it
@ -51,7 +54,7 @@ func NewRuntime(opts ...Option) Runtime {
options: options, options: options,
closed: make(chan bool), closed: make(chan bool),
start: make(chan *service, 128), start: make(chan *service, 128),
services: make(map[string]*service), namespaces: make(map[string]map[string]*service),
} }
} }
@ -190,7 +193,7 @@ func (r *runtime) run(events <-chan Event) {
defer t.Stop() defer t.Stop()
// process event processes an incoming event // process event processes an incoming event
processEvent := func(event Event, service *service) error { processEvent := func(event Event, service *service, ns string) error {
// get current vals // get current vals
r.RLock() r.RLock()
name := service.Name name := service.Name
@ -203,11 +206,11 @@ func (r *runtime) run(events <-chan Event) {
} }
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime updating service %s", name) logger.Debugf("Runtime updating service %s in %v namespace", name, ns)
} }
// this will cause a delete followed by created // this will cause a delete followed by created
if err := r.Update(service.Service); err != nil { if err := r.Update(service.Service, UpdateNamespace(ns)); err != nil {
return err return err
} }
@ -224,7 +227,8 @@ func (r *runtime) run(events <-chan Event) {
case <-t.C: case <-t.C:
// check running services // check running services
r.RLock() r.RLock()
for _, service := range r.services { for _, sevices := range r.namespaces {
for _, service := range sevices {
if !service.ShouldStart() { if !service.ShouldStart() {
continue continue
} }
@ -239,6 +243,7 @@ func (r *runtime) run(events <-chan Event) {
} }
} }
} }
}
r.RUnlock() r.RUnlock()
case service := <-r.start: case service := <-r.start:
if !service.ShouldStart() { if !service.ShouldStart() {
@ -260,17 +265,27 @@ func (r *runtime) run(events <-chan Event) {
// NOTE: we only handle Update events for now // NOTE: we only handle Update events for now
switch event.Type { switch event.Type {
case Update: case Update:
if len(event.Service) > 0 { if event.Service != nil {
r.RLock() ns := defaultNamespace
service, ok := r.services[fmt.Sprintf("%v:%v", event.Service, event.Version)] if event.Options != nil && len(event.Options.Namespace) > 0 {
r.RUnlock() ns = event.Options.Namespace
if !ok {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime unknown service: %s", event.Service)
} }
r.RLock()
if _, ok := r.namespaces[ns]; !ok {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime unknown namespace: %s", ns)
}
r.RUnlock()
continue continue
} }
if err := processEvent(event, service); err != nil { service, ok := r.namespaces[ns][fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)]
r.RUnlock()
if !ok {
logger.Debugf("Runtime unknown service: %s", event.Service)
}
if err := processEvent(event, service, ns); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error updating service %s: %v", event.Service, err) logger.Debugf("Runtime error updating service %s: %v", event.Service, err)
} }
@ -279,18 +294,20 @@ func (r *runtime) run(events <-chan Event) {
} }
r.RLock() r.RLock()
services := r.services namespaces := r.namespaces
r.RUnlock() r.RUnlock()
// if blank service was received we update all services // if blank service was received we update all services
for ns, services := range namespaces {
for _, service := range services { for _, service := range services {
if err := processEvent(event, service); err != nil { if err := processEvent(event, service, ns); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error updating service %s: %v", service.Name, err) logger.Debugf("Runtime error updating service %s: %v", service.Name, err)
} }
} }
} }
} }
}
case <-r.closed: case <-r.closed:
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime stopped") logger.Debugf("Runtime stopped")
@ -320,20 +337,25 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
if _, ok := r.services[serviceKey(s)]; ok {
return errors.New("service already running")
}
var options CreateOptions var options CreateOptions
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
if len(options.Command) == 0 { if len(options.Command) == 0 {
options.Command = []string{"go"} options.Command = []string{"go"}
options.Args = []string{"run", "."} options.Args = []string{"run", "."}
} }
if _, ok := r.namespaces[options.Namespace]; !ok {
r.namespaces[options.Namespace] = make(map[string]*service)
}
if _, ok := r.namespaces[options.Namespace][serviceKey(s)]; ok {
return errors.New("service already running")
}
// create new service // create new service
service := newService(s, options) service := newService(s, options)
@ -353,7 +375,7 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error {
} }
// save service // save service
r.services[serviceKey(s)] = service r.namespaces[options.Namespace][serviceKey(s)] = service
return nil return nil
} }
@ -481,6 +503,9 @@ func (r *runtime) Read(opts ...ReadOption) ([]*Service, error) {
for _, o := range opts { for _, o := range opts {
o(&gopts) o(&gopts)
} }
if len(gopts.Namespace) == 0 {
gopts.Namespace = defaultNamespace
}
save := func(k, v string) bool { save := func(k, v string) bool {
if len(k) == 0 { if len(k) == 0 {
@ -492,7 +517,11 @@ func (r *runtime) Read(opts ...ReadOption) ([]*Service, error) {
//nolint:prealloc //nolint:prealloc
var services []*Service var services []*Service
for _, service := range r.services { if _, ok := r.namespaces[gopts.Namespace]; !ok {
return make([]*Service, 0), nil
}
for _, service := range r.namespaces[gopts.Namespace] {
if !save(gopts.Service, service.Name) { if !save(gopts.Service, service.Name) {
continue continue
} }
@ -509,20 +538,37 @@ func (r *runtime) Read(opts ...ReadOption) ([]*Service, error) {
// Update attemps to update the service // Update attemps to update the service
func (r *runtime) Update(s *Service, opts ...UpdateOption) error { func (r *runtime) Update(s *Service, opts ...UpdateOption) error {
var options UpdateOptions
for _, o := range opts {
o(&options)
}
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
err := r.checkoutSourceIfNeeded(s) err := r.checkoutSourceIfNeeded(s)
if err != nil { if err != nil {
return err return err
} }
r.Lock() r.Lock()
service, ok := r.services[serviceKey(s)] srvs, ok := r.namespaces[options.Namespace]
r.Unlock() r.Unlock()
if !ok { if !ok {
return errors.New("Service not found") return errors.New("Service not found")
} }
err = service.Stop()
if err != nil { r.Lock()
service, ok := srvs[serviceKey(s)]
r.Unlock()
if !ok {
return errors.New("Service not found")
}
if err := service.Stop(); err != nil {
return err return err
} }
return service.Start() return service.Start()
} }
@ -531,24 +577,41 @@ func (r *runtime) Delete(s *Service, opts ...DeleteOption) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
if logger.V(logger.DebugLevel, logger.DefaultLogger) { var options DeleteOptions
logger.Debugf("Runtime deleting service %s", s.Name) for _, o := range opts {
o(&options)
} }
if s, ok := r.services[serviceKey(s)]; ok { if len(options.Namespace) == 0 {
// check if running options.Namespace = defaultNamespace
if !s.Running() {
delete(r.services, s.key())
return nil
} }
// otherwise stop it
if err := s.Stop(); err != nil { srvs, ok := r.namespaces[options.Namespace]
return err if !ok {
}
// delete it
delete(r.services, s.key())
return nil return nil
} }
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime deleting service %s", s.Name)
}
service, ok := srvs[serviceKey(s)]
if !ok {
return nil
}
// check if running
if !service.Running() {
delete(srvs, service.key())
r.namespaces[options.Namespace] = srvs
return nil
}
// otherwise stop it
if err := service.Stop(); err != nil {
return err
}
// delete it
delete(srvs, service.key())
r.namespaces[options.Namespace] = srvs
return nil return nil
} }
@ -602,12 +665,15 @@ func (r *runtime) Stop() error {
r.running = false r.running = false
// stop all the services // stop all the services
for _, service := range r.services { for _, services := range r.namespaces {
for _, service := range services {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime stopping %s", service.Name) logger.Debugf("Runtime stopping %s", service.Name)
} }
service.Stop() service.Stop()
} }
}
// stop the scheduler // stop the scheduler
if r.options.Scheduler != nil { if r.options.Scheduler != nil {
return r.options.Scheduler.Close() return r.options.Scheduler.Close()

View File

@ -252,12 +252,12 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
case runtime.Update: case runtime.Update:
// only process if there's an actual service // only process if there's an actual service
// we do not update all the things individually // we do not update all the things individually
if len(event.Service) == 0 { if event.Service == nil {
continue continue
} }
// format the name // format the name
name := client.Format(event.Service) name := client.Format(event.Service.Name)
// set the default labels // set the default labels
labels := map[string]string{ labels := map[string]string{
@ -265,8 +265,8 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
"name": name, "name": name,
} }
if len(event.Version) > 0 { if len(event.Service.Version) > 0 {
labels["version"] = event.Version labels["version"] = event.Service.Version
} }
// get the deployment status // get the deployment status

View File

@ -85,14 +85,16 @@ func (t EventType) String() string {
// Event is notification event // Event is notification event
type Event struct { type Event struct {
// ID of the event
ID string
// Type is event type // Type is event type
Type EventType Type EventType
// Timestamp is event timestamp // Timestamp is event timestamp
Timestamp time.Time Timestamp time.Time
// Service is the name of the service // Service the event relates to
Service string Service *Service
// Version of the build // Options to use when processing the event
Version string Options *CreateOptions
} }
// Service is runtime service // Service is runtime service