From 6f7702a093d9defd8b5f5710f7a511b44824cfea Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Sat, 2 Nov 2019 13:25:10 +0000 Subject: [PATCH] [WIP] K8s update and runtime package changes (#895) * First commit: outline of K8s runtime package * Added poller. Added auto-updater into default runtime * Added build and updated Poller interface * Added comments and NewRuntime that accepts Options * DefaultPoller; Runtime options * First commit to add Kubernetes cruft * Add comments * Add micro- prefix to K8s runtime service names * Get rid of import cycles. Move K8s runtime into main runtime package * Major refactoring: Poller replaced by Notifier POller has been replaced by Notifier which returns a channel of events that can be consumed and acted upon. * Added runtime configuration options * K8s runtime is now Kubernetes runtime in dedicated pkg. Naming kung-fu. * Fix typo in command. * Fixed typo * Dont Delete service when runtime stops. runtime.Stop stops services; no need to double-stop * Track runtime services * Parse Unix timestamps properly * Added deployments into K8s client. Debug logging --- config/cmd/cmd.go | 28 ++ config/cmd/options.go | 10 + runtime/default.go | 251 ++++++--------- runtime/kubernetes/client/api/request.go | 223 ++++++++++++++ runtime/kubernetes/client/api/response.go | 94 ++++++ runtime/kubernetes/client/client.go | 102 ++++++ runtime/kubernetes/client/kubernetes.go | 12 + runtime/kubernetes/client/utils.go | 74 +++++ runtime/kubernetes/client/watch/body.go | 92 ++++++ runtime/kubernetes/client/watch/watch.go | 26 ++ runtime/kubernetes/client/watch/watch_test.go | 71 +++++ runtime/kubernetes/kubernetes.go | 290 ++++++++++++++++++ runtime/options.go | 18 +- runtime/runtime.go | 92 ++++-- runtime/service.go | 158 ++++++++++ 15 files changed, 1357 insertions(+), 184 deletions(-) create mode 100644 runtime/kubernetes/client/api/request.go create mode 100644 runtime/kubernetes/client/api/response.go create mode 100644 runtime/kubernetes/client/client.go create mode 100644 runtime/kubernetes/client/kubernetes.go create mode 100644 runtime/kubernetes/client/utils.go create mode 100644 runtime/kubernetes/client/watch/body.go create mode 100644 runtime/kubernetes/client/watch/watch.go create mode 100644 runtime/kubernetes/client/watch/watch_test.go create mode 100644 runtime/kubernetes/kubernetes.go create mode 100644 runtime/service.go diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index a99557d2..e66836c1 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -44,6 +44,10 @@ import ( thttp "github.com/micro/go-micro/transport/http" tmem "github.com/micro/go-micro/transport/memory" "github.com/micro/go-micro/transport/quic" + + // runtimes + "github.com/micro/go-micro/runtime" + "github.com/micro/go-micro/runtime/kubernetes" ) type Cmd interface { @@ -67,6 +71,12 @@ var ( DefaultCmd = newCmd() DefaultFlags = []cli.Flag{ + cli.StringFlag{ + Name: "runtime", + Usage: "Micro runtime", + EnvVar: "MICRO_RUNTIME", + Value: "local", + }, cli.StringFlag{ Name: "client", EnvVar: "MICRO_CLIENT", @@ -221,6 +231,11 @@ var ( "quic": quic.NewTransport, } + DefaultRuntimes = map[string]func(...runtime.Option) runtime.Runtime{ + "local": runtime.NewRuntime, + "kubernetes": kubernetes.NewRuntime, + } + // used for default selection as the fall back defaultClient = "rpc" defaultServer = "rpc" @@ -228,6 +243,7 @@ var ( defaultRegistry = "mdns" defaultSelector = "registry" defaultTransport = "http" + defaultRuntime = "local" ) func init() { @@ -247,6 +263,7 @@ func newCmd(opts ...Option) Cmd { Server: &server.DefaultServer, Selector: &selector.DefaultSelector, Transport: &transport.DefaultTransport, + Runtime: &runtime.DefaultRuntime, Brokers: DefaultBrokers, Clients: DefaultClients, @@ -254,6 +271,7 @@ func newCmd(opts ...Option) Cmd { Selectors: DefaultSelectors, Servers: DefaultServers, Transports: DefaultTransports, + Runtimes: DefaultRuntimes, } for _, o := range opts { @@ -294,6 +312,16 @@ func (c *cmd) Before(ctx *cli.Context) error { var serverOpts []server.Option var clientOpts []client.Option + // Set the runtime + if name := ctx.String("runtime"); len(name) > 0 { + r, ok := c.opts.Runtimes[name] + if !ok { + return fmt.Errorf("Unsupported runtime: %s", name) + } + + *c.opts.Runtime = r() + } + // Set the client if name := ctx.String("client"); len(name) > 0 { // only change if we have the client and type differs diff --git a/config/cmd/options.go b/config/cmd/options.go index 1f221f35..be831fe1 100644 --- a/config/cmd/options.go +++ b/config/cmd/options.go @@ -7,6 +7,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/runtime" "github.com/micro/go-micro/server" "github.com/micro/go-micro/transport" ) @@ -24,6 +25,7 @@ type Options struct { Transport *transport.Transport Client *client.Client Server *server.Server + Runtime *runtime.Runtime Brokers map[string]func(...broker.Option) broker.Broker Clients map[string]func(...client.Option) client.Client @@ -31,6 +33,7 @@ type Options struct { Selectors map[string]func(...selector.Option) selector.Selector Servers map[string]func(...server.Option) server.Server Transports map[string]func(...transport.Option) transport.Transport + Runtimes map[string]func(...runtime.Option) runtime.Runtime // Other options for implementations of the interface // can be stored in a context @@ -135,3 +138,10 @@ func NewTransport(name string, t func(...transport.Option) transport.Transport) o.Transports[name] = t } } + +// New runtime func +func NewRuntime(name string, r func(...runtime.Option) runtime.Runtime) Option { + return func(o *Options) { + o.Runtimes[name] = r + } +} diff --git a/runtime/default.go b/runtime/default.go index 797ec107..1785c2f5 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -2,19 +2,18 @@ package runtime import ( "errors" - "io" - "strings" + "fmt" + "strconv" "sync" "time" - "github.com/micro/go-micro/runtime/package" - "github.com/micro/go-micro/runtime/process" - proc "github.com/micro/go-micro/runtime/process/os" "github.com/micro/go-micro/util/log" ) type runtime struct { sync.RWMutex + // options configure runtime + options Options // used to stop the runtime closed chan bool // used to start new services @@ -25,162 +24,38 @@ type runtime struct { services map[string]*service } -type service struct { - sync.RWMutex +// NewRuntime creates new local runtime and returns it +func NewRuntime(opts ...Option) Runtime { + // get default options + options := Options{} - running bool - closed chan bool - err error + // apply requested options + for _, o := range opts { + o(&options) + } - // output for logs - output io.Writer - - // service to manage - *Service - // process creator - Process *proc.Process - // Exec - Exec *process.Executable - // process pid - PID *process.PID -} - -func newRuntime() *runtime { return &runtime{ + options: options, closed: make(chan bool), start: make(chan *service, 128), services: make(map[string]*service), } } -func newService(s *Service, c CreateOptions) *service { - var exec string - var args []string +// Init initializes runtime options +func (r *runtime) Init(opts ...Option) error { + r.Lock() + defer r.Unlock() - if len(s.Exec) > 0 { - parts := strings.Split(s.Exec, " ") - exec = parts[0] - args = []string{} - - if len(parts) > 1 { - args = parts[1:] - } - } else { - // set command - exec = c.Command[0] - // set args - if len(c.Command) > 1 { - args = c.Command[1:] - } - } - - return &service{ - Service: s, - Process: new(proc.Process), - Exec: &process.Executable{ - Binary: &packager.Binary{ - Name: s.Name, - Path: exec, - }, - Env: c.Env, - Args: args, - }, - closed: make(chan bool), - output: c.Output, - } -} - -func (s *service) streamOutput() { - go io.Copy(s.output, s.PID.Output) - go io.Copy(s.output, s.PID.Error) -} - -func (s *service) Running() bool { - s.RLock() - defer s.RUnlock() - return s.running -} - -func (s *service) Start() error { - s.Lock() - defer s.Unlock() - - if s.running { - return nil - } - - // reset - s.err = nil - s.closed = make(chan bool) - - // TODO: pull source & build binary - log.Debugf("Runtime service %s forking new process\n", s.Service.Name) - p, err := s.Process.Fork(s.Exec) - if err != nil { - return err - } - - // set the pid - s.PID = p - // set to running - s.running = true - - if s.output != nil { - s.streamOutput() - } - - // wait and watch - go s.Wait() - - return nil -} - -func (s *service) Stop() error { - s.Lock() - defer s.Unlock() - - select { - case <-s.closed: - return nil - default: - close(s.closed) - s.running = false - if s.PID == nil { - return nil - } - return s.Process.Kill(s.PID) + for _, o := range opts { + o(&r.options) } return nil } -func (s *service) Error() error { - s.RLock() - defer s.RUnlock() - return s.err -} - -func (s *service) Wait() { - // wait for process to exit - err := s.Process.Wait(s.PID) - - s.Lock() - defer s.Unlock() - - // save the error - if err != nil { - s.err = err - } - - // no longer running - s.running = false -} - -func (r *runtime) run() { - r.RLock() - closed := r.closed - r.RUnlock() - +// run runs the runtime management loop +func (r *runtime) run(events <-chan Event) { t := time.NewTicker(time.Second * 5) defer t.Stop() @@ -205,19 +80,67 @@ func (r *runtime) run() { if service.Running() { continue } - // TODO: check service error - log.Debugf("Starting %s", service.Name) + log.Debugf("Runtime starting service %s", service.Name) if err := service.Start(); err != nil { - log.Debugf("Runtime error starting %s: %v", service.Name, err) + log.Debugf("Runtime error starting service %s: %v", service.Name, err) } - case <-closed: - // TODO: stop all the things + case event := <-events: + log.Debugf("Runtime received notification event: %v", event) + // NOTE: we only handle Update events for now + switch event.Type { + case Update: + // parse returned response to timestamp + updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64) + if err != nil { + log.Debugf("Runtime error parsing update build time: %v", err) + continue + } + buildTime := time.Unix(updateTimeStamp, 0) + processEvent := func(event Event, service *Service) error { + buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64) + if err != nil { + return err + } + muBuild := time.Unix(buildTimeStamp, 0) + if buildTime.After(muBuild) { + if err := r.Update(service); err != nil { + return err + } + service.Version = fmt.Sprintf("%d", buildTime.Unix()) + } + return nil + } + r.Lock() + if len(event.Service) > 0 { + service, ok := r.services[event.Service] + if !ok { + log.Debugf("Runtime unknown service: %s", event.Service) + r.Unlock() + continue + } + if err := processEvent(event, service.Service); err != nil { + log.Debugf("Runtime error updating service %s: %v", event.Service, err) + } + r.Unlock() + continue + } + // if blank service was received we update all services + for _, service := range r.services { + if err := processEvent(event, service.Service); err != nil { + log.Debugf("Runtime error updating service %s: %v", service.Name, err) + } + } + r.Unlock() + } + case <-r.closed: + log.Debugf("Runtime stopped.") return } } } +// Create creates a new service which is then started by runtime func (r *runtime) Create(s *Service, opts ...CreateOption) error { r.Lock() defer r.Unlock() @@ -244,6 +167,7 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error { return nil } +// Delete removes the service from the runtime and stops it func (r *runtime) Delete(s *Service) error { r.Lock() defer r.Unlock() @@ -256,6 +180,7 @@ func (r *runtime) Delete(s *Service) error { return nil } +// Update attemps to update the service func (r *runtime) Update(s *Service) error { // delete the service if err := r.Delete(s); err != nil { @@ -266,6 +191,7 @@ func (r *runtime) Update(s *Service) error { return r.Create(s) } +// List returns a slice of all services tracked by the runtime func (r *runtime) List() ([]*Service, error) { var services []*Service r.RLock() @@ -278,6 +204,7 @@ func (r *runtime) List() ([]*Service, error) { return services, nil } +// Start starts the runtime func (r *runtime) Start() error { r.Lock() defer r.Unlock() @@ -291,11 +218,22 @@ func (r *runtime) Start() error { r.running = true r.closed = make(chan bool) - go r.run() + var events <-chan Event + if r.options.Notifier != nil { + var err error + events, err = r.options.Notifier.Notify() + if err != nil { + // TODO: should we bail here? + log.Debugf("Runtime failed to start update notifier") + } + } + + go r.run(events) return nil } +// Stop stops the runtime func (r *runtime) Stop() error { r.Lock() defer r.Unlock() @@ -318,7 +256,16 @@ func (r *runtime) Stop() error { log.Debugf("Runtime stopping %s", service.Name) service.Stop() } + // stop the notifier too + if r.options.Notifier != nil { + return r.options.Notifier.Close() + } } return nil } + +// String implements stringer interface +func (r *runtime) String() string { + return "local" +} diff --git a/runtime/kubernetes/client/api/request.go b/runtime/kubernetes/client/api/request.go new file mode 100644 index 00000000..b40ed72a --- /dev/null +++ b/runtime/kubernetes/client/api/request.go @@ -0,0 +1,223 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/micro/go-micro/runtime/kubernetes/client/watch" + "github.com/micro/go-micro/util/log" +) + +// Request is used to construct a http request for the k8s API. +type Request struct { + client *http.Client + header http.Header + params url.Values + method string + host string + namespace string + + resource string + resourceName *string + body io.Reader + + err error +} + +// Params is the object to pass in to set paramaters +// on a request. +type Params struct { + LabelSelector map[string]string + Annotations map[string]string + Watch bool +} + +// verb sets method +func (r *Request) verb(method string) *Request { + r.method = method + return r +} + +// Get request +func (r *Request) Get() *Request { + return r.verb("GET") +} + +// Post request +func (r *Request) Post() *Request { + return r.verb("POST") +} + +// Put request +func (r *Request) Put() *Request { + return r.verb("PUT") +} + +// Patch request +// https://github.com/kubernetes/kubernetes/blob/master/docs/devel/api-conventions.md#patch-operations +func (r *Request) Patch() *Request { + return r.verb("PATCH").SetHeader("Content-Type", "application/strategic-merge-patch+json") +} + +// Delete request +func (r *Request) Delete() *Request { + return r.verb("DELETE") +} + +// Namespace is to set the namespace to operate on +func (r *Request) Namespace(s string) *Request { + r.namespace = s + return r +} + +// Resource is the type of resource the operation is +// for, such as "services", "endpoints" or "pods" +func (r *Request) Resource(s string) *Request { + r.resource = s + return r +} + +// Name is for targeting a specific resource by id +func (r *Request) Name(s string) *Request { + r.resourceName = &s + return r +} + +// Body pass in a body to set, this is for POST, PUT +// and PATCH requests +func (r *Request) Body(in interface{}) *Request { + b := new(bytes.Buffer) + if err := json.NewEncoder(b).Encode(&in); err != nil { + r.err = err + return r + } + log.Debugf("Patch body: %v", b) + r.body = b + return r +} + +// Params isused to set paramters on a request +func (r *Request) Params(p *Params) *Request { + for k, v := range p.LabelSelector { + r.params.Add("labelSelectors", k+"="+v) + } + + return r +} + +// SetHeader sets a header on a request with +// a `key` and `value` +func (r *Request) SetHeader(key, value string) *Request { + r.header.Add(key, value) + return r +} + +// request builds the http.Request from the options +func (r *Request) request() (*http.Request, error) { + var url string + switch r.resource { + case "pods": + // /api/v1/namespaces/{namespace}/pods + url = fmt.Sprintf("%s/api/v1/namespaces/%s/%s/", r.host, r.namespace, r.resource) + case "deployments": + // /apis/apps/v1/namespaces/{namespace}/deployments/{name} + url = fmt.Sprintf("%s/apis/apps/v1/namespaces/%s/%s/", r.host, r.namespace, r.resource) + } + + // append resourceName if it is present + if r.resourceName != nil { + url += *r.resourceName + } + + // append any query params + if len(r.params) > 0 { + url += "?" + r.params.Encode() + } + + // build request + req, err := http.NewRequest(r.method, url, r.body) + if err != nil { + return nil, err + } + + // set headers on request + req.Header = r.header + return req, nil +} + +// Do builds and triggers the request +func (r *Request) Do() *Response { + if r.err != nil { + return &Response{ + err: r.err, + } + } + + req, err := r.request() + if err != nil { + return &Response{ + err: err, + } + } + + log.Debugf("kubernetes api request: %v", req) + + res, err := r.client.Do(req) + if err != nil { + return &Response{ + err: err, + } + } + + log.Debugf("kubernetes api response: %v", res) + + // return res, err + return newResponse(res, err) +} + +// Watch builds and triggers the request, but +// will watch instead of return an object +func (r *Request) Watch() (watch.Watch, error) { + if r.err != nil { + return nil, r.err + } + + r.params.Set("watch", "true") + + req, err := r.request() + if err != nil { + return nil, err + } + + w, err := watch.NewBodyWatcher(req, r.client) + return w, err +} + +// Options ... +type Options struct { + Host string + Namespace string + BearerToken *string + Client *http.Client +} + +// NewRequest creates a k8s api request +func NewRequest(opts *Options) *Request { + req := &Request{ + header: make(http.Header), + params: make(url.Values), + client: opts.Client, + namespace: opts.Namespace, + host: opts.Host, + } + + if opts.BearerToken != nil { + req.SetHeader("Authorization", "Bearer "+*opts.BearerToken) + } + + return req +} diff --git a/runtime/kubernetes/client/api/response.go b/runtime/kubernetes/client/api/response.go new file mode 100644 index 00000000..8700115f --- /dev/null +++ b/runtime/kubernetes/client/api/response.go @@ -0,0 +1,94 @@ +package api + +import ( + "encoding/json" + "errors" + "io/ioutil" + "net/http" + + "github.com/micro/go-micro/util/log" +) + +// Errors ... +var ( + ErrNotFound = errors.New("kubernetes: not found") + ErrDecode = errors.New("kubernetes: error decoding") + ErrOther = errors.New("kubernetes: unknown error") +) + +// Status is an object that is returned when a request +// failed or delete succeeded. +// type Status struct { +// Kind string `json:"kind"` +// Status string `json:"status"` +// Message string `json:"message"` +// Reason string `json:"reason"` +// Code int `json:"code"` +// } + +// Response ... +type Response struct { + res *http.Response + err error + + body []byte +} + +// Error returns an error +func (r *Response) Error() error { + return r.err +} + +// StatusCode returns status code for response +func (r *Response) StatusCode() int { + return r.res.StatusCode +} + +// Into decode body into `data` +func (r *Response) Into(data interface{}) error { + if r.err != nil { + return r.err + } + + defer r.res.Body.Close() + decoder := json.NewDecoder(r.res.Body) + err := decoder.Decode(&data) + if err != nil { + return ErrDecode + } + + return r.err +} + +func newResponse(res *http.Response, err error) *Response { + r := &Response{ + res: res, + err: err, + } + + if err != nil { + return r + } + + if r.res.StatusCode == http.StatusOK || + r.res.StatusCode == http.StatusCreated || + r.res.StatusCode == http.StatusNoContent { + // Non error status code + return r + } + + if r.res.StatusCode == http.StatusNotFound { + r.err = ErrNotFound + return r + } + + log.Logf("kubernetes: request failed with code %v", r.res.StatusCode) + + b, err := ioutil.ReadAll(r.res.Body) + if err == nil { + log.Log("kubernetes: request failed with body:") + log.Log(string(b)) + } + r.err = ErrOther + return r +} diff --git a/runtime/kubernetes/client/client.go b/runtime/kubernetes/client/client.go new file mode 100644 index 00000000..695ecd65 --- /dev/null +++ b/runtime/kubernetes/client/client.go @@ -0,0 +1,102 @@ +package client + +import ( + "crypto/tls" + "errors" + "io/ioutil" + "net/http" + "os" + "path" + + "github.com/micro/go-micro/runtime/kubernetes/client/api" + "github.com/micro/go-micro/util/log" +) + +var ( + serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount" + // ErrReadNamespace is returned when the names could not be read from service account + ErrReadNamespace = errors.New("Could not read namespace from service account secret") +) + +// Client ... +type client struct { + opts *api.Options +} + +// NewClientInCluster should work similarily to the official api +// NewInClient by setting up a client configuration for use within +// a k8s pod. +func NewClientInCluster() *client { + host := "https://" + os.Getenv("KUBERNETES_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_SERVICE_PORT") + + s, err := os.Stat(serviceAccountPath) + if err != nil { + log.Fatal(err) + } + if s == nil || !s.IsDir() { + log.Fatal(errors.New("no k8s service account found")) + } + + token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token")) + if err != nil { + log.Fatal(err) + } + t := string(token) + + ns, err := detectNamespace() + if err != nil { + log.Fatal(err) + } + + crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt")) + if err != nil { + log.Fatal(err) + } + + c := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: crt, + }, + DisableCompression: true, + }, + } + + return &client{ + opts: &api.Options{ + Client: c, + Host: host, + Namespace: ns, + BearerToken: &t, + }, + } +} + +func detectNamespace() (string, error) { + nsPath := path.Join(serviceAccountPath, "namespace") + + // Make sure it's a file and we can read it + if s, e := os.Stat(nsPath); e != nil { + return "", e + } else if s.IsDir() { + return "", ErrReadNamespace + } + + // Read the file, and cast to a string + if ns, e := ioutil.ReadFile(nsPath); e != nil { + return string(ns), e + } else { + return string(ns), nil + } +} + +// UpdateDeployment +func (c *client) UpdateDeployment(name string, body interface{}) error { + return api.NewRequest(c.opts). + Patch(). + Resource("deployments"). + Name(name). + Body(body). + Do(). + Error() +} diff --git a/runtime/kubernetes/client/kubernetes.go b/runtime/kubernetes/client/kubernetes.go new file mode 100644 index 00000000..565d2243 --- /dev/null +++ b/runtime/kubernetes/client/kubernetes.go @@ -0,0 +1,12 @@ +package client + +// Kubernetes client +type Kubernetes interface { + // UpdateDeployment patches deployment annotations with new metadata + UpdateDeployment(string, interface{}) error +} + +// Metadata defines api request metadata +type Metadata struct { + Annotations map[string]string `json:"annotations,omitempty"` +} diff --git a/runtime/kubernetes/client/utils.go b/runtime/kubernetes/client/utils.go new file mode 100644 index 00000000..ec250aca --- /dev/null +++ b/runtime/kubernetes/client/utils.go @@ -0,0 +1,74 @@ +package client + +import ( + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "io/ioutil" +) + +// COPIED FROM +// https://github.com/kubernetes/kubernetes/blob/7a725418af4661067b56506faabc2d44c6d7703a/pkg/util/crypto/crypto.go + +// CertPoolFromFile returns an x509.CertPool containing the certificates in the given PEM-encoded file. +// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates +func CertPoolFromFile(filename string) (*x509.CertPool, error) { + certs, err := certificatesFromFile(filename) + if err != nil { + return nil, err + } + pool := x509.NewCertPool() + for _, cert := range certs { + pool.AddCert(cert) + } + return pool, nil +} + +// certificatesFromFile returns the x509.Certificates contained in the given PEM-encoded file. +// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates +func certificatesFromFile(file string) ([]*x509.Certificate, error) { + if len(file) == 0 { + return nil, errors.New("error reading certificates from an empty filename") + } + pemBlock, err := ioutil.ReadFile(file) + if err != nil { + return nil, err + } + certs, err := CertsFromPEM(pemBlock) + if err != nil { + return nil, fmt.Errorf("error reading %s: %s", file, err) + } + return certs, nil +} + +// CertsFromPEM returns the x509.Certificates contained in the given PEM-encoded byte array +// Returns an error if a certificate could not be parsed, or if the data does not contain any certificates +func CertsFromPEM(pemCerts []byte) ([]*x509.Certificate, error) { + ok := false + certs := []*x509.Certificate{} + for len(pemCerts) > 0 { + var block *pem.Block + block, pemCerts = pem.Decode(pemCerts) + if block == nil { + break + } + // Only use PEM "CERTIFICATE" blocks without extra headers + if block.Type != "CERTIFICATE" || len(block.Headers) != 0 { + continue + } + + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + return certs, err + } + + certs = append(certs, cert) + ok = true + } + + if !ok { + return certs, errors.New("could not read any certificates") + } + return certs, nil +} diff --git a/runtime/kubernetes/client/watch/body.go b/runtime/kubernetes/client/watch/body.go new file mode 100644 index 00000000..5f939655 --- /dev/null +++ b/runtime/kubernetes/client/watch/body.go @@ -0,0 +1,92 @@ +package watch + +import ( + "bufio" + "encoding/json" + "net/http" + "time" +) + +// bodyWatcher scans the body of a request for chunks +type bodyWatcher struct { + results chan Event + stop chan struct{} + res *http.Response + req *http.Request +} + +// Changes returns the results channel +func (wr *bodyWatcher) ResultChan() <-chan Event { + return wr.results +} + +// Stop cancels the request +func (wr *bodyWatcher) Stop() { + select { + case <-wr.stop: + return + default: + close(wr.stop) + close(wr.results) + } +} + +func (wr *bodyWatcher) stream() { + reader := bufio.NewReader(wr.res.Body) + + // ignore first few messages from stream, + // as they are usually old. + ignore := true + + go func() { + <-time.After(time.Second) + ignore = false + }() + + go func() { + // stop the watcher + defer wr.Stop() + + for { + // read a line + b, err := reader.ReadBytes('\n') + if err != nil { + return + } + + // ignore for the first second + if ignore { + continue + } + + // send the event + var event Event + if err := json.Unmarshal(b, &event); err != nil { + continue + } + wr.results <- event + } + }() +} + +// NewBodyWatcher creates a k8s body watcher for +// a given http request +func NewBodyWatcher(req *http.Request, client *http.Client) (Watch, error) { + stop := make(chan struct{}) + req.Cancel = stop + + res, err := client.Do(req) + if err != nil { + return nil, err + } + + wr := &bodyWatcher{ + results: make(chan Event), + stop: stop, + req: req, + res: res, + } + + go wr.stream() + return wr, nil +} diff --git a/runtime/kubernetes/client/watch/watch.go b/runtime/kubernetes/client/watch/watch.go new file mode 100644 index 00000000..f8a178b8 --- /dev/null +++ b/runtime/kubernetes/client/watch/watch.go @@ -0,0 +1,26 @@ +package watch + +import "encoding/json" + +// Watch ... +type Watch interface { + Stop() + ResultChan() <-chan Event +} + +// EventType defines the possible types of events. +type EventType string + +// EventTypes used +const ( + Added EventType = "ADDED" + Modified EventType = "MODIFIED" + Deleted EventType = "DELETED" + Error EventType = "ERROR" +) + +// Event represents a single event to a watched resource. +type Event struct { + Type EventType `json:"type"` + Object json.RawMessage `json:"object"` +} diff --git a/runtime/kubernetes/client/watch/watch_test.go b/runtime/kubernetes/client/watch/watch_test.go new file mode 100644 index 00000000..d34918c4 --- /dev/null +++ b/runtime/kubernetes/client/watch/watch_test.go @@ -0,0 +1,71 @@ +package watch + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +var actions = []string{ + `{"type": "create", "object":{"foo": "bar"}}`, + `{"type": "delete", INVALID}`, + `{"type": "update", "object":{"foo": {"foo": "bar"}}}`, + `{"type": "delete", "object":null}`, +} + +func TestBodyWatcher(t *testing.T) { + // set up server with handler to flush strings from ch. + ch := make(chan string) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + t.Fatal("expected ResponseWriter to be a flusher") + } + + fmt.Fprintf(w, "\n") + flusher.Flush() + + for v := range ch { + fmt.Fprintf(w, "%s\n", v) + flusher.Flush() + time.Sleep(10 * time.Millisecond) + } + })) + defer ts.Close() + + req, err := http.NewRequest("GET", ts.URL, nil) + if err != nil { + t.Fatalf("did not expect NewRequest to return err: %v", err) + } + + // setup body watcher + w, err := NewBodyWatcher(req, http.DefaultClient) + if err != nil { + t.Fatalf("did not expect NewBodyWatcher to return %v", err) + } + + <-time.After(time.Second) + + // send action strings in, and expect result back + ch <- actions[0] + if r := <-w.ResultChan(); r.Type != "create" { + t.Fatalf("expected result to be create") + } + + ch <- actions[1] // should be ignored as its invalid json + ch <- actions[2] + if r := <-w.ResultChan(); r.Type != "update" { + t.Fatalf("expected result to be update") + } + + ch <- actions[3] + if r := <-w.ResultChan(); r.Type != "delete" { + t.Fatalf("expected result to be delete") + } + + // stop should clean up all channels. + w.Stop() + close(ch) +} diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go new file mode 100644 index 00000000..49666f37 --- /dev/null +++ b/runtime/kubernetes/kubernetes.go @@ -0,0 +1,290 @@ +// Package kubernetes implements kubernetes micro runtime +package kubernetes + +import ( + "errors" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/micro/go-micro/runtime" + "github.com/micro/go-micro/runtime/kubernetes/client" + "github.com/micro/go-micro/util/log" +) + +type kubernetes struct { + sync.RWMutex + // options configure runtime + options runtime.Options + // indicates if we're running + running bool + // used to start new services + start chan *runtime.Service + // used to stop the runtime + closed chan bool + // service tracks deployed services + services map[string]*runtime.Service + // client is kubernetes client + client client.Kubernetes +} + +// NewRuntime creates new kubernetes runtime +func NewRuntime(opts ...runtime.Option) runtime.Runtime { + // get default options + options := runtime.Options{} + + // apply requested options + for _, o := range opts { + o(&options) + } + + // kubernetes client + client := client.NewClientInCluster() + + return &kubernetes{ + options: options, + closed: make(chan bool), + start: make(chan *runtime.Service, 128), + services: make(map[string]*runtime.Service), + client: client, + } +} + +// Init initializes runtime options +func (k *kubernetes) Init(opts ...runtime.Option) error { + k.Lock() + defer k.Unlock() + + for _, o := range opts { + o(&k.options) + } + + return nil +} + +// Registers a service +func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error { + k.Lock() + defer k.Unlock() + + // TODO: + // * create service + // * create deployment + + // NOTE: our services have micro- prefix + muName := strings.Split(s.Name, ".") + s.Name = "micro-" + muName[len(muName)-1] + + // NOTE: we are tracking this in memory for now + if _, ok := k.services[s.Name]; ok { + return errors.New("service already registered") + } + + var options runtime.CreateOptions + for _, o := range opts { + o(&options) + } + + // save service + k.services[s.Name] = s + // push into start queue + k.start <- k.services[s.Name] + + return nil +} + +// Remove a service +func (k *kubernetes) Delete(s *runtime.Service) error { + k.Lock() + defer k.Unlock() + + // TODO: + // * delete service + // * delete dpeloyment + + // NOTE: we are tracking this in memory for now + if s, ok := k.services[s.Name]; ok { + delete(k.services, s.Name) + return nil + } + + return nil +} + +// Update the service in place +func (k *kubernetes) Update(s *runtime.Service) error { + type body struct { + Metadata *client.Metadata `json:"metadata"` + } + // parse version into human readable timestamp + updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64) + if err != nil { + return err + } + unixTimeUTC := time.Unix(updateTimeStamp, 0) + // metada which we will PATCH deployment with + reqBody := body{ + Metadata: &client.Metadata{ + Annotations: map[string]string{ + "build": unixTimeUTC.Format(time.RFC3339), + }, + }, + } + return k.client.UpdateDeployment(s.Name, reqBody) +} + +// List the managed services +func (k *kubernetes) List() ([]*runtime.Service, error) { + // TODO: this should list the k8s deployments + // but for now we return in-memory tracked services + var services []*runtime.Service + k.RLock() + defer k.RUnlock() + + for _, service := range k.services { + services = append(services, service) + } + + return services, nil +} + +// run runs the runtime management loop +func (k *kubernetes) run(events <-chan runtime.Event) { + t := time.NewTicker(time.Second * 5) + defer t.Stop() + + for { + select { + case <-t.C: + // TODO: noop for now + // check running services + // * deployments exist + // * service is exposed + case service := <-k.start: + // TODO: following might have to be done + // * create a deployment + // * expose a service + log.Debugf("Runtime starting service: %s", service.Name) + case event := <-events: + // NOTE: we only handle Update events for now + log.Debugf("Runtime received notification event: %v", event) + switch event.Type { + case runtime.Update: + // parse returned response to timestamp + updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64) + if err != nil { + log.Debugf("Runtime error parsing update build time: %v", err) + continue + } + buildTime := time.Unix(updateTimeStamp, 0) + processEvent := func(event runtime.Event, service *runtime.Service) error { + buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64) + if err != nil { + return err + } + muBuild := time.Unix(buildTimeStamp, 0) + if buildTime.After(muBuild) { + version := fmt.Sprintf("%d", buildTime.Unix()) + muService := &runtime.Service{ + Name: service.Name, + Source: service.Source, + Path: service.Path, + Exec: service.Exec, + Version: version, + } + if err := k.Update(muService); err != nil { + return err + } + service.Version = version + } + return nil + } + k.Lock() + if len(event.Service) > 0 { + service, ok := k.services[event.Service] + if !ok { + log.Debugf("Runtime unknown service: %s", event.Service) + k.Unlock() + continue + } + if err := processEvent(event, service); err != nil { + log.Debugf("Runtime error updating service %s: %v", event.Service, err) + } + k.Unlock() + continue + } + // if blank service was received we update all services + for _, service := range k.services { + if err := processEvent(event, service); err != nil { + log.Debugf("Runtime error updating service %s: %v", service.Name, err) + } + } + k.Unlock() + } + case <-k.closed: + log.Debugf("Runtime stopped") + return + } + } +} + +// starts the runtime +func (k *kubernetes) Start() error { + k.Lock() + defer k.Unlock() + + // already running + if k.running { + return nil + } + + // set running + k.running = true + k.closed = make(chan bool) + + var events <-chan runtime.Event + if k.options.Notifier != nil { + var err error + events, err = k.options.Notifier.Notify() + if err != nil { + // TODO: should we bail here? + log.Debugf("Runtime failed to start update notifier") + } + } + + go k.run(events) + + return nil +} + +// Shutdown the runtime +func (k *kubernetes) Stop() error { + k.Lock() + defer k.Unlock() + + if !k.running { + return nil + } + + select { + case <-k.closed: + return nil + default: + close(k.closed) + // set not running + k.running = false + // stop the notifier too + if k.options.Notifier != nil { + return k.options.Notifier.Close() + } + } + + return nil +} + +// String implements stringer interface +func (k *kubernetes) String() string { + return "kubernetes" +} diff --git a/runtime/options.go b/runtime/options.go index 64221c9e..82284b36 100644 --- a/runtime/options.go +++ b/runtime/options.go @@ -4,8 +4,24 @@ import ( "io" ) +type Option func(o *Options) + +// Options configure runtime +type Options struct { + // Notifier for updates + Notifier Notifier +} + +// AutoUpdate enables micro auto-updates +func WithNotifier(n Notifier) Option { + return func(o *Options) { + o.Notifier = n + } +} + type CreateOption func(o *CreateOptions) +// CreateOptions configure runtime services type CreateOptions struct { // command to execute including args Command []string @@ -25,7 +41,7 @@ func WithCommand(c string, args ...string) CreateOption { } } -// WithEnv sets the created service env +// WithEnv sets the created service environment func WithEnv(env []string) CreateOption { return func(o *CreateOptions) { o.Env = env diff --git a/runtime/runtime.go b/runtime/runtime.go index 0cb84802..ca5762bb 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -1,8 +1,17 @@ // Package runtime is a service runtime manager package runtime +import "time" + +var ( + // DefaultRuntime is default micro runtime + DefaultRuntime Runtime = NewRuntime() +) + // Runtime is a service runtime manager type Runtime interface { + // Init initializes runtime + Init(...Option) error // Registers a service Create(*Service, ...CreateOption) error // Remove a service @@ -17,41 +26,62 @@ type Runtime interface { Stop() error } +// Notifier is an update notifier +type Notifier interface { + // Notify publishes notification events + Notify() (<-chan Event, error) + // Close stops the notifier + Close() error +} + +// EventType defines notification event +type EventType int + +const ( + // Create is emitted when a new build has been craeted + Create EventType = iota + // Update is emitted when a new update become available + Update + // Delete is emitted when a build has been deleted + Delete +) + +// String returns human readable event type +func (t EventType) String() string { + switch t { + case Create: + return "create" + case Delete: + return "delete" + case Update: + return "update" + default: + return "unknown" + } +} + +// Event is notification event +type Event struct { + // Type is event type + Type EventType + // Timestamp is event timestamp + Timestamp time.Time + // Service is the name of the service + Service string + // Version of the build + Version string +} + +// Service is runtime service type Service struct { - // name of the service + // Name of the service Name string // url location of source Source string - // path to store source + // Path to store source Path string - // exec command + // Exec command Exec string -} - -var ( - DefaultRuntime = newRuntime() -) - -func Create(s *Service, opts ...CreateOption) error { - return DefaultRuntime.Create(s, opts...) -} - -func Delete(s *Service) error { - return DefaultRuntime.Delete(s) -} - -func Update(s *Service) error { - return DefaultRuntime.Update(s) -} - -func List() ([]*Service, error) { - return DefaultRuntime.List() -} - -func Start() error { - return DefaultRuntime.Start() -} - -func Stop() error { - return DefaultRuntime.Stop() + // Version of the service + Version string } diff --git a/runtime/service.go b/runtime/service.go new file mode 100644 index 00000000..cb0d3bb9 --- /dev/null +++ b/runtime/service.go @@ -0,0 +1,158 @@ +package runtime + +import ( + "io" + "strings" + "sync" + + packager "github.com/micro/go-micro/runtime/package" + "github.com/micro/go-micro/runtime/process" + proc "github.com/micro/go-micro/runtime/process/os" + "github.com/micro/go-micro/util/log" +) + +type service struct { + sync.RWMutex + + running bool + closed chan bool + err error + + // output for logs + output io.Writer + + // service to manage + *Service + // process creator + Process *proc.Process + // Exec + Exec *process.Executable + // process pid + PID *process.PID +} + +func newService(s *Service, c CreateOptions) *service { + var exec string + var args []string + + if len(s.Exec) > 0 { + parts := strings.Split(s.Exec, " ") + exec = parts[0] + args = []string{} + + if len(parts) > 1 { + args = parts[1:] + } + } else { + // set command + exec = c.Command[0] + // set args + if len(c.Command) > 1 { + args = c.Command[1:] + } + } + + return &service{ + Service: s, + Process: new(proc.Process), + Exec: &process.Executable{ + Binary: &packager.Binary{ + Name: s.Name, + Path: exec, + }, + Env: c.Env, + Args: args, + }, + closed: make(chan bool), + output: c.Output, + } +} + +func (s *service) streamOutput() { + go io.Copy(s.output, s.PID.Output) + go io.Copy(s.output, s.PID.Error) +} + +// Running returns true is the service is running +func (s *service) Running() bool { + s.RLock() + defer s.RUnlock() + return s.running +} + +// Start stars the service +func (s *service) Start() error { + s.Lock() + defer s.Unlock() + + if s.running { + return nil + } + + // reset + s.err = nil + s.closed = make(chan bool) + + // TODO: pull source & build binary + log.Debugf("Runtime service %s forking new process", s.Service.Name) + p, err := s.Process.Fork(s.Exec) + if err != nil { + return err + } + + // set the pid + s.PID = p + // set to running + s.running = true + + if s.output != nil { + s.streamOutput() + } + + // wait and watch + go s.Wait() + + return nil +} + +// Stop stops the service +func (s *service) Stop() error { + s.Lock() + defer s.Unlock() + + select { + case <-s.closed: + return nil + default: + close(s.closed) + s.running = false + if s.PID == nil { + return nil + } + return s.Process.Kill(s.PID) + } +} + +// Error returns the last error service has returned +func (s *service) Error() error { + s.RLock() + defer s.RUnlock() + return s.err +} + +// Wait waits for the service to finish running +func (s *service) Wait() { + // wait for process to exit + err := s.Process.Wait(s.PID) + + s.Lock() + defer s.Unlock() + + // save the error + if err != nil { + s.err = err + } + + // no longer running + s.running = false +}