97c1300f53
* Add Get() and GetOptions. * Removed watcher. Outline of client. YAML templates * Added default service and deployment templates and types * Added API tests and cleaned up errors. * Small refactoring. Template package is no more. * Ripped out existing code in preparation to small rework * Reshuffled the source code to make it organized better * Create service and deployment in kubernetes runtime * Major cleanup and refactoring of Kubernetes runtime * Service now handles low level K8s API calls across both K8s deployment an service API objects * Runtime has a task queue that serves for queueing runtime action requests * General refactoring * No need for Lock in k8s service * Added kubernetes runtime env var to default deployment * Enable running different versions of the same service * Can't delete services through labels * Proto cruft. Added runtime.CreateOptions implementation in proto * Removed proxy service from default env variables * Make service name mandatory param to Get method * Get Delete changes from https://github.com/micro/go-micro/pull/945 * Replaced template files with global variables * Validate service names before sending K8s API request * Refactored Kubernetes API client. Fixed typos. * Added client.Resource to make API resources more explicit in code
365 lines
8.1 KiB
Go
365 lines
8.1 KiB
Go
// Package kubernetes implements kubernetes micro runtime
|
|
package kubernetes
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/micro/go-micro/runtime"
|
|
"github.com/micro/go-micro/runtime/kubernetes/client"
|
|
"github.com/micro/go-micro/util/log"
|
|
)
|
|
|
|
// action to take on runtime service
|
|
type action int
|
|
|
|
const (
|
|
start action = iota
|
|
update
|
|
stop
|
|
)
|
|
|
|
// task is queued into runtime queue
|
|
type task struct {
|
|
action action
|
|
service *service
|
|
}
|
|
|
|
type kubernetes struct {
|
|
sync.RWMutex
|
|
// options configure runtime
|
|
options runtime.Options
|
|
// indicates if we're running
|
|
running bool
|
|
// task queue for kubernetes services
|
|
queue chan *task
|
|
// used to stop the runtime
|
|
closed chan bool
|
|
// client is kubernetes client
|
|
client client.Kubernetes
|
|
}
|
|
|
|
// NewRuntime creates new kubernetes runtime
|
|
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
|
|
// get default options
|
|
options := runtime.Options{}
|
|
|
|
// apply requested options
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
// kubernetes client
|
|
client := client.NewClientInCluster()
|
|
|
|
return &kubernetes{
|
|
options: options,
|
|
closed: make(chan bool),
|
|
queue: make(chan *task, 128),
|
|
client: client,
|
|
}
|
|
}
|
|
|
|
// Init initializes runtime options
|
|
func (k *kubernetes) Init(opts ...runtime.Option) error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
for _, o := range opts {
|
|
o(&k.options)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Creates a service
|
|
func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
var options runtime.CreateOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
svcName := s.Name
|
|
if len(s.Version) > 0 {
|
|
svcName = strings.Join([]string{s.Name, s.Version}, "-")
|
|
}
|
|
|
|
if !client.ServiceRegexp.MatchString(svcName) {
|
|
return fmt.Errorf("invalid service name: %s", svcName)
|
|
}
|
|
|
|
// create new kubernetes micro service
|
|
service := newService(s, options)
|
|
|
|
log.Debugf("Runtime queueing service %s for start action", service.Name)
|
|
|
|
// push into start queue
|
|
k.queue <- &task{
|
|
action: start,
|
|
service: service,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get returns all instances of given service
|
|
func (k *kubernetes) Get(name string, opts ...runtime.GetOption) ([]*runtime.Service, error) {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
// if no name has been passed in, return error
|
|
if len(name) == 0 {
|
|
return nil, errors.New("missing service name")
|
|
}
|
|
|
|
// set the default label
|
|
labels := map[string]string{
|
|
"micro": "service",
|
|
"name": name,
|
|
}
|
|
var options runtime.GetOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
// add version to labels if a version has been supplied
|
|
if len(options.Version) > 0 {
|
|
labels["version"] = options.Version
|
|
}
|
|
|
|
log.Debugf("Runtime querying service %s", name)
|
|
|
|
serviceList := new(client.ServiceList)
|
|
r := &client.Resource{
|
|
Kind: "service",
|
|
Value: serviceList,
|
|
}
|
|
if err := k.client.Get(r, labels); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
services := make([]*runtime.Service, 0, len(serviceList.Items))
|
|
for _, kservice := range serviceList.Items {
|
|
service := &runtime.Service{
|
|
Name: kservice.Metadata.Name,
|
|
Version: kservice.Metadata.Version,
|
|
}
|
|
services = append(services, service)
|
|
}
|
|
|
|
return services, nil
|
|
}
|
|
|
|
// Update the service in place
|
|
func (k *kubernetes) Update(s *runtime.Service) error {
|
|
// parse version into human readable timestamp
|
|
updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
unixTimeUTC := time.Unix(updateTimeStamp, 0)
|
|
|
|
// create new kubernetes micro service
|
|
service := newService(s, runtime.CreateOptions{})
|
|
|
|
// update build time annotation
|
|
service.kdeploy.Spec.Template.Metadata.Annotations["build"] = unixTimeUTC.Format(time.RFC3339)
|
|
|
|
log.Debugf("Runtime queueing service %s for update action", service.Name)
|
|
|
|
// queue service for removal
|
|
k.queue <- &task{
|
|
action: update,
|
|
service: service,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Remove a service
|
|
func (k *kubernetes) Delete(s *runtime.Service) error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
// create new kubernetes micro service
|
|
service := newService(s, runtime.CreateOptions{})
|
|
|
|
log.Debugf("Runtime queueing service %s for delete action", service.Name)
|
|
|
|
// queue service for removal
|
|
k.queue <- &task{
|
|
action: stop,
|
|
service: service,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// List the managed services
|
|
func (k *kubernetes) List() ([]*runtime.Service, error) {
|
|
serviceList := new(client.ServiceList)
|
|
r := &client.Resource{
|
|
Kind: "service",
|
|
Value: serviceList,
|
|
}
|
|
|
|
if err := k.client.List(r); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Debugf("Runtime found %d micro services", len(serviceList.Items))
|
|
|
|
services := make([]*runtime.Service, 0, len(serviceList.Items))
|
|
|
|
for _, service := range serviceList.Items {
|
|
buildTime, err := time.Parse(time.RFC3339, service.Metadata.Annotations["build"])
|
|
if err != nil {
|
|
log.Debugf("Runtime error parsing build time for %s: %v", service.Metadata.Name, err)
|
|
continue
|
|
}
|
|
// add the service to the list of services
|
|
svc := &runtime.Service{
|
|
Name: service.Metadata.Name,
|
|
Version: fmt.Sprintf("%d", buildTime.Unix()),
|
|
}
|
|
services = append(services, svc)
|
|
}
|
|
|
|
return services, nil
|
|
}
|
|
|
|
// run runs the runtime management loop
|
|
func (k *kubernetes) run(events <-chan runtime.Event) {
|
|
t := time.NewTicker(time.Second * 10)
|
|
defer t.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
// TODO: figure out what to do here
|
|
// - do we even need the ticker for k8s services?
|
|
case task := <-k.queue:
|
|
switch task.action {
|
|
case start:
|
|
log.Debugf("Runtime starting new service: %s", task.service.Name)
|
|
if err := task.service.Start(k.client); err != nil {
|
|
log.Debugf("Runtime failed to start service %s: %v", task.service.Name, err)
|
|
continue
|
|
}
|
|
case stop:
|
|
log.Debugf("Runtime stopping service: %s", task.service.Name)
|
|
if err := task.service.Stop(k.client); err != nil {
|
|
log.Debugf("Runtime failed to stop service %s: %v", task.service.Name, err)
|
|
continue
|
|
}
|
|
case update:
|
|
log.Debugf("Runtime updating service: %s", task.service.Name)
|
|
if err := task.service.Update(k.client); err != nil {
|
|
log.Debugf("Runtime failed to update service %s: %v", task.service.Name, err)
|
|
continue
|
|
}
|
|
default:
|
|
log.Debugf("Runtime received unknown action for service: %s", task.service.Name)
|
|
}
|
|
case event := <-events:
|
|
// NOTE: we only handle Update events for now
|
|
log.Debugf("Runtime received notification event: %v", event)
|
|
switch event.Type {
|
|
case runtime.Update:
|
|
// parse returned response to timestamp
|
|
updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64)
|
|
if err != nil {
|
|
log.Debugf("Runtime error parsing update build time: %v", err)
|
|
continue
|
|
}
|
|
unixTimeUTC := time.Unix(updateTimeStamp, 0)
|
|
if len(event.Service) > 0 {
|
|
s := &runtime.Service{
|
|
Name: event.Service,
|
|
Version: event.Version,
|
|
}
|
|
// create new kubernetes micro service
|
|
service := newService(s, runtime.CreateOptions{})
|
|
// update build time annotation
|
|
service.kdeploy.Spec.Template.Metadata.Annotations["build"] = unixTimeUTC.Format(time.RFC3339)
|
|
|
|
log.Debugf("Runtime updating service: %s", service.Name)
|
|
if err := service.Update(k.client); err != nil {
|
|
log.Debugf("Runtime failed to update service %s: %v", service.Name, err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
case <-k.closed:
|
|
log.Debugf("Runtime stopped")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// starts the runtime
|
|
func (k *kubernetes) Start() error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
// already running
|
|
if k.running {
|
|
return nil
|
|
}
|
|
|
|
// set running
|
|
k.running = true
|
|
k.closed = make(chan bool)
|
|
|
|
var events <-chan runtime.Event
|
|
if k.options.Notifier != nil {
|
|
var err error
|
|
events, err = k.options.Notifier.Notify()
|
|
if err != nil {
|
|
// TODO: should we bail here?
|
|
log.Debugf("Runtime failed to start update notifier")
|
|
}
|
|
}
|
|
|
|
go k.run(events)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Shutdown the runtime
|
|
func (k *kubernetes) Stop() error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
if !k.running {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-k.closed:
|
|
return nil
|
|
default:
|
|
close(k.closed)
|
|
// set not running
|
|
k.running = false
|
|
// stop the notifier too
|
|
if k.options.Notifier != nil {
|
|
return k.options.Notifier.Close()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// String implements stringer interface
|
|
func (k *kubernetes) String() string {
|
|
return "kubernetes"
|
|
}
|