[WIP] K8s update and runtime package changes (#895)

* First commit: outline of K8s runtime package

* Added poller. Added auto-updater into default runtime

* Added build and updated Poller interface

* Added comments and NewRuntime that accepts Options

* DefaultPoller; Runtime options

* First commit to add Kubernetes cruft

* Add comments

* Add micro- prefix to K8s runtime service names

* Get rid of import cycles. Move K8s runtime into main runtime package

* Major refactoring: Poller replaced by Notifier

POller has been replaced by Notifier which returns a channel of events
that can be consumed and acted upon.

* Added runtime configuration options

* K8s runtime is now Kubernetes runtime in dedicated pkg. Naming kung-fu.

* Fix typo in command.

* Fixed typo

* Dont Delete service when runtime stops.

runtime.Stop stops services; no need to double-stop

* Track runtime services

* Parse Unix timestamps properly

* Added deployments into K8s client. Debug logging
This commit is contained in:
Milos Gajdos 2019-11-02 13:25:10 +00:00 committed by Asim Aslam
parent a94a95ab55
commit 6f7702a093
15 changed files with 1357 additions and 184 deletions

View File

@ -44,6 +44,10 @@ import (
thttp "github.com/micro/go-micro/transport/http"
tmem "github.com/micro/go-micro/transport/memory"
"github.com/micro/go-micro/transport/quic"
// runtimes
"github.com/micro/go-micro/runtime"
"github.com/micro/go-micro/runtime/kubernetes"
)
type Cmd interface {
@ -67,6 +71,12 @@ var (
DefaultCmd = newCmd()
DefaultFlags = []cli.Flag{
cli.StringFlag{
Name: "runtime",
Usage: "Micro runtime",
EnvVar: "MICRO_RUNTIME",
Value: "local",
},
cli.StringFlag{
Name: "client",
EnvVar: "MICRO_CLIENT",
@ -221,6 +231,11 @@ var (
"quic": quic.NewTransport,
}
DefaultRuntimes = map[string]func(...runtime.Option) runtime.Runtime{
"local": runtime.NewRuntime,
"kubernetes": kubernetes.NewRuntime,
}
// used for default selection as the fall back
defaultClient = "rpc"
defaultServer = "rpc"
@ -228,6 +243,7 @@ var (
defaultRegistry = "mdns"
defaultSelector = "registry"
defaultTransport = "http"
defaultRuntime = "local"
)
func init() {
@ -247,6 +263,7 @@ func newCmd(opts ...Option) Cmd {
Server: &server.DefaultServer,
Selector: &selector.DefaultSelector,
Transport: &transport.DefaultTransport,
Runtime: &runtime.DefaultRuntime,
Brokers: DefaultBrokers,
Clients: DefaultClients,
@ -254,6 +271,7 @@ func newCmd(opts ...Option) Cmd {
Selectors: DefaultSelectors,
Servers: DefaultServers,
Transports: DefaultTransports,
Runtimes: DefaultRuntimes,
}
for _, o := range opts {
@ -294,6 +312,16 @@ func (c *cmd) Before(ctx *cli.Context) error {
var serverOpts []server.Option
var clientOpts []client.Option
// Set the runtime
if name := ctx.String("runtime"); len(name) > 0 {
r, ok := c.opts.Runtimes[name]
if !ok {
return fmt.Errorf("Unsupported runtime: %s", name)
}
*c.opts.Runtime = r()
}
// Set the client
if name := ctx.String("client"); len(name) > 0 {
// only change if we have the client and type differs

View File

@ -7,6 +7,7 @@ import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/runtime"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
)
@ -24,6 +25,7 @@ type Options struct {
Transport *transport.Transport
Client *client.Client
Server *server.Server
Runtime *runtime.Runtime
Brokers map[string]func(...broker.Option) broker.Broker
Clients map[string]func(...client.Option) client.Client
@ -31,6 +33,7 @@ type Options struct {
Selectors map[string]func(...selector.Option) selector.Selector
Servers map[string]func(...server.Option) server.Server
Transports map[string]func(...transport.Option) transport.Transport
Runtimes map[string]func(...runtime.Option) runtime.Runtime
// Other options for implementations of the interface
// can be stored in a context
@ -135,3 +138,10 @@ func NewTransport(name string, t func(...transport.Option) transport.Transport)
o.Transports[name] = t
}
}
// New runtime func
func NewRuntime(name string, r func(...runtime.Option) runtime.Runtime) Option {
return func(o *Options) {
o.Runtimes[name] = r
}
}

View File

@ -2,19 +2,18 @@ package runtime
import (
"errors"
"io"
"strings"
"fmt"
"strconv"
"sync"
"time"
"github.com/micro/go-micro/runtime/package"
"github.com/micro/go-micro/runtime/process"
proc "github.com/micro/go-micro/runtime/process/os"
"github.com/micro/go-micro/util/log"
)
type runtime struct {
sync.RWMutex
// options configure runtime
options Options
// used to stop the runtime
closed chan bool
// used to start new services
@ -25,162 +24,38 @@ type runtime struct {
services map[string]*service
}
type service struct {
sync.RWMutex
// NewRuntime creates new local runtime and returns it
func NewRuntime(opts ...Option) Runtime {
// get default options
options := Options{}
running bool
closed chan bool
err error
// apply requested options
for _, o := range opts {
o(&options)
}
// output for logs
output io.Writer
// service to manage
*Service
// process creator
Process *proc.Process
// Exec
Exec *process.Executable
// process pid
PID *process.PID
}
func newRuntime() *runtime {
return &runtime{
options: options,
closed: make(chan bool),
start: make(chan *service, 128),
services: make(map[string]*service),
}
}
func newService(s *Service, c CreateOptions) *service {
var exec string
var args []string
// Init initializes runtime options
func (r *runtime) Init(opts ...Option) error {
r.Lock()
defer r.Unlock()
if len(s.Exec) > 0 {
parts := strings.Split(s.Exec, " ")
exec = parts[0]
args = []string{}
if len(parts) > 1 {
args = parts[1:]
}
} else {
// set command
exec = c.Command[0]
// set args
if len(c.Command) > 1 {
args = c.Command[1:]
}
}
return &service{
Service: s,
Process: new(proc.Process),
Exec: &process.Executable{
Binary: &packager.Binary{
Name: s.Name,
Path: exec,
},
Env: c.Env,
Args: args,
},
closed: make(chan bool),
output: c.Output,
}
}
func (s *service) streamOutput() {
go io.Copy(s.output, s.PID.Output)
go io.Copy(s.output, s.PID.Error)
}
func (s *service) Running() bool {
s.RLock()
defer s.RUnlock()
return s.running
}
func (s *service) Start() error {
s.Lock()
defer s.Unlock()
if s.running {
return nil
}
// reset
s.err = nil
s.closed = make(chan bool)
// TODO: pull source & build binary
log.Debugf("Runtime service %s forking new process\n", s.Service.Name)
p, err := s.Process.Fork(s.Exec)
if err != nil {
return err
}
// set the pid
s.PID = p
// set to running
s.running = true
if s.output != nil {
s.streamOutput()
}
// wait and watch
go s.Wait()
return nil
}
func (s *service) Stop() error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
return nil
default:
close(s.closed)
s.running = false
if s.PID == nil {
return nil
}
return s.Process.Kill(s.PID)
for _, o := range opts {
o(&r.options)
}
return nil
}
func (s *service) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
func (s *service) Wait() {
// wait for process to exit
err := s.Process.Wait(s.PID)
s.Lock()
defer s.Unlock()
// save the error
if err != nil {
s.err = err
}
// no longer running
s.running = false
}
func (r *runtime) run() {
r.RLock()
closed := r.closed
r.RUnlock()
// run runs the runtime management loop
func (r *runtime) run(events <-chan Event) {
t := time.NewTicker(time.Second * 5)
defer t.Stop()
@ -205,19 +80,67 @@ func (r *runtime) run() {
if service.Running() {
continue
}
// TODO: check service error
log.Debugf("Starting %s", service.Name)
log.Debugf("Runtime starting service %s", service.Name)
if err := service.Start(); err != nil {
log.Debugf("Runtime error starting %s: %v", service.Name, err)
log.Debugf("Runtime error starting service %s: %v", service.Name, err)
}
case <-closed:
// TODO: stop all the things
case event := <-events:
log.Debugf("Runtime received notification event: %v", event)
// NOTE: we only handle Update events for now
switch event.Type {
case Update:
// parse returned response to timestamp
updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64)
if err != nil {
log.Debugf("Runtime error parsing update build time: %v", err)
continue
}
buildTime := time.Unix(updateTimeStamp, 0)
processEvent := func(event Event, service *Service) error {
buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64)
if err != nil {
return err
}
muBuild := time.Unix(buildTimeStamp, 0)
if buildTime.After(muBuild) {
if err := r.Update(service); err != nil {
return err
}
service.Version = fmt.Sprintf("%d", buildTime.Unix())
}
return nil
}
r.Lock()
if len(event.Service) > 0 {
service, ok := r.services[event.Service]
if !ok {
log.Debugf("Runtime unknown service: %s", event.Service)
r.Unlock()
continue
}
if err := processEvent(event, service.Service); err != nil {
log.Debugf("Runtime error updating service %s: %v", event.Service, err)
}
r.Unlock()
continue
}
// if blank service was received we update all services
for _, service := range r.services {
if err := processEvent(event, service.Service); err != nil {
log.Debugf("Runtime error updating service %s: %v", service.Name, err)
}
}
r.Unlock()
}
case <-r.closed:
log.Debugf("Runtime stopped.")
return
}
}
}
// Create creates a new service which is then started by runtime
func (r *runtime) Create(s *Service, opts ...CreateOption) error {
r.Lock()
defer r.Unlock()
@ -244,6 +167,7 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error {
return nil
}
// Delete removes the service from the runtime and stops it
func (r *runtime) Delete(s *Service) error {
r.Lock()
defer r.Unlock()
@ -256,6 +180,7 @@ func (r *runtime) Delete(s *Service) error {
return nil
}
// Update attemps to update the service
func (r *runtime) Update(s *Service) error {
// delete the service
if err := r.Delete(s); err != nil {
@ -266,6 +191,7 @@ func (r *runtime) Update(s *Service) error {
return r.Create(s)
}
// List returns a slice of all services tracked by the runtime
func (r *runtime) List() ([]*Service, error) {
var services []*Service
r.RLock()
@ -278,6 +204,7 @@ func (r *runtime) List() ([]*Service, error) {
return services, nil
}
// Start starts the runtime
func (r *runtime) Start() error {
r.Lock()
defer r.Unlock()
@ -291,11 +218,22 @@ func (r *runtime) Start() error {
r.running = true
r.closed = make(chan bool)
go r.run()
var events <-chan Event
if r.options.Notifier != nil {
var err error
events, err = r.options.Notifier.Notify()
if err != nil {
// TODO: should we bail here?
log.Debugf("Runtime failed to start update notifier")
}
}
go r.run(events)
return nil
}
// Stop stops the runtime
func (r *runtime) Stop() error {
r.Lock()
defer r.Unlock()
@ -318,7 +256,16 @@ func (r *runtime) Stop() error {
log.Debugf("Runtime stopping %s", service.Name)
service.Stop()
}
// stop the notifier too
if r.options.Notifier != nil {
return r.options.Notifier.Close()
}
}
return nil
}
// String implements stringer interface
func (r *runtime) String() string {
return "local"
}

View File

@ -0,0 +1,223 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"github.com/micro/go-micro/runtime/kubernetes/client/watch"
"github.com/micro/go-micro/util/log"
)
// Request is used to construct a http request for the k8s API.
type Request struct {
client *http.Client
header http.Header
params url.Values
method string
host string
namespace string
resource string
resourceName *string
body io.Reader
err error
}
// Params is the object to pass in to set paramaters
// on a request.
type Params struct {
LabelSelector map[string]string
Annotations map[string]string
Watch bool
}
// verb sets method
func (r *Request) verb(method string) *Request {
r.method = method
return r
}
// Get request
func (r *Request) Get() *Request {
return r.verb("GET")
}
// Post request
func (r *Request) Post() *Request {
return r.verb("POST")
}
// Put request
func (r *Request) Put() *Request {
return r.verb("PUT")
}
// Patch request
// https://github.com/kubernetes/kubernetes/blob/master/docs/devel/api-conventions.md#patch-operations
func (r *Request) Patch() *Request {
return r.verb("PATCH").SetHeader("Content-Type", "application/strategic-merge-patch+json")
}
// Delete request
func (r *Request) Delete() *Request {
return r.verb("DELETE")
}
// Namespace is to set the namespace to operate on
func (r *Request) Namespace(s string) *Request {
r.namespace = s
return r
}
// Resource is the type of resource the operation is
// for, such as "services", "endpoints" or "pods"
func (r *Request) Resource(s string) *Request {
r.resource = s
return r
}
// Name is for targeting a specific resource by id
func (r *Request) Name(s string) *Request {
r.resourceName = &s
return r
}
// Body pass in a body to set, this is for POST, PUT
// and PATCH requests
func (r *Request) Body(in interface{}) *Request {
b := new(bytes.Buffer)
if err := json.NewEncoder(b).Encode(&in); err != nil {
r.err = err
return r
}
log.Debugf("Patch body: %v", b)
r.body = b
return r
}
// Params isused to set paramters on a request
func (r *Request) Params(p *Params) *Request {
for k, v := range p.LabelSelector {
r.params.Add("labelSelectors", k+"="+v)
}
return r
}
// SetHeader sets a header on a request with
// a `key` and `value`
func (r *Request) SetHeader(key, value string) *Request {
r.header.Add(key, value)
return r
}
// request builds the http.Request from the options
func (r *Request) request() (*http.Request, error) {
var url string
switch r.resource {
case "pods":
// /api/v1/namespaces/{namespace}/pods
url = fmt.Sprintf("%s/api/v1/namespaces/%s/%s/", r.host, r.namespace, r.resource)
case "deployments":
// /apis/apps/v1/namespaces/{namespace}/deployments/{name}
url = fmt.Sprintf("%s/apis/apps/v1/namespaces/%s/%s/", r.host, r.namespace, r.resource)
}
// append resourceName if it is present
if r.resourceName != nil {
url += *r.resourceName
}
// append any query params
if len(r.params) > 0 {
url += "?" + r.params.Encode()
}
// build request
req, err := http.NewRequest(r.method, url, r.body)
if err != nil {
return nil, err
}
// set headers on request
req.Header = r.header
return req, nil
}
// Do builds and triggers the request
func (r *Request) Do() *Response {
if r.err != nil {
return &Response{
err: r.err,
}
}
req, err := r.request()
if err != nil {
return &Response{
err: err,
}
}
log.Debugf("kubernetes api request: %v", req)
res, err := r.client.Do(req)
if err != nil {
return &Response{
err: err,
}
}
log.Debugf("kubernetes api response: %v", res)
// return res, err
return newResponse(res, err)
}
// Watch builds and triggers the request, but
// will watch instead of return an object
func (r *Request) Watch() (watch.Watch, error) {
if r.err != nil {
return nil, r.err
}
r.params.Set("watch", "true")
req, err := r.request()
if err != nil {
return nil, err
}
w, err := watch.NewBodyWatcher(req, r.client)
return w, err
}
// Options ...
type Options struct {
Host string
Namespace string
BearerToken *string
Client *http.Client
}
// NewRequest creates a k8s api request
func NewRequest(opts *Options) *Request {
req := &Request{
header: make(http.Header),
params: make(url.Values),
client: opts.Client,
namespace: opts.Namespace,
host: opts.Host,
}
if opts.BearerToken != nil {
req.SetHeader("Authorization", "Bearer "+*opts.BearerToken)
}
return req
}

View File

@ -0,0 +1,94 @@
package api
import (
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"github.com/micro/go-micro/util/log"
)
// Errors ...
var (
ErrNotFound = errors.New("kubernetes: not found")
ErrDecode = errors.New("kubernetes: error decoding")
ErrOther = errors.New("kubernetes: unknown error")
)
// Status is an object that is returned when a request
// failed or delete succeeded.
// type Status struct {
// Kind string `json:"kind"`
// Status string `json:"status"`
// Message string `json:"message"`
// Reason string `json:"reason"`
// Code int `json:"code"`
// }
// Response ...
type Response struct {
res *http.Response
err error
body []byte
}
// Error returns an error
func (r *Response) Error() error {
return r.err
}
// StatusCode returns status code for response
func (r *Response) StatusCode() int {
return r.res.StatusCode
}
// Into decode body into `data`
func (r *Response) Into(data interface{}) error {
if r.err != nil {
return r.err
}
defer r.res.Body.Close()
decoder := json.NewDecoder(r.res.Body)
err := decoder.Decode(&data)
if err != nil {
return ErrDecode
}
return r.err
}
func newResponse(res *http.Response, err error) *Response {
r := &Response{
res: res,
err: err,
}
if err != nil {
return r
}
if r.res.StatusCode == http.StatusOK ||
r.res.StatusCode == http.StatusCreated ||
r.res.StatusCode == http.StatusNoContent {
// Non error status code
return r
}
if r.res.StatusCode == http.StatusNotFound {
r.err = ErrNotFound
return r
}
log.Logf("kubernetes: request failed with code %v", r.res.StatusCode)
b, err := ioutil.ReadAll(r.res.Body)
if err == nil {
log.Log("kubernetes: request failed with body:")
log.Log(string(b))
}
r.err = ErrOther
return r
}

View File

@ -0,0 +1,102 @@
package client
import (
"crypto/tls"
"errors"
"io/ioutil"
"net/http"
"os"
"path"
"github.com/micro/go-micro/runtime/kubernetes/client/api"
"github.com/micro/go-micro/util/log"
)
var (
serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount"
// ErrReadNamespace is returned when the names could not be read from service account
ErrReadNamespace = errors.New("Could not read namespace from service account secret")
)
// Client ...
type client struct {
opts *api.Options
}
// NewClientInCluster should work similarily to the official api
// NewInClient by setting up a client configuration for use within
// a k8s pod.
func NewClientInCluster() *client {
host := "https://" + os.Getenv("KUBERNETES_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_SERVICE_PORT")
s, err := os.Stat(serviceAccountPath)
if err != nil {
log.Fatal(err)
}
if s == nil || !s.IsDir() {
log.Fatal(errors.New("no k8s service account found"))
}
token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token"))
if err != nil {
log.Fatal(err)
}
t := string(token)
ns, err := detectNamespace()
if err != nil {
log.Fatal(err)
}
crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt"))
if err != nil {
log.Fatal(err)
}
c := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: crt,
},
DisableCompression: true,
},
}
return &client{
opts: &api.Options{
Client: c,
Host: host,
Namespace: ns,
BearerToken: &t,
},
}
}
func detectNamespace() (string, error) {
nsPath := path.Join(serviceAccountPath, "namespace")
// Make sure it's a file and we can read it
if s, e := os.Stat(nsPath); e != nil {
return "", e
} else if s.IsDir() {
return "", ErrReadNamespace
}
// Read the file, and cast to a string
if ns, e := ioutil.ReadFile(nsPath); e != nil {
return string(ns), e
} else {
return string(ns), nil
}
}
// UpdateDeployment
func (c *client) UpdateDeployment(name string, body interface{}) error {
return api.NewRequest(c.opts).
Patch().
Resource("deployments").
Name(name).
Body(body).
Do().
Error()
}

View File

@ -0,0 +1,12 @@
package client
// Kubernetes client
type Kubernetes interface {
// UpdateDeployment patches deployment annotations with new metadata
UpdateDeployment(string, interface{}) error
}
// Metadata defines api request metadata
type Metadata struct {
Annotations map[string]string `json:"annotations,omitempty"`
}

View File

@ -0,0 +1,74 @@
package client
import (
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
)
// COPIED FROM
// https://github.com/kubernetes/kubernetes/blob/7a725418af4661067b56506faabc2d44c6d7703a/pkg/util/crypto/crypto.go
// CertPoolFromFile returns an x509.CertPool containing the certificates in the given PEM-encoded file.
// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates
func CertPoolFromFile(filename string) (*x509.CertPool, error) {
certs, err := certificatesFromFile(filename)
if err != nil {
return nil, err
}
pool := x509.NewCertPool()
for _, cert := range certs {
pool.AddCert(cert)
}
return pool, nil
}
// certificatesFromFile returns the x509.Certificates contained in the given PEM-encoded file.
// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates
func certificatesFromFile(file string) ([]*x509.Certificate, error) {
if len(file) == 0 {
return nil, errors.New("error reading certificates from an empty filename")
}
pemBlock, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
certs, err := CertsFromPEM(pemBlock)
if err != nil {
return nil, fmt.Errorf("error reading %s: %s", file, err)
}
return certs, nil
}
// CertsFromPEM returns the x509.Certificates contained in the given PEM-encoded byte array
// Returns an error if a certificate could not be parsed, or if the data does not contain any certificates
func CertsFromPEM(pemCerts []byte) ([]*x509.Certificate, error) {
ok := false
certs := []*x509.Certificate{}
for len(pemCerts) > 0 {
var block *pem.Block
block, pemCerts = pem.Decode(pemCerts)
if block == nil {
break
}
// Only use PEM "CERTIFICATE" blocks without extra headers
if block.Type != "CERTIFICATE" || len(block.Headers) != 0 {
continue
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return certs, err
}
certs = append(certs, cert)
ok = true
}
if !ok {
return certs, errors.New("could not read any certificates")
}
return certs, nil
}

View File

@ -0,0 +1,92 @@
package watch
import (
"bufio"
"encoding/json"
"net/http"
"time"
)
// bodyWatcher scans the body of a request for chunks
type bodyWatcher struct {
results chan Event
stop chan struct{}
res *http.Response
req *http.Request
}
// Changes returns the results channel
func (wr *bodyWatcher) ResultChan() <-chan Event {
return wr.results
}
// Stop cancels the request
func (wr *bodyWatcher) Stop() {
select {
case <-wr.stop:
return
default:
close(wr.stop)
close(wr.results)
}
}
func (wr *bodyWatcher) stream() {
reader := bufio.NewReader(wr.res.Body)
// ignore first few messages from stream,
// as they are usually old.
ignore := true
go func() {
<-time.After(time.Second)
ignore = false
}()
go func() {
// stop the watcher
defer wr.Stop()
for {
// read a line
b, err := reader.ReadBytes('\n')
if err != nil {
return
}
// ignore for the first second
if ignore {
continue
}
// send the event
var event Event
if err := json.Unmarshal(b, &event); err != nil {
continue
}
wr.results <- event
}
}()
}
// NewBodyWatcher creates a k8s body watcher for
// a given http request
func NewBodyWatcher(req *http.Request, client *http.Client) (Watch, error) {
stop := make(chan struct{})
req.Cancel = stop
res, err := client.Do(req)
if err != nil {
return nil, err
}
wr := &bodyWatcher{
results: make(chan Event),
stop: stop,
req: req,
res: res,
}
go wr.stream()
return wr, nil
}

View File

@ -0,0 +1,26 @@
package watch
import "encoding/json"
// Watch ...
type Watch interface {
Stop()
ResultChan() <-chan Event
}
// EventType defines the possible types of events.
type EventType string
// EventTypes used
const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Error EventType = "ERROR"
)
// Event represents a single event to a watched resource.
type Event struct {
Type EventType `json:"type"`
Object json.RawMessage `json:"object"`
}

View File

@ -0,0 +1,71 @@
package watch
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
)
var actions = []string{
`{"type": "create", "object":{"foo": "bar"}}`,
`{"type": "delete", INVALID}`,
`{"type": "update", "object":{"foo": {"foo": "bar"}}}`,
`{"type": "delete", "object":null}`,
}
func TestBodyWatcher(t *testing.T) {
// set up server with handler to flush strings from ch.
ch := make(chan string)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
t.Fatal("expected ResponseWriter to be a flusher")
}
fmt.Fprintf(w, "\n")
flusher.Flush()
for v := range ch {
fmt.Fprintf(w, "%s\n", v)
flusher.Flush()
time.Sleep(10 * time.Millisecond)
}
}))
defer ts.Close()
req, err := http.NewRequest("GET", ts.URL, nil)
if err != nil {
t.Fatalf("did not expect NewRequest to return err: %v", err)
}
// setup body watcher
w, err := NewBodyWatcher(req, http.DefaultClient)
if err != nil {
t.Fatalf("did not expect NewBodyWatcher to return %v", err)
}
<-time.After(time.Second)
// send action strings in, and expect result back
ch <- actions[0]
if r := <-w.ResultChan(); r.Type != "create" {
t.Fatalf("expected result to be create")
}
ch <- actions[1] // should be ignored as its invalid json
ch <- actions[2]
if r := <-w.ResultChan(); r.Type != "update" {
t.Fatalf("expected result to be update")
}
ch <- actions[3]
if r := <-w.ResultChan(); r.Type != "delete" {
t.Fatalf("expected result to be delete")
}
// stop should clean up all channels.
w.Stop()
close(ch)
}

View File

@ -0,0 +1,290 @@
// Package kubernetes implements kubernetes micro runtime
package kubernetes
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/micro/go-micro/runtime"
"github.com/micro/go-micro/runtime/kubernetes/client"
"github.com/micro/go-micro/util/log"
)
type kubernetes struct {
sync.RWMutex
// options configure runtime
options runtime.Options
// indicates if we're running
running bool
// used to start new services
start chan *runtime.Service
// used to stop the runtime
closed chan bool
// service tracks deployed services
services map[string]*runtime.Service
// client is kubernetes client
client client.Kubernetes
}
// NewRuntime creates new kubernetes runtime
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
// get default options
options := runtime.Options{}
// apply requested options
for _, o := range opts {
o(&options)
}
// kubernetes client
client := client.NewClientInCluster()
return &kubernetes{
options: options,
closed: make(chan bool),
start: make(chan *runtime.Service, 128),
services: make(map[string]*runtime.Service),
client: client,
}
}
// Init initializes runtime options
func (k *kubernetes) Init(opts ...runtime.Option) error {
k.Lock()
defer k.Unlock()
for _, o := range opts {
o(&k.options)
}
return nil
}
// Registers a service
func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
k.Lock()
defer k.Unlock()
// TODO:
// * create service
// * create deployment
// NOTE: our services have micro- prefix
muName := strings.Split(s.Name, ".")
s.Name = "micro-" + muName[len(muName)-1]
// NOTE: we are tracking this in memory for now
if _, ok := k.services[s.Name]; ok {
return errors.New("service already registered")
}
var options runtime.CreateOptions
for _, o := range opts {
o(&options)
}
// save service
k.services[s.Name] = s
// push into start queue
k.start <- k.services[s.Name]
return nil
}
// Remove a service
func (k *kubernetes) Delete(s *runtime.Service) error {
k.Lock()
defer k.Unlock()
// TODO:
// * delete service
// * delete dpeloyment
// NOTE: we are tracking this in memory for now
if s, ok := k.services[s.Name]; ok {
delete(k.services, s.Name)
return nil
}
return nil
}
// Update the service in place
func (k *kubernetes) Update(s *runtime.Service) error {
type body struct {
Metadata *client.Metadata `json:"metadata"`
}
// parse version into human readable timestamp
updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64)
if err != nil {
return err
}
unixTimeUTC := time.Unix(updateTimeStamp, 0)
// metada which we will PATCH deployment with
reqBody := body{
Metadata: &client.Metadata{
Annotations: map[string]string{
"build": unixTimeUTC.Format(time.RFC3339),
},
},
}
return k.client.UpdateDeployment(s.Name, reqBody)
}
// List the managed services
func (k *kubernetes) List() ([]*runtime.Service, error) {
// TODO: this should list the k8s deployments
// but for now we return in-memory tracked services
var services []*runtime.Service
k.RLock()
defer k.RUnlock()
for _, service := range k.services {
services = append(services, service)
}
return services, nil
}
// run runs the runtime management loop
func (k *kubernetes) run(events <-chan runtime.Event) {
t := time.NewTicker(time.Second * 5)
defer t.Stop()
for {
select {
case <-t.C:
// TODO: noop for now
// check running services
// * deployments exist
// * service is exposed
case service := <-k.start:
// TODO: following might have to be done
// * create a deployment
// * expose a service
log.Debugf("Runtime starting service: %s", service.Name)
case event := <-events:
// NOTE: we only handle Update events for now
log.Debugf("Runtime received notification event: %v", event)
switch event.Type {
case runtime.Update:
// parse returned response to timestamp
updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64)
if err != nil {
log.Debugf("Runtime error parsing update build time: %v", err)
continue
}
buildTime := time.Unix(updateTimeStamp, 0)
processEvent := func(event runtime.Event, service *runtime.Service) error {
buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64)
if err != nil {
return err
}
muBuild := time.Unix(buildTimeStamp, 0)
if buildTime.After(muBuild) {
version := fmt.Sprintf("%d", buildTime.Unix())
muService := &runtime.Service{
Name: service.Name,
Source: service.Source,
Path: service.Path,
Exec: service.Exec,
Version: version,
}
if err := k.Update(muService); err != nil {
return err
}
service.Version = version
}
return nil
}
k.Lock()
if len(event.Service) > 0 {
service, ok := k.services[event.Service]
if !ok {
log.Debugf("Runtime unknown service: %s", event.Service)
k.Unlock()
continue
}
if err := processEvent(event, service); err != nil {
log.Debugf("Runtime error updating service %s: %v", event.Service, err)
}
k.Unlock()
continue
}
// if blank service was received we update all services
for _, service := range k.services {
if err := processEvent(event, service); err != nil {
log.Debugf("Runtime error updating service %s: %v", service.Name, err)
}
}
k.Unlock()
}
case <-k.closed:
log.Debugf("Runtime stopped")
return
}
}
}
// starts the runtime
func (k *kubernetes) Start() error {
k.Lock()
defer k.Unlock()
// already running
if k.running {
return nil
}
// set running
k.running = true
k.closed = make(chan bool)
var events <-chan runtime.Event
if k.options.Notifier != nil {
var err error
events, err = k.options.Notifier.Notify()
if err != nil {
// TODO: should we bail here?
log.Debugf("Runtime failed to start update notifier")
}
}
go k.run(events)
return nil
}
// Shutdown the runtime
func (k *kubernetes) Stop() error {
k.Lock()
defer k.Unlock()
if !k.running {
return nil
}
select {
case <-k.closed:
return nil
default:
close(k.closed)
// set not running
k.running = false
// stop the notifier too
if k.options.Notifier != nil {
return k.options.Notifier.Close()
}
}
return nil
}
// String implements stringer interface
func (k *kubernetes) String() string {
return "kubernetes"
}

