From 692b27578cbaa3c84162bfd25a61ff415c550ec3 Mon Sep 17 00:00:00 2001 From: ben-toogood Date: Thu, 23 Apr 2020 13:53:42 +0100 Subject: [PATCH] Runtime Namespace (#1547) * Add context option to runtime; Add dynamic namespace to kubectl client * Add namespace runtime arg * Fixes & Debugging * Pass options in k8s runtime * Set namespace on k8s resources * Additional Logging * More debugging * Remove Debugging * Ensure namespace exists * Add debugging * Refactor namespaceExists check * Fix * Fix * Fix * Fix * Change the way we check for namespace * Fix * Tidying Up * Fix Test * Fix merge bugs * Serialize k8s namespaces * Add namespace to watch * Serialize namespace when creating k8s namespace Co-authored-by: Ben Toogood Co-authored-by: Asim Aslam --- debug/log/kubernetes/kubernetes.go | 2 +- runtime/default.go | 4 +- runtime/kubernetes/kubernetes.go | 111 ++++++++++++++++++++------ runtime/kubernetes/kubernetes_logs.go | 2 +- runtime/kubernetes/service.go | 22 ++--- runtime/options.go | 101 +++++++++++++++++++++++ runtime/runtime.go | 4 +- runtime/service/proto/runtime.proto | 14 ++-- runtime/service/service.go | 52 +++++++++--- util/kubernetes/api/request.go | 7 +- util/kubernetes/client/client.go | 106 ++++++++++++++---------- util/kubernetes/client/options.go | 85 +++++++++++++++++++- util/kubernetes/client/types.go | 5 ++ util/kubernetes/client/util_test.go | 5 +- 14 files changed, 411 insertions(+), 109 deletions(-) diff --git a/debug/log/kubernetes/kubernetes.go b/debug/log/kubernetes/kubernetes.go index d81aba23..cf886d06 100644 --- a/debug/log/kubernetes/kubernetes.go +++ b/debug/log/kubernetes/kubernetes.go @@ -67,7 +67,7 @@ func (k *klog) getMatchingPods() ([]string, error) { // TODO: specify micro:service // l["micro"] = "service" - if err := k.client.Get(r, l); err != nil { + if err := k.client.Get(r, client.GetLabels(l)); err != nil { return nil, err } diff --git a/runtime/default.go b/runtime/default.go index dff3a235..8ac00bb9 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -335,7 +335,7 @@ func (r *runtime) Read(opts ...ReadOption) ([]*Service, error) { } // Update attemps to update the service -func (r *runtime) Update(s *Service) error { +func (r *runtime) Update(s *Service, opts ...UpdateOption) error { r.Lock() service, ok := r.services[serviceKey(s)] r.Unlock() @@ -350,7 +350,7 @@ func (r *runtime) Update(s *Service) error { } // Delete removes the service from the runtime and stops it -func (r *runtime) Delete(s *Service) error { +func (r *runtime) Delete(s *Service, opts ...DeleteOption) error { r.Lock() defer r.Unlock() diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 7adbd301..d4e5eaee 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/micro/go-micro/v2/logger" + log "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/runtime" "github.com/micro/go-micro/v2/util/kubernetes/client" ) @@ -24,11 +24,48 @@ type kubernetes struct { closed chan bool // client is kubernetes client client client.Client + // namespaces which exist + namespaces []client.Namespace +} + +// namespaceExists returns a boolean indicating if a namespace exists +func (k *kubernetes) namespaceExists(name string) (bool, error) { + // populate the cache + if k.namespaces == nil { + namespaceList := new(client.NamespaceList) + resource := &client.Resource{Kind: "namespace", Value: namespaceList} + if err := k.client.List(resource); err != nil { + return false, err + } + k.namespaces = namespaceList.Items + } + + // check if the namespace exists in the cache + for _, n := range k.namespaces { + if n.Metadata.Name == name { + return true, nil + } + } + + return false, nil +} + +// createNamespace creates a new k8s namespace +func (k *kubernetes) createNamespace(namespace string) error { + ns := client.Namespace{Metadata: &client.Metadata{Name: namespace}} + err := k.client.Create(&client.Resource{Kind: "namespace", Value: ns}) + + // add to cache + if err == nil && k.namespaces != nil { + k.namespaces = append(k.namespaces, ns) + } + + return err } // getService queries kubernetes for micro service // NOTE: this function is not thread-safe -func (k *kubernetes) getService(labels map[string]string) ([]*service, error) { +func (k *kubernetes) getService(labels map[string]string, opts ...client.GetOption) ([]*service, error) { // get the service status serviceList := new(client.ServiceList) r := &client.Resource{ @@ -36,8 +73,10 @@ func (k *kubernetes) getService(labels map[string]string) ([]*service, error) { Value: serviceList, } + opts = append(opts, client.GetLabels(labels)) + // get the service from k8s - if err := k.client.Get(r, labels); err != nil { + if err := k.client.Get(r, opts...); err != nil { return nil, err } @@ -47,7 +86,7 @@ func (k *kubernetes) getService(labels map[string]string) ([]*service, error) { Kind: "deployment", Value: depList, } - if err := k.client.Get(d, labels); err != nil { + if err := k.client.Get(d, opts...); err != nil { return nil, err } @@ -57,7 +96,7 @@ func (k *kubernetes) getService(labels map[string]string) ([]*service, error) { Kind: "pod", Value: podList, } - if err := k.client.Get(p, labels); err != nil { + if err := k.client.Get(p, opts...); err != nil { return nil, err } @@ -206,8 +245,8 @@ func (k *kubernetes) run(events <-chan runtime.Event) { // - do we even need the ticker for k8s services? case event := <-events: // NOTE: we only handle Update events for now - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime received notification event: %v", event) + if log.V(log.DebugLevel, log.DefaultLogger) { + log.Debugf("Runtime received notification event: %v", event) } switch event.Type { case runtime.Update: @@ -237,11 +276,11 @@ func (k *kubernetes) run(events <-chan runtime.Event) { err := k.client.Get(&client.Resource{ Kind: "deployment", Value: deployed, - }, labels) + }, client.GetLabels(labels)) if err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime update failed to get service %s: %v", event.Service, err) + if log.V(log.DebugLevel, log.DefaultLogger) { + log.Debugf("Runtime update failed to get service %s: %v", event.Service, err) } continue } @@ -262,20 +301,20 @@ func (k *kubernetes) run(events <-chan runtime.Event) { // update the build time service.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", event.Timestamp.Unix()) - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name) + if log.V(log.DebugLevel, log.DefaultLogger) { + log.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name) } if err := k.client.Update(deploymentResource(&service)); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to update service %s: %v", event.Service, err) + if log.V(log.DebugLevel, log.DefaultLogger) { + log.Debugf("Runtime failed to update service %s: %v", event.Service, err) } continue } } } case <-k.closed: - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime stopped") + if log.V(log.DebugLevel, log.DefaultLogger) { + log.Debugf("Runtime stopped") } return } @@ -305,7 +344,7 @@ func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (ru go func() { records, err := klo.Read() if err != nil { - logger.Errorf("Failed to get logs for service '%v' from k8s: %v", err) + log.Errorf("Failed to get logs for service '%v' from k8s: %v", err) return } // @todo: this might actually not run before podLogStream starts @@ -371,6 +410,16 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er s.Source = k.options.Source } + // ensure the namespace exists + namespace := client.SerializeResourceName(options.Namespace) + if exist, err := k.namespaceExists(namespace); err == nil && !exist { + if err := k.createNamespace(namespace); err != nil { + return err + } + } else if err != nil { + return err + } + // determine the image from the source and options options.Image = k.getImage(s, options) @@ -378,7 +427,7 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er service := newService(s, options) // start the service - return service.Start(k.client) + return service.Start(k.client, client.CreateNamespace(options.Namespace)) } // Read returns all instances of given service @@ -423,8 +472,12 @@ func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error } // Update the service in place -func (k *kubernetes) Update(s *runtime.Service) error { - // TODO: set the type +func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) error { + var options runtime.UpdateOptions + for _, o := range opts { + o(&options) + } + labels := map[string]string{} if len(s.Name) > 0 { @@ -459,7 +512,7 @@ func (k *kubernetes) Update(s *runtime.Service) error { service.kdeploy.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix()) // update the service - if err := service.Update(k.client); err != nil { + if err := service.Update(k.client, client.UpdateNamespace(options.Namespace)); err != nil { return err } } @@ -468,16 +521,22 @@ func (k *kubernetes) Update(s *runtime.Service) error { } // Delete removes a service -func (k *kubernetes) Delete(s *runtime.Service) error { +func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error { + var options runtime.DeleteOptions + for _, o := range opts { + o(&options) + } + k.Lock() defer k.Unlock() // create new kubernetes micro service service := newService(s, runtime.CreateOptions{ - Type: k.options.Type, + Type: k.options.Type, + Namespace: options.Namespace, }) - return service.Stop(k.client) + return service.Stop(k.client, client.DeleteNamespace(options.Namespace)) } // Start starts the runtime @@ -500,8 +559,8 @@ func (k *kubernetes) Start() error { events, err = k.options.Scheduler.Notify() if err != nil { // TODO: should we bail here? - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to start update notifier") + if log.V(log.DebugLevel, log.DefaultLogger) { + log.Debugf("Runtime failed to start update notifier") } } } diff --git a/runtime/kubernetes/kubernetes_logs.go b/runtime/kubernetes/kubernetes_logs.go index 3600170b..0c113f4e 100644 --- a/runtime/kubernetes/kubernetes_logs.go +++ b/runtime/kubernetes/kubernetes_logs.go @@ -70,7 +70,7 @@ func (k *klog) getMatchingPods() ([]string, error) { // TODO: specify micro:service // l["micro"] = "service" - if err := k.client.Get(r, l); err != nil { + if err := k.client.Get(r, client.GetLabels(l)); err != nil { return nil, err } diff --git a/runtime/kubernetes/service.go b/runtime/kubernetes/service.go index f04f9a14..b3c02395 100644 --- a/runtime/kubernetes/service.go +++ b/runtime/kubernetes/service.go @@ -30,8 +30,8 @@ func newService(s *runtime.Service, c runtime.CreateOptions) *service { name := client.Format(s.Name) version := client.Format(s.Version) - kservice := client.NewService(name, version, c.Type) - kdeploy := client.NewDeployment(name, version, c.Type) + kservice := client.NewService(name, version, c.Type, c.Namespace) + kdeploy := client.NewDeployment(name, version, c.Type, c.Namespace) // ensure the metadata is set if kdeploy.Spec.Template.Metadata.Annotations == nil { @@ -112,9 +112,9 @@ func serviceResource(s *client.Service) *client.Resource { } // Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects -func (s *service) Start(k client.Client) error { +func (s *service) Start(k client.Client, opts ...client.CreateOption) error { // create deployment first; if we fail, we dont create service - if err := k.Create(deploymentResource(s.kdeploy)); err != nil { + if err := k.Create(deploymentResource(s.kdeploy), opts...); err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Runtime failed to create deployment: %v", err) } @@ -126,7 +126,7 @@ func (s *service) Start(k client.Client) error { return err } // create service now that the deployment has been created - if err := k.Create(serviceResource(s.kservice)); err != nil { + if err := k.Create(serviceResource(s.kservice), opts...); err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Runtime failed to create service: %v", err) } @@ -143,9 +143,9 @@ func (s *service) Start(k client.Client) error { return nil } -func (s *service) Stop(k client.Client) error { +func (s *service) Stop(k client.Client, opts ...client.DeleteOption) error { // first attempt to delete service - if err := k.Delete(serviceResource(s.kservice)); err != nil { + if err := k.Delete(serviceResource(s.kservice), opts...); err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Runtime failed to delete service: %v", err) } @@ -153,7 +153,7 @@ func (s *service) Stop(k client.Client) error { return err } // delete deployment once the service has been deleted - if err := k.Delete(deploymentResource(s.kdeploy)); err != nil { + if err := k.Delete(deploymentResource(s.kdeploy), opts...); err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Runtime failed to delete deployment: %v", err) } @@ -166,15 +166,15 @@ func (s *service) Stop(k client.Client) error { return nil } -func (s *service) Update(k client.Client) error { - if err := k.Update(deploymentResource(s.kdeploy)); err != nil { +func (s *service) Update(k client.Client, opts ...client.UpdateOption) error { + if err := k.Update(deploymentResource(s.kdeploy), opts...); err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Runtime failed to update deployment: %v", err) } s.Status("error", err) return err } - if err := k.Update(serviceResource(s.kservice)); err != nil { + if err := k.Update(serviceResource(s.kservice), opts...); err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Runtime failed to update service: %v", err) } diff --git a/runtime/options.go b/runtime/options.go index 3ae7f506..d7d9fe90 100644 --- a/runtime/options.go +++ b/runtime/options.go @@ -1,6 +1,7 @@ package runtime import ( + "context" "io" ) @@ -66,6 +67,10 @@ type CreateOptions struct { Retries int // Specify the image to use Image string + // Namespace to create the service in + Namespace string + // Specify the context to use + Context context.Context } // ReadOptions queries runtime services @@ -76,6 +81,10 @@ type ReadOptions struct { Version string // Type of service Type string + // Namespace the service is running in + Namespace string + // Specify the context to use + Context context.Context } // CreateType sets the type of service to create @@ -92,6 +101,20 @@ func CreateImage(img string) CreateOption { } } +// CreateNamespace sets the namespace +func CreateNamespace(ns string) CreateOption { + return func(o *CreateOptions) { + o.Namespace = ns + } +} + +// CreateContext sets the context +func CreateContext(ctx context.Context) CreateOption { + return func(o *CreateOptions) { + o.Context = ctx + } +} + // WithCommand specifies the command to execute func WithCommand(cmd ...string) CreateOption { return func(o *CreateOptions) { @@ -150,6 +173,66 @@ func ReadType(t string) ReadOption { } } +// ReadNamespace sets the namespace +func ReadNamespace(ns string) ReadOption { + return func(o *ReadOptions) { + o.Namespace = ns + } +} + +// ReadContext sets the context +func ReadContext(ctx context.Context) ReadOption { + return func(o *ReadOptions) { + o.Context = ctx + } +} + +type UpdateOption func(o *UpdateOptions) + +type UpdateOptions struct { + // Namespace the service is running in + Namespace string + // Specify the context to use + Context context.Context +} + +// UpdateNamespace sets the namespace +func UpdateNamespace(ns string) UpdateOption { + return func(o *UpdateOptions) { + o.Namespace = ns + } +} + +// UpdateContext sets the context +func UpdateContext(ctx context.Context) UpdateOption { + return func(o *UpdateOptions) { + o.Context = ctx + } +} + +type DeleteOption func(o *DeleteOptions) + +type DeleteOptions struct { + // Namespace the service is running in + Namespace string + // Specify the context to use + Context context.Context +} + +// DeleteNamespace sets the namespace +func DeleteNamespace(ns string) DeleteOption { + return func(o *DeleteOptions) { + o.Namespace = ns + } +} + +// DeleteContext sets the context +func DeleteContext(ctx context.Context) DeleteOption { + return func(o *DeleteOptions) { + o.Context = ctx + } +} + // LogsOption configures runtime logging type LogsOption func(o *LogsOptions) @@ -159,6 +242,10 @@ type LogsOptions struct { Count int64 // Stream new lines? Stream bool + // Namespace the service is running in + Namespace string + // Specify the context to use + Context context.Context } // LogsExistingCount confiures how many existing lines to show @@ -174,3 +261,17 @@ func LogsStream(stream bool) LogsOption { l.Stream = stream } } + +// LogsNamespace sets the namespace +func LogsNamespace(ns string) LogsOption { + return func(o *LogsOptions) { + o.Namespace = ns + } +} + +// LogsContext sets the context +func LogsContext(ctx context.Context) LogsOption { + return func(o *LogsOptions) { + o.Context = ctx + } +} diff --git a/runtime/runtime.go b/runtime/runtime.go index 2df1530c..567e8c3b 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -24,9 +24,9 @@ type Runtime interface { // Read returns the service Read(...ReadOption) ([]*Service, error) // Update the service in place - Update(*Service) error + Update(*Service, ...UpdateOption) error // Remove a service - Delete(*Service) error + Delete(*Service, ...DeleteOption) error // Logs returns the logs for a service Logs(*Service, ...LogsOption) (LogStream, error) // Start starts the runtime diff --git a/runtime/service/proto/runtime.proto b/runtime/service/proto/runtime.proto index 65c2c497..6c7033c9 100644 --- a/runtime/service/proto/runtime.proto +++ b/runtime/service/proto/runtime.proto @@ -60,11 +60,11 @@ message ReadOptions { } message ReadRequest { - ReadOptions options = 1; + ReadOptions options = 1; } message ReadResponse { - repeated Service services = 1; + repeated Service services = 1; } message DeleteRequest { @@ -100,10 +100,10 @@ message LogsRequest{ message LogRecord { // timestamp of log record - int64 timestamp = 1; - // record metadata - map metadata = 2; - // message - string message = 3; + int64 timestamp = 1; + // record metadata + map metadata = 2; + // message + string message = 3; } diff --git a/runtime/service/service.go b/runtime/service/service.go index d065038c..06cfc2ad 100644 --- a/runtime/service/service.go +++ b/runtime/service/service.go @@ -30,11 +30,13 @@ func (s *svc) Init(opts ...runtime.Option) error { // Create registers a service in the runtime func (s *svc) Create(svc *runtime.Service, opts ...runtime.CreateOption) error { - options := runtime.CreateOptions{} - // apply requested options + var options runtime.CreateOptions for _, o := range opts { o(&options) } + if options.Context == nil { + options.Context = context.Background() + } // set the default source from MICRO_RUNTIME_SOURCE if len(svc.Source) == 0 { @@ -58,15 +60,23 @@ func (s *svc) Create(svc *runtime.Service, opts ...runtime.CreateOption) error { }, } - if _, err := s.runtime.Create(context.Background(), req); err != nil { + if _, err := s.runtime.Create(options.Context, req); err != nil { return err } return nil } -func (s *svc) Logs(service *runtime.Service, options ...runtime.LogsOption) (runtime.LogStream, error) { - ls, err := s.runtime.Logs(context.Background(), &pb.LogsRequest{ +func (s *svc) Logs(service *runtime.Service, opts ...runtime.LogsOption) (runtime.LogStream, error) { + var options runtime.LogsOptions + for _, o := range opts { + o(&options) + } + if options.Context == nil { + options.Context = context.Background() + } + + ls, err := s.runtime.Logs(options.Context, &pb.LogsRequest{ Service: service.Name, Stream: true, Count: 10, // @todo pass in actual options @@ -122,11 +132,13 @@ func (l *serviceLogStream) Stop() error { // Read returns the service with the given name from the runtime func (s *svc) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) { - options := runtime.ReadOptions{} - // apply requested options + var options runtime.ReadOptions for _, o := range opts { o(&options) } + if options.Context == nil { + options.Context = context.Background() + } // runtime service create request req := &pb.ReadRequest{ @@ -137,7 +149,7 @@ func (s *svc) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) { }, } - resp, err := s.runtime.Read(context.Background(), req) + resp, err := s.runtime.Read(options.Context, req) if err != nil { return nil, err } @@ -157,7 +169,15 @@ func (s *svc) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) { } // Update updates the running service -func (s *svc) Update(svc *runtime.Service) error { +func (s *svc) Update(svc *runtime.Service, opts ...runtime.UpdateOption) error { + var options runtime.UpdateOptions + for _, o := range opts { + o(&options) + } + if options.Context == nil { + options.Context = context.Background() + } + // runtime service create request req := &pb.UpdateRequest{ Service: &pb.Service{ @@ -168,7 +188,7 @@ func (s *svc) Update(svc *runtime.Service) error { }, } - if _, err := s.runtime.Update(context.Background(), req); err != nil { + if _, err := s.runtime.Update(options.Context, req); err != nil { return err } @@ -176,7 +196,15 @@ func (s *svc) Update(svc *runtime.Service) error { } // Delete stops and removes the service from the runtime -func (s *svc) Delete(svc *runtime.Service) error { +func (s *svc) Delete(svc *runtime.Service, opts ...runtime.DeleteOption) error { + var options runtime.DeleteOptions + for _, o := range opts { + o(&options) + } + if options.Context == nil { + options.Context = context.Background() + } + // runtime service create request req := &pb.DeleteRequest{ Service: &pb.Service{ @@ -187,7 +215,7 @@ func (s *svc) Delete(svc *runtime.Service) error { }, } - if _, err := s.runtime.Delete(context.Background(), req); err != nil { + if _, err := s.runtime.Delete(options.Context, req); err != nil { return err } diff --git a/util/kubernetes/api/request.go b/util/kubernetes/api/request.go index 89b429b7..e5a96509 100644 --- a/util/kubernetes/api/request.go +++ b/util/kubernetes/api/request.go @@ -75,7 +75,9 @@ func (r *Request) Delete() *Request { // Namespace is to set the namespace to operate on func (r *Request) Namespace(s string) *Request { - r.namespace = s + if len(s) > 0 { + r.namespace = s + } return r } @@ -158,6 +160,9 @@ func (r *Request) SetHeader(key, value string) *Request { func (r *Request) request() (*http.Request, error) { var url string switch r.resource { + case "namespace": + // /api/v1/namespaces/ + url = fmt.Sprintf("%s/api/v1/namespaces/", r.host) case "pod", "service", "endpoint": // /api/v1/namespaces/{namespace}/pods url = fmt.Sprintf("%s/api/v1/namespaces/%s/%ss/", r.host, r.namespace, r.resource) diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go index e3cfc110..6b579777 100644 --- a/util/kubernetes/client/client.go +++ b/util/kubernetes/client/client.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "path" + "regexp" "strings" "github.com/micro/go-micro/v2/logger" @@ -23,6 +24,8 @@ var ( ErrReadNamespace = errors.New("Could not read namespace from service account secret") // DefaultImage is default micro image DefaultImage = "micro/go-micro" + // DefaultNamespace is the default k8s namespace + DefaultNamespace = "default" ) // Client ... @@ -33,41 +36,28 @@ type client struct { // Kubernetes client type Client interface { // Create creates new API resource - Create(*Resource) error + Create(*Resource, ...CreateOption) error // Get queries API resrouces - Get(*Resource, map[string]string) error + Get(*Resource, ...GetOption) error // Update patches existing API object - Update(*Resource) error + Update(*Resource, ...UpdateOption) error // Delete deletes API resource - Delete(*Resource) error + Delete(*Resource, ...DeleteOption) error // List lists API resources - List(*Resource) error + List(*Resource, ...ListOption) error // Log gets log for a pod Log(*Resource, ...LogOption) (io.ReadCloser, error) // Watch for events Watch(*Resource, ...WatchOption) (Watcher, error) } -func detectNamespace() (string, error) { - nsPath := path.Join(serviceAccountPath, "namespace") - - // Make sure it's a file and we can read it - if s, e := os.Stat(nsPath); e != nil { - return "", e - } else if s.IsDir() { - return "", ErrReadNamespace - } - - // Read the file, and cast to a string - if ns, e := ioutil.ReadFile(nsPath); e != nil { - return string(ns), e - } else { - return string(ns), nil - } -} - // Create creates new API object -func (c *client) Create(r *Resource) error { +func (c *client) Create(r *Resource, opts ...CreateOption) error { + var options CreateOptions + for _, o := range opts { + o(&options) + } + b := new(bytes.Buffer) if err := renderTemplate(r.Kind, b, r.Value); err != nil { return err @@ -76,18 +66,35 @@ func (c *client) Create(r *Resource) error { return api.NewRequest(c.opts). Post(). SetHeader("Content-Type", "application/yaml"). + Namespace(options.Namespace). Resource(r.Kind). Body(b). Do(). Error() } +var ( + nameRegex = regexp.MustCompile("[^a-zA-Z0-9]+") +) + +// SerializeResourceName removes all spacial chars from a string so it +// can be used as a k8s resource name +func SerializeResourceName(ns string) string { + return nameRegex.ReplaceAllString(ns, "-") +} + // Get queries API objects and stores the result in r -func (c *client) Get(r *Resource, labels map[string]string) error { +func (c *client) Get(r *Resource, opts ...GetOption) error { + var options GetOptions + for _, o := range opts { + o(&options) + } + return api.NewRequest(c.opts). Get(). Resource(r.Kind). - Params(&api.Params{LabelSelector: labels}). + Namespace(options.Namespace). + Params(&api.Params{LabelSelector: options.Labels}). Do(). Into(r.Value) } @@ -103,7 +110,8 @@ func (c *client) Log(r *Resource, opts ...LogOption) (io.ReadCloser, error) { Get(). Resource(r.Kind). SubResource("log"). - Name(r.Name) + Name(r.Name). + Namespace(options.Namespace) if options.Params != nil { req.Params(&api.Params{Additional: options.Params}) @@ -121,12 +129,18 @@ func (c *client) Log(r *Resource, opts ...LogOption) (io.ReadCloser, error) { } // Update updates API object -func (c *client) Update(r *Resource) error { +func (c *client) Update(r *Resource, opts ...UpdateOption) error { + var options UpdateOptions + for _, o := range opts { + o(&options) + } + req := api.NewRequest(c.opts). Patch(). SetHeader("Content-Type", "application/strategic-merge-patch+json"). Resource(r.Kind). - Name(r.Name) + Name(r.Name). + Namespace(options.Namespace) switch r.Kind { case "service": @@ -143,21 +157,33 @@ func (c *client) Update(r *Resource) error { } // Delete removes API object -func (c *client) Delete(r *Resource) error { +func (c *client) Delete(r *Resource, opts ...DeleteOption) error { + var options DeleteOptions + for _, o := range opts { + o(&options) + } + return api.NewRequest(c.opts). Delete(). Resource(r.Kind). Name(r.Name). + Namespace(options.Namespace). Do(). Error() } // List lists API objects and stores the result in r -func (c *client) List(r *Resource) error { +func (c *client) List(r *Resource, opts ...ListOption) error { + var options ListOptions + for _, o := range opts { + o(&options) + } + labels := map[string]string{ "micro": "service", } - return c.Get(r, labels) + + return c.Get(r, GetLabels(labels), GetNamespace(options.Namespace)) } // Watch returns an event stream @@ -183,13 +209,14 @@ func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) { Get(). Resource(r.Kind). Name(r.Name). + Namespace(options.Namespace). Params(params) return newWatcher(req) } // NewService returns default micro kubernetes service definition -func NewService(name, version, typ string) *Service { +func NewService(name, version, typ, namespace string) *Service { if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("kubernetes default service: name: %s, version: %s", name, version) } @@ -208,7 +235,7 @@ func NewService(name, version, typ string) *Service { Metadata := &Metadata{ Name: svcName, - Namespace: "default", + Namespace: SerializeResourceName(namespace), Version: version, Labels: Labels, } @@ -228,7 +255,7 @@ func NewService(name, version, typ string) *Service { } // NewService returns default micro kubernetes deployment definition -func NewDeployment(name, version, typ string) *Deployment { +func NewDeployment(name, version, typ, namespace string) *Deployment { if logger.V(logger.TraceLevel, logger.DefaultLogger) { logger.Tracef("kubernetes default deployment: name: %s, version: %s", name, version) } @@ -247,7 +274,7 @@ func NewDeployment(name, version, typ string) *Deployment { Metadata := &Metadata{ Name: depName, - Namespace: "default", + Namespace: SerializeResourceName(namespace), Version: version, Labels: Labels, Annotations: map[string]string{}, @@ -319,11 +346,6 @@ func NewClusterClient() *client { } t := string(token) - ns, err := detectNamespace() - if err != nil { - logger.Fatal(err) - } - crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt")) if err != nil { logger.Fatal(err) @@ -342,8 +364,8 @@ func NewClusterClient() *client { opts: &api.Options{ Client: c, Host: host, - Namespace: ns, BearerToken: &t, + Namespace: DefaultNamespace, }, } } diff --git a/util/kubernetes/client/options.go b/util/kubernetes/client/options.go index 0e293522..dd6b32a5 100644 --- a/util/kubernetes/client/options.go +++ b/util/kubernetes/client/options.go @@ -1,13 +1,38 @@ package client +type CreateOptions struct { + Namespace string +} + +type GetOptions struct { + Namespace string + Labels map[string]string +} +type UpdateOptions struct { + Namespace string +} +type DeleteOptions struct { + Namespace string +} +type ListOptions struct { + Namespace string +} + type LogOptions struct { - Params map[string]string + Namespace string + Params map[string]string } type WatchOptions struct { - Params map[string]string + Namespace string + Params map[string]string } +type CreateOption func(*CreateOptions) +type GetOption func(*GetOptions) +type UpdateOption func(*UpdateOptions) +type DeleteOption func(*DeleteOptions) +type ListOption func(*ListOptions) type LogOption func(*LogOptions) type WatchOption func(*WatchOptions) @@ -24,3 +49,59 @@ func WatchParams(p map[string]string) WatchOption { w.Params = p } } + +// CreateNamespace sets the namespace for creating a resource +func CreateNamespace(ns string) CreateOption { + return func(o *CreateOptions) { + o.Namespace = SerializeResourceName(ns) + } +} + +// GetNamespace sets the namespace for getting a resource +func GetNamespace(ns string) GetOption { + return func(o *GetOptions) { + o.Namespace = SerializeResourceName(ns) + } +} + +// GetLabels sets the labels for when getting a resource +func GetLabels(ls map[string]string) GetOption { + return func(o *GetOptions) { + o.Labels = ls + } +} + +// UpdateNamespace sets the namespace for updating a resource +func UpdateNamespace(ns string) UpdateOption { + return func(o *UpdateOptions) { + o.Namespace = SerializeResourceName(ns) + } +} + +// DeleteNamespace sets the namespace for deleting a resource +func DeleteNamespace(ns string) DeleteOption { + return func(o *DeleteOptions) { + o.Namespace = SerializeResourceName(ns) + } +} + +// ListNamespace sets the namespace for listing resources +func ListNamespace(ns string) ListOption { + return func(o *ListOptions) { + o.Namespace = SerializeResourceName(ns) + } +} + +// LogNamespace sets the namespace for logging a resource +func LogNamespace(ns string) LogOption { + return func(o *LogOptions) { + o.Namespace = SerializeResourceName(ns) + } +} + +// WatchNamespace sets the namespace for watching a resource +func WatchNamespace(ns string) WatchOption { + return func(o *WatchOptions) { + o.Namespace = SerializeResourceName(ns) + } +} diff --git a/util/kubernetes/client/types.go b/util/kubernetes/client/types.go index c19fff1c..5b58ab9f 100644 --- a/util/kubernetes/client/types.go +++ b/util/kubernetes/client/types.go @@ -183,3 +183,8 @@ type Template struct { type Namespace struct { Metadata *Metadata `json:"metadata,omitempty"` } + +// NamespaceList +type NamespaceList struct { + Items []Namespace `json:"items"` +} diff --git a/util/kubernetes/client/util_test.go b/util/kubernetes/client/util_test.go index 236f3185..8073a049 100644 --- a/util/kubernetes/client/util_test.go +++ b/util/kubernetes/client/util_test.go @@ -9,16 +9,17 @@ func TestTemplates(t *testing.T) { name := "foo" version := "123" typ := "service" + namespace := "default" // Render default service - s := NewService(name, version, typ) + s := NewService(name, version, typ, namespace) bs := new(bytes.Buffer) if err := renderTemplate(templates["service"], bs, s); err != nil { t.Errorf("Failed to render kubernetes service: %v", err) } // Render default deployment - d := NewDeployment(name, version, typ) + d := NewDeployment(name, version, typ, namespace) bd := new(bytes.Buffer) if err := renderTemplate(templates["deployment"], bd, d); err != nil { t.Errorf("Failed to render kubernetes deployment: %v", err)