Runtime Namespace (#1547)

* Add context option to runtime; Add dynamic namespace to kubectl client

* Add namespace runtime arg

* Fixes & Debugging

* Pass options in k8s runtime

* Set namespace on k8s resources

* Additional Logging

* More debugging

* Remove Debugging

* Ensure namespace exists

* Add debugging

* Refactor namespaceExists check

* Fix

* Fix

* Fix

* Fix

* Change the way we check for namespace

* Fix

* Tidying Up

* Fix Test

* Fix merge bugs

* Serialize k8s namespaces

* Add namespace to watch

* Serialize namespace when creating k8s namespace

Co-authored-by: Ben Toogood <ben@micro.mu>
Co-authored-by: Asim Aslam <asim@aslam.me>
This commit is contained in:
ben-toogood 2020-04-23 13:53:42 +01:00 committed by GitHub
parent 7345ce9192
commit 692b27578c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 411 additions and 109 deletions

View File

@ -67,7 +67,7 @@ func (k *klog) getMatchingPods() ([]string, error) {
// TODO: specify micro:service // TODO: specify micro:service
// l["micro"] = "service" // l["micro"] = "service"
if err := k.client.Get(r, l); err != nil { if err := k.client.Get(r, client.GetLabels(l)); err != nil {
return nil, err return nil, err
} }

View File

@ -335,7 +335,7 @@ func (r *runtime) Read(opts ...ReadOption) ([]*Service, error) {
} }
// Update attemps to update the service // Update attemps to update the service
func (r *runtime) Update(s *Service) error { func (r *runtime) Update(s *Service, opts ...UpdateOption) error {
r.Lock() r.Lock()
service, ok := r.services[serviceKey(s)] service, ok := r.services[serviceKey(s)]
r.Unlock() r.Unlock()
@ -350,7 +350,7 @@ func (r *runtime) Update(s *Service) error {
} }
// Delete removes the service from the runtime and stops it // Delete removes the service from the runtime and stops it
func (r *runtime) Delete(s *Service) error { func (r *runtime) Delete(s *Service, opts ...DeleteOption) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()

View File

@ -6,7 +6,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/micro/go-micro/v2/logger" log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/runtime" "github.com/micro/go-micro/v2/runtime"
"github.com/micro/go-micro/v2/util/kubernetes/client" "github.com/micro/go-micro/v2/util/kubernetes/client"
) )
@ -24,11 +24,48 @@ type kubernetes struct {
closed chan bool closed chan bool
// client is kubernetes client // client is kubernetes client
client client.Client client client.Client
// namespaces which exist
namespaces []client.Namespace
}
// namespaceExists returns a boolean indicating if a namespace exists
func (k *kubernetes) namespaceExists(name string) (bool, error) {
// populate the cache
if k.namespaces == nil {
namespaceList := new(client.NamespaceList)
resource := &client.Resource{Kind: "namespace", Value: namespaceList}
if err := k.client.List(resource); err != nil {
return false, err
}
k.namespaces = namespaceList.Items
}
// check if the namespace exists in the cache
for _, n := range k.namespaces {
if n.Metadata.Name == name {
return true, nil
}
}
return false, nil
}
// createNamespace creates a new k8s namespace
func (k *kubernetes) createNamespace(namespace string) error {
ns := client.Namespace{Metadata: &client.Metadata{Name: namespace}}
err := k.client.Create(&client.Resource{Kind: "namespace", Value: ns})
// add to cache
if err == nil && k.namespaces != nil {
k.namespaces = append(k.namespaces, ns)
}
return err
} }
// getService queries kubernetes for micro service // getService queries kubernetes for micro service
// NOTE: this function is not thread-safe // NOTE: this function is not thread-safe
func (k *kubernetes) getService(labels map[string]string) ([]*service, error) { func (k *kubernetes) getService(labels map[string]string, opts ...client.GetOption) ([]*service, error) {
// get the service status // get the service status
serviceList := new(client.ServiceList) serviceList := new(client.ServiceList)
r := &client.Resource{ r := &client.Resource{
@ -36,8 +73,10 @@ func (k *kubernetes) getService(labels map[string]string) ([]*service, error) {
Value: serviceList, Value: serviceList,
} }
opts = append(opts, client.GetLabels(labels))
// get the service from k8s // get the service from k8s
if err := k.client.Get(r, labels); err != nil { if err := k.client.Get(r, opts...); err != nil {
return nil, err return nil, err
} }
@ -47,7 +86,7 @@ func (k *kubernetes) getService(labels map[string]string) ([]*service, error) {
Kind: "deployment", Kind: "deployment",
Value: depList, Value: depList,
} }
if err := k.client.Get(d, labels); err != nil { if err := k.client.Get(d, opts...); err != nil {
return nil, err return nil, err
} }
@ -57,7 +96,7 @@ func (k *kubernetes) getService(labels map[string]string) ([]*service, error) {
Kind: "pod", Kind: "pod",
Value: podList, Value: podList,
} }
if err := k.client.Get(p, labels); err != nil { if err := k.client.Get(p, opts...); err != nil {
return nil, err return nil, err
} }
@ -206,8 +245,8 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
// - do we even need the ticker for k8s services? // - do we even need the ticker for k8s services?
case event := <-events: case event := <-events:
// NOTE: we only handle Update events for now // NOTE: we only handle Update events for now
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if log.V(log.DebugLevel, log.DefaultLogger) {
logger.Debugf("Runtime received notification event: %v", event) log.Debugf("Runtime received notification event: %v", event)
} }
switch event.Type { switch event.Type {
case runtime.Update: case runtime.Update:
@ -237,11 +276,11 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
err := k.client.Get(&client.Resource{ err := k.client.Get(&client.Resource{
Kind: "deployment", Kind: "deployment",
Value: deployed, Value: deployed,
}, labels) }, client.GetLabels(labels))
if err != nil { if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if log.V(log.DebugLevel, log.DefaultLogger) {
logger.Debugf("Runtime update failed to get service %s: %v", event.Service, err) log.Debugf("Runtime update failed to get service %s: %v", event.Service, err)
} }
continue continue
} }
@ -262,20 +301,20 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
// update the build time // update the build time
service.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", event.Timestamp.Unix()) service.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", event.Timestamp.Unix())
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if log.V(log.DebugLevel, log.DefaultLogger) {
logger.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name) log.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name)
} }
if err := k.client.Update(deploymentResource(&service)); err != nil { if err := k.client.Update(deploymentResource(&service)); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if log.V(log.DebugLevel, log.DefaultLogger) {
logger.Debugf("Runtime failed to update service %s: %v", event.Service, err) log.Debugf("Runtime failed to update service %s: %v", event.Service, err)
} }
continue continue
} }
} }
} }
case <-k.closed: case <-k.closed:
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if log.V(log.DebugLevel, log.DefaultLogger) {
logger.Debugf("Runtime stopped") log.Debugf("Runtime stopped")
} }
return return
} }
@ -305,7 +344,7 @@ func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (ru
go func() { go func() {
records, err := klo.Read() records, err := klo.Read()
if err != nil { if err != nil {
logger.Errorf("Failed to get logs for service '%v' from k8s: %v", err) log.Errorf("Failed to get logs for service '%v' from k8s: %v", err)
return return
} }
// @todo: this might actually not run before podLogStream starts // @todo: this might actually not run before podLogStream starts
@ -371,6 +410,16 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er
s.Source = k.options.Source s.Source = k.options.Source
} }
// ensure the namespace exists
namespace := client.SerializeResourceName(options.Namespace)
if exist, err := k.namespaceExists(namespace); err == nil && !exist {
if err := k.createNamespace(namespace); err != nil {
return err
}
} else if err != nil {
return err
}
// determine the image from the source and options // determine the image from the source and options
options.Image = k.getImage(s, options) options.Image = k.getImage(s, options)
@ -378,7 +427,7 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er
service := newService(s, options) service := newService(s, options)
// start the service // start the service
return service.Start(k.client) return service.Start(k.client, client.CreateNamespace(options.Namespace))
} }
// Read returns all instances of given service // Read returns all instances of given service
@ -423,8 +472,12 @@ func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error
} }
// Update the service in place // Update the service in place
func (k *kubernetes) Update(s *runtime.Service) error { func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) error {
// TODO: set the type var options runtime.UpdateOptions
for _, o := range opts {
o(&options)
}
labels := map[string]string{} labels := map[string]string{}
if len(s.Name) > 0 { if len(s.Name) > 0 {
@ -459,7 +512,7 @@ func (k *kubernetes) Update(s *runtime.Service) error {
service.kdeploy.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix()) service.kdeploy.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix())
// update the service // update the service
if err := service.Update(k.client); err != nil { if err := service.Update(k.client, client.UpdateNamespace(options.Namespace)); err != nil {
return err return err
} }
} }
@ -468,16 +521,22 @@ func (k *kubernetes) Update(s *runtime.Service) error {
} }
// Delete removes a service // Delete removes a service
func (k *kubernetes) Delete(s *runtime.Service) error { func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error {
var options runtime.DeleteOptions
for _, o := range opts {
o(&options)
}
k.Lock() k.Lock()
defer k.Unlock() defer k.Unlock()
// create new kubernetes micro service // create new kubernetes micro service
service := newService(s, runtime.CreateOptions{ service := newService(s, runtime.CreateOptions{
Type: k.options.Type, Type: k.options.Type,
Namespace: options.Namespace,
}) })
return service.Stop(k.client) return service.Stop(k.client, client.DeleteNamespace(options.Namespace))
} }
// Start starts the runtime // Start starts the runtime
@ -500,8 +559,8 @@ func (k *kubernetes) Start() error {
events, err = k.options.Scheduler.Notify() events, err = k.options.Scheduler.Notify()
if err != nil { if err != nil {
// TODO: should we bail here? // TODO: should we bail here?
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if log.V(log.DebugLevel, log.DefaultLogger) {
logger.Debugf("Runtime failed to start update notifier") log.Debugf("Runtime failed to start update notifier")
} }
} }
} }

