448 lines
11 KiB
Go
448 lines
11 KiB
Go
// Package kubernetes implements kubernetes micro runtime
|
|
package kubernetes
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/micro/go-micro/v3/logger"
|
|
log "github.com/micro/go-micro/v3/logger"
|
|
"github.com/micro/go-micro/v3/runtime"
|
|
"github.com/micro/go-micro/v3/util/kubernetes/api"
|
|
"github.com/micro/go-micro/v3/util/kubernetes/client"
|
|
)
|
|
|
|
// action to take on runtime service
|
|
type action int
|
|
|
|
type kubernetes struct {
|
|
sync.Mutex
|
|
// options configure runtime
|
|
options runtime.Options
|
|
// client is kubernetes client
|
|
client client.Client
|
|
// namespaces which exist
|
|
namespaces []client.Namespace
|
|
}
|
|
|
|
// Init initializes runtime options
|
|
func (k *kubernetes) Init(opts ...runtime.Option) error {
|
|
for _, o := range opts {
|
|
o(&k.options)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *kubernetes) Logs(resource runtime.Resource, options ...runtime.LogsOption) (runtime.Logs, error) {
|
|
|
|
// Handle the various different types of resources:
|
|
switch resource.Type() {
|
|
case runtime.TypeNamespace:
|
|
// noop (Namespace is not supported by *kubernetes.Logs())
|
|
return nil, nil
|
|
case runtime.TypeNetworkPolicy:
|
|
// noop (NetworkPolicy is not supported by *kubernetes.Logs()))
|
|
return nil, nil
|
|
case runtime.TypeService:
|
|
|
|
// Assert the resource back into a *runtime.Service
|
|
s, ok := resource.(*runtime.Service)
|
|
if !ok {
|
|
return nil, runtime.ErrInvalidResource
|
|
}
|
|
|
|
klo := newLog(k.client, s.Name, options...)
|
|
|
|
if !klo.options.Stream {
|
|
records, err := klo.Read()
|
|
if err != nil {
|
|
log.Errorf("Failed to get logs for service '%v' from k8s: %v", s.Name, err)
|
|
return nil, err
|
|
}
|
|
kstream := &kubeStream{
|
|
stream: make(chan runtime.Log),
|
|
stop: make(chan bool),
|
|
}
|
|
go func() {
|
|
for _, record := range records {
|
|
kstream.Chan() <- record
|
|
}
|
|
kstream.Stop()
|
|
}()
|
|
return kstream, nil
|
|
}
|
|
stream, err := klo.Stream()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return stream, nil
|
|
default:
|
|
return nil, runtime.ErrInvalidResource
|
|
}
|
|
}
|
|
|
|
type kubeStream struct {
|
|
// the k8s log stream
|
|
stream chan runtime.Log
|
|
// the stop chan
|
|
sync.Mutex
|
|
stop chan bool
|
|
err error
|
|
}
|
|
|
|
func (k *kubeStream) Error() error {
|
|
return k.err
|
|
}
|
|
|
|
func (k *kubeStream) Chan() chan runtime.Log {
|
|
return k.stream
|
|
}
|
|
|
|
func (k *kubeStream) Stop() error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
select {
|
|
case <-k.stop:
|
|
return nil
|
|
default:
|
|
close(k.stop)
|
|
close(k.stream)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Create a resource
|
|
func (k *kubernetes) Create(resource runtime.Resource, opts ...runtime.CreateOption) error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
return k.create(resource, opts...)
|
|
}
|
|
|
|
func (k *kubernetes) create(resource runtime.Resource, opts ...runtime.CreateOption) error {
|
|
// parse the options
|
|
options := &runtime.CreateOptions{
|
|
Type: k.options.Type,
|
|
Image: k.options.Image,
|
|
Namespace: client.DefaultNamespace,
|
|
}
|
|
for _, o := range opts {
|
|
o(options)
|
|
}
|
|
|
|
// Handle the various different types of resources:
|
|
switch resource.Type() {
|
|
case runtime.TypeNamespace:
|
|
// Assert the resource back into a *runtime.Namespace
|
|
namespace, ok := resource.(*runtime.Namespace)
|
|
if !ok {
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
return k.createNamespace(namespace)
|
|
case runtime.TypeNetworkPolicy:
|
|
// Assert the resource back into a *runtime.NetworkPolicy
|
|
networkPolicy, ok := resource.(*runtime.NetworkPolicy)
|
|
if !ok {
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
return k.createNetworkPolicy(networkPolicy)
|
|
case runtime.TypeService:
|
|
|
|
// Assert the resource back into a *runtime.Service
|
|
s, ok := resource.(*runtime.Service)
|
|
if !ok {
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
|
|
// default the service's source and version
|
|
if len(s.Source) == 0 {
|
|
s.Source = k.options.Source
|
|
}
|
|
if len(s.Version) == 0 {
|
|
s.Version = "latest"
|
|
}
|
|
|
|
// ensure the namespace exists
|
|
if err := k.ensureNamepaceExists(options.Namespace); err != nil {
|
|
return nil
|
|
}
|
|
|
|
// create a secret for the deployment
|
|
if err := k.createCredentials(s, options); err != nil {
|
|
return err
|
|
}
|
|
|
|
// create the deployment
|
|
if err := k.client.Create(client.NewDeployment(s, options), client.CreateNamespace(options.Namespace)); err != nil {
|
|
if parseError(err).Reason == "AlreadyExists" {
|
|
return runtime.ErrAlreadyExists
|
|
}
|
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
|
logger.Errorf("Runtime failed to create deployment: %v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// create the service, one could already exist for another version so ignore ErrAlreadyExists
|
|
if err := k.client.Create(client.NewService(s, options), client.CreateNamespace(options.Namespace)); err != nil {
|
|
if parseError(err).Reason == "AlreadyExists" {
|
|
return nil
|
|
}
|
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
|
logger.Errorf("Runtime failed to create service: %v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
default:
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
}
|
|
|
|
// Read returns all instances of given service
|
|
func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
// parse the options
|
|
options := runtime.ReadOptions{
|
|
Namespace: client.DefaultNamespace,
|
|
}
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
// construct the query
|
|
labels := map[string]string{}
|
|
if len(options.Service) > 0 {
|
|
labels["name"] = client.Format(options.Service)
|
|
}
|
|
if len(options.Version) > 0 {
|
|
labels["version"] = client.Format(options.Version)
|
|
}
|
|
if len(options.Type) > 0 {
|
|
labels["micro"] = client.Format(options.Type)
|
|
}
|
|
|
|
// lookup all the serivces which match this query, if one service has two different versions,
|
|
// they'll be returned as two seperate resullts
|
|
return k.getServices(client.GetNamespace(options.Namespace), client.GetLabels(labels))
|
|
}
|
|
|
|
// Update a resource in place
|
|
func (k *kubernetes) Update(resource runtime.Resource, opts ...runtime.UpdateOption) error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
// parse the options
|
|
options := runtime.UpdateOptions{
|
|
Namespace: client.DefaultNamespace,
|
|
}
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
// Handle the various different types of resources:
|
|
switch resource.Type() {
|
|
case runtime.TypeNamespace:
|
|
// noop (Namespace is not supported by *kubernetes.Update())
|
|
return nil
|
|
case runtime.TypeNetworkPolicy:
|
|
// Assert the resource back into a *runtime.NetworkPolicy
|
|
networkPolicy, ok := resource.(*runtime.NetworkPolicy)
|
|
if !ok {
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
return k.updateNetworkPolicy(networkPolicy)
|
|
case runtime.TypeService:
|
|
|
|
// Assert the resource back into a *runtime.Service
|
|
s, ok := resource.(*runtime.Service)
|
|
if !ok {
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
|
|
// construct the query
|
|
labels := map[string]string{}
|
|
if len(s.Name) > 0 {
|
|
labels["name"] = client.Format(s.Name)
|
|
}
|
|
if len(s.Version) > 0 {
|
|
labels["version"] = client.Format(s.Version)
|
|
}
|
|
|
|
// get the existing deployments
|
|
depList := new(client.DeploymentList)
|
|
d := &client.Resource{
|
|
Kind: "deployment",
|
|
Value: depList,
|
|
}
|
|
depOpts := []client.GetOption{
|
|
client.GetNamespace(options.Namespace),
|
|
client.GetLabels(labels),
|
|
}
|
|
if err := k.client.Get(d, depOpts...); err != nil {
|
|
return err
|
|
} else if len(depList.Items) == 0 {
|
|
return runtime.ErrNotFound
|
|
}
|
|
|
|
// update the deployments which match the query
|
|
for _, dep := range depList.Items {
|
|
// the service wan't created by the k8s runtime
|
|
if dep.Metadata == nil || dep.Metadata.Annotations == nil {
|
|
continue
|
|
}
|
|
|
|
// update metadata
|
|
for k, v := range s.Metadata {
|
|
dep.Metadata.Annotations[k] = v
|
|
}
|
|
|
|
// update build time annotation
|
|
dep.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix())
|
|
|
|
// update the deployment
|
|
res := &client.Resource{
|
|
Kind: "deployment",
|
|
Name: resourceName(s),
|
|
Value: &dep,
|
|
}
|
|
if err := k.client.Update(res, client.UpdateNamespace(options.Namespace)); err != nil {
|
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
|
logger.Errorf("Runtime failed to update deployment: %v", err)
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
default:
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
}
|
|
|
|
// Delete removes a resource
|
|
func (k *kubernetes) Delete(resource runtime.Resource, opts ...runtime.DeleteOption) error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
options := runtime.DeleteOptions{
|
|
Namespace: client.DefaultNamespace,
|
|
}
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
// Handle the various different types of resources:
|
|
switch resource.Type() {
|
|
case runtime.TypeNamespace:
|
|
// Assert the resource back into a *runtime.Namespace
|
|
namespace, ok := resource.(*runtime.Namespace)
|
|
if !ok {
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
return k.deleteNamespace(namespace)
|
|
case runtime.TypeNetworkPolicy:
|
|
// Assert the resource back into a *runtime.NetworkPolicy
|
|
networkPolicy, ok := resource.(*runtime.NetworkPolicy)
|
|
if !ok {
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
return k.deleteNetworkPolicy(networkPolicy)
|
|
case runtime.TypeService:
|
|
|
|
// Assert the resource back into a *runtime.Service
|
|
s, ok := resource.(*runtime.Service)
|
|
if !ok {
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
|
|
// delete the deployment
|
|
dep := client.NewDeployment(s, &runtime.CreateOptions{
|
|
Type: k.options.Type,
|
|
Namespace: options.Namespace,
|
|
})
|
|
if err := k.client.Delete(dep, client.DeleteNamespace(options.Namespace)); err != nil {
|
|
if err == api.ErrNotFound {
|
|
return runtime.ErrNotFound
|
|
}
|
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
|
logger.Errorf("Runtime failed to delete deployment: %v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// delete the credentials
|
|
if err := k.deleteCredentials(s, &runtime.CreateOptions{Namespace: options.Namespace}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// if there are more deployments for this service, then don't delete it
|
|
labels := map[string]string{}
|
|
if len(s.Name) > 0 {
|
|
labels["name"] = client.Format(s.Name)
|
|
}
|
|
|
|
// get the existing services. todo: refactor to just get the deployments
|
|
services, err := k.getServices(client.GetNamespace(options.Namespace), client.GetLabels(labels))
|
|
if err != nil || len(services) > 0 {
|
|
return err
|
|
}
|
|
|
|
// delete the service
|
|
srv := client.NewService(s, &runtime.CreateOptions{
|
|
Type: k.options.Type,
|
|
Namespace: options.Namespace,
|
|
})
|
|
if err := k.client.Delete(srv, client.DeleteNamespace(options.Namespace)); err != nil {
|
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
|
logger.Errorf("Runtime failed to delete service: %v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
default:
|
|
return runtime.ErrInvalidResource
|
|
}
|
|
}
|
|
|
|
// Start starts the runtime
|
|
func (k *kubernetes) Start() error {
|
|
return nil
|
|
}
|
|
|
|
// Stop shuts down the runtime
|
|
func (k *kubernetes) Stop() error {
|
|
return nil
|
|
}
|
|
|
|
// String implements stringer interface
|
|
func (k *kubernetes) String() string {
|
|
return "kubernetes"
|
|
}
|
|
|
|
// NewRuntime creates new kubernetes runtime
|
|
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
|
|
// get default options
|
|
options := runtime.Options{
|
|
// Create labels with type "micro": "service"
|
|
Type: "service",
|
|
}
|
|
|
|
// apply requested options
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
// kubernetes client
|
|
client := client.NewClusterClient()
|
|
|
|
return &kubernetes{
|
|
options: options,
|
|
client: client,
|
|
}
|
|
}
|