diff --git a/debug/log/kubernetes/kubernetes.go b/debug/log/kubernetes/kubernetes.go deleted file mode 100644 index 9fc17b10..00000000 --- a/debug/log/kubernetes/kubernetes.go +++ /dev/null @@ -1,190 +0,0 @@ -// Package kubernetes is a logger implementing (github.com/unistack-org/micro/v3/debug/log).Log -package kubernetes - -import ( - "bufio" - "encoding/json" - "fmt" - "os" - "sort" - "strconv" - "time" - - "github.com/unistack-org/micro/v3/debug/log" - "github.com/unistack-org/micro/v3/metadata" - "github.com/unistack-org/micro/v3/util/kubernetes/client" -) - -type klog struct { - client client.Client - - log.Options -} - -func (k *klog) podLogStream(podName string, stream *kubeStream) { - p := make(map[string]string) - p["follow"] = "true" - - // get the logs for the pod - body, err := k.client.Log(&client.Resource{ - Name: podName, - Kind: "pod", - }, client.LogParams(p)) - - if err != nil { - fmt.Fprintf(os.Stderr, "%v", err) - return - } - - s := bufio.NewScanner(body) - defer body.Close() - - for { - select { - case <-stream.stop: - return - default: - if s.Scan() { - record := k.parse(s.Text()) - stream.stream <- record - } else { - // TODO: is there a blocking call - // rather than a sleep loop? - time.Sleep(time.Second) - } - } - } -} - -func (k *klog) getMatchingPods() ([]string, error) { - r := &client.Resource{ - Kind: "pod", - Value: new(client.PodList), - } - - l := make(map[string]string) - - l["name"] = client.Format(k.Options.Name) - // TODO: specify micro:service - // l["micro"] = "service" - - if err := k.client.Get(r, client.GetLabels(l)); err != nil { - return nil, err - } - - var matches []string - - for _, p := range r.Value.(*client.PodList).Items { - // find labels that match the name - if p.Metadata.Labels["name"] == client.Format(k.Options.Name) { - matches = append(matches, p.Metadata.Name) - } - } - - return matches, nil -} - -func (k *klog) parse(line string) log.Record { - record := log.Record{} - - if err := json.Unmarshal([]byte(line), &record); err != nil { - record.Timestamp = time.Now().UTC() - record.Message = line - record.Metadata = metadata.New(1) - } - - record.Metadata["service"] = k.Options.Name - - return record -} - -func (k *klog) Read(options ...log.ReadOption) ([]log.Record, error) { - opts := &log.ReadOptions{} - for _, o := range options { - o(opts) - } - pods, err := k.getMatchingPods() - if err != nil { - return nil, err - } - - var records []log.Record - - for _, pod := range pods { - logParams := make(map[string]string) - - if !opts.Since.Equal(time.Time{}) { - logParams["sinceSeconds"] = strconv.Itoa(int(time.Since(opts.Since).Seconds())) - } - - if opts.Count != 0 { - logParams["tailLines"] = strconv.Itoa(opts.Count) - } - - if opts.Stream { - logParams["follow"] = "true" - } - - logs, err := k.client.Log(&client.Resource{ - Name: pod, - Kind: "pod", - }, client.LogParams(logParams)) - - if err != nil { - return nil, err - } - defer logs.Close() - - s := bufio.NewScanner(logs) - - for s.Scan() { - record := k.parse(s.Text()) - record.Metadata["pod"] = pod - records = append(records, record) - } - } - - // sort the records - sort.Slice(records, func(i, j int) bool { return records[i].Timestamp.Before(records[j].Timestamp) }) - - return records, nil -} - -func (k *klog) Write(l log.Record) error { - return write(l) -} - -func (k *klog) Stream() (log.Stream, error) { - // find the matching pods - pods, err := k.getMatchingPods() - if err != nil { - return nil, err - } - - stream := &kubeStream{ - stream: make(chan log.Record), - stop: make(chan bool), - } - - // stream from the individual pods - for _, pod := range pods { - go k.podLogStream(pod, stream) - } - - return stream, nil -} - -// NewLog returns a configured Kubernetes logger -func NewLog(opts ...log.Option) log.Log { - klog := &klog{} - for _, o := range opts { - o(&klog.Options) - } - - if len(os.Getenv("KUBERNETES_SERVICE_HOST")) > 0 { - klog.client = client.NewClusterClient() - } else { - klog.client = client.NewLocalClient() - } - return klog -} diff --git a/debug/log/kubernetes/kubernetes_test.go b/debug/log/kubernetes/kubernetes_test.go deleted file mode 100644 index 178de391..00000000 --- a/debug/log/kubernetes/kubernetes_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package kubernetes - -import ( - "bytes" - "encoding/json" - "io" - "os" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/unistack-org/micro/v3/debug/log" -) - -func TestKubernetes(t *testing.T) { - if len(os.Getenv("INTEGRATION_TESTS")) > 0 { - t.Skip() - } - - k := NewLog(log.Name("micro-network")) - - r, w, err := os.Pipe() - if err != nil { - t.Fatal(err) - } - - s := os.Stderr - os.Stderr = w - meta := make(map[string]string) - - write := log.Record{ - Timestamp: time.Unix(0, 0).UTC(), - Message: "Test log entry", - Metadata: meta, - } - - meta["foo"] = "bar" - - k.Write(write) - b := &bytes.Buffer{} - w.Close() - io.Copy(b, r) - os.Stderr = s - - var read log.Record - - if err := json.Unmarshal(b.Bytes(), &read); err != nil { - t.Fatalf("json.Unmarshal failed: %s", err.Error()) - } - - assert.Equal(t, write, read, "Write was not equal") - - records, err := k.Read() - assert.Nil(t, err, "Read should not error") - assert.NotNil(t, records, "Read should return records") - - stream, err := k.Stream() - if err != nil { - t.Fatal(err) - } - - records = nil - - go stream.Stop() - - for s := range stream.Chan() { - records = append(records, s) - } - - assert.Equal(t, 0, len(records), "Stream should return nothing") -} diff --git a/debug/log/kubernetes/stream.go b/debug/log/kubernetes/stream.go deleted file mode 100644 index 957964ca..00000000 --- a/debug/log/kubernetes/stream.go +++ /dev/null @@ -1,44 +0,0 @@ -package kubernetes - -import ( - "encoding/json" - "fmt" - "os" - "sync" - - "github.com/unistack-org/micro/v3/debug/log" -) - -func write(l log.Record) error { - m, err := json.Marshal(l) - if err == nil { - _, err := fmt.Fprintf(os.Stderr, "%s", m) - return err - } - return err -} - -type kubeStream struct { - // the k8s log stream - stream chan log.Record - sync.Mutex - // the stop chan - stop chan bool -} - -func (k *kubeStream) Chan() <-chan log.Record { - return k.stream -} - -func (k *kubeStream) Stop() error { - k.Lock() - defer k.Unlock() - select { - case <-k.stop: - return nil - default: - close(k.stop) - close(k.stream) - } - return nil -} diff --git a/util/kubernetes/api/api_test.go b/util/kubernetes/api/api_test.go deleted file mode 100644 index 26474b17..00000000 --- a/util/kubernetes/api/api_test.go +++ /dev/null @@ -1,178 +0,0 @@ -package api - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "reflect" - "testing" -) - -type testcase struct { - Token string - ReqFn func(opts *Options) *Request - Method string - URI string - Body interface{} - Header map[string]string - Assert func(req *http.Request) bool -} - -var tests = []testcase{ - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Get().Resource("service") - }, - Method: "GET", - URI: "/api/v1/namespaces/default/services/", - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Get().Resource("service").Name("foo") - }, - Method: "GET", - URI: "/api/v1/namespaces/default/services/foo", - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Get().Resource("service").Namespace("test").Name("bar") - }, - Method: "GET", - URI: "/api/v1/namespaces/test/services/bar", - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Get().Resource("deployment").Name("foo") - }, - Method: "GET", - URI: "/apis/apps/v1/namespaces/default/deployments/foo", - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Get().Resource("deployment").Namespace("test").Name("foo") - }, - Method: "GET", - URI: "/apis/apps/v1/namespaces/test/deployments/foo", - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Get().Resource("pod").Params(&Params{LabelSelector: map[string]string{"foo": "bar"}}) - }, - Method: "GET", - URI: "/api/v1/namespaces/default/pods/?labelSelector=foo%3Dbar", - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Post().Resource("service").Name("foo").Body(map[string]string{"foo": "bar"}) - }, - Method: "POST", - URI: "/api/v1/namespaces/default/services/foo", - Body: map[string]string{"foo": "bar"}, - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Post().Resource("deployment").Namespace("test").Name("foo").Body(map[string]string{"foo": "bar"}) - }, - Method: "POST", - URI: "/apis/apps/v1/namespaces/test/deployments/foo", - Body: map[string]string{"foo": "bar"}, - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Put().Resource("endpoint").Name("baz").Body(map[string]string{"bam": "bar"}) - }, - Method: "PUT", - URI: "/api/v1/namespaces/default/endpoints/baz", - Body: map[string]string{"bam": "bar"}, - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Patch().Resource("endpoint").Name("baz").Body(map[string]string{"bam": "bar"}) - }, - Method: "PATCH", - URI: "/api/v1/namespaces/default/endpoints/baz", - Body: map[string]string{"bam": "bar"}, - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Patch().Resource("endpoint").Name("baz").SetHeader("foo", "bar") - }, - Method: "PATCH", - URI: "/api/v1/namespaces/default/endpoints/baz", - Header: map[string]string{"foo": "bar"}, - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts).Patch().Resource("deployment").Name("baz").SetHeader("foo", "bar") - }, - Method: "PATCH", - URI: "/apis/apps/v1/namespaces/default/deployments/baz", - Header: map[string]string{"foo": "bar"}, - }, - { - ReqFn: func(opts *Options) *Request { - return NewRequest(opts). - Get(). - Resource("pod"). - SubResource("log"). - Name("foolog") - }, - Method: "GET", - URI: "/api/v1/namespaces/default/pods/foolog/log", - }, -} - -var wrappedHandler = func(test *testcase, t *testing.T) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - auth := r.Header.Get("Authorization") - if len(test.Token) > 0 && (len(auth) == 0 || auth != "Bearer "+test.Token) { - t.Errorf("test case token (%s) did not match expected token (%s)", "Bearer "+test.Token, auth) - } - - if len(test.Method) > 0 && test.Method != r.Method { - t.Errorf("test case Method (%s) did not match expected Method (%s)", test.Method, r.Method) - } - - if len(test.URI) > 0 && test.URI != r.URL.RequestURI() { - t.Errorf("test case URI (%s) did not match expected URI (%s)", test.URI, r.URL.RequestURI()) - } - - if test.Body != nil { - var res map[string]string - decoder := json.NewDecoder(r.Body) - if err := decoder.Decode(&res); err != nil { - t.Errorf("decoding body failed: %v", err) - } - if !reflect.DeepEqual(res, test.Body) { - t.Error("body did not match") - } - } - - if test.Header != nil { - for k, v := range test.Header { - if r.Header.Get(k) != v { - t.Error("header did not exist") - } - } - } - - w.WriteHeader(http.StatusOK) - }) -} - -func TestRequest(t *testing.T) { - for _, test := range tests { - ts := httptest.NewServer(wrappedHandler(&test, t)) - req := test.ReqFn(&Options{ - Host: ts.URL, - Client: &http.Client{}, - BearerToken: &test.Token, - Namespace: "default", - }) - res := req.Do() - if res.Error() != nil { - t.Errorf("request failed with %v", res.Error()) - } - ts.Close() - } -} diff --git a/util/kubernetes/api/request.go b/util/kubernetes/api/request.go deleted file mode 100644 index cc11b438..00000000 --- a/util/kubernetes/api/request.go +++ /dev/null @@ -1,271 +0,0 @@ -package api - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "net/url" - - "github.com/unistack-org/micro/v3/logger" -) - -// Request is used to construct a http request for the k8s API. -type Request struct { - // the request context - context context.Context - client *http.Client - header http.Header - params url.Values - method string - host string - namespace string - - resource string - resourceName *string - subResource *string - body io.Reader - - err error -} - -// Params is the object to pass in to set parameters -// on a request. -type Params struct { - LabelSelector map[string]string - Annotations map[string]string - Additional map[string]string -} - -// verb sets method -func (r *Request) verb(method string) *Request { - r.method = method - return r -} - -func (r *Request) Context(ctx context.Context) { - r.context = ctx -} - -// Get request -func (r *Request) Get() *Request { - return r.verb("GET") -} - -// Post request -func (r *Request) Post() *Request { - return r.verb("POST") -} - -// Put request -func (r *Request) Put() *Request { - return r.verb("PUT") -} - -// Patch request -func (r *Request) Patch() *Request { - return r.verb("PATCH") -} - -// Delete request -func (r *Request) Delete() *Request { - return r.verb("DELETE") -} - -// Namespace is to set the namespace to operate on -func (r *Request) Namespace(s string) *Request { - if len(s) > 0 { - r.namespace = s - } - return r -} - -// Resource is the type of resource the operation is -// for, such as "services", "endpoints" or "pods" -func (r *Request) Resource(s string) *Request { - r.resource = s - return r -} - -// SubResource sets a sub resource on a resource, -// e.g. pods/log for pod logs -func (r *Request) SubResource(s string) *Request { - r.subResource = &s - return r -} - -// Name is for targeting a specific resource by id -func (r *Request) Name(s string) *Request { - r.resourceName = &s - return r -} - -// Body pass in a body to set, this is for POST, PUT and PATCH requests -func (r *Request) Body(in interface{}) *Request { - b := new(bytes.Buffer) - // if we're not sending YAML request, we encode to JSON - if r.header.Get("Content-Type") != "application/yaml" { - if err := json.NewEncoder(b).Encode(&in); err != nil { - r.err = err - return r - } - r.body = b - return r - } - - // if application/yaml is set, we assume we get a raw bytes so we just copy over - body, ok := in.(io.Reader) - if !ok { - r.err = errors.New("invalid data") - return r - } - // copy over data to the bytes buffer - if _, err := io.Copy(b, body); err != nil { - r.err = err - return r - } - - r.body = b - return r -} - -// Params is used to set parameters on a request -func (r *Request) Params(p *Params) *Request { - for k, v := range p.LabelSelector { - // create new key=value pair - value := fmt.Sprintf("%s=%s", k, v) - // check if there's an existing value - if label := r.params.Get("labelSelector"); len(label) > 0 { - value = fmt.Sprintf("%s,%s", label, value) - } - // set and overwrite the value - r.params.Set("labelSelector", value) - } - for k, v := range p.Additional { - r.params.Set(k, v) - } - - return r -} - -// SetHeader sets a header on a request with -// a `key` and `value` -func (r *Request) SetHeader(key, value string) *Request { - r.header.Add(key, value) - return r -} - -// request builds the http.Request from the options -func (r *Request) request() (*http.Request, error) { - var url string - switch r.resource { - case "namespace": - // /api/v1/namespaces/ - url = fmt.Sprintf("%s/api/v1/namespaces/", r.host) - case "deployment": - // /apis/apps/v1/namespaces/{namespace}/deployments/{name} - url = fmt.Sprintf("%s/apis/apps/v1/namespaces/%s/%ss/", r.host, r.namespace, r.resource) - default: - // /api/v1/namespaces/{namespace}/{resource} - url = fmt.Sprintf("%s/api/v1/namespaces/%s/%ss/", r.host, r.namespace, r.resource) - } - - // append resourceName if it is present - if r.resourceName != nil { - url += *r.resourceName - if r.subResource != nil { - url += "/" + *r.subResource - } - } - - // append any query params - if len(r.params) > 0 { - url += "?" + r.params.Encode() - } - - var req *http.Request - var err error - - // build request - if r.context != nil { - req, err = http.NewRequestWithContext(r.context, r.method, url, r.body) - } else { - req, err = http.NewRequest(r.method, url, r.body) - } - if err != nil { - return nil, err - } - - // set headers on request - req.Header = r.header - return req, nil -} - -// Do builds and triggers the request -func (r *Request) Do() *Response { - if r.err != nil { - return &Response{ - err: r.err, - } - } - - req, err := r.request() - if err != nil { - return &Response{ - err: err, - } - } - - logger.Debug(context.TODO(), "[Kubernetes] %v %v", req.Method, req.URL.String()) - res, err := r.client.Do(req) - if err != nil { - return &Response{ - err: err, - } - } - - // return res, err - return newResponse(res, err) -} - -// Raw performs a Raw HTTP request to the Kubernetes API -func (r *Request) Raw() (*http.Response, error) { - req, err := r.request() - if err != nil { - return nil, err - } - - res, err := r.client.Do(req) - if err != nil { - return nil, err - } - return res, nil -} - -// Options ... -type Options struct { - Host string - Namespace string - BearerToken *string - Client *http.Client -} - -// NewRequest creates a k8s api request -func NewRequest(opts *Options) *Request { - req := &Request{ - header: make(http.Header), - params: make(url.Values), - client: opts.Client, - namespace: opts.Namespace, - host: opts.Host, - } - - if opts.BearerToken != nil { - req.SetHeader("Authorization", "Bearer "+*opts.BearerToken) - } - - return req -} diff --git a/util/kubernetes/api/response.go b/util/kubernetes/api/response.go deleted file mode 100644 index 14de9f28..00000000 --- a/util/kubernetes/api/response.go +++ /dev/null @@ -1,94 +0,0 @@ -package api - -import ( - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/http" -) - -// Errors ... -var ( - ErrNotFound = errors.New("kubernetes: resource not found") - ErrDecode = errors.New("kubernetes: error decoding") - ErrUnknown = errors.New("kubernetes: unknown error") -) - -// Status is an object that is returned when a request -// failed or delete succeeded. -type Status struct { - Kind string `json:"kind"` - Status string `json:"status"` - Message string `json:"message"` - Reason string `json:"reason"` - Code int `json:"code"` -} - -// Response ... -type Response struct { - res *http.Response - err error -} - -// Error returns an error -func (r *Response) Error() error { - return r.err -} - -// StatusCode returns status code for response -func (r *Response) StatusCode() int { - return r.res.StatusCode -} - -// Into decode body into `data` -func (r *Response) Into(data interface{}) error { - if r.err != nil { - return r.err - } - - defer r.res.Body.Close() - decoder := json.NewDecoder(r.res.Body) - if err := decoder.Decode(&data); err != nil { - return fmt.Errorf("%v: %v", ErrDecode, err) - } - - return r.err -} - -func (r *Response) Close() error { - return r.res.Body.Close() -} - -func newResponse(res *http.Response, err error) *Response { - r := &Response{ - res: res, - err: err, - } - - if err != nil { - return r - } - - if r.res.StatusCode == http.StatusOK || - r.res.StatusCode == http.StatusCreated || - r.res.StatusCode == http.StatusNoContent { - // Non error status code - return r - } - - if r.res.StatusCode == http.StatusNotFound { - r.err = ErrNotFound - return r - } - - b, err := ioutil.ReadAll(r.res.Body) - if err == nil { - r.err = errors.New(string(b)) - return r - } - - r.err = ErrUnknown - - return r -} diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go deleted file mode 100644 index 2635db93..00000000 --- a/util/kubernetes/client/client.go +++ /dev/null @@ -1,401 +0,0 @@ -// Package client provides an implementation of a restricted subset of kubernetes API client -package client - -import ( - "bytes" - "context" - "crypto/tls" - "errors" - "io" - "io/ioutil" - "net/http" - "os" - "path" - "regexp" - "strings" - - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/util/kubernetes/api" -) - -var ( - // path to kubernetes service account token - serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount" - // ErrReadNamespace is returned when the names could not be read from service account - ErrReadNamespace = errors.New("Could not read namespace from service account secret") - // DefaultImage is default micro image - DefaultImage = "micro/micro" - // DefaultNamespace is the default k8s namespace - DefaultNamespace = "default" -) - -// Client ... -type client struct { - opts *api.Options -} - -// Kubernetes client -type Client interface { - // Create creates new API resource - Create(*Resource, ...CreateOption) error - // Get queries API resources - Get(*Resource, ...GetOption) error - // Update patches existing API object - Update(*Resource, ...UpdateOption) error - // Delete deletes API resource - Delete(*Resource, ...DeleteOption) error - // List lists API resources - List(*Resource, ...ListOption) error - // Log gets log for a pod - Log(*Resource, ...LogOption) (io.ReadCloser, error) - // Watch for events - Watch(*Resource, ...WatchOption) (Watcher, error) -} - -// Create creates new API object -func (c *client) Create(r *Resource, opts ...CreateOption) error { - options := CreateOptions{ - Namespace: c.opts.Namespace, - } - for _, o := range opts { - o(&options) - } - - b := new(bytes.Buffer) - if err := renderTemplate(r.Kind, b, r.Value); err != nil { - return err - } - - return api.NewRequest(c.opts). - Post(). - SetHeader("Content-Type", "application/yaml"). - Namespace(options.Namespace). - Resource(r.Kind). - Body(b). - Do(). - Error() -} - -var ( - nameRegex = regexp.MustCompile("[^a-zA-Z0-9]+") -) - -// SerializeResourceName removes all spacial chars from a string so it -// can be used as a k8s resource name -func SerializeResourceName(ns string) string { - return nameRegex.ReplaceAllString(ns, "-") -} - -// Get queries API objects and stores the result in r -func (c *client) Get(r *Resource, opts ...GetOption) error { - options := GetOptions{ - Namespace: c.opts.Namespace, - } - for _, o := range opts { - o(&options) - } - - return api.NewRequest(c.opts). - Get(). - Resource(r.Kind). - Namespace(options.Namespace). - Params(&api.Params{LabelSelector: options.Labels}). - Do(). - Into(r.Value) -} - -// Log returns logs for a pod -func (c *client) Log(r *Resource, opts ...LogOption) (io.ReadCloser, error) { - options := LogOptions{ - Namespace: c.opts.Namespace, - } - for _, o := range opts { - o(&options) - } - - req := api.NewRequest(c.opts). - Get(). - Resource(r.Kind). - SubResource("log"). - Name(r.Name). - Namespace(options.Namespace) - - if options.Params != nil { - req.Params(&api.Params{Additional: options.Params}) - } - - resp, err := req.Raw() - if err != nil { - return nil, err - } - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - resp.Body.Close() - return nil, errors.New(resp.Request.URL.String() + ": " + resp.Status) - } - return resp.Body, nil -} - -// Update updates API object -func (c *client) Update(r *Resource, opts ...UpdateOption) error { - options := UpdateOptions{ - Namespace: c.opts.Namespace, - } - for _, o := range opts { - o(&options) - } - - req := api.NewRequest(c.opts). - Patch(). - SetHeader("Content-Type", "application/strategic-merge-patch+json"). - Resource(r.Kind). - Name(r.Name). - Namespace(options.Namespace) - - switch r.Kind { - case "service": - req.Body(r.Value.(*Service)) - case "deployment": - req.Body(r.Value.(*Deployment)) - case "pod": - req.Body(r.Value.(*Pod)) - default: - return errors.New("unsupported resource") - } - - return req.Do().Error() -} - -// Delete removes API object -func (c *client) Delete(r *Resource, opts ...DeleteOption) error { - options := DeleteOptions{ - Namespace: c.opts.Namespace, - } - for _, o := range opts { - o(&options) - } - - return api.NewRequest(c.opts). - Delete(). - Resource(r.Kind). - Name(r.Name). - Namespace(options.Namespace). - Do(). - Error() -} - -// List lists API objects and stores the result in r -func (c *client) List(r *Resource, opts ...ListOption) error { - options := ListOptions{ - Namespace: c.opts.Namespace, - } - for _, o := range opts { - o(&options) - } - - return c.Get(r, GetNamespace(options.Namespace)) -} - -// Watch returns an event stream -func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) { - options := WatchOptions{ - Namespace: c.opts.Namespace, - } - for _, o := range opts { - o(&options) - } - - // set the watch param - params := &api.Params{Additional: map[string]string{ - "watch": "true", - }} - - // get options params - if options.Params != nil { - for k, v := range options.Params { - params.Additional[k] = v - } - } - - req := api.NewRequest(c.opts). - Get(). - Resource(r.Kind). - Name(r.Name). - Namespace(options.Namespace). - Params(params) - - return newWatcher(req) -} - -// NewService returns default micro kubernetes service definition -func NewService(name, version, typ, namespace string) *Service { - if logger.V(logger.TraceLevel) { - logger.Trace(context.TODO(), "kubernetes default service: name: %s, version: %s", name, version) - } - - Labels := map[string]string{ - "name": name, - "version": version, - "micro": typ, - } - - svcName := name - if len(version) > 0 { - // API service object name joins name and version over "-" - svcName = strings.Join([]string{name, version}, "-") - } - - if len(namespace) == 0 { - namespace = DefaultNamespace - } - - Metadata := &Metadata{ - Name: svcName, - Namespace: SerializeResourceName(namespace), - Version: version, - Labels: Labels, - } - - Spec := &ServiceSpec{ - Type: "ClusterIP", - Selector: Labels, - Ports: []ServicePort{{ - "service-port", 8080, "", - }}, - } - - return &Service{ - Metadata: Metadata, - Spec: Spec, - } -} - -// NewService returns default micro kubernetes deployment definition -func NewDeployment(name, version, typ, namespace string) *Deployment { - if logger.V(logger.TraceLevel) { - logger.Trace(context.TODO(), "kubernetes default deployment: name: %s, version: %s", name, version) - } - - Labels := map[string]string{ - "name": name, - "version": version, - "micro": typ, - } - - depName := name - if len(version) > 0 { - // API deployment object name joins name and version over "-" - depName = strings.Join([]string{name, version}, "-") - } - - if len(namespace) == 0 { - namespace = DefaultNamespace - } - - Metadata := &Metadata{ - Name: depName, - Namespace: SerializeResourceName(namespace), - Version: version, - Labels: Labels, - Annotations: map[string]string{}, - } - - // enable go modules by default - env := EnvVar{ - Name: "GO111MODULE", - Value: "on", - } - - Spec := &DeploymentSpec{ - Replicas: 1, - Selector: &LabelSelector{ - MatchLabels: Labels, - }, - Template: &Template{ - Metadata: Metadata, - PodSpec: &PodSpec{ - Containers: []Container{{ - Name: name, - Image: DefaultImage, - Env: []EnvVar{env}, - Command: []string{}, - Ports: []ContainerPort{{ - Name: "service-port", - ContainerPort: 8080, - }}, - ReadinessProbe: &Probe{ - TCPSocket: TCPSocketAction{ - Port: 8080, - }, - PeriodSeconds: 10, - InitialDelaySeconds: 10, - }, - }}, - }, - }, - } - - return &Deployment{ - Metadata: Metadata, - Spec: Spec, - } -} - -// NewLocalClient returns a client that can be used with `kubectl proxy` -func NewLocalClient(hosts ...string) *client { - c := &client{ - opts: &api.Options{ - Client: http.DefaultClient, - Namespace: "default", - }, - } - - if len(hosts) == 0 { - c.opts.Host = "http://localhost:8001" - } else { - c.opts.Host = hosts[0] - } - - return c -} - -// NewClusterClient creates a Kubernetes client for use from within a k8s pod. -func NewClusterClient() *client { - host := "https://" + os.Getenv("KUBERNETES_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_SERVICE_PORT") - - s, err := os.Stat(serviceAccountPath) - if err != nil { - logger.Fatal(context.TODO(), err.Error()) - } - if s == nil || !s.IsDir() { - logger.Fatal(context.TODO(), "service account not found") - } - - token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token")) - if err != nil { - logger.Fatal(context.TODO(), err.Error()) - } - t := string(token) - - crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt")) - if err != nil { - logger.Fatal(context.TODO(), err.Error()) - } - - c := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - RootCAs: crt, - }, - DisableCompression: true, - }, - } - - return &client{ - opts: &api.Options{ - Client: c, - Host: host, - BearerToken: &t, - Namespace: DefaultNamespace, - }, - } -} diff --git a/util/kubernetes/client/options.go b/util/kubernetes/client/options.go deleted file mode 100644 index dd6b32a5..00000000 --- a/util/kubernetes/client/options.go +++ /dev/null @@ -1,107 +0,0 @@ -package client - -type CreateOptions struct { - Namespace string -} - -type GetOptions struct { - Namespace string - Labels map[string]string -} -type UpdateOptions struct { - Namespace string -} -type DeleteOptions struct { - Namespace string -} -type ListOptions struct { - Namespace string -} - -type LogOptions struct { - Namespace string - Params map[string]string -} - -type WatchOptions struct { - Namespace string - Params map[string]string -} - -type CreateOption func(*CreateOptions) -type GetOption func(*GetOptions) -type UpdateOption func(*UpdateOptions) -type DeleteOption func(*DeleteOptions) -type ListOption func(*ListOptions) -type LogOption func(*LogOptions) -type WatchOption func(*WatchOptions) - -// LogParams provides additional params for logs -func LogParams(p map[string]string) LogOption { - return func(l *LogOptions) { - l.Params = p - } -} - -// WatchParams used for watch params -func WatchParams(p map[string]string) WatchOption { - return func(w *WatchOptions) { - w.Params = p - } -} - -// CreateNamespace sets the namespace for creating a resource -func CreateNamespace(ns string) CreateOption { - return func(o *CreateOptions) { - o.Namespace = SerializeResourceName(ns) - } -} - -// GetNamespace sets the namespace for getting a resource -func GetNamespace(ns string) GetOption { - return func(o *GetOptions) { - o.Namespace = SerializeResourceName(ns) - } -} - -// GetLabels sets the labels for when getting a resource -func GetLabels(ls map[string]string) GetOption { - return func(o *GetOptions) { - o.Labels = ls - } -} - -// UpdateNamespace sets the namespace for updating a resource -func UpdateNamespace(ns string) UpdateOption { - return func(o *UpdateOptions) { - o.Namespace = SerializeResourceName(ns) - } -} - -// DeleteNamespace sets the namespace for deleting a resource -func DeleteNamespace(ns string) DeleteOption { - return func(o *DeleteOptions) { - o.Namespace = SerializeResourceName(ns) - } -} - -// ListNamespace sets the namespace for listing resources -func ListNamespace(ns string) ListOption { - return func(o *ListOptions) { - o.Namespace = SerializeResourceName(ns) - } -} - -// LogNamespace sets the namespace for logging a resource -func LogNamespace(ns string) LogOption { - return func(o *LogOptions) { - o.Namespace = SerializeResourceName(ns) - } -} - -// WatchNamespace sets the namespace for watching a resource -func WatchNamespace(ns string) WatchOption { - return func(o *WatchOptions) { - o.Namespace = SerializeResourceName(ns) - } -} diff --git a/util/kubernetes/client/templates.go b/util/kubernetes/client/templates.go deleted file mode 100644 index ee35b483..00000000 --- a/util/kubernetes/client/templates.go +++ /dev/null @@ -1,227 +0,0 @@ -package client - -var templates = map[string]string{ - "deployment": deploymentTmpl, - "service": serviceTmpl, - "namespace": namespaceTmpl, - "secret": secretTmpl, - "serviceaccount": serviceAccountTmpl, -} - -var deploymentTmpl = ` -apiVersion: apps/v1 -kind: Deployment -metadata: - name: "{{ .Metadata.Name }}" - namespace: "{{ .Metadata.Namespace }}" - labels: - {{- with .Metadata.Labels }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} - annotations: - {{- with .Metadata.Annotations }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} -spec: - replicas: {{ .Spec.Replicas }} - selector: - matchLabels: - {{- with .Spec.Selector.MatchLabels }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} - template: - metadata: - labels: - {{- with .Spec.Template.Metadata.Labels }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} - annotations: - {{- with .Spec.Template.Metadata.Annotations }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} - spec: - serviceAccountName: {{ .Spec.Template.PodSpec.ServiceAccountName }} - containers: - {{- with .Spec.Template.PodSpec.Containers }} - {{- range . }} - - name: {{ .Name }} - env: - {{- with .Env }} - {{- range . }} - - name: "{{ .Name }}" - value: "{{ .Value }}" - {{- if .ValueFrom }} - {{- with .ValueFrom }} - valueFrom: - {{- if .SecretKeyRef }} - {{- with .SecretKeyRef }} - secretKeyRef: - key: {{ .Key }} - name: {{ .Name }} - optional: {{ .Optional }} - {{- end }} - {{- end }} - {{- end }} - {{- end }} - {{- end }} - {{- end }} - args: - {{- range .Args }} - - {{.}} - {{- end }} - command: - {{- range .Command }} - - {{.}} - {{- end }} - image: {{ .Image }} - imagePullPolicy: Always - ports: - {{- with .Ports }} - {{- range . }} - - containerPort: {{ .ContainerPort }} - name: {{ .Name }} - {{- end }} - {{- end }} - {{- if .ReadinessProbe }} - {{- with .ReadinessProbe }} - readinessProbe: - {{- with .TCPSocket }} - tcpSocket: - {{- if .Host }} - host: {{ .Host }} - {{- end }} - port: {{ .Port }} - {{- end }} - initialDelaySeconds: {{ .InitialDelaySeconds }} - periodSeconds: {{ .PeriodSeconds }} - {{- end }} - {{- end }} - {{- if .Resources }} - {{- with .Resources }} - resources: - {{- if .Limits }} - {{- with .Limits }} - limits: - {{- if .Memory }} - memory: {{ .Memory }} - {{- end }} - {{- if .CPU }} - cpu: {{ .CPU }} - {{- end }} - {{- if .EphemeralStorage }} - ephemeral-storage: {{ .EphemeralStorage }} - {{- end }} - {{- end }} - {{- end }} - {{- if .Requests }} - {{- with .Requests }} - requests: - {{- if .Memory }} - memory: {{ .Memory }} - {{- end }} - {{- if .CPU }} - cpu: {{ .CPU }} - {{- end }} - {{- if .EphemeralStorage }} - ephemeral-storage: {{ .EphemeralStorage }} - {{- end }} - {{- end }} - {{- end }} - {{- end }} - {{- end }} - {{- end }} - {{- end }} -` - -var serviceTmpl = ` -apiVersion: v1 -kind: Service -metadata: - name: "{{ .Metadata.Name }}" - namespace: "{{ .Metadata.Namespace }}" - labels: - {{- with .Metadata.Labels }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} -spec: - selector: - {{- with .Spec.Selector }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} - ports: - {{- with .Spec.Ports }} - {{- range . }} - - name: "{{ .Name }}" - port: {{ .Port }} - protocol: {{ .Protocol }} - {{- end }} - {{- end }} -` - -var namespaceTmpl = ` -apiVersion: v1 -kind: Namespace -metadata: - name: "{{ .Metadata.Name }}" - labels: - {{- with .Metadata.Labels }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} -` - -//nolint:gosec -var secretTmpl = ` -apiVersion: v1 -kind: Secret -type: "{{ .Type }}" -metadata: - name: "{{ .Metadata.Name }}" - namespace: "{{ .Metadata.Namespace }}" - labels: - {{- with .Metadata.Labels }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} -data: - {{- with .Data }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} -` - -var serviceAccountTmpl = ` -apiVersion: v1 -kind: ServiceAccount -metadata: - name: "{{ .Metadata.Name }}" - labels: - {{- with .Metadata.Labels }} - {{- range $key, $value := . }} - {{ $key }}: "{{ $value }}" - {{- end }} - {{- end }} -imagePullSecrets: -{{- with .ImagePullSecrets }} -{{- range . }} -- name: "{{ .Name }}" -{{- end }} -{{- end }} -` diff --git a/util/kubernetes/client/types.go b/util/kubernetes/client/types.go deleted file mode 100644 index da78d1dd..00000000 --- a/util/kubernetes/client/types.go +++ /dev/null @@ -1,250 +0,0 @@ -package client - -// ContainerPort -type ContainerPort struct { - Name string `json:"name,omitempty"` - HostPort int `json:"hostPort,omitempty"` - ContainerPort int `json:"containerPort"` - Protocol string `json:"protocol,omitempty"` -} - -// EnvVar is environment variable -type EnvVar struct { - Name string `json:"name"` - Value string `json:"value,omitempty"` - ValueFrom *EnvVarSource `json:"valueFrom,omitempty"` -} - -// EnvVarSource represents a source for the value of an EnvVar. -type EnvVarSource struct { - SecretKeyRef *SecretKeySelector `json:"secretKeyRef,omitempty"` -} - -// SecretKeySelector selects a key of a Secret. -type SecretKeySelector struct { - Key string `json:"key"` - Name string `json:"name"` - Optional bool `json:"optional,omitempty"` -} - -type Condition struct { - Started string `json:"startedAt,omitempty"` - Reason string `json:"reason,omitempty"` - Message string `json:"message,omitempty"` -} - -// Container defined container runtime values -type Container struct { - Name string `json:"name"` - Image string `json:"image"` - Env []EnvVar `json:"env,omitempty"` - Command []string `json:"command,omitempty"` - Args []string `json:"args,omitempty"` - Ports []ContainerPort `json:"ports,omitempty"` - ReadinessProbe *Probe `json:"readinessProbe,omitempty"` - Resources *ResourceRequirements `json:"resources,omitempty"` -} - -// DeploymentSpec defines micro deployment spec -type DeploymentSpec struct { - Replicas int `json:"replicas,omitempty"` - Selector *LabelSelector `json:"selector"` - Template *Template `json:"template,omitempty"` -} - -// DeploymentCondition describes the state of deployment -type DeploymentCondition struct { - LastUpdateTime string `json:"lastUpdateTime"` - Type string `json:"type"` - Reason string `json:"reason,omitempty"` - Message string `json:"message,omitempty"` -} - -// DeploymentStatus is returned when querying deployment -type DeploymentStatus struct { - Replicas int `json:"replicas,omitempty"` - UpdatedReplicas int `json:"updatedReplicas,omitempty"` - ReadyReplicas int `json:"readyReplicas,omitempty"` - AvailableReplicas int `json:"availableReplicas,omitempty"` - UnavailableReplicas int `json:"unavailableReplicas,omitempty"` - Conditions []DeploymentCondition `json:"conditions,omitempty"` -} - -// Deployment is Kubernetes deployment -type Deployment struct { - Metadata *Metadata `json:"metadata"` - Spec *DeploymentSpec `json:"spec,omitempty"` - Status *DeploymentStatus `json:"status,omitempty"` -} - -// DeploymentList -type DeploymentList struct { - Items []Deployment `json:"items"` -} - -// LabelSelector is a label query over a set of resources -// NOTE: we do not support MatchExpressions at the moment -type LabelSelector struct { - MatchLabels map[string]string `json:"matchLabels,omitempty"` -} - -type LoadBalancerIngress struct { - IP string `json:"ip,omitempty"` - Hostname string `json:"hostname,omitempty"` -} - -type LoadBalancerStatus struct { - Ingress []LoadBalancerIngress `json:"ingress,omitempty"` -} - -// Metadata defines api object metadata -type Metadata struct { - Name string `json:"name,omitempty"` - Namespace string `json:"namespace,omitempty"` - Version string `json:"version,omitempty"` - Labels map[string]string `json:"labels,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` -} - -// PodSpec is a pod -type PodSpec struct { - Containers []Container `json:"containers"` - ServiceAccountName string `json:"serviceAccountName"` -} - -// PodList -type PodList struct { - Items []Pod `json:"items"` -} - -// Pod is the top level item for a pod -type Pod struct { - Metadata *Metadata `json:"metadata"` - Spec *PodSpec `json:"spec,omitempty"` - Status *PodStatus `json:"status"` -} - -// PodStatus -type PodStatus struct { - Conditions []PodCondition `json:"conditions,omitempty"` - Containers []ContainerStatus `json:"containerStatuses"` - PodIP string `json:"podIP"` - Phase string `json:"phase"` - Reason string `json:"reason"` -} - -// PodCondition describes the state of pod -type PodCondition struct { - Type string `json:"type"` - Reason string `json:"reason,omitempty"` - Message string `json:"message,omitempty"` -} - -type ContainerStatus struct { - State ContainerState `json:"state"` -} - -type ContainerState struct { - Running *Condition `json:"running"` - Terminated *Condition `json:"terminated"` - Waiting *Condition `json:"waiting"` -} - -// Resource is API resource -type Resource struct { - Name string - Kind string - Value interface{} -} - -// ServicePort configures service ports -type ServicePort struct { - Name string `json:"name,omitempty"` - Port int `json:"port"` - Protocol string `json:"protocol,omitempty"` -} - -// ServiceSpec provides service configuration -type ServiceSpec struct { - ClusterIP string `json:"clusterIP"` - Type string `json:"type,omitempty"` - Selector map[string]string `json:"selector,omitempty"` - Ports []ServicePort `json:"ports,omitempty"` -} - -// ServiceStatus -type ServiceStatus struct { - LoadBalancer LoadBalancerStatus `json:"loadBalancer,omitempty"` -} - -// Service is kubernetes service -type Service struct { - Metadata *Metadata `json:"metadata"` - Spec *ServiceSpec `json:"spec,omitempty"` - Status *ServiceStatus `json:"status,omitempty"` -} - -// ServiceList -type ServiceList struct { - Items []Service `json:"items"` -} - -// Template is micro deployment template -type Template struct { - Metadata *Metadata `json:"metadata,omitempty"` - PodSpec *PodSpec `json:"spec,omitempty"` -} - -// Namespace is a Kubernetes Namespace -type Namespace struct { - Metadata *Metadata `json:"metadata,omitempty"` -} - -// NamespaceList -type NamespaceList struct { - Items []Namespace `json:"items"` -} - -// ImagePullSecret -type ImagePullSecret struct { - Name string `json:"name"` -} - -// Secret -type Secret struct { - Type string `json:"type,omitempty"` - Data map[string]string `json:"data"` - Metadata *Metadata `json:"metadata,omitempty"` -} - -// ServiceAccount -type ServiceAccount struct { - Metadata *Metadata `json:"metadata,omitempty"` - ImagePullSecrets []ImagePullSecret `json:"imagePullSecrets,omitempty"` -} - -// Probe describes a health check to be performed against a container to determine whether it is alive or ready to receive traffic. -type Probe struct { - TCPSocket TCPSocketAction `json:"tcpSocket,omitempty"` - PeriodSeconds int `json:"periodSeconds"` - InitialDelaySeconds int `json:"initialDelaySeconds"` -} - -// TCPSocketAction describes an action based on opening a socket -type TCPSocketAction struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` -} - -// ResourceRequirements describes the compute resource requirements. -type ResourceRequirements struct { - Limits *ResourceLimits `json:"limits,omitempty"` - Requests *ResourceLimits `json:"requests,omitempty"` -} - -// ResourceLimits describes the limits for a service -type ResourceLimits struct { - Memory string `json:"memory,omitempty"` - CPU string `json:"cpu,omitempty"` - EphemeralStorage string `json:"ephemeral-storage,omitempty"` -} diff --git a/util/kubernetes/client/util.go b/util/kubernetes/client/util.go deleted file mode 100644 index 747ac9cb..00000000 --- a/util/kubernetes/client/util.go +++ /dev/null @@ -1,104 +0,0 @@ -package client - -import ( - "crypto/x509" - "encoding/pem" - "errors" - "fmt" - "io" - "io/ioutil" - "strings" - "text/template" -) - -// renderTemplateFile renders template for a given resource into writer w -func renderTemplate(resource string, w io.Writer, data interface{}) error { - t := template.Must(template.New("kubernetes").Parse(templates[resource])) - - if err := t.Execute(w, data); err != nil { - return err - } - - return nil -} - -// COPIED FROM -// https://github.com/kubernetes/kubernetes/blob/7a725418af4661067b56506faabc2d44c6d7703a/pkg/util/crypto/crypto.go - -// CertPoolFromFile returns an x509.CertPool containing the certificates in the given PEM-encoded file. -// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates -func CertPoolFromFile(filename string) (*x509.CertPool, error) { - certs, err := certificatesFromFile(filename) - if err != nil { - return nil, err - } - pool := x509.NewCertPool() - for _, cert := range certs { - pool.AddCert(cert) - } - return pool, nil -} - -// certificatesFromFile returns the x509.Certificates contained in the given PEM-encoded file. -// Returns an error if the file could not be read, a certificate could not be parsed, or if the file does not contain any certificates -func certificatesFromFile(file string) ([]*x509.Certificate, error) { - if len(file) == 0 { - return nil, errors.New("error reading certificates from an empty filename") - } - pemBlock, err := ioutil.ReadFile(file) - if err != nil { - return nil, err - } - certs, err := CertsFromPEM(pemBlock) - if err != nil { - return nil, fmt.Errorf("error reading %s: %s", file, err) - } - return certs, nil -} - -// CertsFromPEM returns the x509.Certificates contained in the given PEM-encoded byte array -// Returns an error if a certificate could not be parsed, or if the data does not contain any certificates -func CertsFromPEM(pemCerts []byte) ([]*x509.Certificate, error) { - ok := false - certs := []*x509.Certificate{} - for len(pemCerts) > 0 { - var block *pem.Block - block, pemCerts = pem.Decode(pemCerts) - if block == nil { - break - } - // Only use PEM "CERTIFICATE" blocks without extra headers - if block.Type != "CERTIFICATE" || len(block.Headers) != 0 { - continue - } - - cert, err := x509.ParseCertificate(block.Bytes) - if err != nil { - return certs, err - } - - certs = append(certs, cert) - ok = true - } - - if !ok { - return certs, errors.New("could not read any certificates") - } - return certs, nil -} - -// Format is used to format a string value into a k8s valid name -func Format(v string) string { - // to lower case - v = strings.ToLower(v) - // / to dashes - v = strings.ReplaceAll(v, "/", "-") - // dots to dashes - v = strings.ReplaceAll(v, ".", "-") - // limit to 253 chars - if len(v) > 253 { - v = v[:253] - } - // return new name - return v -} diff --git a/util/kubernetes/client/util_test.go b/util/kubernetes/client/util_test.go deleted file mode 100644 index 54a6139c..00000000 --- a/util/kubernetes/client/util_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package client - -import ( - "bytes" - "testing" -) - -func TestTemplates(t *testing.T) { - name := "foo" - version := "123" - typ := "service" - namespace := "default" - - // Render default service - s := NewService(name, version, typ, namespace) - bs := new(bytes.Buffer) - if err := renderTemplate(templates["service"], bs, s); err != nil { - t.Errorf("Failed to render kubernetes service: %v", err) - } - - // Render default deployment - d := NewDeployment(name, version, typ, namespace) - bd := new(bytes.Buffer) - if err := renderTemplate(templates["deployment"], bd, d); err != nil { - t.Errorf("Failed to render kubernetes deployment: %v", err) - } -} - -func TestFormatName(t *testing.T) { - testCases := []struct { - name string - expect string - }{ - {"foobar", "foobar"}, - {"foo-bar", "foo-bar"}, - {"foo.bar", "foo-bar"}, - {"Foo.Bar", "foo-bar"}, - {"micro.foo.bar", "micro-foo-bar"}, - } - - for _, test := range testCases { - v := Format(test.name) - if v != test.expect { - t.Fatalf("Expected name %s for %s got: %s", test.expect, test.name, v) - } - } -} diff --git a/util/kubernetes/client/watch.go b/util/kubernetes/client/watch.go deleted file mode 100644 index dc799c43..00000000 --- a/util/kubernetes/client/watch.go +++ /dev/null @@ -1,124 +0,0 @@ -package client - -import ( - "bufio" - "context" - "encoding/json" - "errors" - "net/http" - - "github.com/unistack-org/micro/v3/util/kubernetes/api" -) - -const ( - // EventTypes used - Added EventType = "ADDED" - Modified EventType = "MODIFIED" - Deleted EventType = "DELETED" - Error EventType = "ERROR" -) - -// Watcher is used to watch for events -type Watcher interface { - // A channel of events - Chan() <-chan Event - // Stop the watcher - Stop() -} - -// EventType defines the possible types of events. -type EventType string - -// Event represents a single event to a watched resource. -type Event struct { - Type EventType `json:"type"` - Object json.RawMessage `json:"object"` -} - -// bodyWatcher scans the body of a request for chunks -type bodyWatcher struct { - results chan Event - cancel func() - stop chan bool - res *http.Response - req *api.Request -} - -// Changes returns the results channel -func (wr *bodyWatcher) Chan() <-chan Event { - return wr.results -} - -// Stop cancels the request -func (wr *bodyWatcher) Stop() { - select { - case <-wr.stop: - return - default: - // cancel the request - wr.cancel() - // stop the watcher - close(wr.stop) - } -} - -func (wr *bodyWatcher) stream() { - reader := bufio.NewReader(wr.res.Body) - - go func() { - for { - // read a line - b, err := reader.ReadBytes('\n') - if err != nil { - return - } - - // send the event - var event Event - if err := json.Unmarshal(b, &event); err != nil { - continue - } - - select { - case <-wr.stop: - return - case wr.results <- event: - } - } - }() -} - -// newWatcher creates a k8s body watcher for -// a given http request -func newWatcher(req *api.Request) (Watcher, error) { - // set request context so we can cancel the request - ctx, cancel := context.WithCancel(context.Background()) - req.Context(ctx) - - // do the raw request - res, err := req.Raw() - if err != nil { - cancel() - return nil, err - } - - if res.StatusCode < 200 || res.StatusCode >= 300 { - cancel() - // close the response body - res.Body.Close() - // return an error - return nil, errors.New(res.Request.URL.String() + ": " + res.Status) - } - - wr := &bodyWatcher{ - results: make(chan Event), - stop: make(chan bool), - cancel: cancel, - req: req, - res: res, - } - - go wr.stream() - - return wr, nil -}