move runtime implementations (#22)

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-08-25 15:55:30 +03:00 committed by GitHub
parent 67ab44593b
commit fc3794f548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 0 additions and 3173 deletions

4
go.mod
View File

@ -9,7 +9,6 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4-0.20190904040645-54cb57c252a1
github.com/evanphx/json-patch/v5 v5.0.0
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-acme/lego/v3 v3.4.0
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee
@ -19,7 +18,6 @@ require (
github.com/google/uuid v1.1.1
github.com/gorilla/handlers v1.4.2
github.com/hashicorp/hcl v1.0.0
github.com/hpcloud/tail v1.0.0
github.com/kr/pretty v0.2.0 // indirect
github.com/miekg/dns v1.1.27
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
@ -27,8 +25,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.7.0
github.com/stretchr/testify v1.5.1
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf
github.com/xanzy/go-gitlab v0.35.1
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899
golang.org/x/net v0.0.0-20200707034311-ab3426394381
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013

19
go.sum
View File

@ -80,8 +80,6 @@ github.com/exoscale/egoscale v0.18.1/go.mod h1:Z7OOdzzTOz1Q1PjQXumlz9Wn/CddH0zSY
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-acme/lego/v3 v3.4.0 h1:deB9NkelA+TfjGHVw8J7iKl/rMtffcGMWSMmptvMv0A=
@ -155,12 +153,6 @@ github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE=
github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-retryablehttp v0.6.4 h1:BbgctKO892xEyOXnGiaAwIoSq1QZ/SS4AhjoAh9DnfY=
github.com/hashicorp/go-retryablehttp v0.6.4/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
@ -285,16 +277,11 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf h1:Z2X3Os7oRzpdJ75iPqWZc0HeJWFYNCvKsfpQwFpRNTA=
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf/go.mod h1:M8agBzgqHIhgj7wEn9/0hJUZcrvt9VY+Ln+S1I5Mha0=
github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY=
github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/unistack-org/micro v1.18.0 h1:EbFiII0bKV0Xcua7o6J30MFmm4/g0Hv3ECOKzsUBihU=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA=
github.com/xanzy/go-gitlab v0.35.1 h1:jJSgT0NxjCvrSZf7Gvn2NxxV9xAYkTjYrKW8XwWhrfY=
github.com/xanzy/go-gitlab v0.35.1/go.mod h1:sPLojNBn68fMUWSxIJtdVVIP8uSBYqesTfDUseX11Ug=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.1.0/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
@ -340,7 +327,6 @@ golang.org/x/net v0.0.0-20180611182652-db08ff08e862/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181108082009-03003ca0c849/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -360,7 +346,6 @@ golang.org/x/net v0.0.0-20191027093000-83d349e8ac1a/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -391,7 +376,6 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
@ -403,8 +387,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@ -436,7 +418,6 @@ google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEn
google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
google.golang.org/api v0.14.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I=

View File

@ -1,714 +0,0 @@
// Package kubernetes implements kubernetes micro runtime
package kubernetes
import (
"encoding/base64"
"fmt"
"strings"
"sync"
"time"
"github.com/unistack-org/micro/v3/logger"
log "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/runtime"
"github.com/unistack-org/micro/v3/util/kubernetes/client"
)
// action to take on runtime service
type action int
type kubernetes struct {
sync.RWMutex
// options configure runtime
options runtime.Options
// indicates if we're running
running bool
// used to stop the runtime
closed chan bool
// client is kubernetes client
client client.Client
// namespaces which exist
namespaces []client.Namespace
}
// namespaceExists returns a boolean indicating if a namespace exists
func (k *kubernetes) namespaceExists(name string) (bool, error) {
// populate the cache
if k.namespaces == nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Populating namespace cache")
}
namespaceList := new(client.NamespaceList)
resource := &client.Resource{Kind: "namespace", Value: namespaceList}
if err := k.client.List(resource); err != nil {
return false, err
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Popualted namespace cache successfully with %v items", len(namespaceList.Items))
}
k.namespaces = namespaceList.Items
}
// check if the namespace exists in the cache
for _, n := range k.namespaces {
if n.Metadata.Name == name {
return true, nil
}
}
return false, nil
}
// createNamespace creates a new k8s namespace
func (k *kubernetes) createNamespace(namespace string) error {
ns := client.Namespace{Metadata: &client.Metadata{Name: namespace}}
err := k.client.Create(&client.Resource{Kind: "namespace", Value: ns})
// ignore err already exists
if err != nil && strings.Contains(err.Error(), "already exists") {
logger.Debugf("Ignoring ErrAlreadyExists for namespace %v: %v", namespace, err)
err = nil
}
// add to cache
if err == nil && k.namespaces != nil {
k.namespaces = append(k.namespaces, ns)
}
return err
}
// getService queries kubernetes for micro service
// NOTE: this function is not thread-safe
func (k *kubernetes) getService(labels map[string]string, opts ...client.GetOption) ([]*service, error) {
// get the service status
serviceList := new(client.ServiceList)
r := &client.Resource{
Kind: "service",
Value: serviceList,
}
opts = append(opts, client.GetLabels(labels))
// get the service from k8s
if err := k.client.Get(r, opts...); err != nil {
return nil, err
}
// get the deployment status
depList := new(client.DeploymentList)
d := &client.Resource{
Kind: "deployment",
Value: depList,
}
if err := k.client.Get(d, opts...); err != nil {
return nil, err
}
// get the pods from k8s
podList := new(client.PodList)
p := &client.Resource{
Kind: "pod",
Value: podList,
}
if err := k.client.Get(p, opts...); err != nil {
return nil, err
}
// service map
svcMap := make(map[string]*service)
// collect info from kubernetes service
for _, kservice := range serviceList.Items {
// name of the service
name := kservice.Metadata.Labels["name"]
// version of the service
version := kservice.Metadata.Labels["version"]
srv := &service{
Service: &runtime.Service{
Name: name,
Version: version,
Metadata: make(map[string]string),
},
kservice: &kservice,
}
// set the address
address := kservice.Spec.ClusterIP
port := kservice.Spec.Ports[0]
srv.Service.Metadata["address"] = fmt.Sprintf("%s:%d", address, port.Port)
// set the type of service
srv.Service.Metadata["type"] = kservice.Metadata.Labels["micro"]
// copy annotations metadata into service metadata
for k, v := range kservice.Metadata.Annotations {
srv.Service.Metadata[k] = v
}
// save as service
svcMap[name+version] = srv
}
// collect additional info from kubernetes deployment
for _, kdep := range depList.Items {
// name of the service
name := kdep.Metadata.Labels["name"]
// versio of the service
version := kdep.Metadata.Labels["version"]
// access existing service map based on name + version
if svc, ok := svcMap[name+version]; ok {
// we're expecting our own service name in metadata
if _, ok := kdep.Metadata.Annotations["name"]; !ok {
continue
}
// set the service name, version and source
// based on existing annotations we stored
svc.Service.Name = kdep.Metadata.Annotations["name"]
svc.Service.Version = kdep.Metadata.Annotations["version"]
svc.Service.Source = kdep.Metadata.Annotations["source"]
// delete from metadata
delete(kdep.Metadata.Annotations, "name")
delete(kdep.Metadata.Annotations, "version")
delete(kdep.Metadata.Annotations, "source")
// copy all annotations metadata into service metadata
for k, v := range kdep.Metadata.Annotations {
svc.Service.Metadata[k] = v
}
// parse out deployment status and inject into service metadata
if len(kdep.Status.Conditions) > 0 {
svc.Status(kdep.Status.Conditions[0].Type, nil)
svc.Metadata["started"] = kdep.Status.Conditions[0].LastUpdateTime
} else {
svc.Status("n/a", nil)
}
// get the real status
for _, item := range podList.Items {
var status string
// check the name
if item.Metadata.Labels["name"] != name {
continue
}
// check the version
if item.Metadata.Labels["version"] != version {
continue
}
switch item.Status.Phase {
case "Failed":
status = item.Status.Reason
default:
status = item.Status.Phase
}
// skip if we can't get the container
if len(item.Status.Containers) == 0 {
continue
}
// now try get a deeper status
state := item.Status.Containers[0].State
// set start time
if state.Running != nil {
svc.Metadata["started"] = state.Running.Started
}
// set status from waiting
if v := state.Waiting; v != nil {
if len(v.Reason) > 0 {
status = v.Reason
}
}
// TODO: set from terminated
svc.Status(status, nil)
}
// save deployment
svc.kdeploy = &kdep
}
}
// collect all the services and return
services := make([]*service, 0, len(serviceList.Items))
for _, service := range svcMap {
services = append(services, service)
}
return services, nil
}
// run runs the runtime management loop
func (k *kubernetes) run(events <-chan runtime.Event) {
t := time.NewTicker(time.Second * 10)
defer t.Stop()
for {
select {
case <-t.C:
// TODO: figure out what to do here
// - do we even need the ticker for k8s services?
case event := <-events:
// NOTE: we only handle Update events for now
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime received notification event: %v", event)
}
switch event.Type {
case runtime.Update:
// only process if there's an actual service
// we do not update all the things individually
if event.Service == nil {
continue
}
// format the name
name := client.Format(event.Service.Name)
// set the default labels
labels := map[string]string{
"micro": k.options.Type,
"name": name,
}
if len(event.Service.Version) > 0 {
labels["version"] = event.Service.Version
}
// get the deployment status
deployed := new(client.DeploymentList)
// get the existing service rather than creating a new one
err := k.client.Get(&client.Resource{
Kind: "deployment",
Value: deployed,
}, client.GetLabels(labels))
if err != nil {
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime update failed to get service %s: %v", event.Service, err)
}
continue
}
// technically we should not receive multiple versions but hey ho
for _, service := range deployed.Items {
// check the name matches
if service.Metadata.Name != name {
continue
}
// update build time annotation
if service.Spec.Template.Metadata.Annotations == nil {
service.Spec.Template.Metadata.Annotations = make(map[string]string)
}
// update the build time
service.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", event.Timestamp.Unix())
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name)
}
if err := k.client.Update(deploymentResource(&service)); err != nil {
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime failed to update service %s: %v", event.Service, err)
}
continue
}
}
}
case <-k.closed:
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime stopped")
}
return
}
}
}
// Init initializes runtime options
func (k *kubernetes) Init(opts ...runtime.Option) error {
k.Lock()
defer k.Unlock()
for _, o := range opts {
o(&k.options)
}
return nil
}
func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.Logs, error) {
klo := newLog(k.client, s.Name, options...)
if !klo.options.Stream {
records, err := klo.Read()
if err != nil {
log.Errorf("Failed to get logs for service '%v' from k8s: %v", s.Name, err)
return nil, err
}
kstream := &kubeStream{
stream: make(chan runtime.Log),
stop: make(chan bool),
}
go func() {
for _, record := range records {
kstream.Chan() <- record
}
kstream.Stop()
}()
return kstream, nil
}
stream, err := klo.Stream()
if err != nil {
return nil, err
}
return stream, nil
}
type kubeStream struct {
// the k8s log stream
stream chan runtime.Log
// the stop chan
sync.Mutex
stop chan bool
err error
}
func (k *kubeStream) Error() error {
return k.err
}
func (k *kubeStream) Chan() chan runtime.Log {
return k.stream
}
func (k *kubeStream) Stop() error {
k.Lock()
defer k.Unlock()
select {
case <-k.stop:
return nil
default:
close(k.stop)
close(k.stream)
}
return nil
}
// Creates a service
func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
k.Lock()
defer k.Unlock()
options := runtime.CreateOptions{
Type: k.options.Type,
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
// default type if it doesn't exist
if len(options.Type) == 0 {
options.Type = k.options.Type
}
// default the source if it doesn't exist
if len(s.Source) == 0 {
s.Source = k.options.Source
}
// ensure the namespace exists
namespace := client.SerializeResourceName(options.Namespace)
// only do this if the namespace is not default
if namespace != "default" {
if exist, err := k.namespaceExists(namespace); err == nil && !exist {
if err := k.createNamespace(namespace); err != nil {
if logger.V(logger.WarnLevel, logger.DefaultLogger) {
logger.Warnf("Error creating namespacr %v: %v", namespace, err)
}
return err
}
} else if err != nil {
if logger.V(logger.WarnLevel, logger.DefaultLogger) {
logger.Warnf("Error checking namespace %v exists: %v", namespace, err)
}
return err
}
}
// determine the image from the source and options
options.Image = k.getImage(s, options)
// create a secret for the credentials if some where provided
if len(options.Secrets) > 0 {
if err := k.createCredentials(s, options); err != nil {
if logger.V(logger.WarnLevel, logger.DefaultLogger) {
logger.Warnf("Error generating auth credentials for service: %v", err)
}
return err
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Generated auth credentials for service %v", s.Name)
}
}
// create new service
service := newService(s, options)
// start the service
return service.Start(k.client, client.CreateNamespace(options.Namespace))
}
// Read returns all instances of given service
func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) {
k.Lock()
defer k.Unlock()
// set the default labels
labels := map[string]string{}
options := runtime.ReadOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
if len(options.Service) > 0 {
labels["name"] = client.Format(options.Service)
}
// add version to labels if a version has been supplied
if len(options.Version) > 0 {
labels["version"] = options.Version
}
if len(options.Type) > 0 {
labels["micro"] = options.Type
}
srvs, err := k.getService(labels, client.GetNamespace(options.Namespace))
if err != nil {
return nil, err
}
services := make([]*runtime.Service, 0, len(srvs))
for _, service := range srvs {
services = append(services, service.Service)
}
return services, nil
}
// Update the service in place
func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) error {
options := runtime.UpdateOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
labels := map[string]string{}
if len(s.Name) > 0 {
labels["name"] = client.Format(s.Name)
}
if len(s.Version) > 0 {
labels["version"] = s.Version
}
// get the existing service
services, err := k.getService(labels, client.GetNamespace(options.Namespace))
if err != nil {
return err
}
// update the relevant services
for _, service := range services {
// nil check
if service.kdeploy.Metadata == nil || service.kdeploy.Metadata.Annotations == nil {
md := new(client.Metadata)
md.Annotations = make(map[string]string)
service.kdeploy.Metadata = md
}
// update metadata
for k, v := range s.Metadata {
service.kdeploy.Metadata.Annotations[k] = v
}
// update build time annotation
service.kdeploy.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix())
// update the service
if err := service.Update(k.client, client.UpdateNamespace(options.Namespace)); err != nil {
return err
}
}
return nil
}
// Delete removes a service
func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error {
options := runtime.DeleteOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
k.Lock()
defer k.Unlock()
// create new kubernetes micro service
service := newService(s, runtime.CreateOptions{
Type: k.options.Type,
Namespace: options.Namespace,
})
// delete the service credentials
ns := client.DeleteNamespace(options.Namespace)
k.client.Delete(&client.Resource{Name: credentialsName(s), Kind: "secret"}, ns)
return service.Stop(k.client, ns)
}
// Start starts the runtime
func (k *kubernetes) Start() error {
k.Lock()
defer k.Unlock()
// already running
if k.running {
return nil
}
// set running
k.running = true
k.closed = make(chan bool)
var events <-chan runtime.Event
if k.options.Scheduler != nil {
var err error
events, err = k.options.Scheduler.Notify()
if err != nil {
// TODO: should we bail here?
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime failed to start update notifier")
}
}
}
go k.run(events)
return nil
}
// Stop shuts down the runtime
func (k *kubernetes) Stop() error {
k.Lock()
defer k.Unlock()
if !k.running {
return nil
}
select {
case <-k.closed:
return nil
default:
close(k.closed)
// set not running
k.running = false
// stop the scheduler
if k.options.Scheduler != nil {
return k.options.Scheduler.Close()
}
}
return nil
}
// String implements stringer interface
func (k *kubernetes) String() string {
return "kubernetes"
}
// NewRuntime creates new kubernetes runtime
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
// get default options
options := runtime.Options{
// Create labels with type "micro": "service"
Type: "service",
}
// apply requested options
for _, o := range opts {
o(&options)
}
// kubernetes client
client := client.NewClusterClient()
return &kubernetes{
options: options,
closed: make(chan bool),
client: client,
}
}
func (k *kubernetes) getImage(s *runtime.Service, options runtime.CreateOptions) string {
// use the image when its specified
if len(options.Image) > 0 {
return options.Image
}
if len(k.options.Image) > 0 {
return k.options.Image
}
return ""
}
func (k *kubernetes) createCredentials(service *runtime.Service, options runtime.CreateOptions) error {
data := make(map[string]string, len(options.Secrets))
for key, value := range options.Secrets {
data[key] = base64.StdEncoding.EncodeToString([]byte(value))
}
// construct the k8s secret object
secret := &client.Secret{
Type: "Opaque",
Data: data,
Metadata: &client.Metadata{
Name: credentialsName(service),
Namespace: options.Namespace,
},
}
// crete the secret in kubernetes
name := credentialsName(service)
return k.client.Create(&client.Resource{
Kind: "secret", Name: name, Value: secret,
}, client.CreateNamespace(options.Namespace))
}
func credentialsName(service *runtime.Service) string {
name := fmt.Sprintf("%v-%v-credentials", service.Name, service.Version)
return client.SerializeResourceName(name)
}

