diff --git a/go.mod b/go.mod index 5b3c0c97..a740171b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1 github.com/evanphx/json-patch/v5 v5.0.0 - github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/ghodss/yaml v1.0.0 github.com/go-acme/lego/v3 v3.4.0 github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee @@ -19,7 +18,6 @@ require ( github.com/google/uuid v1.1.1 github.com/gorilla/handlers v1.4.2 github.com/hashicorp/hcl v1.0.0 - github.com/hpcloud/tail v1.0.0 github.com/kr/pretty v0.2.0 // indirect github.com/miekg/dns v1.1.27 github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c @@ -27,8 +25,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.7.0 github.com/stretchr/testify v1.5.1 - github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf - github.com/xanzy/go-gitlab v0.35.1 golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899 golang.org/x/net v0.0.0-20200707034311-ab3426394381 google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 diff --git a/go.sum b/go.sum index 4e562439..59fe4779 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,6 @@ github.com/exoscale/egoscale v0.18.1/go.mod h1:Z7OOdzzTOz1Q1PjQXumlz9Wn/CddH0zSY github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-acme/lego/v3 v3.4.0 h1:deB9NkelA+TfjGHVw8J7iKl/rMtffcGMWSMmptvMv0A= @@ -155,12 +153,6 @@ github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE= github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= -github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= -github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= -github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= -github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-retryablehttp v0.6.4 h1:BbgctKO892xEyOXnGiaAwIoSq1QZ/SS4AhjoAh9DnfY= -github.com/hashicorp/go-retryablehttp v0.6.4/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= @@ -285,16 +277,11 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ75iPqWZc0HeJWFYNCvKsfpQwFpRNTA= -github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0= github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY= github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= -github.com/unistack-org/micro v1.18.0 h1:EbFiII0bKV0Xcua7o6J30MFmm4/g0Hv3ECOKzsUBihU= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= -github.com/xanzy/go-gitlab v0.35.1 h1:jJSgT0NxjCvrSZf7Gvn2NxxV9xAYkTjYrKW8XwWhrfY= -github.com/xanzy/go-gitlab v0.35.1/go.mod h1:sPLojNBn68fMUWSxIJtdVVIP8uSBYqesTfDUseX11Ug= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.1.0/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= @@ -340,7 +327,6 @@ golang.org/x/net v0.0.0-20180611182652-db08ff08e862/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181108082009-03003ca0c849/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -360,7 +346,6 @@ golang.org/x/net v0.0.0-20191027093000-83d349e8ac1a/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -391,7 +376,6 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= @@ -403,8 +387,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -436,7 +418,6 @@ google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEn google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= google.golang.org/api v0.14.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go deleted file mode 100644 index 2f381c7a..00000000 --- a/runtime/kubernetes/kubernetes.go +++ /dev/null @@ -1,714 +0,0 @@ -// Package kubernetes implements kubernetes micro runtime -package kubernetes - -import ( - "encoding/base64" - "fmt" - "strings" - "sync" - "time" - - "github.com/unistack-org/micro/v3/logger" - log "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/runtime" - "github.com/unistack-org/micro/v3/util/kubernetes/client" -) - -// action to take on runtime service -type action int - -type kubernetes struct { - sync.RWMutex - // options configure runtime - options runtime.Options - // indicates if we're running - running bool - // used to stop the runtime - closed chan bool - // client is kubernetes client - client client.Client - // namespaces which exist - namespaces []client.Namespace -} - -// namespaceExists returns a boolean indicating if a namespace exists -func (k *kubernetes) namespaceExists(name string) (bool, error) { - // populate the cache - if k.namespaces == nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Populating namespace cache") - } - - namespaceList := new(client.NamespaceList) - resource := &client.Resource{Kind: "namespace", Value: namespaceList} - if err := k.client.List(resource); err != nil { - return false, err - } - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Popualted namespace cache successfully with %v items", len(namespaceList.Items)) - } - k.namespaces = namespaceList.Items - } - - // check if the namespace exists in the cache - for _, n := range k.namespaces { - if n.Metadata.Name == name { - return true, nil - } - } - - return false, nil -} - -// createNamespace creates a new k8s namespace -func (k *kubernetes) createNamespace(namespace string) error { - ns := client.Namespace{Metadata: &client.Metadata{Name: namespace}} - err := k.client.Create(&client.Resource{Kind: "namespace", Value: ns}) - - // ignore err already exists - if err != nil && strings.Contains(err.Error(), "already exists") { - logger.Debugf("Ignoring ErrAlreadyExists for namespace %v: %v", namespace, err) - err = nil - } - - // add to cache - if err == nil && k.namespaces != nil { - k.namespaces = append(k.namespaces, ns) - } - - return err -} - -// getService queries kubernetes for micro service -// NOTE: this function is not thread-safe -func (k *kubernetes) getService(labels map[string]string, opts ...client.GetOption) ([]*service, error) { - // get the service status - serviceList := new(client.ServiceList) - r := &client.Resource{ - Kind: "service", - Value: serviceList, - } - - opts = append(opts, client.GetLabels(labels)) - - // get the service from k8s - if err := k.client.Get(r, opts...); err != nil { - return nil, err - } - - // get the deployment status - depList := new(client.DeploymentList) - d := &client.Resource{ - Kind: "deployment", - Value: depList, - } - if err := k.client.Get(d, opts...); err != nil { - return nil, err - } - - // get the pods from k8s - podList := new(client.PodList) - p := &client.Resource{ - Kind: "pod", - Value: podList, - } - if err := k.client.Get(p, opts...); err != nil { - return nil, err - } - - // service map - svcMap := make(map[string]*service) - - // collect info from kubernetes service - for _, kservice := range serviceList.Items { - // name of the service - name := kservice.Metadata.Labels["name"] - // version of the service - version := kservice.Metadata.Labels["version"] - - srv := &service{ - Service: &runtime.Service{ - Name: name, - Version: version, - Metadata: make(map[string]string), - }, - kservice: &kservice, - } - - // set the address - address := kservice.Spec.ClusterIP - port := kservice.Spec.Ports[0] - srv.Service.Metadata["address"] = fmt.Sprintf("%s:%d", address, port.Port) - // set the type of service - srv.Service.Metadata["type"] = kservice.Metadata.Labels["micro"] - - // copy annotations metadata into service metadata - for k, v := range kservice.Metadata.Annotations { - srv.Service.Metadata[k] = v - } - - // save as service - svcMap[name+version] = srv - } - - // collect additional info from kubernetes deployment - for _, kdep := range depList.Items { - // name of the service - name := kdep.Metadata.Labels["name"] - // versio of the service - version := kdep.Metadata.Labels["version"] - - // access existing service map based on name + version - if svc, ok := svcMap[name+version]; ok { - // we're expecting our own service name in metadata - if _, ok := kdep.Metadata.Annotations["name"]; !ok { - continue - } - - // set the service name, version and source - // based on existing annotations we stored - svc.Service.Name = kdep.Metadata.Annotations["name"] - svc.Service.Version = kdep.Metadata.Annotations["version"] - svc.Service.Source = kdep.Metadata.Annotations["source"] - - // delete from metadata - delete(kdep.Metadata.Annotations, "name") - delete(kdep.Metadata.Annotations, "version") - delete(kdep.Metadata.Annotations, "source") - - // copy all annotations metadata into service metadata - for k, v := range kdep.Metadata.Annotations { - svc.Service.Metadata[k] = v - } - - // parse out deployment status and inject into service metadata - if len(kdep.Status.Conditions) > 0 { - svc.Status(kdep.Status.Conditions[0].Type, nil) - svc.Metadata["started"] = kdep.Status.Conditions[0].LastUpdateTime - } else { - svc.Status("n/a", nil) - } - - // get the real status - for _, item := range podList.Items { - var status string - - // check the name - if item.Metadata.Labels["name"] != name { - continue - } - // check the version - if item.Metadata.Labels["version"] != version { - continue - } - - switch item.Status.Phase { - case "Failed": - status = item.Status.Reason - default: - status = item.Status.Phase - } - - // skip if we can't get the container - if len(item.Status.Containers) == 0 { - continue - } - - // now try get a deeper status - state := item.Status.Containers[0].State - - // set start time - if state.Running != nil { - svc.Metadata["started"] = state.Running.Started - } - - // set status from waiting - if v := state.Waiting; v != nil { - if len(v.Reason) > 0 { - status = v.Reason - } - } - // TODO: set from terminated - svc.Status(status, nil) - } - - // save deployment - svc.kdeploy = &kdep - } - } - - // collect all the services and return - services := make([]*service, 0, len(serviceList.Items)) - - for _, service := range svcMap { - services = append(services, service) - } - - return services, nil -} - -// run runs the runtime management loop -func (k *kubernetes) run(events <-chan runtime.Event) { - t := time.NewTicker(time.Second * 10) - defer t.Stop() - - for { - select { - case <-t.C: - // TODO: figure out what to do here - // - do we even need the ticker for k8s services? - case event := <-events: - // NOTE: we only handle Update events for now - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime received notification event: %v", event) - } - switch event.Type { - case runtime.Update: - // only process if there's an actual service - // we do not update all the things individually - if event.Service == nil { - continue - } - - // format the name - name := client.Format(event.Service.Name) - - // set the default labels - labels := map[string]string{ - "micro": k.options.Type, - "name": name, - } - - if len(event.Service.Version) > 0 { - labels["version"] = event.Service.Version - } - - // get the deployment status - deployed := new(client.DeploymentList) - - // get the existing service rather than creating a new one - err := k.client.Get(&client.Resource{ - Kind: "deployment", - Value: deployed, - }, client.GetLabels(labels)) - - if err != nil { - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime update failed to get service %s: %v", event.Service, err) - } - continue - } - - // technically we should not receive multiple versions but hey ho - for _, service := range deployed.Items { - // check the name matches - if service.Metadata.Name != name { - continue - } - - // update build time annotation - if service.Spec.Template.Metadata.Annotations == nil { - service.Spec.Template.Metadata.Annotations = make(map[string]string) - } - - // update the build time - service.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", event.Timestamp.Unix()) - - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name) - } - if err := k.client.Update(deploymentResource(&service)); err != nil { - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime failed to update service %s: %v", event.Service, err) - } - continue - } - } - } - case <-k.closed: - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime stopped") - } - return - } - } -} - -// Init initializes runtime options -func (k *kubernetes) Init(opts ...runtime.Option) error { - k.Lock() - defer k.Unlock() - - for _, o := range opts { - o(&k.options) - } - - return nil -} - -func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.Logs, error) { - klo := newLog(k.client, s.Name, options...) - - if !klo.options.Stream { - records, err := klo.Read() - if err != nil { - log.Errorf("Failed to get logs for service '%v' from k8s: %v", s.Name, err) - return nil, err - } - kstream := &kubeStream{ - stream: make(chan runtime.Log), - stop: make(chan bool), - } - go func() { - for _, record := range records { - kstream.Chan() <- record - } - kstream.Stop() - }() - return kstream, nil - } - stream, err := klo.Stream() - if err != nil { - return nil, err - } - return stream, nil -} - -type kubeStream struct { - // the k8s log stream - stream chan runtime.Log - // the stop chan - sync.Mutex - stop chan bool - err error -} - -func (k *kubeStream) Error() error { - return k.err -} - -func (k *kubeStream) Chan() chan runtime.Log { - return k.stream -} - -func (k *kubeStream) Stop() error { - k.Lock() - defer k.Unlock() - select { - case <-k.stop: - return nil - default: - close(k.stop) - close(k.stream) - } - return nil -} - -// Creates a service -func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error { - k.Lock() - defer k.Unlock() - - options := runtime.CreateOptions{ - Type: k.options.Type, - Namespace: client.DefaultNamespace, - } - for _, o := range opts { - o(&options) - } - - // default type if it doesn't exist - if len(options.Type) == 0 { - options.Type = k.options.Type - } - - // default the source if it doesn't exist - if len(s.Source) == 0 { - s.Source = k.options.Source - } - - // ensure the namespace exists - namespace := client.SerializeResourceName(options.Namespace) - // only do this if the namespace is not default - if namespace != "default" { - if exist, err := k.namespaceExists(namespace); err == nil && !exist { - if err := k.createNamespace(namespace); err != nil { - if logger.V(logger.WarnLevel, logger.DefaultLogger) { - logger.Warnf("Error creating namespacr %v: %v", namespace, err) - } - return err - } - } else if err != nil { - if logger.V(logger.WarnLevel, logger.DefaultLogger) { - logger.Warnf("Error checking namespace %v exists: %v", namespace, err) - } - return err - } - } - // determine the image from the source and options - options.Image = k.getImage(s, options) - - // create a secret for the credentials if some where provided - if len(options.Secrets) > 0 { - if err := k.createCredentials(s, options); err != nil { - if logger.V(logger.WarnLevel, logger.DefaultLogger) { - logger.Warnf("Error generating auth credentials for service: %v", err) - } - return err - } - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Generated auth credentials for service %v", s.Name) - } - } - - // create new service - service := newService(s, options) - - // start the service - return service.Start(k.client, client.CreateNamespace(options.Namespace)) -} - -// Read returns all instances of given service -func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) { - k.Lock() - defer k.Unlock() - - // set the default labels - labels := map[string]string{} - - options := runtime.ReadOptions{ - Namespace: client.DefaultNamespace, - } - - for _, o := range opts { - o(&options) - } - - if len(options.Service) > 0 { - labels["name"] = client.Format(options.Service) - } - - // add version to labels if a version has been supplied - if len(options.Version) > 0 { - labels["version"] = options.Version - } - - if len(options.Type) > 0 { - labels["micro"] = options.Type - } - - srvs, err := k.getService(labels, client.GetNamespace(options.Namespace)) - if err != nil { - return nil, err - } - - services := make([]*runtime.Service, 0, len(srvs)) - for _, service := range srvs { - services = append(services, service.Service) - } - - return services, nil -} - -// Update the service in place -func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) error { - options := runtime.UpdateOptions{ - Namespace: client.DefaultNamespace, - } - - for _, o := range opts { - o(&options) - } - - labels := map[string]string{} - - if len(s.Name) > 0 { - labels["name"] = client.Format(s.Name) - } - - if len(s.Version) > 0 { - labels["version"] = s.Version - } - - // get the existing service - services, err := k.getService(labels, client.GetNamespace(options.Namespace)) - if err != nil { - return err - } - - // update the relevant services - for _, service := range services { - // nil check - if service.kdeploy.Metadata == nil || service.kdeploy.Metadata.Annotations == nil { - md := new(client.Metadata) - md.Annotations = make(map[string]string) - service.kdeploy.Metadata = md - } - - // update metadata - for k, v := range s.Metadata { - service.kdeploy.Metadata.Annotations[k] = v - } - - // update build time annotation - service.kdeploy.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix()) - - // update the service - if err := service.Update(k.client, client.UpdateNamespace(options.Namespace)); err != nil { - return err - } - } - - return nil -} - -// Delete removes a service -func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error { - options := runtime.DeleteOptions{ - Namespace: client.DefaultNamespace, - } - for _, o := range opts { - o(&options) - } - - k.Lock() - defer k.Unlock() - - // create new kubernetes micro service - service := newService(s, runtime.CreateOptions{ - Type: k.options.Type, - Namespace: options.Namespace, - }) - - // delete the service credentials - ns := client.DeleteNamespace(options.Namespace) - k.client.Delete(&client.Resource{Name: credentialsName(s), Kind: "secret"}, ns) - - return service.Stop(k.client, ns) -} - -// Start starts the runtime -func (k *kubernetes) Start() error { - k.Lock() - defer k.Unlock() - - // already running - if k.running { - return nil - } - - // set running - k.running = true - k.closed = make(chan bool) - - var events <-chan runtime.Event - if k.options.Scheduler != nil { - var err error - events, err = k.options.Scheduler.Notify() - if err != nil { - // TODO: should we bail here? - if log.V(log.DebugLevel, log.DefaultLogger) { - log.Debugf("Runtime failed to start update notifier") - } - } - } - - go k.run(events) - - return nil -} - -// Stop shuts down the runtime -func (k *kubernetes) Stop() error { - k.Lock() - defer k.Unlock() - - if !k.running { - return nil - } - - select { - case <-k.closed: - return nil - default: - close(k.closed) - // set not running - k.running = false - // stop the scheduler - if k.options.Scheduler != nil { - return k.options.Scheduler.Close() - } - } - - return nil -} - -// String implements stringer interface -func (k *kubernetes) String() string { - return "kubernetes" -} - -// NewRuntime creates new kubernetes runtime -func NewRuntime(opts ...runtime.Option) runtime.Runtime { - // get default options - options := runtime.Options{ - // Create labels with type "micro": "service" - Type: "service", - } - - // apply requested options - for _, o := range opts { - o(&options) - } - - // kubernetes client - client := client.NewClusterClient() - - return &kubernetes{ - options: options, - closed: make(chan bool), - client: client, - } -} - -func (k *kubernetes) getImage(s *runtime.Service, options runtime.CreateOptions) string { - // use the image when its specified - if len(options.Image) > 0 { - return options.Image - } - - if len(k.options.Image) > 0 { - return k.options.Image - } - - return "" -} -func (k *kubernetes) createCredentials(service *runtime.Service, options runtime.CreateOptions) error { - data := make(map[string]string, len(options.Secrets)) - for key, value := range options.Secrets { - data[key] = base64.StdEncoding.EncodeToString([]byte(value)) - } - - // construct the k8s secret object - secret := &client.Secret{ - Type: "Opaque", - Data: data, - Metadata: &client.Metadata{ - Name: credentialsName(service), - Namespace: options.Namespace, - }, - } - - // crete the secret in kubernetes - name := credentialsName(service) - return k.client.Create(&client.Resource{ - Kind: "secret", Name: name, Value: secret, - }, client.CreateNamespace(options.Namespace)) -} - -func credentialsName(service *runtime.Service) string { - name := fmt.Sprintf("%v-%v-credentials", service.Name, service.Version) - return client.SerializeResourceName(name) -} diff --git a/runtime/kubernetes/logs.go b/runtime/kubernetes/logs.go deleted file mode 100644 index 4aaa6c01..00000000 --- a/runtime/kubernetes/logs.go +++ /dev/null @@ -1,206 +0,0 @@ -// Package kubernetes taken from https://github.com/micro/go-micro/blob/master/debug/log/kubernetes/kubernetes.go -// There are some modifications compared to the other files as -// this package doesn't provide write functionality. -// With the write functionality gone, structured logs also go away. -package kubernetes - -import ( - "bufio" - "strconv" - "time" - - "github.com/unistack-org/micro/v3/errors" - "github.com/unistack-org/micro/v3/runtime" - "github.com/unistack-org/micro/v3/util/kubernetes/client" - "github.com/unistack-org/micro/v3/util/log" -) - -type klog struct { - client client.Client - serviceName string - options runtime.LogsOptions -} - -func (k *klog) podLogs(podName string, stream *kubeStream) error { - p := make(map[string]string) - p["follow"] = "true" - - opts := []client.LogOption{ - client.LogParams(p), - client.LogNamespace(k.options.Namespace), - } - - // get the logs for the pod - body, err := k.client.Log(&client.Resource{ - Name: podName, - Kind: "pod", - }, opts...) - - if err != nil { - stream.err = err - stream.Stop() - return err - } - - s := bufio.NewScanner(body) - defer body.Close() - - for { - select { - case <-stream.stop: - return stream.Error() - default: - if s.Scan() { - record := runtime.Log{ - Message: s.Text(), - } - select { - case stream.stream <- record: - case <-stream.stop: - return stream.Error() - } - } else { - // TODO: is there a blocking call - // rather than a sleep loop? - time.Sleep(time.Second) - } - } - } -} - -func (k *klog) getMatchingPods() ([]string, error) { - r := &client.Resource{ - Kind: "pod", - Value: new(client.PodList), - } - - l := make(map[string]string) - - l["name"] = client.Format(k.serviceName) - // TODO: specify micro:service - // l["micro"] = "service" - - opts := []client.GetOption{ - client.GetLabels(l), - client.GetNamespace(k.options.Namespace), - } - - if err := k.client.Get(r, opts...); err != nil { - return nil, err - } - - var matches []string - - for _, p := range r.Value.(*client.PodList).Items { - // find labels that match the name - if p.Metadata.Labels["name"] == client.Format(k.serviceName) { - matches = append(matches, p.Metadata.Name) - } - } - - return matches, nil -} - -func (k *klog) Read() ([]runtime.Log, error) { - pods, err := k.getMatchingPods() - if err != nil { - return nil, err - } - if len(pods) == 0 { - return nil, errors.NotFound("runtime.logs", "no such service") - } - - var records []runtime.Log - - for _, pod := range pods { - logParams := make(map[string]string) - - //if !opts.Since.Equal(time.Time{}) { - // logParams["sinceSeconds"] = strconv.Itoa(int(time.Since(opts.Since).Seconds())) - //} - - if k.options.Count != 0 { - logParams["tailLines"] = strconv.Itoa(int(k.options.Count)) - } - - if k.options.Stream { - logParams["follow"] = "true" - } - - opts := []client.LogOption{ - client.LogParams(logParams), - client.LogNamespace(k.options.Namespace), - } - - logs, err := k.client.Log(&client.Resource{ - Name: pod, - Kind: "pod", - }, opts...) - - if err != nil { - return nil, err - } - defer logs.Close() - - s := bufio.NewScanner(logs) - - for s.Scan() { - record := runtime.Log{ - Message: s.Text(), - } - // record.Metadata["pod"] = pod - records = append(records, record) - } - } - - // sort the records - // sort.Slice(records, func(i, j int) bool { return records[i].Timestamp.Before(records[j].Timestamp) }) - - return records, nil -} - -func (k *klog) Stream() (runtime.Logs, error) { - // find the matching pods - pods, err := k.getMatchingPods() - if err != nil { - return nil, err - } - if len(pods) == 0 { - return nil, errors.NotFound("runtime.logs", "no such service") - } - - stream := &kubeStream{ - stream: make(chan runtime.Log), - stop: make(chan bool), - } - - // stream from the individual pods - for _, pod := range pods { - go func(podName string) { - err := k.podLogs(podName, stream) - if err != nil { - log.Errorf("Error streaming from pod: %v", err) - } - }(pod) - } - - return stream, nil -} - -// NewLog returns a configured Kubernetes logger -func newLog(c client.Client, serviceName string, opts ...runtime.LogsOption) *klog { - options := runtime.LogsOptions{ - Namespace: client.DefaultNamespace, - } - for _, o := range opts { - o(&options) - } - - klog := &klog{ - serviceName: serviceName, - client: c, - options: options, - } - - return klog -} diff --git a/runtime/kubernetes/service.go b/runtime/kubernetes/service.go deleted file mode 100644 index d7c1c50e..00000000 --- a/runtime/kubernetes/service.go +++ /dev/null @@ -1,227 +0,0 @@ -package kubernetes - -import ( - "encoding/json" - "fmt" - "strings" - "time" - - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/runtime" - "github.com/unistack-org/micro/v3/util/kubernetes/api" - "github.com/unistack-org/micro/v3/util/kubernetes/client" -) - -type service struct { - // service to manage - *runtime.Service - // Kubernetes service - kservice *client.Service - // Kubernetes deployment - kdeploy *client.Deployment -} - -func parseError(err error) *api.Status { - status := new(api.Status) - json.Unmarshal([]byte(err.Error()), &status) - return status -} - -func newService(s *runtime.Service, c runtime.CreateOptions) *service { - // use pre-formatted name/version - name := client.Format(s.Name) - version := client.Format(s.Version) - - kservice := client.NewService(name, version, c.Type, c.Namespace) - kdeploy := client.NewDeployment(name, version, c.Type, c.Namespace) - - // ensure the metadata is set - if kdeploy.Spec.Template.Metadata.Annotations == nil { - kdeploy.Spec.Template.Metadata.Annotations = make(map[string]string) - } - - // create if non existent - if s.Metadata == nil { - s.Metadata = make(map[string]string) - } - - // add the service metadata to the k8s labels, do this first so we - // don't override any labels used by the runtime, e.g. name - for k, v := range s.Metadata { - kdeploy.Metadata.Annotations[k] = v - } - - // attach our values to the deployment; name, version, source - kdeploy.Metadata.Annotations["name"] = s.Name - kdeploy.Metadata.Annotations["version"] = s.Version - kdeploy.Metadata.Annotations["source"] = s.Source - - // associate owner:group to be later augmented - kdeploy.Metadata.Annotations["owner"] = "micro" - kdeploy.Metadata.Annotations["group"] = "micro" - - // update the deployment is a custom source is provided - if len(c.Image) > 0 { - for i := range kdeploy.Spec.Template.PodSpec.Containers { - kdeploy.Spec.Template.PodSpec.Containers[i].Image = c.Image - kdeploy.Spec.Template.PodSpec.Containers[i].Command = []string{} - kdeploy.Spec.Template.PodSpec.Containers[i].Args = []string{} - } - } - - // define the environment values used by the container - env := make([]client.EnvVar, 0, len(c.Env)) - for _, evar := range c.Env { - evarPair := strings.Split(evar, "=") - env = append(env, client.EnvVar{Name: evarPair[0], Value: evarPair[1]}) - } - - // if secrets were provided, pass them to the service - for key := range c.Secrets { - env = append(env, client.EnvVar{ - Name: key, - ValueFrom: &client.EnvVarSource{ - SecretKeyRef: &client.SecretKeySelector{ - Name: credentialsName(s), - Key: key, - }, - }, - }) - } - - // if environment has been supplied update deployment default environment - if len(env) > 0 { - kdeploy.Spec.Template.PodSpec.Containers[0].Env = append(kdeploy.Spec.Template.PodSpec.Containers[0].Env, env...) - } - - // set the command if specified - if len(c.Command) > 0 { - kdeploy.Spec.Template.PodSpec.Containers[0].Command = c.Command - } - - if len(c.Args) > 0 { - kdeploy.Spec.Template.PodSpec.Containers[0].Args = c.Args - } - - // apply resource limits - if c.Resources != nil { - resLimits := &client.ResourceLimits{} - if c.Resources.CPU > 0 { - resLimits.CPU = fmt.Sprintf("%vm", c.Resources.CPU) - } - if c.Resources.Mem > 0 { - resLimits.Memory = fmt.Sprintf("%vMi", c.Resources.Mem) - } - if c.Resources.Disk > 0 { - resLimits.EphemeralStorage = fmt.Sprintf("%vMi", c.Resources.Disk) - } - - kdeploy.Spec.Template.PodSpec.Containers[0].Resources = &client.ResourceRequirements{Limits: resLimits} - } - - return &service{ - Service: s, - kservice: kservice, - kdeploy: kdeploy, - } -} - -func deploymentResource(d *client.Deployment) *client.Resource { - return &client.Resource{ - Name: d.Metadata.Name, - Kind: "deployment", - Value: d, - } -} - -func serviceResource(s *client.Service) *client.Resource { - return &client.Resource{ - Name: s.Metadata.Name, - Kind: "service", - Value: s, - } -} - -// Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects -func (s *service) Start(k client.Client, opts ...client.CreateOption) error { - // create deployment first; if we fail, we dont create service - if err := k.Create(deploymentResource(s.kdeploy), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to create deployment: %v", err) - } - s.Status("error", err) - v := parseError(err) - if v.Reason == "AlreadyExists" { - return runtime.ErrAlreadyExists - } - return err - } - // create service now that the deployment has been created - if err := k.Create(serviceResource(s.kservice), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to create service: %v", err) - } - s.Status("error", err) - v := parseError(err) - if v.Reason == "AlreadyExists" { - return runtime.ErrAlreadyExists - } - return err - } - - s.Status("started", nil) - - return nil -} - -func (s *service) Stop(k client.Client, opts ...client.DeleteOption) error { - // first attempt to delete service - if err := k.Delete(serviceResource(s.kservice), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to delete service: %v", err) - } - s.Status("error", err) - return err - } - // delete deployment once the service has been deleted - if err := k.Delete(deploymentResource(s.kdeploy), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to delete deployment: %v", err) - } - s.Status("error", err) - return err - } - - s.Status("stopped", nil) - - return nil -} - -func (s *service) Update(k client.Client, opts ...client.UpdateOption) error { - if err := k.Update(deploymentResource(s.kdeploy), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to update deployment: %v", err) - } - s.Status("error", err) - return err - } - if err := k.Update(serviceResource(s.kservice), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to update service: %v", err) - } - return err - } - - return nil -} - -func (s *service) Status(status string, err error) { - s.Metadata["lastStatusUpdate"] = time.Now().Format(time.RFC3339) - if err == nil { - s.Metadata["status"] = status - delete(s.Metadata, "error") - return - } - s.Metadata["status"] = "error" - s.Metadata["error"] = err.Error() -} diff --git a/runtime/local/git/git.go b/runtime/local/git/git.go deleted file mode 100644 index 9c07a56e..00000000 --- a/runtime/local/git/git.go +++ /dev/null @@ -1,609 +0,0 @@ -package git - -import ( - "archive/tar" - "archive/zip" - "compress/gzip" - "fmt" - "io" - "io/ioutil" - "net/http" - "os" - "os/exec" - "path" - "path/filepath" - "regexp" - "strings" - - "github.com/teris-io/shortid" - "github.com/unistack-org/micro/v3/logger" - "github.com/xanzy/go-gitlab" -) - -const credentialsKey = "GIT_CREDENTIALS" - -type Gitter interface { - Checkout(repo, branchOrCommit string) error - RepoDir() string -} - -type binaryGitter struct { - folder string - secrets map[string]string -} - -func (g *binaryGitter) Checkout(repo, branchOrCommit string) error { - // The implementation of this method is questionable. - // We use archives from github/gitlab etc which doesnt require the user to have got - // and probably is faster than downloading the whole repo history, - // but it comes with a bit of custom code for EACH host. - // @todo probably we should fall back to git in case the archives are not available. - - if branchOrCommit == "latest" { - branchOrCommit = "master" - } - if strings.Contains(repo, "github") { - return g.checkoutGithub(repo, branchOrCommit) - } else if strings.Contains(repo, "gitlab") { - err := g.checkoutGitLabPublic(repo, branchOrCommit) - if err != nil && len(g.secrets[credentialsKey]) > 0 { - // If the public download fails, try getting it with tokens. - // Private downloads needs a token for api project listing, hence - // the weird structure of this code. - return g.checkoutGitLabPrivate(repo, branchOrCommit) - } - return err - } - if len(g.secrets[credentialsKey]) > 0 { - return g.checkoutAnyRemote(repo, branchOrCommit, true) - } - return g.checkoutAnyRemote(repo, branchOrCommit, false) -} - -// This aims to be a generic checkout method. Currently only tested for bitbucket, -// see tests -func (g *binaryGitter) checkoutAnyRemote(repo, branchOrCommit string, useCredentials bool) error { - repoFolder := strings.ReplaceAll(strings.ReplaceAll(repo, "/", "-"), "https://", "") - g.folder = filepath.Join(os.TempDir(), - repoFolder+"-"+shortid.MustGenerate()) - err := os.MkdirAll(g.folder, 0755) - if err != nil { - return err - } - - // Assumes remote address format is git@gitlab.com:micro-test/monorepo-test.git - remoteAddr := fmt.Sprintf("https://%v", repo) - if useCredentials { - remoteAddr = fmt.Sprintf("https://%v@%v", g.secrets[credentialsKey], repo) - } - - cmd := exec.Command("git", "clone", remoteAddr, "--depth=1", ".") - cmd.Dir = g.folder - outp, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Git clone failed: %v", string(outp)) - } - - cmd = exec.Command("git", "fetch", "origin", branchOrCommit, "--depth=1") - cmd.Dir = g.folder - outp, err = cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Git fetch failed: %v", string(outp)) - } - - cmd = exec.Command("git", "checkout", branchOrCommit) - cmd.Dir = g.folder - outp, err = cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("Git checkout failed: %v", string(outp)) - } - return nil -} - -func (g *binaryGitter) checkoutGithub(repo, branchOrCommit string) error { - // @todo if it's a commit it must not be checked out all the time - repoFolder := strings.ReplaceAll(strings.ReplaceAll(repo, "/", "-"), "https://", "") - g.folder = filepath.Join(os.TempDir(), - repoFolder+"-"+shortid.MustGenerate()) - - url := fmt.Sprintf("%v/archive/%v.zip", repo, branchOrCommit) - if !strings.HasPrefix(url, "https://") { - url = "https://" + url - } - client := &http.Client{} - req, _ := http.NewRequest("GET", url, nil) - if len(g.secrets[credentialsKey]) > 0 { - req.Header.Set("Authorization", "token "+g.secrets[credentialsKey]) - } - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("Can't get zip: %v", err) - } - - defer resp.Body.Close() - // Github returns 404 for tar.gz files... - // but still gives back a proper file so ignoring status code - // for now. - //if resp.StatusCode != 200 { - // return errors.New("Status code was not 200") - //} - - src := g.folder + ".zip" - // Create the file - out, err := os.Create(src) - if err != nil { - return fmt.Errorf("Can't create source file %v src: %v", src, err) - } - defer out.Close() - - // Write the body to file - _, err = io.Copy(out, resp.Body) - if err != nil { - return err - } - return unzip(src, g.folder, true) -} - -func (g *binaryGitter) checkoutGitLabPublic(repo, branchOrCommit string) error { - // Example: https://gitlab.com/micro-test/basic-micro-service/-/archive/master/basic-micro-service-master.tar.gz - // @todo if it's a commit it must not be checked out all the time - repoFolder := strings.ReplaceAll(strings.ReplaceAll(repo, "/", "-"), "https://", "") - g.folder = filepath.Join(os.TempDir(), - repoFolder+"-"+shortid.MustGenerate()) - - tarName := strings.ReplaceAll(strings.ReplaceAll(repo, "gitlab.com/", ""), "/", "-") - url := fmt.Sprintf("%v/-/archive/%v/%v.tar.gz", repo, branchOrCommit, tarName) - if !strings.HasPrefix(url, "https://") { - url = "https://" + url - } - client := &http.Client{} - req, _ := http.NewRequest("GET", url, nil) - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("Can't get zip: %v", err) - } - - defer resp.Body.Close() - - src := g.folder + ".tar.gz" - // Create the file - out, err := os.Create(src) - if err != nil { - return fmt.Errorf("Can't create source file %v src: %v", src, err) - } - defer out.Close() - - // Write the body to file - _, err = io.Copy(out, resp.Body) - if err != nil { - return err - } - err = Uncompress(src, g.folder) - if err != nil { - return err - } - // Gitlab zip/tar has contents inside a folder - // It has the format of eg. basic-micro-service-master-314b4a494ed472793e0a8bce8babbc69359aed7b - // Since we don't have the commit at this point we must list the dir - files, err := ioutil.ReadDir(g.folder) - if err != nil { - return err - } - if len(files) == 0 { - return fmt.Errorf("No contents in dir downloaded from gitlab: %v", g.folder) - } - g.folder = filepath.Join(g.folder, files[0].Name()) - return nil -} - -func (g *binaryGitter) checkoutGitLabPrivate(repo, branchOrCommit string) error { - git, err := gitlab.NewClient(g.secrets[credentialsKey]) - if err != nil { - return err - } - owned := true - projects, _, err := git.Projects.ListProjects(&gitlab.ListProjectsOptions{ - Owned: &owned, - }) - if err != nil { - return err - } - projectID := "" - for _, project := range projects { - if strings.Contains(repo, project.Name) { - projectID = fmt.Sprintf("%v", project.ID) - } - } - if len(projectID) == 0 { - return fmt.Errorf("Project id not found for repo %v", repo) - } - // Example URL: - // https://gitlab.com/api/v3/projects/0000000/repository/archive?private_token=XXXXXXXXXXXXXXXXXXXX - url := fmt.Sprintf("https://gitlab.com/api/v4/projects/%v/repository/archive?private_token=%v", projectID, g.secrets[credentialsKey]) - - client := &http.Client{} - req, _ := http.NewRequest("GET", url, nil) - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("Can't get zip: %v", err) - } - - defer resp.Body.Close() - - src := g.folder + ".tar.gz" - // Create the file - out, err := os.Create(src) - if err != nil { - return fmt.Errorf("Can't create source file %v src: %v", src, err) - } - defer out.Close() - - // Write the body to file - _, err = io.Copy(out, resp.Body) - if err != nil { - return err - } - err = Uncompress(src, g.folder) - if err != nil { - return err - } - // Gitlab zip/tar has contents inside a folder - // It has the format of eg. basic-micro-service-master-314b4a494ed472793e0a8bce8babbc69359aed7b - // Since we don't have the commit at this point we must list the dir - files, err := ioutil.ReadDir(g.folder) - if err != nil { - return err - } - if len(files) == 0 { - return fmt.Errorf("No contents in dir downloaded from gitlab: %v", g.folder) - } - g.folder = filepath.Join(g.folder, files[0].Name()) - return nil -} - -func (g *binaryGitter) RepoDir() string { - return g.folder -} - -func NewGitter(folder string, secrets map[string]string) Gitter { - return &binaryGitter{folder, secrets} - -} - -func commandExists(cmd string) bool { - _, err := exec.LookPath(cmd) - return err == nil -} - -func dirifyRepo(s string) string { - s = strings.ReplaceAll(s, "https://", "") - s = strings.ReplaceAll(s, "/", "-") - return s -} - -// exists returns whether the given file or directory exists -func pathExists(path string) (bool, error) { - _, err := os.Stat(path) - if err == nil { - return true, nil - } - if os.IsNotExist(err) { - return false, nil - } - return true, err -} - -// GetRepoRoot determines the repo root from a full path. -// Returns empty string and no error if not found -func GetRepoRoot(fullPath string) (string, error) { - // traverse parent directories - prev := fullPath - for { - current := prev - exists, err := pathExists(filepath.Join(current, ".git")) - if err != nil { - return "", err - } - if exists { - return current, nil - } - prev = filepath.Dir(current) - // reached top level, see: - // https://play.golang.org/p/rDgVdk3suzb - if current == prev { - break - } - } - return "", nil -} - -// Source is not just git related @todo move -type Source struct { - // is it a local folder intended for a local runtime? - Local bool - // absolute path to service folder in local mode - FullPath string - // path of folder to repo root - // be it local or github repo - Folder string - // github ref - Ref string - // for cloning purposes - // blank for local - Repo string - // dir to repo root - // blank for non local - LocalRepoRoot string -} - -// Name to be passed to RPC call runtime.Create Update Delete -// eg: `helloworld/api`, `crufter/myrepo/helloworld/api`, `localfolder` -func (s *Source) RuntimeName() string { - if len(s.Folder) == 0 { - // This is the case for top level url source ie. gitlab.com/micro-test/basic-micro-service - return path.Base(s.Repo) - } - return path.Base(s.Folder) -} - -// Source to be passed to RPC call runtime.Create Update Delete -// eg: `helloworld`, `github.com/crufter/myrepo/helloworld`, `/path/to/localrepo/localfolder` -func (s *Source) RuntimeSource() string { - if s.Local { - return s.FullPath - } - if len(s.Folder) == 0 { - return s.Repo - } - return fmt.Sprintf("%v/%v", s.Repo, s.Folder) -} - -// ParseSource parses a `micro run/update/kill` source. -func ParseSource(source string) (*Source, error) { - if !strings.Contains(source, "@") { - source += "@latest" - } - ret := &Source{} - refs := strings.Split(source, "@") - ret.Ref = refs[1] - parts := strings.Split(refs[0], "/") - ret.Repo = strings.Join(parts[0:3], "/") - if len(parts) > 1 { - ret.Folder = strings.Join(parts[3:], "/") - } - - return ret, nil -} - -// ParseSourceLocal a version of ParseSource that detects and handles local paths. -// Workdir should be used only from the CLI @todo better interface for this function. -// PathExistsFunc exists only for testing purposes, to make the function side effect free. -func ParseSourceLocal(workDir, source string, pathExistsFunc ...func(path string) (bool, error)) (*Source, error) { - var pexists func(string) (bool, error) - if len(pathExistsFunc) == 0 { - pexists = pathExists - } else { - pexists = pathExistsFunc[0] - } - isLocal, localFullPath := IsLocal(workDir, source, pexists) - if isLocal { - localRepoRoot, err := GetRepoRoot(localFullPath) - if err != nil { - return nil, err - } - var folder string - // If the local repo root is a top level folder, we are not in a git repo. - // In this case, we should take the last folder as folder name. - if localRepoRoot == "" { - folder = filepath.Base(localFullPath) - } else { - folder = strings.ReplaceAll(localFullPath, localRepoRoot+string(filepath.Separator), "") - } - - return &Source{ - Local: true, - Folder: folder, - FullPath: localFullPath, - LocalRepoRoot: localRepoRoot, - Ref: "latest", // @todo consider extracting branch from git here - }, nil - } - return ParseSource(source) -} - -// IsLocal tries returns true and full path of directory if the path is a local one, and -// false and empty string if not. -func IsLocal(workDir, source string, pathExistsFunc ...func(path string) (bool, error)) (bool, string) { - var pexists func(string) (bool, error) - if len(pathExistsFunc) == 0 { - pexists = pathExists - } else { - pexists = pathExistsFunc[0] - } - // Check for absolute path - // @todo "/" won't work for Windows - if exists, err := pexists(source); strings.HasPrefix(source, "/") && err == nil && exists { - return true, source - // Check for path relative to workdir - } else if exists, err := pexists(filepath.Join(workDir, source)); err == nil && exists { - return true, filepath.Join(workDir, source) - } - return false, "" -} - -// CheckoutSource for the local runtime server -// folder is the folder to check out the source code to -// Modifies source path to set it to checked out repo absolute path locally. -func CheckoutSource(folder string, source *Source, secrets map[string]string) error { - // if it's a local folder, do nothing - if exists, err := pathExists(source.FullPath); err == nil && exists { - return nil - } - gitter := NewGitter(folder, secrets) - repo := source.Repo - if !strings.Contains(repo, "https://") { - repo = "https://" + repo - } - err := gitter.Checkout(repo, source.Ref) - if err != nil { - return err - } - source.FullPath = filepath.Join(gitter.RepoDir(), source.Folder) - return nil -} - -// code below is not used yet - -var nameExtractRegexp = regexp.MustCompile(`((micro|web)\.Name\(")(.*)("\))`) - -func extractServiceName(fileContent []byte) string { - hits := nameExtractRegexp.FindAll(fileContent, 1) - if len(hits) == 0 { - return "" - } - hit := string(hits[0]) - return strings.Split(hit, "\"")[1] -} - -// Uncompress is a modified version of: https://gist.github.com/mimoo/25fc9716e0f1353791f5908f94d6e726 -func Uncompress(src string, dst string) error { - file, err := os.OpenFile(src, os.O_RDWR|os.O_CREATE, 0666) - if err != nil { - return err - } - defer file.Close() - - // ungzip - zr, err := gzip.NewReader(file) - if err != nil { - return err - } - // untar - tr := tar.NewReader(zr) - - // uncompress each element - for { - header, err := tr.Next() - if err == io.EOF { - break // End of archive - } - if err != nil { - return err - } - target := header.Name - - // validate name against path traversal - if !validRelPath(header.Name) { - return fmt.Errorf("tar contained invalid name error %q\n", target) - } - - // add dst + re-format slashes according to system - target = filepath.Join(dst, header.Name) - // if no join is needed, replace with ToSlash: - // target = filepath.ToSlash(header.Name) - - // check the type - switch header.Typeflag { - - // if its a dir and it doesn't exist create it (with 0755 permission) - case tar.TypeDir: - if _, err := os.Stat(target); err != nil { - // @todo think about this: - // if we don't nuke the folder, we might end up with files from - // the previous decompress. - if err := os.MkdirAll(target, 0755); err != nil { - return err - } - } - // if it's a file create it (with same permission) - case tar.TypeReg: - // the truncating is probably unnecessary due to the `RemoveAll` of folders - // above - fileToWrite, err := os.OpenFile(target, os.O_TRUNC|os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode)) - if err != nil { - return err - } - // copy over contents - if _, err := io.Copy(fileToWrite, tr); err != nil { - return err - } - // manually close here after each file operation; defering would cause each file close - // to wait until all operations have completed. - fileToWrite.Close() - } - } - return nil -} - -// check for path traversal and correct forward slashes -func validRelPath(p string) bool { - if p == "" || strings.Contains(p, `\`) || strings.HasPrefix(p, "/") || strings.Contains(p, "../") { - return false - } - return true -} - -// taken from https://stackoverflow.com/questions/20357223/easy-way-to-unzip-file-with-golang -func unzip(src, dest string, skipTopFolder bool) error { - r, err := zip.OpenReader(src) - if err != nil { - return err - } - defer func() { - if err = r.Close(); err != nil { - logger.Errorf("failed to close reader: %v", err) - } - }() - - if err = os.MkdirAll(dest, 0755); err != nil { - return err - } - - // Closure to address file descriptors issue with all the deferred .Close() methods - extractAndWriteFile := func(f *zip.File) error { - rc, err := f.Open() - if err != nil { - return err - } - defer func() { - rc.Close() - }() - if skipTopFolder { - f.Name = strings.Join(strings.Split(f.Name, string(filepath.Separator))[1:], string(filepath.Separator)) - } - path := filepath.Join(dest, f.Name) - if f.FileInfo().IsDir() { - if err = os.MkdirAll(path, f.Mode()); err != nil { - return err - } - } else { - if err = os.MkdirAll(filepath.Dir(path), f.Mode()); err != nil { - return err - } - f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode()) - if err != nil { - return err - } - defer func() { - if err = f.Close(); err != nil { - logger.Errorf("failed to close file: %v", err) - } - }() - - _, err = io.Copy(f, rc) - if err != nil { - return err - } - } - return nil - } - - for _, f := range r.File { - err := extractAndWriteFile(f) - if err != nil { - return err - } - } - - return nil -} diff --git a/runtime/local/git/git_test.go b/runtime/local/git/git_test.go deleted file mode 100644 index 3ac011f2..00000000 --- a/runtime/local/git/git_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package git - -import ( - "testing" -) - -type parseCase struct { - source string - expected *Source -} - -func TestParseSource(t *testing.T) { - cases := []parseCase{ - { - source: "github.com/micro/services/helloworld", - expected: &Source{ - Repo: "github.com/micro/services", - Folder: "helloworld", - Ref: "latest", - }, - }, - { - source: "github.com/micro/services/helloworld", - expected: &Source{ - Repo: "github.com/micro/services", - Folder: "helloworld", - Ref: "latest", - }, - }, - { - source: "github.com/micro/services/helloworld@v1.12.1", - expected: &Source{ - Repo: "github.com/micro/services", - Folder: "helloworld", - Ref: "v1.12.1", - }, - }, - { - source: "github.com/micro/services/helloworld@branchname", - expected: &Source{ - Repo: "github.com/micro/services", - Folder: "helloworld", - Ref: "branchname", - }, - }, - { - source: "github.com/crufter/reponame/helloworld@branchname", - expected: &Source{ - Repo: "github.com/crufter/reponame", - Folder: "helloworld", - Ref: "branchname", - }, - }, - } - for i, c := range cases { - result, err := ParseSource(c.source) - if err != nil { - t.Fatalf("Failed case %v: %v", i, err) - } - if result.Folder != c.expected.Folder { - t.Fatalf("Folder does not match for '%v', expected '%v', got '%v'", i, c.expected.Folder, result.Folder) - } - if result.Repo != c.expected.Repo { - t.Fatalf("Repo address does not match for '%v', expected '%v', got '%v'", i, c.expected.Repo, result.Repo) - } - if result.Ref != c.expected.Ref { - t.Fatalf("Ref does not match for '%v', expected '%v', got '%v'", i, c.expected.Ref, result.Ref) - } - } -} - -type localParseCase struct { - source string - expected *Source - workDir string - pathExists bool -} - -func TestLocalParseSource(t *testing.T) { - cases := []localParseCase{ - { - source: ".", - expected: &Source{ - Folder: "folder2", - Ref: "latest", - }, - workDir: "/folder1/folder2", - pathExists: true, - }, - } - for i, c := range cases { - result, err := ParseSourceLocal(c.workDir, c.source, func(s string) (bool, error) { - return c.pathExists, nil - }) - if err != nil { - t.Fatalf("Failed case %v: %v", i, err) - } - if result.Folder != c.expected.Folder { - t.Fatalf("Folder does not match for '%v', expected '%v', got '%v'", i, c.expected.Folder, result.Folder) - } - if result.Repo != c.expected.Repo { - t.Fatalf("Repo address does not match for '%v', expected '%v', got '%v'", i, c.expected.Repo, result.Repo) - } - if result.Ref != c.expected.Ref { - t.Fatalf("Ref does not match for '%v', expected '%v', got '%v'", i, c.expected.Ref, result.Ref) - } - } -} - -type nameCase struct { - fileContent string - expected string -} - -func TestServiceNameExtract(t *testing.T) { - cases := []nameCase{ - { - fileContent: `func main() { - // New Service - service := micro.NewService( - micro.Name("go.micro.service.helloworld"), - micro.Version("latest"), - )`, - expected: "go.micro.service.helloworld", - }, - } - for i, c := range cases { - result := extractServiceName([]byte(c.fileContent)) - if result != c.expected { - t.Fatalf("Case %v, expected: %v, got: %v", i, c.expected, result) - } - } -} diff --git a/runtime/local/local.go b/runtime/local/local.go deleted file mode 100644 index 86150730..00000000 --- a/runtime/local/local.go +++ /dev/null @@ -1,635 +0,0 @@ -package local - -import ( - "errors" - "fmt" - "io" - "log" - "os" - "path/filepath" - "strings" - "sync" - "time" - - "github.com/hpcloud/tail" - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/runtime" - "github.com/unistack-org/micro/v3/runtime/local/git" -) - -// defaultNamespace to use if not provided as an option -const defaultNamespace = "default" - -var ( - // The directory for logs to be output - LogDir = filepath.Join(os.TempDir(), "micro", "logs") - // The source directory where code lives - SourceDir = filepath.Join(os.TempDir(), "micro", "uploads") -) - -type localRuntime struct { - sync.RWMutex - // options configure runtime - options runtime.Options - // used to stop the runtime - closed chan bool - // used to start new services - start chan *service - // indicates if we're running - running bool - // namespaces stores services grouped by namespace, e.g. namespaces["foo"]["go.micro.auth:latest"] - // would return the latest version of go.micro.auth from the foo namespace - namespaces map[string]map[string]*service -} - -// NewRuntime creates new local runtime and returns it -func NewRuntime(opts ...runtime.Option) runtime.Runtime { - // get default options - options := runtime.Options{} - - // apply requested options - for _, o := range opts { - o(&options) - } - - // make the logs directory - os.MkdirAll(LogDir, 0755) - - return &localRuntime{ - options: options, - closed: make(chan bool), - start: make(chan *service, 128), - namespaces: make(map[string]map[string]*service), - } -} - -func (r *localRuntime) checkoutSourceIfNeeded(s *runtime.Service, secrets map[string]string) error { - // Runtime service like config have no source. - // Skip checkout in that case - if len(s.Source) == 0 { - return nil - } - - // Incoming uploaded files have format lastfolder.tar.gz or - // lastfolder.tar.gz/relative/path - sourceParts := strings.Split(s.Source, "/") - compressedFilepath := filepath.Join(SourceDir, sourceParts[0]) - uncompressPath := strings.ReplaceAll(compressedFilepath, ".tar.gz", "") - if len(sourceParts) > 1 { - uncompressPath = filepath.Join(SourceDir, strings.ReplaceAll(sourceParts[0], ".tar.gz", "")) - } - - // check if the directory already exists - if ex, _ := exists(compressedFilepath); ex { - err := os.RemoveAll(uncompressPath) - if err != nil { - return err - } - err = os.MkdirAll(uncompressPath, 0777) - if err != nil { - return err - } - err = git.Uncompress(compressedFilepath, uncompressPath) - if err != nil { - return err - } - if len(sourceParts) > 1 { - lastFolderPart := s.Name - fullp := append([]string{uncompressPath}, sourceParts[1:]...) - s.Source = filepath.Join(append(fullp, lastFolderPart)...) - } else { - s.Source = uncompressPath - } - return nil - } - - source, err := git.ParseSourceLocal("", s.Source) - if err != nil { - return err - } - source.Ref = s.Version - - err = git.CheckoutSource(os.TempDir(), source, secrets) - if err != nil { - return err - } - s.Source = source.FullPath - return nil -} - -// Init initializes runtime options -func (r *localRuntime) Init(opts ...runtime.Option) error { - r.Lock() - defer r.Unlock() - - for _, o := range opts { - o(&r.options) - } - - return nil -} - -// run runs the runtime management loop -func (r *localRuntime) run(events <-chan runtime.Event) { - t := time.NewTicker(time.Second * 5) - defer t.Stop() - - // process event processes an incoming event - processEvent := func(event runtime.Event, service *service, ns string) error { - // get current vals - r.RLock() - name := service.Name - updated := service.updated - r.RUnlock() - - // only process if the timestamp is newer - if !event.Timestamp.After(updated) { - return nil - } - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime updating service %s in %v namespace", name, ns) - } - - // this will cause a delete followed by created - if err := r.Update(service.Service, runtime.UpdateNamespace(ns)); err != nil { - return err - } - - // update the local timestamp - r.Lock() - service.updated = updated - r.Unlock() - - return nil - } - - for { - select { - case <-t.C: - // check running services - r.RLock() - for _, sevices := range r.namespaces { - for _, service := range sevices { - if !service.ShouldStart() { - continue - } - - // TODO: check service error - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime starting %s", service.Name) - } - if err := service.Start(); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error starting %s: %v", service.Name, err) - } - } - } - } - r.RUnlock() - case service := <-r.start: - if !service.ShouldStart() { - continue - } - // TODO: check service error - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime starting service %s", service.Name) - } - if err := service.Start(); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error starting service %s: %v", service.Name, err) - } - } - case event := <-events: - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime received notification event: %v", event) - } - // NOTE: we only handle Update events for now - switch event.Type { - case runtime.Update: - if event.Service != nil { - ns := defaultNamespace - if event.Options != nil && len(event.Options.Namespace) > 0 { - ns = event.Options.Namespace - } - - r.RLock() - if _, ok := r.namespaces[ns]; !ok { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime unknown namespace: %s", ns) - } - r.RUnlock() - continue - } - service, ok := r.namespaces[ns][fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)] - r.RUnlock() - if !ok { - logger.Debugf("Runtime unknown service: %s", event.Service) - } - - if err := processEvent(event, service, ns); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error updating service %s: %v", event.Service, err) - } - } - continue - } - - r.RLock() - namespaces := r.namespaces - r.RUnlock() - - // if blank service was received we update all services - for ns, services := range namespaces { - for _, service := range services { - if err := processEvent(event, service, ns); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime error updating service %s: %v", service.Name, err) - } - } - } - } - } - case <-r.closed: - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime stopped") - } - return - } - } -} - -func logFile(serviceName string) string { - // make the directory - name := strings.Replace(serviceName, "/", "-", -1) - return filepath.Join(LogDir, fmt.Sprintf("%v.log", name)) -} - -func serviceKey(s *runtime.Service) string { - return fmt.Sprintf("%v:%v", s.Name, s.Version) -} - -// Create creates a new service which is then started by runtime -func (r *localRuntime) Create(s *runtime.Service, opts ...runtime.CreateOption) error { - var options runtime.CreateOptions - for _, o := range opts { - o(&options) - } - err := r.checkoutSourceIfNeeded(s, options.Secrets) - if err != nil { - return err - } - r.Lock() - defer r.Unlock() - - if len(options.Namespace) == 0 { - options.Namespace = defaultNamespace - } - if len(options.Command) == 0 { - options.Command = []string{"go"} - options.Args = []string{"run", "."} - } - - // pass secrets as env vars - for key, value := range options.Secrets { - options.Env = append(options.Env, fmt.Sprintf("%v=%v", key, value)) - } - - if _, ok := r.namespaces[options.Namespace]; !ok { - r.namespaces[options.Namespace] = make(map[string]*service) - } - if _, ok := r.namespaces[options.Namespace][serviceKey(s)]; ok { - return errors.New("service already running") - } - - // create new service - service := newService(s, options) - - f, err := os.OpenFile(logFile(service.Name), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - log.Fatal(err) - } - - if service.output != nil { - service.output = io.MultiWriter(service.output, f) - } else { - service.output = f - } - // start the service - if err := service.Start(); err != nil { - return err - } - // save service - r.namespaces[options.Namespace][serviceKey(s)] = service - - return nil -} - -// exists returns whether the given file or directory exists -func exists(path string) (bool, error) { - _, err := os.Stat(path) - if err == nil { - return true, nil - } - if os.IsNotExist(err) { - return false, nil - } - return true, err -} - -// @todo: Getting existing lines is not supported yet. -// The reason for this is because it's hard to calculate line offset -// as opposed to character offset. -// This logger streams by default and only supports the `StreamCount` option. -func (r *localRuntime) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.Logs, error) { - lopts := runtime.LogsOptions{} - for _, o := range options { - o(&lopts) - } - ret := &logStream{ - service: s.Name, - stream: make(chan runtime.Log), - stop: make(chan bool), - } - - fpath := logFile(s.Name) - if ex, err := exists(fpath); err != nil { - return nil, err - } else if !ex { - return nil, fmt.Errorf("Logs not found for service %s", s.Name) - } - - // have to check file size to avoid too big of a seek - fi, err := os.Stat(fpath) - if err != nil { - return nil, err - } - size := fi.Size() - - whence := 2 - // Multiply by length of an average line of log in bytes - offset := lopts.Count * 200 - - if offset > size { - offset = size - } - offset *= -1 - - t, err := tail.TailFile(fpath, tail.Config{Follow: lopts.Stream, Location: &tail.SeekInfo{ - Whence: whence, - Offset: offset, - }, Logger: tail.DiscardingLogger}) - if err != nil { - return nil, err - } - - ret.tail = t - go func() { - for { - select { - case line, ok := <-t.Lines: - if !ok { - ret.Stop() - return - } - ret.stream <- runtime.Log{Message: line.Text} - case <-ret.stop: - return - } - } - - }() - return ret, nil -} - -type logStream struct { - tail *tail.Tail - service string - stream chan runtime.Log - sync.Mutex - stop chan bool - err error -} - -func (l *logStream) Chan() chan runtime.Log { - return l.stream -} - -func (l *logStream) Error() error { - return l.err -} - -func (l *logStream) Stop() error { - l.Lock() - defer l.Unlock() - - select { - case <-l.stop: - return nil - default: - close(l.stop) - close(l.stream) - err := l.tail.Stop() - if err != nil { - logger.Errorf("Error stopping tail: %v", err) - return err - } - } - return nil -} - -// Read returns all instances of requested service -// If no service name is provided we return all the track services. -func (r *localRuntime) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) { - r.Lock() - defer r.Unlock() - - gopts := runtime.ReadOptions{} - for _, o := range opts { - o(&gopts) - } - if len(gopts.Namespace) == 0 { - gopts.Namespace = defaultNamespace - } - - save := func(k, v string) bool { - if len(k) == 0 { - return true - } - return k == v - } - - //nolint:prealloc - var services []*runtime.Service - - if _, ok := r.namespaces[gopts.Namespace]; !ok { - return make([]*runtime.Service, 0), nil - } - - for _, service := range r.namespaces[gopts.Namespace] { - if !save(gopts.Service, service.Name) { - continue - } - if !save(gopts.Version, service.Version) { - continue - } - // TODO deal with service type - // no version has sbeen requested, just append the service - services = append(services, service.Service) - } - - return services, nil -} - -// Update attempts to update the service -func (r *localRuntime) Update(s *runtime.Service, opts ...runtime.UpdateOption) error { - var options runtime.UpdateOptions - for _, o := range opts { - o(&options) - } - err := r.checkoutSourceIfNeeded(s, options.Secrets) - if err != nil { - return err - } - - if len(options.Namespace) == 0 { - options.Namespace = defaultNamespace - } - - r.Lock() - srvs, ok := r.namespaces[options.Namespace] - r.Unlock() - if !ok { - return errors.New("Service not found") - } - - r.Lock() - service, ok := srvs[serviceKey(s)] - r.Unlock() - if !ok { - return errors.New("Service not found") - } - - if err := service.Stop(); err != nil && err.Error() != "no such process" { - logger.Errorf("Error stopping service %s: %s", service.Name, err) - return err - } - - return service.Start() -} - -// Delete removes the service from the runtime and stops it -func (r *localRuntime) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error { - r.Lock() - defer r.Unlock() - - var options runtime.DeleteOptions - for _, o := range opts { - o(&options) - } - if len(options.Namespace) == 0 { - options.Namespace = defaultNamespace - } - - srvs, ok := r.namespaces[options.Namespace] - if !ok { - return nil - } - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime deleting service %s", s.Name) - } - - service, ok := srvs[serviceKey(s)] - if !ok { - return nil - } - - // check if running - if !service.Running() { - delete(srvs, service.key()) - r.namespaces[options.Namespace] = srvs - return nil - } - // otherwise stop it - if err := service.Stop(); err != nil { - return err - } - // delete it - delete(srvs, service.key()) - r.namespaces[options.Namespace] = srvs - return nil -} - -// Start starts the runtime -func (r *localRuntime) Start() error { - r.Lock() - defer r.Unlock() - - // already running - if r.running { - return nil - } - - // set running - r.running = true - r.closed = make(chan bool) - - var events <-chan runtime.Event - if r.options.Scheduler != nil { - var err error - events, err = r.options.Scheduler.Notify() - if err != nil { - // TODO: should we bail here? - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to start update notifier") - } - } - } - - go r.run(events) - - return nil -} - -// Stop stops the runtime -func (r *localRuntime) Stop() error { - r.Lock() - defer r.Unlock() - - if !r.running { - return nil - } - - select { - case <-r.closed: - return nil - default: - close(r.closed) - - // set not running - r.running = false - - // stop all the services - for _, services := range r.namespaces { - for _, service := range services { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime stopping %s", service.Name) - } - service.Stop() - } - } - - // stop the scheduler - if r.options.Scheduler != nil { - return r.options.Scheduler.Close() - } - } - - return nil -} - -// String implements stringer interface -func (r *localRuntime) String() string { - return "local" -} diff --git a/runtime/local/process/options.go b/runtime/local/process/options.go deleted file mode 100644 index 0dcc6882..00000000 --- a/runtime/local/process/options.go +++ /dev/null @@ -1,5 +0,0 @@ -package process - -type Options struct{} - -type Option func(o *Options) diff --git a/runtime/local/process/os/os.go b/runtime/local/process/os/os.go deleted file mode 100644 index 3294a3c9..00000000 --- a/runtime/local/process/os/os.go +++ /dev/null @@ -1,94 +0,0 @@ -// +build !windows - -// Package os runs processes locally -package os - -import ( - "fmt" - "os" - "os/exec" - "strconv" - "syscall" - - "github.com/unistack-org/micro/v3/runtime/local/process" -) - -func (p *Process) Exec(exe *process.Binary) error { - cmd := exec.Command(exe.Package.Path) - cmd.Dir = exe.Dir - return cmd.Run() -} - -func (p *Process) Fork(exe *process.Binary) (*process.PID, error) { - // create command - cmd := exec.Command(exe.Package.Path, exe.Args...) - - cmd.Dir = exe.Dir - // set env vars - cmd.Env = append(cmd.Env, os.Environ()...) - cmd.Env = append(cmd.Env, exe.Env...) - - // create process group - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - in, err := cmd.StdinPipe() - if err != nil { - return nil, err - } - out, err := cmd.StdoutPipe() - if err != nil { - return nil, err - } - er, err := cmd.StderrPipe() - if err != nil { - return nil, err - } - // start the process - if err := cmd.Start(); err != nil { - return nil, err - } - - return &process.PID{ - ID: fmt.Sprintf("%d", cmd.Process.Pid), - Input: in, - Output: out, - Error: er, - }, nil -} - -func (p *Process) Kill(pid *process.PID) error { - id, err := strconv.Atoi(pid.ID) - if err != nil { - return err - } - if _, err := os.FindProcess(id); err != nil { - return err - } - - // now kill it - // using -ve PID kills the process group which we created in Fork() - return syscall.Kill(-id, syscall.SIGTERM) -} - -func (p *Process) Wait(pid *process.PID) error { - id, err := strconv.Atoi(pid.ID) - if err != nil { - return err - } - - pr, err := os.FindProcess(id) - if err != nil { - return err - } - - ps, err := pr.Wait() - if err != nil { - return err - } - - if ps.Success() { - return nil - } - - return fmt.Errorf(ps.String()) -} diff --git a/runtime/local/process/os/os_windows.go b/runtime/local/process/os/os_windows.go deleted file mode 100644 index fb6f2132..00000000 --- a/runtime/local/process/os/os_windows.go +++ /dev/null @@ -1,89 +0,0 @@ -// Package os runs processes locally -package os - -import ( - "fmt" - "os" - "os/exec" - "strconv" - - "github.com/unistack-org/micro/v3/runtime/local/process" -) - -func (p *Process) Exec(exe *process.Binary) error { - cmd := exec.Command(exe.Package.Path) - return cmd.Run() -} - -func (p *Process) Fork(exe *process.Binary) (*process.PID, error) { - // create command - cmd := exec.Command(exe.Package.Path, exe.Args...) - // set env vars - cmd.Env = append(cmd.Env, exe.Env...) - - in, err := cmd.StdinPipe() - if err != nil { - return nil, err - } - out, err := cmd.StdoutPipe() - if err != nil { - return nil, err - } - er, err := cmd.StderrPipe() - if err != nil { - return nil, err - } - - // start the process - if err := cmd.Start(); err != nil { - return nil, err - } - - return &process.PID{ - ID: fmt.Sprintf("%d", cmd.Process.Pid), - Input: in, - Output: out, - Error: er, - }, nil -} - -func (p *Process) Kill(pid *process.PID) error { - id, err := strconv.Atoi(pid.ID) - if err != nil { - return err - } - - pr, err := os.FindProcess(id) - if err != nil { - return err - } - - // now kill it - err = pr.Kill() - - // return the kill error - return err -} - -func (p *Process) Wait(pid *process.PID) error { - id, err := strconv.Atoi(pid.ID) - if err != nil { - return err - } - - pr, err := os.FindProcess(id) - if err != nil { - return err - } - - ps, err := pr.Wait() - if err != nil { - return err - } - - if ps.Success() { - return nil - } - - return fmt.Errorf(ps.String()) -} diff --git a/runtime/local/process/os/process.go b/runtime/local/process/os/process.go deleted file mode 100644 index 84f9ffb2..00000000 --- a/runtime/local/process/os/process.go +++ /dev/null @@ -1,12 +0,0 @@ -// Package os runs processes locally -package os - -import ( - "github.com/unistack-org/micro/v3/runtime/local/process" -) - -type Process struct{} - -func NewProcess(opts ...process.Option) process.Process { - return &Process{} -} diff --git a/runtime/local/process/process.go b/runtime/local/process/process.go deleted file mode 100644 index 08d2385b..00000000 --- a/runtime/local/process/process.go +++ /dev/null @@ -1,43 +0,0 @@ -// Package process executes a binary -package process - -import ( - "io" - - "github.com/unistack-org/micro/v3/build" -) - -// Process manages a running process -type Process interface { - // Executes a process to completion - Exec(*Binary) error - // Creates a new process - Fork(*Binary) (*PID, error) - // Kills the process - Kill(*PID) error - // Waits for a process to exit - Wait(*PID) error -} - -type Binary struct { - // Package containing executable - Package *build.Package - // The env variables - Env []string - // Args to pass - Args []string - // Initial working directory - Dir string -} - -// PID is the running process -type PID struct { - // ID of the process - ID string - // Stdin - Input io.Writer - // Stdout - Output io.Reader - // Stderr - Error io.Reader -} diff --git a/runtime/local/service.go b/runtime/local/service.go deleted file mode 100644 index d23d80fb..00000000 --- a/runtime/local/service.go +++ /dev/null @@ -1,253 +0,0 @@ -package local - -import ( - "fmt" - "io" - "path/filepath" - "strconv" - "strings" - "sync" - "time" - - "github.com/unistack-org/micro/v3/build" - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/runtime" - "github.com/unistack-org/micro/v3/runtime/local/process" - proc "github.com/unistack-org/micro/v3/runtime/local/process/os" -) - -type service struct { - sync.RWMutex - - running bool - closed chan bool - err error - updated time.Time - - retries int - maxRetries int - - // output for logs - output io.Writer - - // service to manage - *runtime.Service - // process creator - Process *proc.Process - // Exec - Exec *process.Binary - // process pid - PID *process.PID -} - -func newService(s *runtime.Service, c runtime.CreateOptions) *service { - var exec string - var args []string - - // set command - exec = strings.Join(c.Command, " ") - args = c.Args - - dir := s.Source - - // For uploaded packages, we upload the whole repo - // so the correct working directory to do a `go run .` - // needs to include the relative path from the repo root - // which is the service name. - // - // Could use a better upload check. - if strings.Contains(s.Source, "uploads") { - // There are two cases to consider here: - // a., if the uploaded code comes from a repo - in this case - // the service name is the relative path. - // b., if the uploaded code comes from a non repo folder - - // in this case the service name is the folder name. - // Because of this, we only append the service name to the source in - // case `a` - if ex, err := exists(filepath.Join(s.Source, s.Name)); err == nil && ex { - dir = filepath.Join(s.Source, s.Name) - } - } - - return &service{ - Service: s, - Process: new(proc.Process), - Exec: &process.Binary{ - Package: &build.Package{ - Name: s.Name, - Path: exec, - }, - Env: c.Env, - Args: args, - Dir: dir, - }, - closed: make(chan bool), - output: c.Output, - updated: time.Now(), - maxRetries: c.Retries, - } -} - -func (s *service) streamOutput() { - go io.Copy(s.output, s.PID.Output) - go io.Copy(s.output, s.PID.Error) -} - -func (s *service) shouldStart() bool { - if s.running { - return false - } - return s.retries <= s.maxRetries -} - -func (s *service) key() string { - return fmt.Sprintf("%v:%v", s.Name, s.Version) -} - -func (s *service) ShouldStart() bool { - s.RLock() - defer s.RUnlock() - return s.shouldStart() -} - -func (s *service) Running() bool { - s.RLock() - defer s.RUnlock() - return s.running -} - -// Start starts the service -func (s *service) Start() error { - s.Lock() - defer s.Unlock() - - if !s.shouldStart() { - return nil - } - - // reset - s.err = nil - s.closed = make(chan bool) - s.retries = 0 - - if s.Metadata == nil { - s.Metadata = make(map[string]string) - } - s.Status("starting", nil) - - // TODO: pull source & build binary - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime service %s forking new process", s.Service.Name) - } - - p, err := s.Process.Fork(s.Exec) - if err != nil { - s.Status("error", err) - return err - } - // set the pid - s.PID = p - // set to running - s.running = true - // set status - s.Status("running", nil) - // set started - s.Metadata["started"] = time.Now().Format(time.RFC3339) - - if s.output != nil { - s.streamOutput() - } - - // wait and watch - go s.Wait() - - return nil -} - -// Status updates the status of the service. Assumes it's called under a lock as it mutates state -func (s *service) Status(status string, err error) { - s.Metadata["lastStatusUpdate"] = time.Now().Format(time.RFC3339) - s.Metadata["status"] = status - if err == nil { - delete(s.Metadata, "error") - return - } - s.Metadata["error"] = err.Error() - -} - -// Stop stops the service -func (s *service) Stop() error { - s.Lock() - defer s.Unlock() - - select { - case <-s.closed: - return nil - default: - close(s.closed) - s.running = false - s.retries = 0 - if s.PID == nil { - return nil - } - - // set status - s.Status("stopping", nil) - - // kill the process - err := s.Process.Kill(s.PID) - if err == nil { - // wait for it to exit - s.Process.Wait(s.PID) - } - - // set status - s.Status("stopped", err) - - // return the kill error - return err - } -} - -// Error returns the last error service has returned -func (s *service) Error() error { - s.RLock() - defer s.RUnlock() - return s.err -} - -// Wait waits for the service to finish running -func (s *service) Wait() { - // wait for process to exit - s.RLock() - thisPID := s.PID - s.RUnlock() - err := s.Process.Wait(thisPID) - - s.Lock() - defer s.Unlock() - - if s.PID.ID != thisPID.ID { - // trying to update when it's already been switched out, ignore - logger.Debugf("Trying to update a process status but PID doesn't match. Old %s, New %s. Skipping update.", thisPID.ID, s.PID.ID) - return - } - - // save the error - if err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Service %s terminated with error %s", s.Name, err) - } - s.retries++ - s.Status("error", err) - s.Metadata["retries"] = strconv.Itoa(s.retries) - - s.err = err - } else { - s.Status("done", nil) - } - - // no longer running - s.running = false -} diff --git a/runtime/local/source/go/golang.go b/runtime/local/source/go/golang.go deleted file mode 100644 index 1687b541..00000000 --- a/runtime/local/source/go/golang.go +++ /dev/null @@ -1,93 +0,0 @@ -// Package golang is a source for Go -package golang - -import ( - "os" - "os/exec" - "path/filepath" - "strings" - - "github.com/unistack-org/micro/v3/runtime/local/source" -) - -type Source struct { - Options source.Options - // Go Command - Cmd string - Path string -} - -func (g *Source) Fetch(url string) (*source.Repository, error) { - purl := url - - if parts := strings.Split(url, "://"); len(parts) > 1 { - purl = parts[len(parts)-1] - } - - // name of repo - name := filepath.Base(url) - // local path of repo - path := filepath.Join(g.Path, purl) - args := []string{"get", "-d", url, path} - - cmd := exec.Command(g.Cmd, args...) - if err := cmd.Run(); err != nil { - return nil, err - } - return &source.Repository{ - Name: name, - Path: path, - URL: url, - }, nil -} - -// Commit is not yet supported -func (g *Source) Commit(r *source.Repository) error { - return nil -} - -func (g *Source) String() string { - return "golang" -} - -// whichGo locates the go command -func whichGo() string { - // check GOROOT - if gr := os.Getenv("GOROOT"); len(gr) > 0 { - return filepath.Join(gr, "bin", "go") - } - - // check path - for _, p := range filepath.SplitList(os.Getenv("PATH")) { - bin := filepath.Join(p, "go") - if _, err := os.Stat(bin); err == nil { - return bin - } - } - - // best effort - return "go" -} - -func NewSource(opts ...source.Option) source.Source { - options := source.Options{ - Path: os.TempDir(), - } - for _, o := range opts { - o(&options) - } - - cmd := whichGo() - path := options.Path - - // point of no return - if len(cmd) == 0 { - panic("Could not find Go executable") - } - - return &Source{ - Options: options, - Cmd: cmd, - Path: path, - } -} diff --git a/runtime/local/source/options.go b/runtime/local/source/options.go deleted file mode 100644 index 2e842f33..00000000 --- a/runtime/local/source/options.go +++ /dev/null @@ -1,15 +0,0 @@ -package source - -type Options struct { - // local path to download source - Path string -} - -type Option func(o *Options) - -// Local path for repository -func Path(p string) Option { - return func(o *Options) { - o.Path = p - } -} diff --git a/runtime/local/source/source.go b/runtime/local/source/source.go deleted file mode 100644 index 2afa92e3..00000000 --- a/runtime/local/source/source.go +++ /dev/null @@ -1,22 +0,0 @@ -// Package source retrieves source code -package source - -// Source retrieves source code -type Source interface { - // Fetch repo from a url - Fetch(url string) (*Repository, error) - // Commit and upload repo - Commit(*Repository) error - // The sourcerer - String() string -} - -// Repository is the source repository -type Repository struct { - // Name or repo - Name string - // Local path where repo is stored - Path string - // URL from which repo was retrieved - URL string -}