* First commit: outline of K8s runtime package * Added poller. Added auto-updater into default runtime * Added build and updated Poller interface * Added comments and NewRuntime that accepts Options * DefaultPoller; Runtime options * First commit to add Kubernetes cruft * Add comments * Add micro- prefix to K8s runtime service names * Get rid of import cycles. Move K8s runtime into main runtime package * Major refactoring: Poller replaced by Notifier POller has been replaced by Notifier which returns a channel of events that can be consumed and acted upon. * Added runtime configuration options * K8s runtime is now Kubernetes runtime in dedicated pkg. Naming kung-fu. * Fix typo in command. * Fixed typo * Dont Delete service when runtime stops. runtime.Stop stops services; no need to double-stop * Track runtime services * Parse Unix timestamps properly * Added deployments into K8s client. Debug logging
		
			
				
	
	
		
			291 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			291 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package kubernetes implements kubernetes micro runtime
 | |
| package kubernetes
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/micro/go-micro/runtime"
 | |
| 	"github.com/micro/go-micro/runtime/kubernetes/client"
 | |
| 	"github.com/micro/go-micro/util/log"
 | |
| )
 | |
| 
 | |
| type kubernetes struct {
 | |
| 	sync.RWMutex
 | |
| 	// options configure runtime
 | |
| 	options runtime.Options
 | |
| 	// indicates if we're running
 | |
| 	running bool
 | |
| 	// used to start new services
 | |
| 	start chan *runtime.Service
 | |
| 	// used to stop the runtime
 | |
| 	closed chan bool
 | |
| 	// service tracks deployed services
 | |
| 	services map[string]*runtime.Service
 | |
| 	// client is kubernetes client
 | |
| 	client client.Kubernetes
 | |
| }
 | |
| 
 | |
| // NewRuntime creates new kubernetes runtime
 | |