View File

@ -1,206 +0,0 @@
// Package kubernetes taken from https://github.com/micro/go-micro/blob/master/debug/log/kubernetes/kubernetes.go
// There are some modifications compared to the other files as
// this package doesn't provide write functionality.
// With the write functionality gone, structured logs also go away.
package kubernetes
import (
"bufio"
"strconv"
"time"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/runtime"
"github.com/unistack-org/micro/v3/util/kubernetes/client"
"github.com/unistack-org/micro/v3/util/log"
)
type klog struct {
client client.Client
serviceName string
options runtime.LogsOptions
}
func (k *klog) podLogs(podName string, stream *kubeStream) error {
p := make(map[string]string)
p["follow"] = "true"
opts := []client.LogOption{
client.LogParams(p),
client.LogNamespace(k.options.Namespace),
}
// get the logs for the pod
body, err := k.client.Log(&client.Resource{
Name: podName,
Kind: "pod",
}, opts...)
if err != nil {
stream.err = err
stream.Stop()
return err
}
s := bufio.NewScanner(body)
defer body.Close()
for {
select {
case <-stream.stop:
return stream.Error()
default:
if s.Scan() {
record := runtime.Log{
Message: s.Text(),
}
select {
case stream.stream <- record:
case <-stream.stop:
return stream.Error()
}
} else {
// TODO: is there a blocking call
// rather than a sleep loop?
time.Sleep(time.Second)
}
}
}
}
func (k *klog) getMatchingPods() ([]string, error) {
r := &client.Resource{
Kind: "pod",
Value: new(client.PodList),
}
l := make(map[string]string)
l["name"] = client.Format(k.serviceName)
// TODO: specify micro:service
// l["micro"] = "service"
opts := []client.GetOption{
client.GetLabels(l),
client.GetNamespace(k.options.Namespace),
}
if err := k.client.Get(r, opts...); err != nil {
return nil, err
}
var matches []string
for _, p := range r.Value.(*client.PodList).Items {
// find labels that match the name
if p.Metadata.Labels["name"] == client.Format(k.serviceName) {
matches = append(matches, p.Metadata.Name)
}
}
return matches, nil
}
func (k *klog) Read() ([]runtime.Log, error) {
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
if len(pods) == 0 {
return nil, errors.NotFound("runtime.logs", "no such service")
}
var records []runtime.Log
for _, pod := range pods {
logParams := make(map[string]string)
//if !opts.Since.Equal(time.Time{}) {
// logParams["sinceSeconds"] = strconv.Itoa(int(time.Since(opts.Since).Seconds()))
//}
if k.options.Count != 0 {
logParams["tailLines"] = strconv.Itoa(int(k.options.Count))
}
if k.options.Stream {
logParams["follow"] = "true"
}
opts := []client.LogOption{
client.LogParams(logParams),
client.LogNamespace(k.options.Namespace),
}
logs, err := k.client.Log(&client.Resource{
Name: pod,
Kind: "pod",
}, opts...)
if err != nil {
return nil, err
}
defer logs.Close()
s := bufio.NewScanner(logs)
for s.Scan() {
record := runtime.Log{
Message: s.Text(),
}
// record.Metadata["pod"] = pod
records = append(records, record)
}
}
// sort the records
// sort.Slice(records, func(i, j int) bool { return records[i].Timestamp.Before(records[j].Timestamp) })
return records, nil
}
func (k *klog) Stream() (runtime.Logs, error) {
// find the matching pods
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
if len(pods) == 0 {
return nil, errors.NotFound("runtime.logs", "no such service")
}
stream := &kubeStream{
stream: make(chan runtime.Log),
stop: make(chan bool),
}
// stream from the individual pods
for _, pod := range pods {
go func(podName string) {
err := k.podLogs(podName, stream)
if err != nil {
log.Errorf("Error streaming from pod: %v", err)
}
}(pod)
}
return stream, nil
}
// NewLog returns a configured Kubernetes logger
func newLog(c client.Client, serviceName string, opts ...runtime.LogsOption) *klog {
options := runtime.LogsOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
klog := &klog{
serviceName: serviceName,
client: c,
options: options,
}
return klog
}