View File

@ -70,7 +70,7 @@ func (k *klog) getMatchingPods() ([]string, error) {
// TODO: specify micro:service // TODO: specify micro:service
// l["micro"] = "service" // l["micro"] = "service"
if err := k.client.Get(r, l); err != nil { if err := k.client.Get(r, client.GetLabels(l)); err != nil {
return nil, err return nil, err
} }

View File

@ -30,8 +30,8 @@ func newService(s *runtime.Service, c runtime.CreateOptions) *service {
name := client.Format(s.Name) name := client.Format(s.Name)
version := client.Format(s.Version) version := client.Format(s.Version)
kservice := client.NewService(name, version, c.Type) kservice := client.NewService(name, version, c.Type, c.Namespace)
kdeploy := client.NewDeployment(name, version, c.Type) kdeploy := client.NewDeployment(name, version, c.Type, c.Namespace)
// ensure the metadata is set // ensure the metadata is set
if kdeploy.Spec.Template.Metadata.Annotations == nil { if kdeploy.Spec.Template.Metadata.Annotations == nil {
@ -112,9 +112,9 @@ func serviceResource(s *client.Service) *client.Resource {
} }
// Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects // Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects
func (s *service) Start(k client.Client) error { func (s *service) Start(k client.Client, opts ...client.CreateOption) error {
// create deployment first; if we fail, we dont create service // create deployment first; if we fail, we dont create service
if err := k.Create(deploymentResource(s.kdeploy)); err != nil { if err := k.Create(deploymentResource(s.kdeploy), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to create deployment: %v", err) logger.Debugf("Runtime failed to create deployment: %v", err)
} }
@ -126,7 +126,7 @@ func (s *service) Start(k client.Client) error {
return err return err
} }
// create service now that the deployment has been created // create service now that the deployment has been created
if err := k.Create(serviceResource(s.kservice)); err != nil { if err := k.Create(serviceResource(s.kservice), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to create service: %v", err) logger.Debugf("Runtime failed to create service: %v", err)
} }
@ -143,9 +143,9 @@ func (s *service) Start(k client.Client) error {
return nil return nil
} }
func (s *service) Stop(k client.Client) error { func (s *service) Stop(k client.Client, opts ...client.DeleteOption) error {
// first attempt to delete service // first attempt to delete service
if err := k.Delete(serviceResource(s.kservice)); err != nil { if err := k.Delete(serviceResource(s.kservice), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to delete service: %v", err) logger.Debugf("Runtime failed to delete service: %v", err)
} }
@ -153,7 +153,7 @@ func (s *service) Stop(k client.Client) error {
return err return err
} }
// delete deployment once the service has been deleted // delete deployment once the service has been deleted
if err := k.Delete(deploymentResource(s.kdeploy)); err != nil { if err := k.Delete(deploymentResource(s.kdeploy), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to delete deployment: %v", err) logger.Debugf("Runtime failed to delete deployment: %v", err)
} }
@ -166,15 +166,15 @@ func (s *service) Stop(k client.Client) error {
return nil return nil
} }
func (s *service) Update(k client.Client) error { func (s *service) Update(k client.Client, opts ...client.UpdateOption) error {
if err := k.Update(deploymentResource(s.kdeploy)); err != nil { if err := k.Update(deploymentResource(s.kdeploy), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to update deployment: %v", err) logger.Debugf("Runtime failed to update deployment: %v", err)
} }
s.Status("error", err) s.Status("error", err)
return err return err
} }
if err := k.Update(serviceResource(s.kservice)); err != nil { if err := k.Update(serviceResource(s.kservice), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to update service: %v", err) logger.Debugf("Runtime failed to update service: %v", err)
} }

View File

@ -1,6 +1,7 @@
package runtime package runtime
import ( import (
"context"
"io" "io"
) )
@ -66,6 +67,10 @@ type CreateOptions struct {
Retries int Retries int
// Specify the image to use // Specify the image to use
Image string Image string
// Namespace to create the service in
Namespace string
// Specify the context to use
Context context.Context
} }
// ReadOptions queries runtime services // ReadOptions queries runtime services
@ -76,6 +81,10 @@ type ReadOptions struct {
Version string Version string
// Type of service // Type of service
Type string Type string
// Namespace the service is running in
Namespace string
// Specify the context to use
Context context.Context
} }
// CreateType sets the type of service to create // CreateType sets the type of service to create
@ -92,6 +101,20 @@ func CreateImage(img string) CreateOption {
} }
} }
// CreateNamespace sets the namespace
func CreateNamespace(ns string) CreateOption {
return func(o *CreateOptions) {
o.Namespace = ns
}
}
// CreateContext sets the context
func CreateContext(ctx context.Context) CreateOption {
return func(o *CreateOptions) {
o.Context = ctx
}
}
// WithCommand specifies the command to execute // WithCommand specifies the command to execute
func WithCommand(cmd ...string) CreateOption { func WithCommand(cmd ...string) CreateOption {
return func(o *CreateOptions) { return func(o *CreateOptions) {
@ -150,6 +173,66 @@ func ReadType(t string) ReadOption {
} }
} }
// ReadNamespace sets the namespace
func ReadNamespace(ns string) ReadOption {
return func(o *ReadOptions) {
o.Namespace = ns
}
}
// ReadContext sets the context
func ReadContext(ctx context.Context) ReadOption {
return func(o *ReadOptions) {
o.Context = ctx
}
}
type UpdateOption func(o *UpdateOptions)
type UpdateOptions struct {
// Namespace the service is running in
Namespace string
// Specify the context to use
Context context.Context
}
// UpdateNamespace sets the namespace
func UpdateNamespace(ns string) UpdateOption {
return func(o *UpdateOptions) {
o.Namespace = ns
}
}
// UpdateContext sets the context
func UpdateContext(ctx context.Context) UpdateOption {
return func(o *UpdateOptions) {
o.Context = ctx
}
}
type DeleteOption func(o *DeleteOptions)
type DeleteOptions struct {
// Namespace the service is running in
Namespace string
// Specify the context to use
Context context.Context
}
// DeleteNamespace sets the namespace
func DeleteNamespace(ns string) DeleteOption {
return func(o *DeleteOptions) {
o.Namespace = ns
}
}
// DeleteContext sets the context
func DeleteContext(ctx context.Context) DeleteOption {
return func(o *DeleteOptions) {
o.Context = ctx
}
}
// LogsOption configures runtime logging // LogsOption configures runtime logging
type LogsOption func(o *LogsOptions) type LogsOption func(o *LogsOptions)
@ -159,6 +242,10 @@ type LogsOptions struct {
Count int64 Count int64
// Stream new lines? // Stream new lines?
Stream bool Stream bool
// Namespace the service is running in
Namespace string
// Specify the context to use
Context context.Context
} }
// LogsExistingCount confiures how many existing lines to show // LogsExistingCount confiures how many existing lines to show
@ -174,3 +261,17 @@ func LogsStream(stream bool) LogsOption {
l.Stream = stream l.Stream = stream
} }
} }
// LogsNamespace sets the namespace
func LogsNamespace(ns string) LogsOption {
return func(o *LogsOptions) {
o.Namespace = ns
}
}
// LogsContext sets the context
func LogsContext(ctx context.Context) LogsOption {
return func(o *LogsOptions) {
o.Context = ctx
}
}

