diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index dc5f13cb..76775cfe 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -34,32 +34,52 @@ func (k *kubernetes) Init(opts ...runtime.Option) error { return nil } -func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.Logs, error) { - klo := newLog(k.client, s.Name, options...) +func (k *kubernetes) Logs(resource runtime.Resource, options ...runtime.LogsOption) (runtime.Logs, error) { - if !klo.options.Stream { - records, err := klo.Read() + // Handle the various different types of resources: + switch resource.Type() { + case runtime.TypeNamespace: + // noop (Namespace is not supported by *kubernetes.Logs()) + return nil, nil + case runtime.TypeNetworkPolicy: + // noop (NetworkPolicy is not supported by *kubernetes.Logs())) + return nil, nil + case runtime.TypeService: + + // Assert the resource back into a *runtime.Service + s, ok := resource.(*runtime.Service) + if !ok { + return nil, runtime.ErrInvalidResource + } + + klo := newLog(k.client, s.Name, options...) + + if !klo.options.Stream { + records, err := klo.Read() + if err != nil { + log.Errorf("Failed to get logs for service '%v' from k8s: %v", s.Name, err) + return nil, err + } + kstream := &kubeStream{ + stream: make(chan runtime.Log), + stop: make(chan bool), + } + go func() { + for _, record := range records { + kstream.Chan() <- record + } + kstream.Stop() + }() + return kstream, nil + } + stream, err := klo.Stream() if err != nil { - log.Errorf("Failed to get logs for service '%v' from k8s: %v", s.Name, err) return nil, err } - kstream := &kubeStream{ - stream: make(chan runtime.Log), - stop: make(chan bool), - } - go func() { - for _, record := range records { - kstream.Chan() <- record - } - kstream.Stop() - }() - return kstream, nil + return stream, nil + default: + return nil, runtime.ErrInvalidResource } - stream, err := klo.Stream() - if err != nil { - return nil, err - } - return stream, nil } type kubeStream struct { @@ -92,11 +112,14 @@ func (k *kubeStream) Stop() error { return nil } -// Creates a service -func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error { +// Create a resource +func (k *kubernetes) Create(resource runtime.Resource, opts ...runtime.CreateOption) error { k.Lock() defer k.Unlock() + return k.create(resource, opts...) +} +func (k *kubernetes) create(resource runtime.Resource, opts ...runtime.CreateOption) error { // parse the options options := &runtime.CreateOptions{ Type: k.options.Type, @@ -107,47 +130,74 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er o(options) } - // default the service's source and version - if len(s.Source) == 0 { - s.Source = k.options.Source - } - if len(s.Version) == 0 { - s.Version = "latest" - } - - // ensure the namespace exists - if err := k.ensureNamepaceExists(options.Namespace); err != nil { - return nil - } - - // create a secret for the deployment - if err := k.createCredentials(s, options); err != nil { - return err - } - - // create the deployment - if err := k.client.Create(client.NewDeployment(s, options), client.CreateNamespace(options.Namespace)); err != nil { - if parseError(err).Reason == "AlreadyExists" { - return runtime.ErrAlreadyExists + // Handle the various different types of resources: + switch resource.Type() { + case runtime.TypeNamespace: + // Assert the resource back into a *runtime.Namespace + namespace, ok := resource.(*runtime.Namespace) + if !ok { + return runtime.ErrInvalidResource } - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Runtime failed to create deployment: %v", err) + return k.createNamespace(namespace) + case runtime.TypeNetworkPolicy: + // Assert the resource back into a *runtime.NetworkPolicy + networkPolicy, ok := resource.(*runtime.NetworkPolicy) + if !ok { + return runtime.ErrInvalidResource } - return err - } + return k.createNetworkPolicy(networkPolicy) + case runtime.TypeService: - // create the service, one could already exist for another version so ignore ErrAlreadyExists - if err := k.client.Create(client.NewService(s, options), client.CreateNamespace(options.Namespace)); err != nil { - if parseError(err).Reason == "AlreadyExists" { + // Assert the resource back into a *runtime.Service + s, ok := resource.(*runtime.Service) + if !ok { + return runtime.ErrInvalidResource + } + + // default the service's source and version + if len(s.Source) == 0 { + s.Source = k.options.Source + } + if len(s.Version) == 0 { + s.Version = "latest" + } + + // ensure the namespace exists + if err := k.ensureNamepaceExists(options.Namespace); err != nil { return nil } - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Runtime failed to create service: %v", err) - } - return err - } - return nil + // create a secret for the deployment + if err := k.createCredentials(s, options); err != nil { + return err + } + + // create the deployment + if err := k.client.Create(client.NewDeployment(s, options), client.CreateNamespace(options.Namespace)); err != nil { + if parseError(err).Reason == "AlreadyExists" { + return runtime.ErrAlreadyExists + } + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to create deployment: %v", err) + } + return err + } + + // create the service, one could already exist for another version so ignore ErrAlreadyExists + if err := k.client.Create(client.NewService(s, options), client.CreateNamespace(options.Namespace)); err != nil { + if parseError(err).Reason == "AlreadyExists" { + return nil + } + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to create service: %v", err) + } + return err + } + + return nil + default: + return runtime.ErrInvalidResource + } } // Read returns all instances of given service @@ -180,8 +230,8 @@ func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error return k.getServices(client.GetNamespace(options.Namespace), client.GetLabels(labels)) } -// Update the service in place -func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) error { +// Update a resource in place +func (k *kubernetes) Update(resource runtime.Resource, opts ...runtime.UpdateOption) error { k.Lock() defer k.Unlock() @@ -193,69 +243,91 @@ func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) er o(&options) } - // construct the query - labels := map[string]string{} - if len(s.Name) > 0 { - labels["name"] = client.Format(s.Name) - } - if len(s.Version) > 0 { - labels["version"] = client.Format(s.Version) - } + // Handle the various different types of resources: + switch resource.Type() { + case runtime.TypeNamespace: + // noop (Namespace is not supported by *kubernetes.Update()) + return nil + case runtime.TypeNetworkPolicy: + // Assert the resource back into a *runtime.NetworkPolicy + networkPolicy, ok := resource.(*runtime.NetworkPolicy) + if !ok { + return runtime.ErrInvalidResource + } + return k.updateNetworkPolicy(networkPolicy) + case runtime.TypeService: - // get the existing deployments - depList := new(client.DeploymentList) - d := &client.Resource{ - Kind: "deployment", - Value: depList, - } - depOpts := []client.GetOption{ - client.GetNamespace(options.Namespace), - client.GetLabels(labels), - } - if err := k.client.Get(d, depOpts...); err != nil { - return err - } else if len(depList.Items) == 0 { - return runtime.ErrNotFound - } - - // update the deployments which match the query - for _, dep := range depList.Items { - // the service wan't created by the k8s runtime - if dep.Metadata == nil || dep.Metadata.Annotations == nil { - continue + // Assert the resource back into a *runtime.Service + s, ok := resource.(*runtime.Service) + if !ok { + return runtime.ErrInvalidResource } - // update metadata - for k, v := range s.Metadata { - dep.Metadata.Annotations[k] = v + // construct the query + labels := map[string]string{} + if len(s.Name) > 0 { + labels["name"] = client.Format(s.Name) + } + if len(s.Version) > 0 { + labels["version"] = client.Format(s.Version) } - // update build time annotation - dep.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix()) - - // update the deployment - res := &client.Resource{ + // get the existing deployments + depList := new(client.DeploymentList) + d := &client.Resource{ Kind: "deployment", - Name: resourceName(s), - Value: &dep, + Value: depList, } - if err := k.client.Update(res, client.UpdateNamespace(options.Namespace)); err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Runtime failed to update deployment: %v", err) - } + depOpts := []client.GetOption{ + client.GetNamespace(options.Namespace), + client.GetLabels(labels), + } + if err := k.client.Get(d, depOpts...); err != nil { return err + } else if len(depList.Items) == 0 { + return runtime.ErrNotFound } - } - return nil + // update the deployments which match the query + for _, dep := range depList.Items { + // the service wan't created by the k8s runtime + if dep.Metadata == nil || dep.Metadata.Annotations == nil { + continue + } + + // update metadata + for k, v := range s.Metadata { + dep.Metadata.Annotations[k] = v + } + + // update build time annotation + dep.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix()) + + // update the deployment + res := &client.Resource{ + Kind: "deployment", + Name: resourceName(s), + Value: &dep, + } + if err := k.client.Update(res, client.UpdateNamespace(options.Namespace)); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to update deployment: %v", err) + } + return err + } + } + + return nil + default: + return runtime.ErrInvalidResource + } } -// Delete removes a service -func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error { +// Delete removes a resource +func (k *kubernetes) Delete(resource runtime.Resource, opts ...runtime.DeleteOption) error { k.Lock() defer k.Unlock() - // parse the options options := runtime.DeleteOptions{ Namespace: client.DefaultNamespace, } @@ -263,51 +335,78 @@ func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) er o(&options) } - // delete the deployment - dep := client.NewDeployment(s, &runtime.CreateOptions{ - Type: k.options.Type, - Namespace: options.Namespace, - }) - if err := k.client.Delete(dep, client.DeleteNamespace(options.Namespace)); err != nil { - if err == api.ErrNotFound { - return runtime.ErrNotFound + // Handle the various different types of resources: + switch resource.Type() { + case runtime.TypeNamespace: + // Assert the resource back into a *runtime.Namespace + namespace, ok := resource.(*runtime.Namespace) + if !ok { + return runtime.ErrInvalidResource } - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Runtime failed to delete deployment: %v", err) + return k.deleteNamespace(namespace) + case runtime.TypeNetworkPolicy: + // Assert the resource back into a *runtime.NetworkPolicy + networkPolicy, ok := resource.(*runtime.NetworkPolicy) + if !ok { + return runtime.ErrInvalidResource } - return err - } + return k.deleteNetworkPolicy(networkPolicy) + case runtime.TypeService: - // delete the credentials - if err := k.deleteCredentials(s, &runtime.CreateOptions{Namespace: options.Namespace}); err != nil { - return err - } - - // if there are more deployments for this service, then don't delete it - labels := map[string]string{} - if len(s.Name) > 0 { - labels["name"] = client.Format(s.Name) - } - - // get the existing services. todo: refactor to just get the deployments - services, err := k.getServices(client.GetNamespace(options.Namespace), client.GetLabels(labels)) - if err != nil || len(services) > 0 { - return err - } - - // delete the service - srv := client.NewService(s, &runtime.CreateOptions{ - Type: k.options.Type, - Namespace: options.Namespace, - }) - if err := k.client.Delete(srv, client.DeleteNamespace(options.Namespace)); err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Runtime failed to delete service: %v", err) + // Assert the resource back into a *runtime.Service + s, ok := resource.(*runtime.Service) + if !ok { + return runtime.ErrInvalidResource } - return err - } - return nil + // delete the deployment + dep := client.NewDeployment(s, &runtime.CreateOptions{ + Type: k.options.Type, + Namespace: options.Namespace, + }) + if err := k.client.Delete(dep, client.DeleteNamespace(options.Namespace)); err != nil { + if err == api.ErrNotFound { + return runtime.ErrNotFound + } + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to delete deployment: %v", err) + } + return err + } + + // delete the credentials + if err := k.deleteCredentials(s, &runtime.CreateOptions{Namespace: options.Namespace}); err != nil { + return err + } + + // if there are more deployments for this service, then don't delete it + labels := map[string]string{} + if len(s.Name) > 0 { + labels["name"] = client.Format(s.Name) + } + + // get the existing services. todo: refactor to just get the deployments + services, err := k.getServices(client.GetNamespace(options.Namespace), client.GetLabels(labels)) + if err != nil || len(services) > 0 { + return err + } + + // delete the service + srv := client.NewService(s, &runtime.CreateOptions{ + Type: k.options.Type, + Namespace: options.Namespace, + }) + if err := k.client.Delete(srv, client.DeleteNamespace(options.Namespace)); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to delete service: %v", err) + } + return err + } + + return nil + default: + return runtime.ErrInvalidResource + } } // Start starts the runtime diff --git a/runtime/kubernetes/kubernetes_test.go b/runtime/kubernetes/kubernetes_test.go index 131935da..763aba6c 100644 --- a/runtime/kubernetes/kubernetes_test.go +++ b/runtime/kubernetes/kubernetes_test.go @@ -11,6 +11,9 @@ import ( "regexp" "strings" "testing" + + "github.com/micro/go-micro/v3/runtime" + "github.com/stretchr/testify/assert" ) func setupClient(t *testing.T) { @@ -47,19 +50,45 @@ func setupClient(t *testing.T) { func TestNamespaceCreateDelete(t *testing.T) { defer func() { + exec.Command("kubectl", "-n", "foobar", "delete", "networkpolicy", "baz").Run() exec.Command("kubectl", "delete", "namespace", "foobar").Run() }() setupClient(t) r := NewRuntime() - if err := r.CreateNamespace("foobar"); err != nil { - t.Fatalf("Unexpected error creating namespace %s", err) + + // Create a namespace + testNamespace, err := runtime.NewNamespace("foobar") + assert.NoError(t, err) + if err := r.Create(testNamespace); err != nil { + t.Fatalf("Unexpected error creating namespace: %v", err) } + // Check that the namespace exists if !namespaceExists(t, "foobar") { t.Fatalf("Namespace foobar not found") } - if err := r.DeleteNamespace("foobar"); err != nil { - t.Fatalf("Unexpected error deleting namespace %s", err) + + // Create a networkpolicy: + testNetworkPolicy, err := runtime.NewNetworkPolicy("baz", "foobar", nil) + assert.NoError(t, err) + if err := r.Create(testNetworkPolicy); err != nil { + t.Fatalf("Unexpected error creating networkpolicy: %v", err) + } + + // Check that the networkpolicy exists: + if !networkPolicyExists(t, "foobar", "baz") { + t.Fatalf("NetworkPolicy foobar.baz not found") + } + + // Tidy up + if err := r.Delete(testNetworkPolicy); err != nil { + t.Fatalf("Unexpected error deleting networkpolicy: %v", err) + } + if networkPolicyExists(t, "foobar", "baz") { + t.Fatalf("NetworkPolicy foobar.baz still exists") + } + if err := r.Delete(testNamespace); err != nil { + t.Fatalf("Unexpected error deleting namespace: %v", err) } if namespaceExists(t, "foobar") { t.Fatalf("Namespace foobar still exists") @@ -77,5 +106,17 @@ func namespaceExists(t *testing.T, ns string) bool { t.Fatalf("Error listing namespaces %s", err) } return exists - +} + +func networkPolicyExists(t *testing.T, ns, np string) bool { + cmd := exec.Command("kubectl", "-n", ns, "get", "networkpolicy") + outp, err := cmd.Output() + if err != nil { + t.Fatalf("Unexpected error listing networkpolicies %s", err) + } + exists, err := regexp.Match(np, outp) + if err != nil { + t.Fatalf("Error listing networkpolicies %s", err) + } + return exists } diff --git a/runtime/kubernetes/namespace.go b/runtime/kubernetes/namespace.go index 9a57a633..43cfa742 100644 --- a/runtime/kubernetes/namespace.go +++ b/runtime/kubernetes/namespace.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/micro/go-micro/v3/logger" + "github.com/micro/go-micro/v3/runtime" "github.com/micro/go-micro/v3/util/kubernetes/client" ) @@ -24,7 +25,7 @@ func (k *kubernetes) ensureNamepaceExists(ns string) error { return err } - if err := k.createNamespace(namespace); err != nil { + if err := k.autoCreateNamespace(namespace); err != nil { if logger.V(logger.WarnLevel, logger.DefaultLogger) { logger.Warnf("Error creating namespace %v: %v", namespace, err) } @@ -64,8 +65,8 @@ func (k *kubernetes) namespaceExists(name string) (bool, error) { return false, nil } -// createNamespace creates a new k8s namespace -func (k *kubernetes) createNamespace(namespace string) error { +// autoCreateNamespace creates a new k8s namespace +func (k *kubernetes) autoCreateNamespace(namespace string) error { ns := client.Namespace{Metadata: &client.Metadata{Name: namespace}} err := k.client.Create(&client.Resource{Kind: "namespace", Value: ns}) @@ -75,38 +76,54 @@ func (k *kubernetes) createNamespace(namespace string) error { err = nil } - // add to cache + // add to cache and create networkpolicy if err == nil && k.namespaces != nil { k.namespaces = append(k.namespaces, ns) + + if networkPolicy, err := runtime.NewNetworkPolicy("ingress", namespace, map[string]string{"owner": "micro"}); err != nil { + return err + } else { + return k.create(networkPolicy) + } } return err } -func (k *kubernetes) CreateNamespace(ns string) error { +// createNamespace creates a namespace resource +func (k *kubernetes) createNamespace(namespace *runtime.Namespace) error { err := k.client.Create(&client.Resource{ Kind: "namespace", + Name: namespace.Name, Value: client.Namespace{ Metadata: &client.Metadata{ - Name: ns, + Name: namespace.Name, }, }, - }) + }, client.CreateNamespace(namespace.Name)) if err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Error creating namespace %v: %v", ns, err) + logger.Errorf("Error creating namespace %s: %v", namespace.String(), err) } } return err } -func (k *kubernetes) DeleteNamespace(ns string) error { +// deleteNamespace deletes a namespace resource +func (k *kubernetes) deleteNamespace(namespace *runtime.Namespace) error { err := k.client.Delete(&client.Resource{ Kind: "namespace", - Name: ns, - }) - if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Error deleting namespace %v: %v", ns, err) + Name: namespace.Name, + Value: client.Namespace{ + Metadata: &client.Metadata{ + Name: namespace.Name, + }, + }, + }, client.DeleteNamespace(namespace.Name)) + if err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error deleting namespace %s: %v", namespace.String(), err) + } } return err } diff --git a/runtime/kubernetes/networkpolicy.go b/runtime/kubernetes/networkpolicy.go new file mode 100644 index 00000000..85af938e --- /dev/null +++ b/runtime/kubernetes/networkpolicy.go @@ -0,0 +1,67 @@ +package kubernetes + +import ( + "github.com/micro/go-micro/v3/logger" + "github.com/micro/go-micro/v3/runtime" + "github.com/micro/go-micro/v3/util/kubernetes/client" +) + +// createNetworkPolicy creates a networkpolicy resource +func (k *kubernetes) createNetworkPolicy(networkPolicy *runtime.NetworkPolicy) error { + err := k.client.Create(&client.Resource{ + Kind: "networkpolicy", + Value: client.NetworkPolicy{ + AllowedLabels: networkPolicy.AllowedLabels, + Metadata: &client.Metadata{ + Name: networkPolicy.Name, + Namespace: networkPolicy.Namespace, + }, + }, + }, client.CreateNamespace(networkPolicy.Namespace)) + if err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error creating resource %s: %v", networkPolicy.String(), err) + } + } + return err +} + +// updateNetworkPolicy updates a networkpolicy resource in-place +func (k *kubernetes) updateNetworkPolicy(networkPolicy *runtime.NetworkPolicy) error { + err := k.client.Update(&client.Resource{ + Kind: "networkpolicy", + Value: client.NetworkPolicy{ + AllowedLabels: networkPolicy.AllowedLabels, + Metadata: &client.Metadata{ + Name: networkPolicy.Name, + Namespace: networkPolicy.Namespace, + }, + }, + }, client.UpdateNamespace(networkPolicy.Namespace)) + if err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error updating resource %s: %v", networkPolicy.String(), err) + } + } + return err +} + +// deleteNetworkPolicy deletes a networkpolicy resource +func (k *kubernetes) deleteNetworkPolicy(networkPolicy *runtime.NetworkPolicy) error { + err := k.client.Delete(&client.Resource{ + Kind: "networkpolicy", + Value: client.NetworkPolicy{ + AllowedLabels: networkPolicy.AllowedLabels, + Metadata: &client.Metadata{ + Name: networkPolicy.Name, + Namespace: networkPolicy.Namespace, + }, + }, + }, client.DeleteNamespace(networkPolicy.Namespace)) + if err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error deleting resource %s: %v", networkPolicy.String(), err) + } + } + return err +} diff --git a/runtime/kubernetes/test/test.yaml b/runtime/kubernetes/test/test.yaml index bbd2f3c6..b1551f58 100644 --- a/runtime/kubernetes/test/test.yaml +++ b/runtime/kubernetes/test/test.yaml @@ -45,6 +45,20 @@ rules: - get - watch - list + - apiGroups: + - "networking.k8s.io" + resources: + - networkpolicy + - networkpolicies + verbs: + - get + - create + - update + - delete + - deletecollection + - list + - patch + - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/runtime/local/local.go b/runtime/local/local.go index 05ba0d7c..0af2b356 100644 --- a/runtime/local/local.go +++ b/runtime/local/local.go @@ -81,7 +81,7 @@ func serviceKey(s *runtime.Service) string { } // Create creates a new service which is then started by runtime -func (r *localRuntime) Create(s *runtime.Service, opts ...runtime.CreateOption) error { +func (r *localRuntime) Create(resource runtime.Resource, opts ...runtime.CreateOption) error { var options runtime.CreateOptions for _, o := range opts { o(&options) @@ -90,55 +90,74 @@ func (r *localRuntime) Create(s *runtime.Service, opts ...runtime.CreateOption) r.Lock() defer r.Unlock() - if len(options.Namespace) == 0 { - options.Namespace = defaultNamespace - } - if len(options.Entrypoint) > 0 { - s.Source = filepath.Join(s.Source, options.Entrypoint) - } - if len(options.Command) == 0 { - ep, err := Entrypoint(s.Source) - if err != nil { - return err + // Handle the various different types of resources: + switch resource.Type() { + case runtime.TypeNamespace: + // noop (Namespace is not supported by local) + return nil + case runtime.TypeNetworkPolicy: + // noop (NetworkPolicy is not supported by local) + return nil + case runtime.TypeService: + + // Assert the resource back into a *runtime.Service + s, ok := resource.(*runtime.Service) + if !ok { + return runtime.ErrInvalidResource } - options.Command = []string{"go"} - options.Args = []string{"run", ep} - } + if len(options.Namespace) == 0 { + options.Namespace = defaultNamespace + } + if len(options.Entrypoint) > 0 { + s.Source = filepath.Join(s.Source, options.Entrypoint) + } + if len(options.Command) == 0 { + ep, err := Entrypoint(s.Source) + if err != nil { + return err + } - // pass secrets as env vars - for key, value := range options.Secrets { - options.Env = append(options.Env, fmt.Sprintf("%v=%v", key, value)) - } + options.Command = []string{"go"} + options.Args = []string{"run", ep} + } - if _, ok := r.namespaces[options.Namespace]; !ok { - r.namespaces[options.Namespace] = make(map[string]*service) - } - if _, ok := r.namespaces[options.Namespace][serviceKey(s)]; ok { - return runtime.ErrAlreadyExists - } + // pass secrets as env vars + for key, value := range options.Secrets { + options.Env = append(options.Env, fmt.Sprintf("%v=%v", key, value)) + } - // create new service - service := newService(s, options) + if _, ok := r.namespaces[options.Namespace]; !ok { + r.namespaces[options.Namespace] = make(map[string]*service) + } + if _, ok := r.namespaces[options.Namespace][serviceKey(s)]; ok { + return runtime.ErrAlreadyExists + } - f, err := os.OpenFile(logFile(service.Name), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - log.Fatal(err) - } + // create new service + service := newService(s, options) - if service.output != nil { - service.output = io.MultiWriter(service.output, f) - } else { - service.output = f - } - // start the service - if err := service.Start(); err != nil { - return err - } - // save service - r.namespaces[options.Namespace][serviceKey(s)] = service + f, err := os.OpenFile(logFile(service.Name), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Fatal(err) + } - return nil + if service.output != nil { + service.output = io.MultiWriter(service.output, f) + } else { + service.output = f + } + // start the service + if err := service.Start(); err != nil { + return err + } + // save service + r.namespaces[options.Namespace][serviceKey(s)] = service + + return nil + default: + return runtime.ErrInvalidResource + } } // exists returns whether the given file or directory exists @@ -157,65 +176,85 @@ func exists(path string) (bool, error) { // The reason for this is because it's hard to calculate line offset // as opposed to character offset. // This logger streams by default and only supports the `StreamCount` option. -func (r *localRuntime) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.Logs, error) { +func (r *localRuntime) Logs(resource runtime.Resource, options ...runtime.LogsOption) (runtime.Logs, error) { lopts := runtime.LogsOptions{} for _, o := range options { o(&lopts) } - ret := &logStream{ - service: s.Name, - stream: make(chan runtime.Log), - stop: make(chan bool), - } - fpath := logFile(s.Name) - if ex, err := exists(fpath); err != nil { - return nil, err - } else if !ex { - return nil, fmt.Errorf("Logs not found for service %s", s.Name) - } + // Handle the various different types of resources: + switch resource.Type() { + case runtime.TypeNamespace: + // noop (Namespace is not supported by local) + return nil, nil + case runtime.TypeNetworkPolicy: + // noop (NetworkPolicy is not supported by local) + return nil, nil + case runtime.TypeService: - // have to check file size to avoid too big of a seek - fi, err := os.Stat(fpath) - if err != nil { - return nil, err - } - size := fi.Size() - - whence := 2 - // Multiply by length of an average line of log in bytes - offset := lopts.Count * 200 - - if offset > size { - offset = size - } - offset *= -1 - - t, err := tail.TailFile(fpath, tail.Config{Follow: lopts.Stream, Location: &tail.SeekInfo{ - Whence: whence, - Offset: int64(offset), - }, Logger: tail.DiscardingLogger}) - if err != nil { - return nil, err - } - - ret.tail = t - go func() { - for { - select { - case line, ok := <-t.Lines: - if !ok { - ret.Stop() - return - } - ret.stream <- runtime.Log{Message: line.Text} - case <-ret.stop: - return - } + // Assert the resource back into a *runtime.Service + s, ok := resource.(*runtime.Service) + if !ok { + return nil, runtime.ErrInvalidResource } - }() - return ret, nil + ret := &logStream{ + service: s.Name, + stream: make(chan runtime.Log), + stop: make(chan bool), + } + + fpath := logFile(s.Name) + if ex, err := exists(fpath); err != nil { + return nil, err + } else if !ex { + return nil, fmt.Errorf("Logs not found for service %s", s.Name) + } + + // have to check file size to avoid too big of a seek + fi, err := os.Stat(fpath) + if err != nil { + return nil, err + } + size := fi.Size() + + whence := 2 + // Multiply by length of an average line of log in bytes + offset := lopts.Count * 200 + + if offset > size { + offset = size + } + offset *= -1 + + t, err := tail.TailFile(fpath, tail.Config{Follow: lopts.Stream, Location: &tail.SeekInfo{ + Whence: whence, + Offset: int64(offset), + }, Logger: tail.DiscardingLogger}) + if err != nil { + return nil, err + } + + ret.tail = t + go func() { + for { + select { + case line, ok := <-t.Lines: + if !ok { + ret.Stop() + return + } + ret.stream <- runtime.Log{Message: line.Text} + case <-ret.stop: + return + } + } + + }() + return ret, nil + default: + return nil, runtime.ErrInvalidResource + } } type logStream struct { @@ -298,84 +337,126 @@ func (r *localRuntime) Read(opts ...runtime.ReadOption) ([]*runtime.Service, err } // Update attempts to update the service -func (r *localRuntime) Update(s *runtime.Service, opts ...runtime.UpdateOption) error { +func (r *localRuntime) Update(resource runtime.Resource, opts ...runtime.UpdateOption) error { var options runtime.UpdateOptions for _, o := range opts { o(&options) } - if len(options.Namespace) == 0 { - options.Namespace = defaultNamespace - } - if len(options.Entrypoint) > 0 { - s.Source = filepath.Join(s.Source, options.Entrypoint) - } - r.Lock() - srvs, ok := r.namespaces[options.Namespace] - r.Unlock() - if !ok { - return errors.New("Service not found") - } + // Handle the various different types of resources: + switch resource.Type() { + case runtime.TypeNamespace: + // noop (Namespace is not supported by local) + return nil + case runtime.TypeNetworkPolicy: + // noop (NetworkPolicy is not supported by local) + return nil + case runtime.TypeService: - r.Lock() - service, ok := srvs[serviceKey(s)] - r.Unlock() - if !ok { - return errors.New("Service not found") - } + // Assert the resource back into a *runtime.Service + s, ok := resource.(*runtime.Service) + if !ok { + return runtime.ErrInvalidResource + } - if err := service.Stop(); err != nil && err.Error() != "no such process" { - logger.Errorf("Error stopping service %s: %s", service.Name, err) - return err - } + if len(options.Entrypoint) > 0 { + s.Source = filepath.Join(s.Source, options.Entrypoint) + } - // update the source to the new location and restart the service - service.Source = s.Source - service.Exec.Dir = s.Source - return service.Start() + if len(options.Namespace) == 0 { + options.Namespace = defaultNamespace + } + + r.Lock() + srvs, ok := r.namespaces[options.Namespace] + r.Unlock() + if !ok { + return errors.New("Service not found") + } + + r.Lock() + service, ok := srvs[serviceKey(s)] + r.Unlock() + if !ok { + return errors.New("Service not found") + } + + if err := service.Stop(); err != nil && err.Error() != "no such process" { + logger.Errorf("Error stopping service %s: %s", service.Name, err) + return err + } + + // update the source to the new location and restart the service + service.Source = s.Source + service.Exec.Dir = s.Source + return service.Start() + + default: + return runtime.ErrInvalidResource + } } // Delete removes the service from the runtime and stops it -func (r *localRuntime) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error { - r.Lock() - defer r.Unlock() +func (r *localRuntime) Delete(resource runtime.Resource, opts ...runtime.DeleteOption) error { - var options runtime.DeleteOptions - for _, o := range opts { - o(&options) - } - if len(options.Namespace) == 0 { - options.Namespace = defaultNamespace - } - - srvs, ok := r.namespaces[options.Namespace] - if !ok { + // Handle the various different types of resources: + switch resource.Type() { + case runtime.TypeNamespace: + // noop (Namespace is not supported by local) return nil - } - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime deleting service %s", s.Name) - } - - service, ok := srvs[serviceKey(s)] - if !ok { + case runtime.TypeNetworkPolicy: + // noop (NetworkPolicy is not supported by local) return nil - } + case runtime.TypeService: - // check if running - if !service.Running() { + // Assert the resource back into a *runtime.Service + s, ok := resource.(*runtime.Service) + if !ok { + return runtime.ErrInvalidResource + } + + r.Lock() + defer r.Unlock() + + var options runtime.DeleteOptions + for _, o := range opts { + o(&options) + } + if len(options.Namespace) == 0 { + options.Namespace = defaultNamespace + } + + srvs, ok := r.namespaces[options.Namespace] + if !ok { + return nil + } + + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Runtime deleting service %s", s.Name) + } + + service, ok := srvs[serviceKey(s)] + if !ok { + return nil + } + + // check if running + if !service.Running() { + delete(srvs, service.key()) + r.namespaces[options.Namespace] = srvs + return nil + } + // otherwise stop it + if err := service.Stop(); err != nil { + return err + } + // delete it delete(srvs, service.key()) r.namespaces[options.Namespace] = srvs return nil + default: + return runtime.ErrInvalidResource } - // otherwise stop it - if err := service.Stop(); err != nil { - return err - } - // delete it - delete(srvs, service.key()) - r.namespaces[options.Namespace] = srvs - return nil } // Start starts the runtime @@ -461,13 +542,3 @@ func Entrypoint(dir string) (string, error) { return "", errors.New("More than one entrypoint found") } } - -func (r *localRuntime) CreateNamespace(ns string) error { - // noop - return nil -} - -func (r *localRuntime) DeleteNamespace(ns string) error { - // noop - return nil -} diff --git a/runtime/resource.go b/runtime/resource.go new file mode 100644 index 00000000..5250b7a7 --- /dev/null +++ b/runtime/resource.go @@ -0,0 +1,117 @@ +// Package runtime is a service runtime manager +package runtime + +import "fmt" + +const ( + TypeNamespace = "namespace" + TypeNetworkPolicy = "networkpolicy" + TypeService = "service" +) + +// Resource represents any resource handled by runtime +type Resource interface { + String() string + Type() string +} + +// Namespace represents a logical namespace for organising resources +type Namespace struct { + // Name of the namespace + Name string +} + +// NewNamespace mints a new namespace +func NewNamespace(name string) (*Namespace, error) { + if name == "" { + return nil, ErrInvalidResource + } + return &Namespace{ + Name: name, + }, nil +} + +// String implements Resource +func (r *Namespace) String() string { + return r.Name +} + +// Type implements Resource +func (*Namespace) Type() string { + return TypeNamespace +} + +// NetworkPolicy represents an ACL of label pairs allowing ignress to a namespace +type NetworkPolicy struct { + // The labels allowed ingress by this policy + AllowedLabels map[string]string + // Name of the network policy + Name string + // Namespace the network policy belongs to + Namespace string +} + +// NewNetworkPolicy mints a new networkpolicy +func NewNetworkPolicy(name, namespace string, allowedLabels map[string]string) (*NetworkPolicy, error) { + if name == "" || namespace == "" { + return nil, ErrInvalidResource + } + if allowedLabels == nil { + allowedLabels = map[string]string{ + "origin": "micro", + } + } + return &NetworkPolicy{ + AllowedLabels: allowedLabels, + Name: name, + Namespace: namespace, + }, nil +} + +// String implements Resource +func (r *NetworkPolicy) String() string { + return fmt.Sprintf("%s.%s", r.Namespace, r.Name) +} + +// Type implements Resource +func (*NetworkPolicy) Type() string { + return TypeNetworkPolicy +} + +// Service represents a Micro service running within a namespace +type Service struct { + // Name of the service + Name string + // Version of the service + Version string + // url location of source + Source string + // Metadata stores metadata + Metadata map[string]string + // Status of the service + Status ServiceStatus +} + +// NewService mints a new service +func NewService(name, version string) (*Service, error) { + if name == "" { + return nil, ErrInvalidResource + } + if version == "" { + version = "latest" + } + return &Service{ + Name: name, + Version: version, + }, nil +} + +// String implements Resource +func (r *Service) String() string { + return fmt.Sprintf("service://%s@%s:%s", r.Metadata["namespace"], r.Name, r.Version) +} + +// Type implements Resource +func (*Service) Type() string { + return TypeService +} diff --git a/runtime/resource_test.go b/runtime/resource_test.go new file mode 100644 index 00000000..cf09b996 --- /dev/null +++ b/runtime/resource_test.go @@ -0,0 +1,65 @@ +package runtime + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResources(t *testing.T) { + + // Namespace: + assert.Equal(t, TypeNamespace, new(Namespace).Type()) + namespace, err := NewNamespace("") + assert.Error(t, err) + assert.Equal(t, ErrInvalidResource, err) + assert.Nil(t, namespace) + + namespace, err = NewNamespace("test-namespace") + assert.NoError(t, err) + assert.NotNil(t, namespace) + assert.Equal(t, TypeNamespace, namespace.Type()) + assert.Equal(t, "test-namespace", namespace.String()) + + // NetworkPolicy: + assert.Equal(t, TypeNetworkPolicy, new(NetworkPolicy).Type()) + networkPolicy, err := NewNetworkPolicy("", "", nil) + assert.Error(t, err) + assert.Equal(t, ErrInvalidResource, err) + assert.Nil(t, networkPolicy) + + networkPolicy, err = NewNetworkPolicy("test", "", nil) + assert.Error(t, err) + assert.Equal(t, ErrInvalidResource, err) + assert.Nil(t, networkPolicy) + + networkPolicy, err = NewNetworkPolicy("", "test", nil) + assert.Error(t, err) + assert.Equal(t, ErrInvalidResource, err) + assert.Nil(t, networkPolicy) + + networkPolicy, err = NewNetworkPolicy("ingress", "test", nil) + assert.NoError(t, err) + assert.NotNil(t, networkPolicy) + assert.Equal(t, TypeNetworkPolicy, networkPolicy.Type()) + assert.Equal(t, "test.ingress", networkPolicy.String()) + assert.Len(t, networkPolicy.AllowedLabels, 1) + + networkPolicy, err = NewNetworkPolicy("ingress", "test", map[string]string{"foo": "bar", "bar": "foo"}) + assert.Len(t, networkPolicy.AllowedLabels, 2) + + // Service: + assert.Equal(t, TypeService, new(Service).Type()) + service, err := NewService("", "") + assert.Error(t, err) + assert.Equal(t, ErrInvalidResource, err) + assert.Nil(t, service) + + service, err = NewService("test-service", "oldest") + service.Metadata = map[string]string{"namespace": "testing"} + assert.NoError(t, err) + assert.NotNil(t, service) + assert.Equal(t, TypeService, service.Type()) + assert.Equal(t, "service://testing@test-service:oldest", service.String()) + assert.Equal(t, "oldest", service.Version) +} diff --git a/runtime/runtime.go b/runtime/runtime.go index 61aceedb..107ca4ea 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -7,28 +7,25 @@ import ( ) var ( - ErrAlreadyExists = errors.New("already exists") - ErrNotFound = errors.New("not found") + ErrAlreadyExists = errors.New("already exists") + ErrInvalidResource = errors.New("invalid resource") + ErrNotFound = errors.New("not found") ) // Runtime is a service runtime manager type Runtime interface { // Init initializes runtime Init(...Option) error - // CreateNamespace creates a new namespace in the runtime - CreateNamespace(string) error - // DeleteNamespace deletes a namespace in the runtime - DeleteNamespace(string) error - // Create registers a service - Create(*Service, ...CreateOption) error - // Read returns the service + // Create a resource + Create(Resource, ...CreateOption) error + // Read a resource Read(...ReadOption) ([]*Service, error) - // Update the service in place - Update(*Service, ...UpdateOption) error - // Remove a service - Delete(*Service, ...DeleteOption) error - // Logs returns the logs for a service - Logs(*Service, ...LogsOption) (Logs, error) + // Update a resource + Update(Resource, ...UpdateOption) error + // Delete a resource + Delete(Resource, ...DeleteOption) error + // Logs returns the logs for a resource + Logs(Resource, ...LogsOption) (Logs, error) // Start starts the runtime Start() error // Stop shuts down the runtime @@ -113,20 +110,6 @@ const ( Error ) -// Service is runtime service -type Service struct { - // Name of the service - Name string - // Version of the service - Version string - // url location of source - Source string - // Metadata stores metadata - Metadata map[string]string - // Status of the service - Status ServiceStatus -} - // Resources which are allocated to a serivce type Resources struct { // CPU is the maximum amount of CPU the service will be allocated (unit millicpu) diff --git a/util/kubernetes/api/request.go b/util/kubernetes/api/request.go index db73f615..5f9fbb3f 100644 --- a/util/kubernetes/api/request.go +++ b/util/kubernetes/api/request.go @@ -168,6 +168,9 @@ func (r *Request) request() (*http.Request, error) { case "deployment": // /apis/apps/v1/namespaces/{namespace}/deployments/{name} url = fmt.Sprintf("%s/apis/apps/v1/namespaces/%s/%ss/", r.host, r.namespace, r.resource) + case "networkpolicy", "networkpolicies": + // /apis/networking.k8s.io/v1/namespaces/{namespace}/networkpolicies + url = fmt.Sprintf("%s/apis/networking.k8s.io/v1/namespaces/%s/networkpolicies/", r.host, r.namespace) default: // /api/v1/namespaces/{namespace}/{resource} url = fmt.Sprintf("%s/api/v1/namespaces/%s/%ss/", r.host, r.namespace, r.resource) diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go index 261f90fc..9214e766 100644 --- a/util/kubernetes/client/client.go +++ b/util/kubernetes/client/client.go @@ -156,6 +156,8 @@ func (c *client) Update(r *Resource, opts ...UpdateOption) error { req.Body(r.Value.(*Deployment)) case "pod": req.Body(r.Value.(*Pod)) + case "networkpolicy", "networkpolicies": + req.Body(r.Value.(*NetworkPolicy)) default: return errors.New("unsupported resource") } diff --git a/util/kubernetes/client/templates.go b/util/kubernetes/client/templates.go index aeb48618..8d79e8b4 100644 --- a/util/kubernetes/client/templates.go +++ b/util/kubernetes/client/templates.go @@ -1,11 +1,13 @@ package client var templates = map[string]string{ - "deployment": deploymentTmpl, - "service": serviceTmpl, - "namespace": namespaceTmpl, - "secret": secretTmpl, - "serviceaccount": serviceAccountTmpl, + "deployment": deploymentTmpl, + "service": serviceTmpl, + "namespace": namespaceTmpl, + "secret": secretTmpl, + "serviceaccount": serviceAccountTmpl, + "networkpolicies": networkPolicyTmpl, + "networkpolicy": networkPolicyTmpl, } var deploymentTmpl = ` @@ -239,3 +241,31 @@ imagePullSecrets: {{- end }} {{- end }} ` + +var networkPolicyTmpl = ` +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: "{{ .Metadata.Name }}" + namespace: "{{ .Metadata.Namespace }}" + labels: + {{- with .Metadata.Labels }} + {{- range $key, $value := . }} + {{ $key }}: "{{ $value }}" + {{- end }} + {{- end }} +spec: + podSelector: + matchLabels: + ingress: + - from: # Allow pods in this namespace to talk to each other + - podSelector: {} + - from: # Allow pods in the namespaces bearing the specified labels to talk to pods in this namespace: + - namespaceSelector: + matchLabels: + {{- with .AllowedLabels }} + {{- range $key, $value := . }} + {{ $key }}: "{{ $value }}" + {{- end }} + {{- end }} +` diff --git a/util/kubernetes/client/types.go b/util/kubernetes/client/types.go index 66c18882..a5fd77ad 100644 --- a/util/kubernetes/client/types.go +++ b/util/kubernetes/client/types.go @@ -267,3 +267,9 @@ type VolumeMount struct { Name string `json:"name"` MountPath string `json:"mountPath"` } + +// NetworkPolicy is a Kubernetes Namespace +type NetworkPolicy struct { + AllowedLabels map[string]string `json:"allowedLabels,omitempty"` + Metadata *Metadata `json:"metadata,omitempty"` +}