View File

@ -1,227 +0,0 @@
package kubernetes
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/runtime"
"github.com/unistack-org/micro/v3/util/kubernetes/api"
"github.com/unistack-org/micro/v3/util/kubernetes/client"
)
type service struct {
// service to manage
*runtime.Service
// Kubernetes service
kservice *client.Service
// Kubernetes deployment
kdeploy *client.Deployment
}
func parseError(err error) *api.Status {
status := new(api.Status)
json.Unmarshal([]byte(err.Error()), &status)
return status
}
func newService(s *runtime.Service, c runtime.CreateOptions) *service {
// use pre-formatted name/version
name := client.Format(s.Name)
version := client.Format(s.Version)
kservice := client.NewService(name, version, c.Type, c.Namespace)
kdeploy := client.NewDeployment(name, version, c.Type, c.Namespace)
// ensure the metadata is set
if kdeploy.Spec.Template.Metadata.Annotations == nil {
kdeploy.Spec.Template.Metadata.Annotations = make(map[string]string)
}
// create if non existent
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
// add the service metadata to the k8s labels, do this first so we
// don't override any labels used by the runtime, e.g. name
for k, v := range s.Metadata {
kdeploy.Metadata.Annotations[k] = v
}
// attach our values to the deployment; name, version, source
kdeploy.Metadata.Annotations["name"] = s.Name
kdeploy.Metadata.Annotations["version"] = s.Version
kdeploy.Metadata.Annotations["source"] = s.Source
// associate owner:group to be later augmented
kdeploy.Metadata.Annotations["owner"] = "micro"
kdeploy.Metadata.Annotations["group"] = "micro"
// update the deployment is a custom source is provided
if len(c.Image) > 0 {
for i := range kdeploy.Spec.Template.PodSpec.Containers {
kdeploy.Spec.Template.PodSpec.Containers[i].Image = c.Image
kdeploy.Spec.Template.PodSpec.Containers[i].Command = []string{}
kdeploy.Spec.Template.PodSpec.Containers[i].Args = []string{}
}
}
// define the environment values used by the container
env := make([]client.EnvVar, 0, len(c.Env))
for _, evar := range c.Env {
evarPair := strings.Split(evar, "=")
env = append(env, client.EnvVar{Name: evarPair[0], Value: evarPair[1]})
}
// if secrets were provided, pass them to the service
for key := range c.Secrets {
env = append(env, client.EnvVar{
Name: key,
ValueFrom: &client.EnvVarSource{
SecretKeyRef: &client.SecretKeySelector{
Name: credentialsName(s),
Key: key,
},
},
})
}
// if environment has been supplied update deployment default environment
if len(env) > 0 {
kdeploy.Spec.Template.PodSpec.Containers[0].Env = append(kdeploy.Spec.Template.PodSpec.Containers[0].Env, env...)
}
// set the command if specified
if len(c.Command) > 0 {
kdeploy.Spec.Template.PodSpec.Containers[0].Command = c.Command
}
if len(c.Args) > 0 {
kdeploy.Spec.Template.PodSpec.Containers[0].Args = c.Args
}
// apply resource limits
if c.Resources != nil {
resLimits := &client.ResourceLimits{}
if c.Resources.CPU > 0 {
resLimits.CPU = fmt.Sprintf("%vm", c.Resources.CPU)
}
if c.Resources.Mem > 0 {
resLimits.Memory = fmt.Sprintf("%vMi", c.Resources.Mem)
}
if c.Resources.Disk > 0 {
resLimits.EphemeralStorage = fmt.Sprintf("%vMi", c.Resources.Disk)
}
kdeploy.Spec.Template.PodSpec.Containers[0].Resources = &client.ResourceRequirements{Limits: resLimits}
}
return &service{
Service: s,
kservice: kservice,
kdeploy: kdeploy,
}
}
func deploymentResource(d *client.Deployment) *client.Resource {
return &client.Resource{
Name: d.Metadata.Name,
Kind: "deployment",
Value: d,
}
}
func serviceResource(s *client.Service) *client.Resource {
return &client.Resource{
Name: s.Metadata.Name,
Kind: "service",
Value: s,
}
}
// Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects
func (s *service) Start(k client.Client, opts ...client.CreateOption) error {
// create deployment first; if we fail, we dont create service
if err := k.Create(deploymentResource(s.kdeploy), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to create deployment: %v", err)
}
s.Status("error", err)
v := parseError(err)
if v.Reason == "AlreadyExists" {
return runtime.ErrAlreadyExists
}
return err
}
// create service now that the deployment has been created
if err := k.Create(serviceResource(s.kservice), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to create service: %v", err)
}
s.Status("error", err)
v := parseError(err)
if v.Reason == "AlreadyExists" {
return runtime.ErrAlreadyExists
}
return err
}
s.Status("started", nil)
return nil
}
func (s *service) Stop(k client.Client, opts ...client.DeleteOption) error {
// first attempt to delete service
if err := k.Delete(serviceResource(s.kservice), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to delete service: %v", err)
}
s.Status("error", err)
return err
}
// delete deployment once the service has been deleted
if err := k.Delete(deploymentResource(s.kdeploy), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to delete deployment: %v", err)
}
s.Status("error", err)
return err
}
s.Status("stopped", nil)
return nil
}
func (s *service) Update(k client.Client, opts ...client.UpdateOption) error {
if err := k.Update(deploymentResource(s.kdeploy), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to update deployment: %v", err)
}
s.Status("error", err)
return err
}
if err := k.Update(serviceResource(s.kservice), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to update service: %v", err)
}
return err
}
return nil
}
func (s *service) Status(status string, err error) {
s.Metadata["lastStatusUpdate"] = time.Now().Format(time.RFC3339)
if err == nil {
s.Metadata["status"] = status
delete(s.Metadata, "error")
return
}
s.Metadata["status"] = "error"
s.Metadata["error"] = err.Error()
}

