291 lines
6.4 KiB
Go
291 lines
6.4 KiB
Go
// Package kubernetes implements kubernetes micro runtime
|
|
package kubernetes
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/micro/go-micro/runtime"
|
|
"github.com/micro/go-micro/runtime/kubernetes/client"
|
|
"github.com/micro/go-micro/util/log"
|
|
)
|
|
|
|
type kubernetes struct {
|
|
sync.RWMutex
|
|
// options configure runtime
|
|
options runtime.Options
|
|
// indicates if we're running
|
|
running bool
|
|
// used to start new services
|
|
start chan *runtime.Service
|
|
// used to stop the runtime
|
|
closed chan bool
|
|
// service tracks deployed services
|
|
services map[string]*runtime.Service
|
|
// client is kubernetes client
|
|
client client.Kubernetes
|
|
}
|
|
|
|
// NewRuntime creates new kubernetes runtime
|
|
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
|
|
// get default options
|
|
options := runtime.Options{}
|
|
|
|
// apply requested options
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
// kubernetes client
|
|
client := client.NewClientInCluster()
|
|
|
|
return &kubernetes{
|
|
options: options,
|
|
closed: make(chan bool),
|
|
start: make(chan *runtime.Service, 128),
|
|
services: make(map[string]*runtime.Service),
|
|
client: client,
|
|
}
|
|
}
|
|
|
|
// Init initializes runtime options
|
|
func (k *kubernetes) Init(opts ...runtime.Option) error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
for _, o := range opts {
|
|
o(&k.options)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Registers a service
|
|
func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
// TODO:
|
|
// * create service
|
|
// * create deployment
|
|
|
|
// NOTE: our services have micro- prefix
|
|
muName := strings.Split(s.Name, ".")
|
|
s.Name = "micro-" + muName[len(muName)-1]
|
|
|
|
// NOTE: we are tracking this in memory for now
|
|
if _, ok := k.services[s.Name]; ok {
|
|
return errors.New("service already registered")
|
|
}
|
|
|
|
var options runtime.CreateOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
// save service
|
|
k.services[s.Name] = s
|
|
// push into start queue
|
|
k.start <- k.services[s.Name]
|
|
|
|
return nil
|
|
}
|
|
|
|
// Remove a service
|
|
func (k *kubernetes) Delete(s *runtime.Service) error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
// TODO:
|
|
// * delete service
|
|
// * delete dpeloyment
|
|
|
|
// NOTE: we are tracking this in memory for now
|
|
if s, ok := k.services[s.Name]; ok {
|
|
delete(k.services, s.Name)
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Update the service in place
|
|
func (k *kubernetes) Update(s *runtime.Service) error {
|
|
type body struct {
|
|
Metadata *client.Metadata `json:"metadata"`
|
|
}
|
|
// parse version into human readable timestamp
|
|
updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
unixTimeUTC := time.Unix(updateTimeStamp, 0)
|
|
// metada which we will PATCH deployment with
|
|
reqBody := body{
|
|
Metadata: &client.Metadata{
|
|
Annotations: map[string]string{
|
|
"build": unixTimeUTC.Format(time.RFC3339),
|
|
},
|
|
},
|
|
}
|
|
return k.client.UpdateDeployment(s.Name, reqBody)
|
|
}
|
|
|
|
// List the managed services
|
|
func (k *kubernetes) List() ([]*runtime.Service, error) {
|
|
// TODO: this should list the k8s deployments
|
|
// but for now we return in-memory tracked services
|
|
services := make([]*runtime.Service, 0, len(k.services))
|
|
k.RLock()
|
|
defer k.RUnlock()
|
|
|
|
for _, service := range k.services {
|
|
services = append(services, service)
|
|
}
|
|
|
|
return services, nil
|
|
}
|
|
|
|
// run runs the runtime management loop
|
|
func (k *kubernetes) run(events <-chan runtime.Event) {
|
|
t := time.NewTicker(time.Second * 5)
|
|
defer t.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
// TODO: noop for now
|
|
// check running services
|
|
// * deployments exist
|
|
// * service is exposed
|
|
case service := <-k.start:
|
|
// TODO: following might have to be done
|
|
// * create a deployment
|
|
// * expose a service
|
|
log.Debugf("Runtime starting service: %s", service.Name)
|
|
case event := <-events:
|
|
// NOTE: we only handle Update events for now
|
|
log.Debugf("Runtime received notification event: %v", event)
|
|
switch event.Type {
|
|
case runtime.Update:
|
|
// parse returned response to timestamp
|
|
updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64)
|
|
if err != nil {
|
|
log.Debugf("Runtime error parsing update build time: %v", err)
|
|
continue
|
|
}
|
|
buildTime := time.Unix(updateTimeStamp, 0)
|
|
processEvent := func(event runtime.Event, service *runtime.Service) error {
|
|
buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
muBuild := time.Unix(buildTimeStamp, 0)
|
|
if buildTime.After(muBuild) {
|
|
version := fmt.Sprintf("%d", buildTime.Unix())
|
|
muService := &runtime.Service{
|
|
Name: service.Name,
|
|
Source: service.Source,
|
|
Path: service.Path,
|
|
Exec: service.Exec,
|
|
Version: version,
|
|
}
|
|
if err := k.Update(muService); err != nil {
|
|
return err
|
|
}
|
|
service.Version = version
|
|
}
|
|
return nil
|
|
}
|
|
k.Lock()
|
|
if len(event.Service) > 0 {
|
|
service, ok := k.services[event.Service]
|
|
if !ok {
|
|
log.Debugf("Runtime unknown service: %s", event.Service)
|
|
k.Unlock()
|
|
continue
|
|
}
|
|
if err := processEvent(event, service); err != nil {
|
|
log.Debugf("Runtime error updating service %s: %v", event.Service, err)
|
|
}
|
|
k.Unlock()
|
|
continue
|
|
}
|
|
// if blank service was received we update all services
|
|
for _, service := range k.services {
|
|
if err := processEvent(event, service); err != nil {
|
|
log.Debugf("Runtime error updating service %s: %v", service.Name, err)
|
|
}
|
|
}
|
|
k.Unlock()
|
|
}
|
|
case <-k.closed:
|
|
log.Debugf("Runtime stopped")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// starts the runtime
|
|
func (k *kubernetes) Start() error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
// already running
|
|
if k.running {
|
|
return nil
|
|
}
|
|
|
|
// set running
|
|
k.running = true
|
|
k.closed = make(chan bool)
|
|
|
|
var events <-chan runtime.Event
|
|
if k.options.Notifier != nil {
|
|
var err error
|
|
events, err = k.options.Notifier.Notify()
|
|
if err != nil {
|
|
// TODO: should we bail here?
|
|
log.Debugf("Runtime failed to start update notifier")
|
|
}
|
|
}
|
|
|
|
go k.run(events)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Shutdown the runtime
|
|
func (k *kubernetes) Stop() error {
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
if !k.running {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-k.closed:
|
|
return nil
|
|
default:
|
|
close(k.closed)
|
|
// set not running
|
|
k.running = false
|
|
// stop the notifier too
|
|
if k.options.Notifier != nil {
|
|
return k.options.Notifier.Close()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// String implements stringer interface
|
|
func (k *kubernetes) String() string {
|
|
return "kubernetes"
|
|
}
|