View File

@ -24,9 +24,9 @@ type Runtime interface {
// Read returns the service // Read returns the service
Read(...ReadOption) ([]*Service, error) Read(...ReadOption) ([]*Service, error)
// Update the service in place // Update the service in place
Update(*Service) error Update(*Service, ...UpdateOption) error
// Remove a service // Remove a service
Delete(*Service) error Delete(*Service, ...DeleteOption) error
// Logs returns the logs for a service // Logs returns the logs for a service
Logs(*Service, ...LogsOption) (LogStream, error) Logs(*Service, ...LogsOption) (LogStream, error)
// Start starts the runtime // Start starts the runtime

View File

@ -60,11 +60,11 @@ message ReadOptions {
} }
message ReadRequest { message ReadRequest {
ReadOptions options = 1; ReadOptions options = 1;
} }
message ReadResponse { message ReadResponse {
repeated Service services = 1; repeated Service services = 1;
} }
message DeleteRequest { message DeleteRequest {
@ -100,10 +100,10 @@ message LogsRequest{
message LogRecord { message LogRecord {
// timestamp of log record // timestamp of log record
int64 timestamp = 1; int64 timestamp = 1;
// record metadata // record metadata
map<string,string> metadata = 2; map<string,string> metadata = 2;
// message // message
string message = 3; string message = 3;
} }

View File

@ -30,11 +30,13 @@ func (s *svc) Init(opts ...runtime.Option) error {
// Create registers a service in the runtime // Create registers a service in the runtime
func (s *svc) Create(svc *runtime.Service, opts ...runtime.CreateOption) error { func (s *svc) Create(svc *runtime.Service, opts ...runtime.CreateOption) error {
options := runtime.CreateOptions{} var options runtime.CreateOptions
// apply requested options
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
if options.Context == nil {
options.Context = context.Background()
}
// set the default source from MICRO_RUNTIME_SOURCE // set the default source from MICRO_RUNTIME_SOURCE
if len(svc.Source) == 0 { if len(svc.Source) == 0 {
@ -58,15 +60,23 @@ func (s *svc) Create(svc *runtime.Service, opts ...runtime.CreateOption) error {
}, },
} }
if _, err := s.runtime.Create(context.Background(), req); err != nil { if _, err := s.runtime.Create(options.Context, req); err != nil {
return err return err
} }
return nil return nil
} }
func (s *svc) Logs(service *runtime.Service, options ...runtime.LogsOption) (runtime.LogStream, error) { func (s *svc) Logs(service *runtime.Service, opts ...runtime.LogsOption) (runtime.LogStream, error) {
ls, err := s.runtime.Logs(context.Background(), &pb.LogsRequest{ var options runtime.LogsOptions
for _, o := range opts {
o(&options)
}
if options.Context == nil {
options.Context = context.Background()
}
ls, err := s.runtime.Logs(options.Context, &pb.LogsRequest{
Service: service.Name, Service: service.Name,
Stream: true, Stream: true,
Count: 10, // @todo pass in actual options Count: 10, // @todo pass in actual options
@ -122,11 +132,13 @@ func (l *serviceLogStream) Stop() error {
// Read returns the service with the given name from the runtime // Read returns the service with the given name from the runtime
func (s *svc) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) { func (s *svc) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) {
options := runtime.ReadOptions{} var options runtime.ReadOptions
// apply requested options
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
if options.Context == nil {
options.Context = context.Background()
}
// runtime service create request // runtime service create request
req := &pb.ReadRequest{ req := &pb.ReadRequest{
@ -137,7 +149,7 @@ func (s *svc) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) {
}, },
} }
resp, err := s.runtime.Read(context.Background(), req) resp, err := s.runtime.Read(options.Context, req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -157,7 +169,15 @@ func (s *svc) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) {
} }
// Update updates the running service // Update updates the running service
func (s *svc) Update(svc *runtime.Service) error { func (s *svc) Update(svc *runtime.Service, opts ...runtime.UpdateOption) error {
var options runtime.UpdateOptions
for _, o := range opts {
o(&options)
}
if options.Context == nil {
options.Context = context.Background()
}
// runtime service create request // runtime service create request
req := &pb.UpdateRequest{ req := &pb.UpdateRequest{
Service: &pb.Service{ Service: &pb.Service{
@ -168,7 +188,7 @@ func (s *svc) Update(svc *runtime.Service) error {
}, },
} }
if _, err := s.runtime.Update(context.Background(), req); err != nil { if _, err := s.runtime.Update(options.Context, req); err != nil {
return err return err
} }
@ -176,7 +196,15 @@ func (s *svc) Update(svc *runtime.Service) error {
} }
// Delete stops and removes the service from the runtime // Delete stops and removes the service from the runtime
func (s *svc) Delete(svc *runtime.Service) error { func (s *svc) Delete(svc *runtime.Service, opts ...runtime.DeleteOption) error {
var options runtime.DeleteOptions
for _, o := range opts {
o(&options)
}
if options.Context == nil {
options.Context = context.Background()
}
// runtime service create request // runtime service create request
req := &pb.DeleteRequest{ req := &pb.DeleteRequest{
Service: &pb.Service{ Service: &pb.Service{
@ -187,7 +215,7 @@ func (s *svc) Delete(svc *runtime.Service) error {
}, },
} }
if _, err := s.runtime.Delete(context.Background(), req); err != nil { if _, err := s.runtime.Delete(options.Context, req); err != nil {
return err return err
} }

View File

@ -75,7 +75,9 @@ func (r *Request) Delete() *Request {
// Namespace is to set the namespace to operate on // Namespace is to set the namespace to operate on
func (r *Request) Namespace(s string) *Request { func (r *Request) Namespace(s string) *Request {
r.namespace = s if len(s) > 0 {
r.namespace = s
}
return r return r
} }
@ -158,6 +160,9 @@ func (r *Request) SetHeader(key, value string) *Request {
func (r *Request) request() (*http.Request, error) { func (r *Request) request() (*http.Request, error) {
var url string var url string
switch r.resource { switch r.resource {
case "namespace":
// /api/v1/namespaces/
url = fmt.Sprintf("%s/api/v1/namespaces/", r.host)
case "pod", "service", "endpoint": case "pod", "service", "endpoint":
// /api/v1/namespaces/{namespace}/pods // /api/v1/namespaces/{namespace}/pods
url = fmt.Sprintf("%s/api/v1/namespaces/%s/%ss/", r.host, r.namespace, r.resource) url = fmt.Sprintf("%s/api/v1/namespaces/%s/%ss/", r.host, r.namespace, r.resource)

View File

@ -10,6 +10,7 @@ import (
"net/http" "net/http"
"os" "os"
"path" "path"
"regexp"
"strings" "strings"
"github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/logger"
@ -23,6 +24,8 @@ var (
ErrReadNamespace = errors.New("Could not read namespace from service account secret") ErrReadNamespace = errors.New("Could not read namespace from service account secret")
// DefaultImage is default micro image // DefaultImage is default micro image
DefaultImage = "micro/go-micro" DefaultImage = "micro/go-micro"
// DefaultNamespace is the default k8s namespace
DefaultNamespace = "default"
) )
// Client ... // Client ...
@ -33,41 +36,28 @@ type client struct {
// Kubernetes client // Kubernetes client
type Client interface { type Client interface {
// Create creates new API resource // Create creates new API resource
Create(*Resource) error Create(*Resource, ...CreateOption) error
// Get queries API resrouces // Get queries API resrouces
Get(*Resource, map[string]string) error Get(*Resource, ...GetOption) error
// Update patches existing API object // Update patches existing API object
Update(*Resource) error Update(*Resource, ...UpdateOption) error
// Delete deletes API resource // Delete deletes API resource
Delete(*Resource) error Delete(*Resource, ...DeleteOption) error
// List lists API resources // List lists API resources
List(*Resource) error List(*Resource, ...ListOption) error
// Log gets log for a pod // Log gets log for a pod
Log(*Resource, ...LogOption) (io.ReadCloser, error) Log(*Resource, ...LogOption) (io.ReadCloser, error)
// Watch for events // Watch for events
Watch(*Resource, ...WatchOption) (Watcher, error) Watch(*Resource, ...WatchOption) (Watcher, error)
} }
func detectNamespace() (string, error) {
nsPath := path.Join(serviceAccountPath, "namespace")
// Make sure it's a file and we can read it
if s, e := os.Stat(nsPath); e != nil {
return "", e
} else if s.IsDir() {
return "", ErrReadNamespace
}
// Read the file, and cast to a string
if ns, e := ioutil.ReadFile(nsPath); e != nil {
return string(ns), e
} else {
return string(ns), nil
}
}
// Create creates new API object // Create creates new API object
func (c *client) Create(r *Resource) error { func (c *client) Create(r *Resource, opts ...CreateOption) error {
var options CreateOptions
for _, o := range opts {
o(&options)
}
b := new(bytes.Buffer) b := new(bytes.Buffer)
if err := renderTemplate(r.Kind, b, r.Value); err != nil { if err := renderTemplate(r.Kind, b, r.Value); err != nil {
return err return err
@ -76,18 +66,35 @@ func (c *client) Create(r *Resource) error {
return api.NewRequest(c.opts). return api.NewRequest(c.opts).
Post(). Post().
SetHeader("Content-Type", "application/yaml"). SetHeader("Content-Type", "application/yaml").
Namespace(options.Namespace).
Resource(r.Kind). Resource(r.Kind).
Body(b). Body(b).
Do(). Do().
Error() Error()
} }
var (
nameRegex = regexp.MustCompile("[^a-zA-Z0-9]+")
)
// SerializeResourceName removes all spacial chars from a string so it
// can be used as a k8s resource name
func SerializeResourceName(ns string) string {
return nameRegex.ReplaceAllString(ns, "-")
}
// Get queries API objects and stores the result in r // Get queries API objects and stores the result in r
func (c *client) Get(r *Resource, labels map[string]string) error { func (c *client) Get(r *Resource, opts ...GetOption) error {
var options GetOptions
for _, o := range opts {
o(&options)
}
return api.NewRequest(c.opts). return api.NewRequest(c.opts).
Get(). Get().
Resource(r.Kind). Resource(r.Kind).
Params(&api.Params{LabelSelector: labels}). Namespace(options.Namespace).
Params(&api.Params{LabelSelector: options.Labels}).
Do(). Do().
Into(r.Value) Into(r.Value)
} }
@ -103,7 +110,8 @@ func (c *client) Log(r *Resource, opts ...LogOption) (io.ReadCloser, error) {
Get(). Get().
Resource(r.Kind). Resource(r.Kind).
SubResource("log"). SubResource("log").
Name(r.Name) Name(r.Name).
Namespace(options.Namespace)
if options.Params != nil { if options.Params != nil {
req.Params(&api.Params{Additional: options.Params}) req.Params(&api.Params{Additional: options.Params})
@ -121,12 +129,18 @@ func (c *client) Log(r *Resource, opts ...LogOption) (io.ReadCloser, error) {
} }
// Update updates API object // Update updates API object
func (c *client) Update(r *Resource) error { func (c *client) Update(r *Resource, opts ...UpdateOption) error {
var options UpdateOptions
for _, o := range opts {
o(&options)
}
req := api.NewRequest(c.opts). req := api.NewRequest(c.opts).
Patch(). Patch().
SetHeader("Content-Type", "application/strategic-merge-patch+json"). SetHeader("Content-Type", "application/strategic-merge-patch+json").
Resource(r.Kind). Resource(r.Kind).
Name(r.Name) Name(r.Name).
Namespace(options.Namespace)
switch r.Kind { switch r.Kind {
case "service": case "service":
@ -143,21 +157,33 @@ func (c *client) Update(r *Resource) error {
} }
// Delete removes API object // Delete removes API object
func (c *client) Delete(r *Resource) error { func (c *client) Delete(r *Resource, opts ...DeleteOption) error {
var options DeleteOptions
for _, o := range opts {
o(&options)
}
return api.NewRequest(c.opts). return api.NewRequest(c.opts).
Delete(). Delete().
Resource(r.Kind). Resource(r.Kind).
Name(r.Name). Name(r.Name).
Namespace(options.Namespace).
Do(). Do().
Error() Error()
} }
// List lists API objects and stores the result in r // List lists API objects and stores the result in r
func (c *client) List(r *Resource) error { func (c *client) List(r *Resource, opts ...ListOption) error {
var options ListOptions
for _, o := range opts {
o(&options)
}
labels := map[string]string{ labels := map[string]string{
"micro": "service", "micro": "service",
} }
return c.Get(r, labels)
return c.Get(r, GetLabels(labels), GetNamespace(options.Namespace))
} }
// Watch returns an event stream // Watch returns an event stream
@ -183,13 +209,14 @@ func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) {
Get(). Get().
Resource(r.Kind). Resource(r.Kind).
Name(r.Name). Name(r.Name).
Namespace(options.Namespace).
Params(params) Params(params)
return newWatcher(req) return newWatcher(req)
} }
// NewService returns default micro kubernetes service definition // NewService returns default micro kubernetes service definition
func NewService(name, version, typ string) *Service { func NewService(name, version, typ, namespace string) *Service {
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("kubernetes default service: name: %s, version: %s", name, version) logger.Tracef("kubernetes default service: name: %s, version: %s", name, version)
} }
@ -208,7 +235,7 @@ func NewService(name, version, typ string) *Service {
Metadata := &Metadata{ Metadata := &Metadata{
Name: svcName, Name: svcName,
Namespace: "default", Namespace: SerializeResourceName(namespace),
Version: version, Version: version,
Labels: Labels, Labels: Labels,
} }
@ -228,7 +255,7 @@ func NewService(name, version, typ string) *Service {
} }
// NewService returns default micro kubernetes deployment definition // NewService returns default micro kubernetes deployment definition
func NewDeployment(name, version, typ string) *Deployment { func NewDeployment(name, version, typ, namespace string) *Deployment {
if logger.V(logger.TraceLevel, logger.DefaultLogger) { if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("kubernetes default deployment: name: %s, version: %s", name, version) logger.Tracef("kubernetes default deployment: name: %s, version: %s", name, version)
} }
@ -247,7 +274,7 @@ func NewDeployment(name, version, typ string) *Deployment {
Metadata := &Metadata{ Metadata := &Metadata{
Name: depName, Name: depName,
Namespace: "default", Namespace: SerializeResourceName(namespace),
Version: version, Version: version,
Labels: Labels, Labels: Labels,
Annotations: map[string]string{}, Annotations: map[string]string{},
@ -319,11 +346,6 @@ func NewClusterClient() *client {
} }
t := string(token) t := string(token)
ns, err := detectNamespace()
if err != nil {
logger.Fatal(err)
}
crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt")) crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt"))
if err != nil { if err != nil {
logger.Fatal(err) logger.Fatal(err)
@ -342,8 +364,8 @@ func NewClusterClient() *client {
opts: &api.Options{ opts: &api.Options{
Client: c, Client: c,
Host: host, Host: host,
Namespace: ns,
BearerToken: &t, BearerToken: &t,
Namespace: DefaultNamespace,
}, },
} }
} }

View File

@ -1,13 +1,38 @@
package client package client
type CreateOptions struct {
Namespace string
}
type GetOptions struct {
Namespace string
Labels map[string]string
}
type UpdateOptions struct {
Namespace string
}
type DeleteOptions struct {
Namespace string
}
type ListOptions struct {
Namespace string
}
type LogOptions struct { type LogOptions struct {
Params map[string]string Namespace string
Params map[string]string
} }
type WatchOptions struct { type WatchOptions struct {
Params map[string]string Namespace string
Params map[string]string
} }
type CreateOption func(*CreateOptions)
type GetOption func(*GetOptions)
type UpdateOption func(*UpdateOptions)
type DeleteOption func(*DeleteOptions)
type ListOption func(*ListOptions)
type LogOption func(*LogOptions) type LogOption func(*LogOptions)
type WatchOption func(*WatchOptions) type WatchOption func(*WatchOptions)
@ -24,3 +49,59 @@ func WatchParams(p map[string]string) WatchOption {
w.Params = p w.Params = p
} }
} }
// CreateNamespace sets the namespace for creating a resource
func CreateNamespace(ns string) CreateOption {
return func(o *CreateOptions) {
o.Namespace = SerializeResourceName(ns)
}
}
// GetNamespace sets the namespace for getting a resource
func GetNamespace(ns string) GetOption {
return func(o *GetOptions) {
o.Namespace = SerializeResourceName(ns)
}
}
// GetLabels sets the labels for when getting a resource
func GetLabels(ls map[string]string) GetOption {
return func(o *GetOptions) {
o.Labels = ls
}
}
// UpdateNamespace sets the namespace for updating a resource
func UpdateNamespace(ns string) UpdateOption {
return func(o *UpdateOptions) {
o.Namespace = SerializeResourceName(ns)
}
}
// DeleteNamespace sets the namespace for deleting a resource
func DeleteNamespace(ns string) DeleteOption {
return func(o *DeleteOptions) {
o.Namespace = SerializeResourceName(ns)
}
}
// ListNamespace sets the namespace for listing resources
func ListNamespace(ns string) ListOption {
return func(o *ListOptions) {
o.Namespace = SerializeResourceName(ns)
}
}
// LogNamespace sets the namespace for logging a resource
func LogNamespace(ns string) LogOption {
return func(o *LogOptions) {
o.Namespace = SerializeResourceName(ns)
}
}
// WatchNamespace sets the namespace for watching a resource
func WatchNamespace(ns string) WatchOption {
return func(o *WatchOptions) {
o.Namespace = SerializeResourceName(ns)
}
}

View File

@ -183,3 +183,8 @@ type Template struct {
type Namespace struct { type Namespace struct {
Metadata *Metadata `json:"metadata,omitempty"` Metadata *Metadata `json:"metadata,omitempty"`
} }
// NamespaceList
type NamespaceList struct {
Items []Namespace `json:"items"`
}

View File

@ -9,16 +9,17 @@ func TestTemplates(t *testing.T) {
name := "foo" name := "foo"
version := "123" version := "123"
typ := "service" typ := "service"
namespace := "default"
// Render default service // Render default service
s := NewService(name, version, typ) s := NewService(name, version, typ, namespace)
bs := new(bytes.Buffer) bs := new(bytes.Buffer)
if err := renderTemplate(templates["service"], bs, s); err != nil { if err := renderTemplate(templates["service"], bs, s); err != nil {
t.Errorf("Failed to render kubernetes service: %v", err) t.Errorf("Failed to render kubernetes service: %v", err)
} }
// Render default deployment // Render default deployment
d := NewDeployment(name, version, typ) d := NewDeployment(name, version, typ, namespace)
bd := new(bytes.Buffer) bd := new(bytes.Buffer)
if err := renderTemplate(templates["deployment"], bd, d); err != nil { if err := renderTemplate(templates["deployment"], bd, d); err != nil {
t.Errorf("Failed to render kubernetes deployment: %v", err) t.Errorf("Failed to render kubernetes deployment: %v", err)