View File

@ -1,609 +0,0 @@
package git
import (
"archive/tar"
"archive/zip"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"strings"
"github.com/teris-io/shortid"
"github.com/unistack-org/micro/v3/logger"
"github.com/xanzy/go-gitlab"
)
const credentialsKey = "GIT_CREDENTIALS"
type Gitter interface {
Checkout(repo, branchOrCommit string) error
RepoDir() string
}
type binaryGitter struct {
folder string
secrets map[string]string
}
func (g *binaryGitter) Checkout(repo, branchOrCommit string) error {
// The implementation of this method is questionable.
// We use archives from github/gitlab etc which doesnt require the user to have got
// and probably is faster than downloading the whole repo history,
// but it comes with a bit of custom code for EACH host.
// @todo probably we should fall back to git in case the archives are not available.
if branchOrCommit == "latest" {
branchOrCommit = "master"
}
if strings.Contains(repo, "github") {
return g.checkoutGithub(repo, branchOrCommit)
} else if strings.Contains(repo, "gitlab") {
err := g.checkoutGitLabPublic(repo, branchOrCommit)
if err != nil && len(g.secrets[credentialsKey]) > 0 {
// If the public download fails, try getting it with tokens.
// Private downloads needs a token for api project listing, hence
// the weird structure of this code.
return g.checkoutGitLabPrivate(repo, branchOrCommit)
}
return err
}
if len(g.secrets[credentialsKey]) > 0 {
return g.checkoutAnyRemote(repo, branchOrCommit, true)
}
return g.checkoutAnyRemote(repo, branchOrCommit, false)
}
// This aims to be a generic checkout method. Currently only tested for bitbucket,
// see tests
func (g *binaryGitter) checkoutAnyRemote(repo, branchOrCommit string, useCredentials bool) error {
repoFolder := strings.ReplaceAll(strings.ReplaceAll(repo, "/", "-"), "https://", "")
g.folder = filepath.Join(os.TempDir(),
repoFolder+"-"+shortid.MustGenerate())
err := os.MkdirAll(g.folder, 0755)
if err != nil {
return err
}
// Assumes remote address format is git@gitlab.com:micro-test/monorepo-test.git
remoteAddr := fmt.Sprintf("https://%v", repo)
if useCredentials {
remoteAddr = fmt.Sprintf("https://%v@%v", g.secrets[credentialsKey], repo)
}
cmd := exec.Command("git", "clone", remoteAddr, "--depth=1", ".")
cmd.Dir = g.folder
outp, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Git clone failed: %v", string(outp))
}
cmd = exec.Command("git", "fetch", "origin", branchOrCommit, "--depth=1")
cmd.Dir = g.folder
outp, err = cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Git fetch failed: %v", string(outp))
}
cmd = exec.Command("git", "checkout", branchOrCommit)
cmd.Dir = g.folder
outp, err = cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Git checkout failed: %v", string(outp))
}
return nil
}
func (g *binaryGitter) checkoutGithub(repo, branchOrCommit string) error {
// @todo if it's a commit it must not be checked out all the time
repoFolder := strings.ReplaceAll(strings.ReplaceAll(repo, "/", "-"), "https://", "")
g.folder = filepath.Join(os.TempDir(),
repoFolder+"-"+shortid.MustGenerate())
url := fmt.Sprintf("%v/archive/%v.zip", repo, branchOrCommit)
if !strings.HasPrefix(url, "https://") {
url = "https://" + url
}
client := &http.Client{}
req, _ := http.NewRequest("GET", url, nil)
if len(g.secrets[credentialsKey]) > 0 {
req.Header.Set("Authorization", "token "+g.secrets[credentialsKey])
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("Can't get zip: %v", err)
}
defer resp.Body.Close()
// Github returns 404 for tar.gz files...
// but still gives back a proper file so ignoring status code
// for now.
//if resp.StatusCode != 200 {
// return errors.New("Status code was not 200")
//}
src := g.folder + ".zip"
// Create the file
out, err := os.Create(src)
if err != nil {
return fmt.Errorf("Can't create source file %v src: %v", src, err)
}
defer out.Close()
// Write the body to file
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
return unzip(src, g.folder, true)
}
func (g *binaryGitter) checkoutGitLabPublic(repo, branchOrCommit string) error {
// Example: https://gitlab.com/micro-test/basic-micro-service/-/archive/master/basic-micro-service-master.tar.gz
// @todo if it's a commit it must not be checked out all the time
repoFolder := strings.ReplaceAll(strings.ReplaceAll(repo, "/", "-"), "https://", "")
g.folder = filepath.Join(os.TempDir(),
repoFolder+"-"+shortid.MustGenerate())
tarName := strings.ReplaceAll(strings.ReplaceAll(repo, "gitlab.com/", ""), "/", "-")
url := fmt.Sprintf("%v/-/archive/%v/%v.tar.gz", repo, branchOrCommit, tarName)
if !strings.HasPrefix(url, "https://") {
url = "https://" + url
}
client := &http.Client{}
req, _ := http.NewRequest("GET", url, nil)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("Can't get zip: %v", err)
}
defer resp.Body.Close()
src := g.folder + ".tar.gz"
// Create the file
out, err := os.Create(src)
if err != nil {
return fmt.Errorf("Can't create source file %v src: %v", src, err)
}
defer out.Close()
// Write the body to file
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
err = Uncompress(src, g.folder)
if err != nil {
return err
}
// Gitlab zip/tar has contents inside a folder
// It has the format of eg. basic-micro-service-master-314b4a494ed472793e0a8bce8babbc69359aed7b
// Since we don't have the commit at this point we must list the dir
files, err := ioutil.ReadDir(g.folder)
if err != nil {
return err
}
if len(files) == 0 {
return fmt.Errorf("No contents in dir downloaded from gitlab: %v", g.folder)
}
g.folder = filepath.Join(g.folder, files[0].Name())
return nil
}
func (g *binaryGitter) checkoutGitLabPrivate(repo, branchOrCommit string) error {
git, err := gitlab.NewClient(g.secrets[credentialsKey])
if err != nil {
return err
}
owned := true
projects, _, err := git.Projects.ListProjects(&gitlab.ListProjectsOptions{
Owned: &owned,
})
if err != nil {
return err
}
projectID := ""
for _, project := range projects {
if strings.Contains(repo, project.Name) {
projectID = fmt.Sprintf("%v", project.ID)
}
}
if len(projectID) == 0 {
return fmt.Errorf("Project id not found for repo %v", repo)
}
// Example URL:
// https://gitlab.com/api/v3/projects/0000000/repository/archive?private_token=XXXXXXXXXXXXXXXXXXXX
url := fmt.Sprintf("https://gitlab.com/api/v4/projects/%v/repository/archive?private_token=%v", projectID, g.secrets[credentialsKey])
client := &http.Client{}
req, _ := http.NewRequest("GET", url, nil)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("Can't get zip: %v", err)
}
defer resp.Body.Close()
src := g.folder + ".tar.gz"
// Create the file
out, err := os.Create(src)
if err != nil {
return fmt.Errorf("Can't create source file %v src: %v", src, err)
}
defer out.Close()
// Write the body to file
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
err = Uncompress(src, g.folder)
if err != nil {
return err
}
// Gitlab zip/tar has contents inside a folder
// It has the format of eg. basic-micro-service-master-314b4a494ed472793e0a8bce8babbc69359aed7b
// Since we don't have the commit at this point we must list the dir
files, err := ioutil.ReadDir(g.folder)
if err != nil {
return err
}
if len(files) == 0 {
return fmt.Errorf("No contents in dir downloaded from gitlab: %v", g.folder)
}
g.folder = filepath.Join(g.folder, files[0].Name())
return nil
}
func (g *binaryGitter) RepoDir() string {
return g.folder
}
func NewGitter(folder string, secrets map[string]string) Gitter {
return &binaryGitter{folder, secrets}
}
func commandExists(cmd string) bool {
_, err := exec.LookPath(cmd)
return err == nil
}
func dirifyRepo(s string) string {
s = strings.ReplaceAll(s, "https://", "")
s = strings.ReplaceAll(s, "/", "-")
return s
}
// exists returns whether the given file or directory exists
func pathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
// GetRepoRoot determines the repo root from a full path.
// Returns empty string and no error if not found
func GetRepoRoot(fullPath string) (string, error) {
// traverse parent directories
prev := fullPath
for {
current := prev
exists, err := pathExists(filepath.Join(current, ".git"))
if err != nil {
return "", err
}
if exists {
return current, nil
}
prev = filepath.Dir(current)
// reached top level, see:
// https://play.golang.org/p/rDgVdk3suzb
if current == prev {
break
}
}
return "", nil
}
// Source is not just git related @todo move
type Source struct {
// is it a local folder intended for a local runtime?
Local bool
// absolute path to service folder in local mode
FullPath string
// path of folder to repo root
// be it local or github repo
Folder string
// github ref
Ref string
// for cloning purposes
// blank for local
Repo string
// dir to repo root
// blank for non local
LocalRepoRoot string
}
// Name to be passed to RPC call runtime.Create Update Delete
// eg: `helloworld/api`, `crufter/myrepo/helloworld/api`, `localfolder`
func (s *Source) RuntimeName() string {
if len(s.Folder) == 0 {
// This is the case for top level url source ie. gitlab.com/micro-test/basic-micro-service
return path.Base(s.Repo)
}
return path.Base(s.Folder)
}
// Source to be passed to RPC call runtime.Create Update Delete
// eg: `helloworld`, `github.com/crufter/myrepo/helloworld`, `/path/to/localrepo/localfolder`
func (s *Source) RuntimeSource() string {
if s.Local {
return s.FullPath
}
if len(s.Folder) == 0 {
return s.Repo
}
return fmt.Sprintf("%v/%v", s.Repo, s.Folder)
}
// ParseSource parses a `micro run/update/kill` source.
func ParseSource(source string) (*Source, error) {
if !strings.Contains(source, "@") {
source += "@latest"
}
ret := &Source{}
refs := strings.Split(source, "@")
ret.Ref = refs[1]
parts := strings.Split(refs[0], "/")
ret.Repo = strings.Join(parts[0:3], "/")
if len(parts) > 1 {
ret.Folder = strings.Join(parts[3:], "/")
}
return ret, nil
}
// ParseSourceLocal a version of ParseSource that detects and handles local paths.
// Workdir should be used only from the CLI @todo better interface for this function.
// PathExistsFunc exists only for testing purposes, to make the function side effect free.
func ParseSourceLocal(workDir, source string, pathExistsFunc ...func(path string) (bool, error)) (*Source, error) {
var pexists func(string) (bool, error)
if len(pathExistsFunc) == 0 {
pexists = pathExists
} else {
pexists = pathExistsFunc[0]
}
isLocal, localFullPath := IsLocal(workDir, source, pexists)
if isLocal {
localRepoRoot, err := GetRepoRoot(localFullPath)
if err != nil {
return nil, err
}
var folder string
// If the local repo root is a top level folder, we are not in a git repo.
// In this case, we should take the last folder as folder name.
if localRepoRoot == "" {
folder = filepath.Base(localFullPath)
} else {
folder = strings.ReplaceAll(localFullPath, localRepoRoot+string(filepath.Separator), "")
}
return &Source{
Local: true,
Folder: folder,
FullPath: localFullPath,
LocalRepoRoot: localRepoRoot,
Ref: "latest", // @todo consider extracting branch from git here
}, nil
}
return ParseSource(source)
}
// IsLocal tries returns true and full path of directory if the path is a local one, and
// false and empty string if not.
func IsLocal(workDir, source string, pathExistsFunc ...func(path string) (bool, error)) (bool, string) {
var pexists func(string) (bool, error)
if len(pathExistsFunc) == 0 {
pexists = pathExists
} else {
pexists = pathExistsFunc[0]
}
// Check for absolute path
// @todo "/" won't work for Windows
if exists, err := pexists(source); strings.HasPrefix(source, "/") && err == nil && exists {
return true, source
// Check for path relative to workdir
} else if exists, err := pexists(filepath.Join(workDir, source)); err == nil && exists {
return true, filepath.Join(workDir, source)
}
return false, ""
}
// CheckoutSource for the local runtime server
// folder is the folder to check out the source code to
// Modifies source path to set it to checked out repo absolute path locally.
func CheckoutSource(folder string, source *Source, secrets map[string]string) error {
// if it's a local folder, do nothing
if exists, err := pathExists(source.FullPath); err == nil && exists {
return nil
}
gitter := NewGitter(folder, secrets)
repo := source.Repo
if !strings.Contains(repo, "https://") {
repo = "https://" + repo
}
err := gitter.Checkout(repo, source.Ref)
if err != nil {
return err
}
source.FullPath = filepath.Join(gitter.RepoDir(), source.Folder)
return nil
}
// code below is not used yet
var nameExtractRegexp = regexp.MustCompile(`((micro|web)\.Name\(")(.*)("\))`)
func extractServiceName(fileContent []byte) string {
hits := nameExtractRegexp.FindAll(fileContent, 1)
if len(hits) == 0 {
return ""
}
hit := string(hits[0])
return strings.Split(hit, "\"")[1]
}
// Uncompress is a modified version of: https://gist.github.com/mimoo/25fc9716e0f1353791f5908f94d6e726
func Uncompress(src string, dst string) error {
file, err := os.OpenFile(src, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return err
}
defer file.Close()
// ungzip
zr, err := gzip.NewReader(file)
if err != nil {
return err
}
// untar
tr := tar.NewReader(zr)
// uncompress each element
for {
header, err := tr.Next()
if err == io.EOF {
break // End of archive
}
if err != nil {
return err
}
target := header.Name
// validate name against path traversal
if !validRelPath(header.Name) {
return fmt.Errorf("tar contained invalid name error %q\n", target)
}
// add dst + re-format slashes according to system
target = filepath.Join(dst, header.Name)
// if no join is needed, replace with ToSlash:
// target = filepath.ToSlash(header.Name)
// check the type
switch header.Typeflag {
// if its a dir and it doesn't exist create it (with 0755 permission)
case tar.TypeDir:
if _, err := os.Stat(target); err != nil {
// @todo think about this:
// if we don't nuke the folder, we might end up with files from
// the previous decompress.
if err := os.MkdirAll(target, 0755); err != nil {
return err
}
}
// if it's a file create it (with same permission)
case tar.TypeReg:
// the truncating is probably unnecessary due to the `RemoveAll` of folders
// above
fileToWrite, err := os.OpenFile(target, os.O_TRUNC|os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
if err != nil {
return err
}
// copy over contents
if _, err := io.Copy(fileToWrite, tr); err != nil {
return err
}
// manually close here after each file operation; defering would cause each file close
// to wait until all operations have completed.
fileToWrite.Close()
}
}
return nil
}
// check for path traversal and correct forward slashes
func validRelPath(p string) bool {
if p == "" || strings.Contains(p, `\`) || strings.HasPrefix(p, "/") || strings.Contains(p, "../") {
return false
}
return true
}
// taken from https://stackoverflow.com/questions/20357223/easy-way-to-unzip-file-with-golang
func unzip(src, dest string, skipTopFolder bool) error {
r, err := zip.OpenReader(src)
if err != nil {
return err
}
defer func() {
if err = r.Close(); err != nil {
logger.Errorf("failed to close reader: %v", err)
}
}()
if err = os.MkdirAll(dest, 0755); err != nil {
return err
}
// Closure to address file descriptors issue with all the deferred .Close() methods
extractAndWriteFile := func(f *zip.File) error {
rc, err := f.Open()
if err != nil {
return err
}
defer func() {
rc.Close()
}()
if skipTopFolder {
f.Name = strings.Join(strings.Split(f.Name, string(filepath.Separator))[1:], string(filepath.Separator))
}
path := filepath.Join(dest, f.Name)
if f.FileInfo().IsDir() {
if err = os.MkdirAll(path, f.Mode()); err != nil {
return err
}
} else {
if err = os.MkdirAll(filepath.Dir(path), f.Mode()); err != nil {
return err
}
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
}
defer func() {
if err = f.Close(); err != nil {
logger.Errorf("failed to close file: %v", err)
}
}()
_, err = io.Copy(f, rc)
if err != nil {
return err
}
}
return nil
}
for _, f := range r.File {
err := extractAndWriteFile(f)
if err != nil {
return err
}
}
return nil
}

View File

@ -1,133 +0,0 @@
package git
import (
"testing"
)
type parseCase struct {
source string
expected *Source
}
func TestParseSource(t *testing.T) {
cases := []parseCase{
{
source: "github.com/micro/services/helloworld",
expected: &Source{
Repo: "github.com/micro/services",
Folder: "helloworld",
Ref: "latest",
},
},
{
source: "github.com/micro/services/helloworld",
expected: &Source{
Repo: "github.com/micro/services",
Folder: "helloworld",
Ref: "latest",
},
},
{
source: "github.com/micro/services/helloworld@v1.12.1",
expected: &Source{
Repo: "github.com/micro/services",
Folder: "helloworld",
Ref: "v1.12.1",
},
},
{
source: "github.com/micro/services/helloworld@branchname",
expected: &Source{
Repo: "github.com/micro/services",
Folder: "helloworld",
Ref: "branchname",
},
},
{
source: "github.com/crufter/reponame/helloworld@branchname",
expected: &Source{
Repo: "github.com/crufter/reponame",
Folder: "helloworld",
Ref: "branchname",
},
},
}
for i, c := range cases {
result, err := ParseSource(c.source)
if err != nil {
t.Fatalf("Failed case %v: %v", i, err)
}
if result.Folder != c.expected.Folder {
t.Fatalf("Folder does not match for '%v', expected '%v', got '%v'", i, c.expected.Folder, result.Folder)
}
if result.Repo != c.expected.Repo {
t.Fatalf("Repo address does not match for '%v', expected '%v', got '%v'", i, c.expected.Repo, result.Repo)
}
if result.Ref != c.expected.Ref {
t.Fatalf("Ref does not match for '%v', expected '%v', got '%v'", i, c.expected.Ref, result.Ref)
}
}
}
type localParseCase struct {
source string
expected *Source
workDir string
pathExists bool
}
func TestLocalParseSource(t *testing.T) {
cases := []localParseCase{
{
source: ".",
expected: &Source{
Folder: "folder2",
Ref: "latest",
},
workDir: "/folder1/folder2",
pathExists: true,
},
}
for i, c := range cases {
result, err := ParseSourceLocal(c.workDir, c.source, func(s string) (bool, error) {
return c.pathExists, nil
})
if err != nil {
t.Fatalf("Failed case %v: %v", i, err)
}
if result.Folder != c.expected.Folder {
t.Fatalf("Folder does not match for '%v', expected '%v', got '%v'", i, c.expected.Folder, result.Folder)
}
if result.Repo != c.expected.Repo {
t.Fatalf("Repo address does not match for '%v', expected '%v', got '%v'", i, c.expected.Repo, result.Repo)
}
if result.Ref != c.expected.Ref {
t.Fatalf("Ref does not match for '%v', expected '%v', got '%v'", i, c.expected.Ref, result.Ref)
}
}
}
type nameCase struct {
fileContent string
expected string
}
func TestServiceNameExtract(t *testing.T) {
cases := []nameCase{
{
fileContent: `func main() {
// New Service
service := micro.NewService(
micro.Name("go.micro.service.helloworld"),
micro.Version("latest"),
)`,
expected: "go.micro.service.helloworld",
},
}
for i, c := range cases {
result := extractServiceName([]byte(c.fileContent))
if result != c.expected {
t.Fatalf("Case %v, expected: %v, got: %v", i, c.expected, result)
}
}
}

View File

@ -1,635 +0,0 @@
package local
import (
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/hpcloud/tail"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/runtime"
"github.com/unistack-org/micro/v3/runtime/local/git"
)
// defaultNamespace to use if not provided as an option
const defaultNamespace = "default"
var (
// The directory for logs to be output
LogDir = filepath.Join(os.TempDir(), "micro", "logs")
// The source directory where code lives
SourceDir = filepath.Join(os.TempDir(), "micro", "uploads")
)
type localRuntime struct {
sync.RWMutex
// options configure runtime
options runtime.Options
// used to stop the runtime
closed chan bool
// used to start new services
start chan *service
// indicates if we're running
running bool
// namespaces stores services grouped by namespace, e.g. namespaces["foo"]["go.micro.auth:latest"]
// would return the latest version of go.micro.auth from the foo namespace
namespaces map[string]map[string]*service
}
// NewRuntime creates new local runtime and returns it
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
// get default options
options := runtime.Options{}
// apply requested options
for _, o := range opts {
o(&options)
}
// make the logs directory
os.MkdirAll(LogDir, 0755)
return &localRuntime{
options: options,
closed: make(chan bool),
start: make(chan *service, 128),
namespaces: make(map[string]map[string]*service),
}
}
func (r *localRuntime) checkoutSourceIfNeeded(s *runtime.Service, secrets map[string]string) error {
// Runtime service like config have no source.
// Skip checkout in that case
if len(s.Source) == 0 {
return nil
}
// Incoming uploaded files have format lastfolder.tar.gz or
// lastfolder.tar.gz/relative/path
sourceParts := strings.Split(s.Source, "/")
compressedFilepath := filepath.Join(SourceDir, sourceParts[0])
uncompressPath := strings.ReplaceAll(compressedFilepath, ".tar.gz", "")
if len(sourceParts) > 1 {
uncompressPath = filepath.Join(SourceDir, strings.ReplaceAll(sourceParts[0], ".tar.gz", ""))
}
// check if the directory already exists
if ex, _ := exists(compressedFilepath); ex {
err := os.RemoveAll(uncompressPath)
if err != nil {
return err
}
err = os.MkdirAll(uncompressPath, 0777)
if err != nil {
return err
}
err = git.Uncompress(compressedFilepath, uncompressPath)
if err != nil {
return err
}
if len(sourceParts) > 1 {
lastFolderPart := s.Name
fullp := append([]string{uncompressPath}, sourceParts[1:]...)
s.Source = filepath.Join(append(fullp, lastFolderPart)...)
} else {
s.Source = uncompressPath
}
return nil
}
source, err := git.ParseSourceLocal("", s.Source)
if err != nil {
return err
}
source.Ref = s.Version
err = git.CheckoutSource(os.TempDir(), source, secrets)
if err != nil {
return err
}
s.Source = source.FullPath
return nil
}
// Init initializes runtime options
func (r *localRuntime) Init(opts ...runtime.Option) error {
r.Lock()
defer r.Unlock()
for _, o := range opts {
o(&r.options)
}
return nil
}
// run runs the runtime management loop
func (r *localRuntime) run(events <-chan runtime.Event) {
t := time.NewTicker(time.Second * 5)
defer t.Stop()
// process event processes an incoming event
processEvent := func(event runtime.Event, service *service, ns string) error {
// get current vals
r.RLock()
name := service.Name
updated := service.updated
r.RUnlock()
// only process if the timestamp is newer
if !event.Timestamp.After(updated) {
return nil
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime updating service %s in %v namespace", name, ns)
}
// this will cause a delete followed by created
if err := r.Update(service.Service, runtime.UpdateNamespace(ns)); err != nil {
return err
}
// update the local timestamp
r.Lock()
service.updated = updated
r.Unlock()
return nil
}
for {
select {
case <-t.C:
// check running services
r.RLock()
for _, sevices := range r.namespaces {
for _, service := range sevices {
if !service.ShouldStart() {
continue
}
// TODO: check service error
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime starting %s", service.Name)
}
if err := service.Start(); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error starting %s: %v", service.Name, err)
}
}
}
}
r.RUnlock()
case service := <-r.start:
if !service.ShouldStart() {
continue
}
// TODO: check service error
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime starting service %s", service.Name)
}
if err := service.Start(); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error starting service %s: %v", service.Name, err)
}
}
case event := <-events:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime received notification event: %v", event)
}
// NOTE: we only handle Update events for now
switch event.Type {
case runtime.Update:
if event.Service != nil {
ns := defaultNamespace
if event.Options != nil && len(event.Options.Namespace) > 0 {
ns = event.Options.Namespace
}
r.RLock()
if _, ok := r.namespaces[ns]; !ok {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime unknown namespace: %s", ns)
}
r.RUnlock()
continue
}
service, ok := r.namespaces[ns][fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)]
r.RUnlock()
if !ok {
logger.Debugf("Runtime unknown service: %s", event.Service)
}
if err := processEvent(event, service, ns); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error updating service %s: %v", event.Service, err)
}
}
continue
}
r.RLock()
namespaces := r.namespaces
r.RUnlock()
// if blank service was received we update all services
for ns, services := range namespaces {
for _, service := range services {
if err := processEvent(event, service, ns); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error updating service %s: %v", service.Name, err)
}
}
}
}
}
case <-r.closed:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime stopped")
}
return
}
}
}
func logFile(serviceName string) string {
// make the directory
name := strings.Replace(serviceName, "/", "-", -1)
return filepath.Join(LogDir, fmt.Sprintf("%v.log", name))
}
func serviceKey(s *runtime.Service) string {
return fmt.Sprintf("%v:%v", s.Name, s.Version)
}
// Create creates a new service which is then started by runtime
func (r *localRuntime) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
var options runtime.CreateOptions
for _, o := range opts {
o(&options)
}
err := r.checkoutSourceIfNeeded(s, options.Secrets)
if err != nil {
return err
}
r.Lock()
defer r.Unlock()
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
if len(options.Command) == 0 {
options.Command = []string{"go"}
options.Args = []string{"run", "."}
}
// pass secrets as env vars
for key, value := range options.Secrets {
options.Env = append(options.Env, fmt.Sprintf("%v=%v", key, value))
}
if _, ok := r.namespaces[options.Namespace]; !ok {
r.namespaces[options.Namespace] = make(map[string]*service)
}
if _, ok := r.namespaces[options.Namespace][serviceKey(s)]; ok {
return errors.New("service already running")
}
// create new service
service := newService(s, options)
f, err := os.OpenFile(logFile(service.Name), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
if service.output != nil {
service.output = io.MultiWriter(service.output, f)
} else {
service.output = f
}
// start the service
if err := service.Start(); err != nil {
return err
}
// save service
r.namespaces[options.Namespace][serviceKey(s)] = service
return nil
}
// exists returns whether the given file or directory exists
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
// @todo: Getting existing lines is not supported yet.
// The reason for this is because it's hard to calculate line offset
// as opposed to character offset.
// This logger streams by default and only supports the `StreamCount` option.
func (r *localRuntime) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.Logs, error) {
lopts := runtime.LogsOptions{}
for _, o := range options {
o(&lopts)
}
ret := &logStream{
service: s.Name,
stream: make(chan runtime.Log),
stop: make(chan bool),
}
fpath := logFile(s.Name)
if ex, err := exists(fpath); err != nil {
return nil, err
} else if !ex {
return nil, fmt.Errorf("Logs not found for service %s", s.Name)
}
// have to check file size to avoid too big of a seek
fi, err := os.Stat(fpath)
if err != nil {
return nil, err
}
size := fi.Size()
whence := 2
// Multiply by length of an average line of log in bytes
offset := lopts.Count * 200
if offset > size {
offset = size
}
offset *= -1
t, err := tail.TailFile(fpath, tail.Config{Follow: lopts.Stream, Location: &tail.SeekInfo{
Whence: whence,
Offset: offset,
}, Logger: tail.DiscardingLogger})
if err != nil {
return nil, err
}
ret.tail = t
go func() {
for {
select {
case line, ok := <-t.Lines:
if !ok {
ret.Stop()
return
}
ret.stream <- runtime.Log{Message: line.Text}
case <-ret.stop:
return
}
}
}()
return ret, nil
}
type logStream struct {
tail *tail.Tail
service string
stream chan runtime.Log
sync.Mutex
stop chan bool
err error
}
func (l *logStream) Chan() chan runtime.Log {
return l.stream
}
func (l *logStream) Error() error {
return l.err
}
func (l *logStream) Stop() error {
l.Lock()
defer l.Unlock()
select {
case <-l.stop:
return nil
default:
close(l.stop)
close(l.stream)
err := l.tail.Stop()
if err != nil {
logger.Errorf("Error stopping tail: %v", err)
return err
}
}
return nil
}
// Read returns all instances of requested service
// If no service name is provided we return all the track services.
func (r *localRuntime) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) {
r.Lock()
defer r.Unlock()
gopts := runtime.ReadOptions{}
for _, o := range opts {
o(&gopts)
}
if len(gopts.Namespace) == 0 {
gopts.Namespace = defaultNamespace
}
save := func(k, v string) bool {
if len(k) == 0 {
return true
}
return k == v
}
//nolint:prealloc
var services []*runtime.Service
if _, ok := r.namespaces[gopts.Namespace]; !ok {
return make([]*runtime.Service, 0), nil
}
for _, service := range r.namespaces[gopts.Namespace] {
if !save(gopts.Service, service.Name) {
continue
}
if !save(gopts.Version, service.Version) {
continue
}
// TODO deal with service type
// no version has sbeen requested, just append the service
services = append(services, service.Service)
}
return services, nil
}
// Update attempts to update the service
func (r *localRuntime) Update(s *runtime.Service, opts ...runtime.UpdateOption) error {
var options runtime.UpdateOptions
for _, o := range opts {
o(&options)
}
err := r.checkoutSourceIfNeeded(s, options.Secrets)
if err != nil {
return err
}
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
r.Lock()
srvs, ok := r.namespaces[options.Namespace]
r.Unlock()
if !ok {
return errors.New("Service not found")
}
r.Lock()
service, ok := srvs[serviceKey(s)]
r.Unlock()
if !ok {
return errors.New("Service not found")
}
if err := service.Stop(); err != nil && err.Error() != "no such process" {
logger.Errorf("Error stopping service %s: %s", service.Name, err)
return err
}
return service.Start()
}
// Delete removes the service from the runtime and stops it
func (r *localRuntime) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error {
r.Lock()
defer r.Unlock()
var options runtime.DeleteOptions
for _, o := range opts {
o(&options)
}
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
srvs, ok := r.namespaces[options.Namespace]
if !ok {
return nil
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime deleting service %s", s.Name)
}
service, ok := srvs[serviceKey(s)]
if !ok {
return nil
}
// check if running
if !service.Running() {
delete(srvs, service.key())
r.namespaces[options.Namespace] = srvs
return nil
}
// otherwise stop it
if err := service.Stop(); err != nil {
return err
}
// delete it
delete(srvs, service.key())
r.namespaces[options.Namespace] = srvs
return nil
}
// Start starts the runtime
func (r *localRuntime) Start() error {
r.Lock()
defer r.Unlock()
// already running
if r.running {
return nil
}
// set running
r.running = true
r.closed = make(chan bool)
var events <-chan runtime.Event
if r.options.Scheduler != nil {
var err error
events, err = r.options.Scheduler.Notify()
if err != nil {
// TODO: should we bail here?
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to start update notifier")
}
}
}
go r.run(events)
return nil
}
// Stop stops the runtime
func (r *localRuntime) Stop() error {
r.Lock()
defer r.Unlock()
if !r.running {
return nil
}
select {
case <-r.closed:
return nil
default:
close(r.closed)
// set not running
r.running = false
// stop all the services
for _, services := range r.namespaces {
for _, service := range services {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime stopping %s", service.Name)
}
service.Stop()
}
}
// stop the scheduler
if r.options.Scheduler != nil {
return r.options.Scheduler.Close()
}
}
return nil
}
// String implements stringer interface
func (r *localRuntime) String() string {
return "local"
}

View File

@ -1,5 +0,0 @@
package process
type Options struct{}
type Option func(o *Options)

View File

@ -1,94 +0,0 @@
// +build !windows
// Package os runs processes locally
package os
import (
"fmt"
"os"
"os/exec"
"strconv"
"syscall"
"github.com/unistack-org/micro/v3/runtime/local/process"
)
func (p *Process) Exec(exe *process.Binary) error {
cmd := exec.Command(exe.Package.Path)
cmd.Dir = exe.Dir
return cmd.Run()
}
func (p *Process) Fork(exe *process.Binary) (*process.PID, error) {
// create command
cmd := exec.Command(exe.Package.Path, exe.Args...)
cmd.Dir = exe.Dir
// set env vars
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, exe.Env...)
// create process group
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
in, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
out, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
er, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
// start the process
if err := cmd.Start(); err != nil {
return nil, err
}
return &process.PID{
ID: fmt.Sprintf("%d", cmd.Process.Pid),
Input: in,
Output: out,
Error: er,
}, nil
}
func (p *Process) Kill(pid *process.PID) error {
id, err := strconv.Atoi(pid.ID)
if err != nil {
return err
}
if _, err := os.FindProcess(id); err != nil {
return err
}
// now kill it
// using -ve PID kills the process group which we created in Fork()
return syscall.Kill(-id, syscall.SIGTERM)
}
func (p *Process) Wait(pid *process.PID) error {
id, err := strconv.Atoi(pid.ID)
if err != nil {
return err
}
pr, err := os.FindProcess(id)
if err != nil {
return err
}
ps, err := pr.Wait()
if err != nil {
return err
}
if ps.Success() {
return nil
}
return fmt.Errorf(ps.String())
}

View File

@ -1,89 +0,0 @@
// Package os runs processes locally
package os
import (
"fmt"
"os"
"os/exec"
"strconv"
"github.com/unistack-org/micro/v3/runtime/local/process"
)
func (p *Process) Exec(exe *process.Binary) error {
cmd := exec.Command(exe.Package.Path)
return cmd.Run()
}
func (p *Process) Fork(exe *process.Binary) (*process.PID, error) {
// create command
cmd := exec.Command(exe.Package.Path, exe.Args...)
// set env vars
cmd.Env = append(cmd.Env, exe.Env...)
in, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
out, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
er, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
// start the process
if err := cmd.Start(); err != nil {
return nil, err
}
return &process.PID{
ID: fmt.Sprintf("%d", cmd.Process.Pid),
Input: in,
Output: out,
Error: er,
}, nil
}
func (p *Process) Kill(pid *process.PID) error {
id, err := strconv.Atoi(pid.ID)
if err != nil {
return err
}
pr, err := os.FindProcess(id)
if err != nil {
return err
}
// now kill it
err = pr.Kill()
// return the kill error
return err
}
func (p *Process) Wait(pid *process.PID) error {
id, err := strconv.Atoi(pid.ID)
if err != nil {
return err
}
pr, err := os.FindProcess(id)
if err != nil {
return err
}
ps, err := pr.Wait()
if err != nil {
return err
}
if ps.Success() {
return nil
}
return fmt.Errorf(ps.String())
}

View File

@ -1,12 +0,0 @@
// Package os runs processes locally
package os
import (
"github.com/unistack-org/micro/v3/runtime/local/process"
)
type Process struct{}
func NewProcess(opts ...process.Option) process.Process {
return &Process{}
}

View File

@ -1,43 +0,0 @@
// Package process executes a binary
package process
import (
"io"
"github.com/unistack-org/micro/v3/build"
)
// Process manages a running process
type Process interface {
// Executes a process to completion
Exec(*Binary) error
// Creates a new process
Fork(*Binary) (*PID, error)
// Kills the process
Kill(*PID) error
// Waits for a process to exit
Wait(*PID) error
}
type Binary struct {
// Package containing executable
Package *build.Package
// The env variables
Env []string
// Args to pass
Args []string
// Initial working directory
Dir string
}
// PID is the running process
type PID struct {
// ID of the process
ID string
// Stdin
Input io.Writer
// Stdout
Output io.Reader
// Stderr
Error io.Reader
}

View File

@ -1,253 +0,0 @@
package local
import (
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/unistack-org/micro/v3/build"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/runtime"
"github.com/unistack-org/micro/v3/runtime/local/process"
proc "github.com/unistack-org/micro/v3/runtime/local/process/os"
)
type service struct {
sync.RWMutex
running bool
closed chan bool
err error
updated time.Time
retries int
maxRetries int
// output for logs
output io.Writer
// service to manage
*runtime.Service
// process creator
Process *proc.Process
// Exec
Exec *process.Binary
// process pid
PID *process.PID
}
func newService(s *runtime.Service, c runtime.CreateOptions) *service {
var exec string
var args []string
// set command
exec = strings.Join(c.Command, " ")
args = c.Args
dir := s.Source
// For uploaded packages, we upload the whole repo
// so the correct working directory to do a `go run .`
// needs to include the relative path from the repo root
// which is the service name.
//
// Could use a better upload check.
if strings.Contains(s.Source, "uploads") {
// There are two cases to consider here:
// a., if the uploaded code comes from a repo - in this case
// the service name is the relative path.
// b., if the uploaded code comes from a non repo folder -
// in this case the service name is the folder name.
// Because of this, we only append the service name to the source in
// case `a`
if ex, err := exists(filepath.Join(s.Source, s.Name)); err == nil && ex {
dir = filepath.Join(s.Source, s.Name)
}
}
return &service{
Service: s,
Process: new(proc.Process),
Exec: &process.Binary{
Package: &build.Package{
Name: s.Name,
Path: exec,
},
Env: c.Env,
Args: args,
Dir: dir,
},
closed: make(chan bool),
output: c.Output,
updated: time.Now(),
maxRetries: c.Retries,
}
}
func (s *service) streamOutput() {
go io.Copy(s.output, s.PID.Output)
go io.Copy(s.output, s.PID.Error)
}
func (s *service) shouldStart() bool {
if s.running {
return false
}
return s.retries <= s.maxRetries
}
func (s *service) key() string {
return fmt.Sprintf("%v:%v", s.Name, s.Version)
}
func (s *service) ShouldStart() bool {
s.RLock()
defer s.RUnlock()
return s.shouldStart()
}
func (s *service) Running() bool {
s.RLock()
defer s.RUnlock()
return s.running
}
// Start starts the service
func (s *service) Start() error {
s.Lock()
defer s.Unlock()
if !s.shouldStart() {
return nil
}
// reset
s.err = nil
s.closed = make(chan bool)
s.retries = 0
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
s.Status("starting", nil)
// TODO: pull source & build binary
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime service %s forking new process", s.Service.Name)
}
p, err := s.Process.Fork(s.Exec)
if err != nil {
s.Status("error", err)
return err
}
// set the pid
s.PID = p
// set to running
s.running = true
// set status
s.Status("running", nil)
// set started
s.Metadata["started"] = time.Now().Format(time.RFC3339)
if s.output != nil {
s.streamOutput()
}
// wait and watch
go s.Wait()
return nil
}
// Status updates the status of the service. Assumes it's called under a lock as it mutates state
func (s *service) Status(status string, err error) {
s.Metadata["lastStatusUpdate"] = time.Now().Format(time.RFC3339)
s.Metadata["status"] = status
if err == nil {
delete(s.Metadata, "error")
return
}
s.Metadata["error"] = err.Error()
}
// Stop stops the service
func (s *service) Stop() error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
return nil
default:
close(s.closed)
s.running = false
s.retries = 0
if s.PID == nil {
return nil
}
// set status
s.Status("stopping", nil)
// kill the process
err := s.Process.Kill(s.PID)
if err == nil {
// wait for it to exit
s.Process.Wait(s.PID)
}
// set status
s.Status("stopped", err)
// return the kill error
return err
}
}
// Error returns the last error service has returned
func (s *service) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
// Wait waits for the service to finish running
func (s *service) Wait() {
// wait for process to exit
s.RLock()
thisPID := s.PID
s.RUnlock()
err := s.Process.Wait(thisPID)
s.Lock()
defer s.Unlock()
if s.PID.ID != thisPID.ID {
// trying to update when it's already been switched out, ignore
logger.Debugf("Trying to update a process status but PID doesn't match. Old %s, New %s. Skipping update.", thisPID.ID, s.PID.ID)
return
}
// save the error
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Service %s terminated with error %s", s.Name, err)
}
s.retries++
s.Status("error", err)
s.Metadata["retries"] = strconv.Itoa(s.retries)
s.err = err
} else {
s.Status("done", nil)
}
// no longer running
s.running = false
}

View File

@ -1,93 +0,0 @@
// Package golang is a source for Go
package golang
import (
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/unistack-org/micro/v3/runtime/local/source"
)
type Source struct {
Options source.Options
// Go Command
Cmd string
Path string
}
func (g *Source) Fetch(url string) (*source.Repository, error) {
purl := url
if parts := strings.Split(url, "://"); len(parts) > 1 {
purl = parts[len(parts)-1]
}
// name of repo
name := filepath.Base(url)
// local path of repo
path := filepath.Join(g.Path, purl)
args := []string{"get", "-d", url, path}
cmd := exec.Command(g.Cmd, args...)
if err := cmd.Run(); err != nil {
return nil, err
}
return &source.Repository{
Name: name,
Path: path,
URL: url,
}, nil
}
// Commit is not yet supported
func (g *Source) Commit(r *source.Repository) error {
return nil
}
func (g *Source) String() string {
return "golang"
}
// whichGo locates the go command
func whichGo() string {
// check GOROOT
if gr := os.Getenv("GOROOT"); len(gr) > 0 {
return filepath.Join(gr, "bin", "go")
}
// check path
for _, p := range filepath.SplitList(os.Getenv("PATH")) {
bin := filepath.Join(p, "go")
if _, err := os.Stat(bin); err == nil {
return bin
}
}
// best effort
return "go"
}
func NewSource(opts ...source.Option) source.Source {
options := source.Options{
Path: os.TempDir(),
}
for _, o := range opts {
o(&options)
}
cmd := whichGo()
path := options.Path
// point of no return
if len(cmd) == 0 {
panic("Could not find Go executable")
}
return &Source{
Options: options,
Cmd: cmd,
Path: path,
}
}

View File

@ -1,15 +0,0 @@
package source
type Options struct {
// local path to download source
Path string
}
type Option func(o *Options)
// Local path for repository
func Path(p string) Option {
return func(o *Options) {
o.Path = p
}
}

View File

@ -1,22 +0,0 @@
// Package source retrieves source code
package source
// Source retrieves source code
type Source interface {
// Fetch repo from a url
Fetch(url string) (*Repository, error)
// Commit and upload repo
Commit(*Repository) error
// The sourcerer
String() string
}
// Repository is the source repository
type Repository struct {
// Name or repo
Name string
// Local path where repo is stored
Path string
// URL from which repo was retrieved
URL string
}