| func NewRuntime(opts ...runtime.Option) runtime.Runtime {
 | |
| 	// get default options
 | |
| 	options := runtime.Options{}
 | |
| 
 | |
| 	// apply requested options
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	// kubernetes client
 | |
| 	client := client.NewClientInCluster()
 | |
| 
 | |
| 	return &kubernetes{
 | |
| 		options:  options,
 | |
| 		closed:   make(chan bool),
 | |
| 		start:    make(chan *runtime.Service, 128),
 | |
| 		services: make(map[string]*runtime.Service),
 | |
| 		client:   client,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Init initializes runtime options
 | |
| func (k *kubernetes) Init(opts ...runtime.Option) error {
 | |
| 	k.Lock()
 | |
| 	defer k.Unlock()
 | |
| 
 | |
| 	for _, o := range opts {
 | |
| 		o(&k.options)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Registers a service
 | |
| func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
 | |
| 	k.Lock()
 | |
| 	defer k.Unlock()
 | |
| 
 | |
| 	// TODO:
 | |
| 	// * create service
 | |
| 	// * create deployment
 | |
| 
 | |
| 	// NOTE: our services have micro- prefix
 | |
| 	muName := strings.Split(s.Name, ".")
 | |
| 	s.Name = "micro-" + muName[len(muName)-1]
 | |
| 
 | |
| 	// NOTE: we are tracking this in memory for now
 | |
| 	if _, ok := k.services[s.Name]; ok {
 | |
| 		return errors.New("service already registered")
 | |
| 	}
 | |
| 
 | |
| 	var options runtime.CreateOptions
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	// save service
 | |
| 	k.services[s.Name] = s
 | |
| 	// push into start queue
 | |
| 	k.start <- k.services[s.Name]
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Remove a service
 | |
| func (k *kubernetes) Delete(s *runtime.Service) error {
 | |
| 	k.Lock()
 | |
| 	defer k.Unlock()
 | |
| 
 | |
| 	// TODO:
 | |
| 	// * delete service
 | |
| 	// * delete dpeloyment
 | |
| 
 | |
| 	// NOTE: we are tracking this in memory for now
 | |
| 	if s, ok := k.services[s.Name]; ok {
 | |
| 		delete(k.services, s.Name)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Update the service in place
 | |
| func (k *kubernetes) Update(s *runtime.Service) error {
 | |
| 	type body struct {
 | |
| 		Metadata *client.Metadata `json:"metadata"`
 | |
| 	}
 | |
| 	// parse version into human readable timestamp
 | |
| 	updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	unixTimeUTC := time.Unix(updateTimeStamp, 0)
 | |
| 	// metada which we will PATCH deployment with
 | |
| 	reqBody := body{
 | |
| 		Metadata: &client.Metadata{
 | |
| 			Annotations: map[string]string{
 | |
| 				"build": unixTimeUTC.Format(time.RFC3339),
 | |
| 			},
 | |
| 		},
 | |
| 	}
 | |
| 	return k.client.UpdateDeployment(s.Name, reqBody)
 | |
| }
 | |
| 
 | |
| // List the managed services
 | |
| func (k *kubernetes) List() ([]*runtime.Service, error) {
 | |
| 	// TODO: this should list the k8s deployments
 | |
| 	// but for now we return in-memory tracked services
 | |
| 	var services []*runtime.Service
 | |
| 	k.RLock()
 | |
| 	defer k.RUnlock()
 | |
| 
 | |
| 	for _, service := range k.services {
 | |
| 		services = append(services, service)
 | |
| 	}
 | |
| 
 | |
| 	return services, nil
 | |
| }
 | |
| 
 | |
| // run runs the runtime management loop
 | |
| func (k *kubernetes) run(events <-chan runtime.Event) {
 | |
| 	t := time.NewTicker(time.Second * 5)
 | |
| 	defer t.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-t.C:
 | |
| 			// TODO: noop for now
 | |
| 			// check running services
 | |
| 			// * deployments exist
 | |
| 			// * service is exposed
 | |
| 		case service := <-k.start:
 | |
| 			// TODO: following might have to be done
 | |
| 			// * create a deployment
 | |
| 			// * expose a service
 | |
| 			log.Debugf("Runtime starting service: %s", service.Name)
 | |
| 		case event := <-events:
 | |
| 			// NOTE: we only handle Update events for now
 | |
| 			log.Debugf("Runtime received notification event: %v", event)
 | |
| 			switch event.Type {
 | |
| 			case runtime.Update:
 | |
| 				// parse returned response to timestamp
 | |
| 				updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64)
 | |
| 				if err != nil {
 | |
| 					log.Debugf("Runtime error parsing update build time: %v", err)
 | |
| 					continue
 | |
| 				}
 | |
| 				buildTime := time.Unix(updateTimeStamp, 0)
 | |
| 				processEvent := func(event runtime.Event, service *runtime.Service) error {
 | |
| 					buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64)
 | |
| 					if err != nil {
 | |
| 						return err
 | |
| 					}
 | |
| 					muBuild := time.Unix(buildTimeStamp, 0)
 | |
| 					if buildTime.After(muBuild) {
 | |
| 						version := fmt.Sprintf("%d", buildTime.Unix())
 | |
| 						muService := &runtime.Service{
 | |
| 							Name:    service.Name,
 | |
| 							Source:  service.Source,
 | |
| 							Path:    service.Path,
 | |
| 							Exec:    service.Exec,
 | |
| 							Version: version,
 | |
| 						}
 | |
| 						if err := k.Update(muService); err != nil {
 | |
| 							return err
 | |
| 						}
 | |
| 						service.Version = version
 | |
| 					}
 | |
| 					return nil
 | |
| 				}
 | |
| 				k.Lock()
 | |
| 				if len(event.Service) > 0 {
 | |
| 					service, ok := k.services[event.Service]
 | |
| 					if !ok {
 | |
| 						log.Debugf("Runtime unknown service: %s", event.Service)
 | |
| 						k.Unlock()
 | |
| 						continue
 | |
| 					}
 | |
| 					if err := processEvent(event, service); err != nil {
 | |
| 						log.Debugf("Runtime error updating service %s: %v", event.Service, err)
 | |
| 					}
 | |
| 					k.Unlock()
 | |
| 					continue
 | |
| 				}
 | |
| 				// if blank service was received we update all services
 | |
| 				for _, service := range k.services {
 | |
| 					if err := processEvent(event, service); err != nil {
 | |
| 						log.Debugf("Runtime error updating service %s: %v", service.Name, err)
 | |
| 					}
 | |
| 				}
 | |
| 				k.Unlock()
 | |
| 			}
 | |
| 		case <-k.closed:
 | |
| 			log.Debugf("Runtime stopped")
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // starts the runtime
 | |
| func (k *kubernetes) Start() error {
 | |
| 	k.Lock()
 | |
| 	defer k.Unlock()
 | |
| 
 | |
| 	// already running
 | |
| 	if k.running {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// set running
 | |
| 	k.running = true
 | |
| 	k.closed = make(chan bool)
 | |
| 
 | |
| 	var events <-chan runtime.Event
 | |
| 	if k.options.Notifier != nil {
 | |
| 		var err error
 | |
| 		events, err = k.options.Notifier.Notify()
 | |
| 		if err != nil {
 | |
| 			// TODO: should we bail here?
 | |
| 			log.Debugf("Runtime failed to start update notifier")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	go k.run(events)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Shutdown the runtime
 | |
| func (k *kubernetes) Stop() error {
 | |
| 	k.Lock()
 | |
| 	defer k.Unlock()
 | |
| 
 | |
| 	if !k.running {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case <-k.closed:
 | |
| 		return nil
 | |
| 	default:
 | |
| 		close(k.closed)
 | |
| 		// set not running
 | |
| 		k.running = false
 | |
| 		// stop the notifier too
 | |
| 		if k.options.Notifier != nil {
 | |
| 			return k.options.Notifier.Close()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // String implements stringer interface
 | |
| func (k *kubernetes) String() string {
 | |
| 	return "kubernetes"
 | |
| }
 |