View File

@ -4,8 +4,24 @@ import (
"io"
)
type Option func(o *Options)
// Options configure runtime
type Options struct {
// Notifier for updates
Notifier Notifier
}
// AutoUpdate enables micro auto-updates
func WithNotifier(n Notifier) Option {
return func(o *Options) {
o.Notifier = n
}
}
type CreateOption func(o *CreateOptions)
// CreateOptions configure runtime services
type CreateOptions struct {
// command to execute including args
Command []string
@ -25,7 +41,7 @@ func WithCommand(c string, args ...string) CreateOption {
}
}
// WithEnv sets the created service env
// WithEnv sets the created service environment
func WithEnv(env []string) CreateOption {
return func(o *CreateOptions) {
o.Env = env

View File

@ -1,8 +1,17 @@
// Package runtime is a service runtime manager
package runtime
import "time"
var (
// DefaultRuntime is default micro runtime
DefaultRuntime Runtime = NewRuntime()
)
// Runtime is a service runtime manager
type Runtime interface {
// Init initializes runtime
Init(...Option) error
// Registers a service
Create(*Service, ...CreateOption) error
// Remove a service
@ -17,41 +26,62 @@ type Runtime interface {
Stop() error
}
// Notifier is an update notifier
type Notifier interface {
// Notify publishes notification events
Notify() (<-chan Event, error)
// Close stops the notifier
Close() error
}
// EventType defines notification event
type EventType int
const (
// Create is emitted when a new build has been craeted
Create EventType = iota
// Update is emitted when a new update become available
Update
// Delete is emitted when a build has been deleted
Delete
)
// String returns human readable event type
func (t EventType) String() string {
switch t {
case Create:
return "create"
case Delete:
return "delete"
case Update:
return "update"
default:
return "unknown"
}
}
// Event is notification event
type Event struct {
// Type is event type
Type EventType
// Timestamp is event timestamp
Timestamp time.Time
// Service is the name of the service
Service string
// Version of the build
Version string
}
// Service is runtime service
type Service struct {
// name of the service
// Name of the service
Name string
// url location of source
Source string
// path to store source
// Path to store source
Path string
// exec command
// Exec command
Exec string
}
var (
DefaultRuntime = newRuntime()
)
func Create(s *Service, opts ...CreateOption) error {
return DefaultRuntime.Create(s, opts...)
}
func Delete(s *Service) error {
return DefaultRuntime.Delete(s)
}
func Update(s *Service) error {
return DefaultRuntime.Update(s)
}
func List() ([]*Service, error) {
return DefaultRuntime.List()
}
func Start() error {
return DefaultRuntime.Start()
}
func Stop() error {
return DefaultRuntime.Stop()
// Version of the service
Version string
}

158
runtime/service.go Normal file
View File

@ -0,0 +1,158 @@
package runtime
import (
"io"
"strings"
"sync"
packager "github.com/micro/go-micro/runtime/package"
"github.com/micro/go-micro/runtime/process"
proc "github.com/micro/go-micro/runtime/process/os"
"github.com/micro/go-micro/util/log"
)
type service struct {
sync.RWMutex
running bool
closed chan bool
err error
// output for logs
output io.Writer
// service to manage
*Service
// process creator
Process *proc.Process
// Exec
Exec *process.Executable
// process pid
PID *process.PID
}
func newService(s *Service, c CreateOptions) *service {
var exec string
var args []string
if len(s.Exec) > 0 {
parts := strings.Split(s.Exec, " ")
exec = parts[0]
args = []string{}
if len(parts) > 1 {
args = parts[1:]
}
} else {
// set command
exec = c.Command[0]
// set args
if len(c.Command) > 1 {
args = c.Command[1:]
}
}
return &service{
Service: s,
Process: new(proc.Process),
Exec: &process.Executable{
Binary: &packager.Binary{
Name: s.Name,
Path: exec,
},
Env: c.Env,
Args: args,
},
closed: make(chan bool),
output: c.Output,
}
}
func (s *service) streamOutput() {
go io.Copy(s.output, s.PID.Output)
go io.Copy(s.output, s.PID.Error)
}
// Running returns true is the service is running
func (s *service) Running() bool {
s.RLock()
defer s.RUnlock()
return s.running
}
// Start stars the service
func (s *service) Start() error {
s.Lock()
defer s.Unlock()
if s.running {
return nil
}
// reset
s.err = nil
s.closed = make(chan bool)
// TODO: pull source & build binary
log.Debugf("Runtime service %s forking new process", s.Service.Name)
p, err := s.Process.Fork(s.Exec)
if err != nil {
return err
}
// set the pid
s.PID = p
// set to running
s.running = true
if s.output != nil {
s.streamOutput()
}
// wait and watch
go s.Wait()
return nil
}
// Stop stops the service
func (s *service) Stop() error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
return nil
default:
close(s.closed)
s.running = false
if s.PID == nil {
return nil
}
return s.Process.Kill(s.PID)
}
}
// Error returns the last error service has returned
func (s *service) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
// Wait waits for the service to finish running
func (s *service) Wait() {
// wait for process to exit
err := s.Process.Wait(s.PID)
s.Lock()
defer s.Unlock()
// save the error
if err != nil {
s.err = err
}
// no longer running
s.running = false
}