v3 refactor (#1868)

* Move to v3

Co-authored-by: Ben Toogood <bentoogood@gmail.com>
This commit is contained in:
Asim Aslam
2020-07-27 13:22:00 +01:00
committed by GitHub
parent 9dfeb98111
commit 563768b58a
424 changed files with 6383 additions and 22490 deletions

View File

@@ -2,7 +2,7 @@
package build
import (
"github.com/micro/go-micro/v2/runtime/local/source"
"github.com/micro/go-micro/v3/runtime/local/source"
)
// Builder builds binaries

View File

@@ -9,8 +9,8 @@ import (
"path/filepath"
docker "github.com/fsouza/go-dockerclient"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/runtime/local/build"
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/runtime/local/build"
)
type Builder struct {

View File

@@ -6,7 +6,7 @@ import (
"os/exec"
"path/filepath"
"github.com/micro/go-micro/v2/runtime/local/build"
"github.com/micro/go-micro/v3/runtime/local/build"
)
type Builder struct {

View File

@@ -1,11 +1,622 @@
// Package local provides a local runtime
package local
import (
"github.com/micro/go-micro/v2/runtime"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/hpcloud/tail"
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/runtime"
"github.com/micro/go-micro/v3/runtime/local/git"
)
// NewRuntime returns a new local runtime
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
return runtime.NewRuntime(opts...)
// defaultNamespace to use if not provided as an option
const defaultNamespace = "default"
type localRuntime struct {
sync.RWMutex
// options configure runtime
options runtime.Options
// used to stop the runtime
closed chan bool
// used to start new services
start chan *service
// indicates if we're running
running bool
// namespaces stores services grouped by namespace, e.g. namespaces["foo"]["go.micro.auth:latest"]
// would return the latest version of go.micro.auth from the foo namespace
namespaces map[string]map[string]*service
}
// NewRuntime creates new local runtime and returns it
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
// get default options
options := runtime.Options{}
// apply requested options
for _, o := range opts {
o(&options)
}
// make the logs directory
path := filepath.Join(os.TempDir(), "micro", "logs")
_ = os.MkdirAll(path, 0755)
return &localRuntime{
options: options,
closed: make(chan bool),
start: make(chan *service, 128),
namespaces: make(map[string]map[string]*service),
}
}
func (r *localRuntime) checkoutSourceIfNeeded(s *runtime.Service) error {
// Runtime service like config have no source.
// Skip checkout in that case
if len(s.Source) == 0 {
return nil
}
// @todo make this come from config
cpath := filepath.Join(os.TempDir(), "micro", "uploads", s.Source)
path := strings.ReplaceAll(cpath, ".tar.gz", "")
if ex, _ := exists(cpath); ex {
err := os.RemoveAll(path)
if err != nil {
return err
}
err = os.MkdirAll(path, 0777)
if err != nil {
return err
}
err = git.Uncompress(cpath, path)
if err != nil {
return err
}
s.Source = path
return nil
}
source, err := git.ParseSourceLocal("", s.Source)
if err != nil {
return err
}
source.Ref = s.Version
err = git.CheckoutSource(os.TempDir(), source)
if err != nil {
return err
}
s.Source = source.FullPath
return nil
}
// Init initializes runtime options
func (r *localRuntime) Init(opts ...runtime.Option) error {
r.Lock()
defer r.Unlock()
for _, o := range opts {
o(&r.options)
}
return nil
}
// run runs the runtime management loop
func (r *localRuntime) run(events <-chan runtime.Event) {
t := time.NewTicker(time.Second * 5)
defer t.Stop()
// process event processes an incoming event
processEvent := func(event runtime.Event, service *service, ns string) error {
// get current vals
r.RLock()
name := service.Name
updated := service.updated
r.RUnlock()
// only process if the timestamp is newer
if !event.Timestamp.After(updated) {
return nil
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime updating service %s in %v namespace", name, ns)
}
// this will cause a delete followed by created
if err := r.Update(service.Service, runtime.UpdateNamespace(ns)); err != nil {
return err
}
// update the local timestamp
r.Lock()
service.updated = updated
r.Unlock()
return nil
}
for {
select {
case <-t.C:
// check running services
r.RLock()
for _, sevices := range r.namespaces {
for _, service := range sevices {
if !service.ShouldStart() {
continue
}
// TODO: check service error
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime starting %s", service.Name)
}
if err := service.Start(); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error starting %s: %v", service.Name, err)
}
}
}
}
r.RUnlock()
case service := <-r.start:
if !service.ShouldStart() {
continue
}
// TODO: check service error
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime starting service %s", service.Name)
}
if err := service.Start(); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error starting service %s: %v", service.Name, err)
}
}
case event := <-events:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime received notification event: %v", event)
}
// NOTE: we only handle Update events for now
switch event.Type {
case runtime.Update:
if event.Service != nil {
ns := defaultNamespace
if event.Options != nil && len(event.Options.Namespace) > 0 {
ns = event.Options.Namespace
}
r.RLock()
if _, ok := r.namespaces[ns]; !ok {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime unknown namespace: %s", ns)
}
r.RUnlock()
continue
}
service, ok := r.namespaces[ns][fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)]
r.RUnlock()
if !ok {
logger.Debugf("Runtime unknown service: %s", event.Service)
}
if err := processEvent(event, service, ns); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error updating service %s: %v", event.Service, err)
}
}
continue
}
r.RLock()
namespaces := r.namespaces
r.RUnlock()
// if blank service was received we update all services
for ns, services := range namespaces {
for _, service := range services {
if err := processEvent(event, service, ns); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error updating service %s: %v", service.Name, err)
}
}
}
}
}
case <-r.closed:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime stopped")
}
return
}
}
}
func logFile(serviceName string) string {
// make the directory
name := strings.Replace(serviceName, "/", "-", -1)
path := filepath.Join(os.TempDir(), "micro", "logs")
return filepath.Join(path, fmt.Sprintf("%v.log", name))
}
func serviceKey(s *runtime.Service) string {
return fmt.Sprintf("%v:%v", s.Name, s.Version)
}
// Create creates a new service which is then started by runtime
func (r *localRuntime) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
err := r.checkoutSourceIfNeeded(s)
if err != nil {
return err
}
r.Lock()
defer r.Unlock()
var options runtime.CreateOptions
for _, o := range opts {
o(&options)
}
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
if len(options.Command) == 0 {
options.Command = []string{"go"}
options.Args = []string{"run", "."}
}
// pass credentials as env vars
if len(options.Credentials) > 0 {
// validate the creds
comps := strings.Split(options.Credentials, ":")
if len(comps) != 2 {
return errors.New("Invalid credentials, expected format 'user:pass'")
}
options.Env = append(options.Env, "MICRO_AUTH_ID", comps[0])
options.Env = append(options.Env, "MICRO_AUTH_SECRET", comps[1])
}
if _, ok := r.namespaces[options.Namespace]; !ok {
r.namespaces[options.Namespace] = make(map[string]*service)
}
if _, ok := r.namespaces[options.Namespace][serviceKey(s)]; ok {
return errors.New("service already running")
}
// create new service
service := newService(s, options)
f, err := os.OpenFile(logFile(service.Name), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
if service.output != nil {
service.output = io.MultiWriter(service.output, f)
} else {
service.output = f
}
// start the service
if err := service.Start(); err != nil {
return err
}
// save service
r.namespaces[options.Namespace][serviceKey(s)] = service
return nil
}
// exists returns whether the given file or directory exists
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
// @todo: Getting existing lines is not supported yet.
// The reason for this is because it's hard to calculate line offset
// as opposed to character offset.
// This logger streams by default and only supports the `StreamCount` option.
func (r *localRuntime) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.LogStream, error) {
lopts := runtime.LogsOptions{}
for _, o := range options {
o(&lopts)
}
ret := &logStream{
service: s.Name,
stream: make(chan runtime.LogRecord),
stop: make(chan bool),
}
fpath := logFile(s.Name)
if ex, err := exists(fpath); err != nil {
return nil, err
} else if !ex {
return nil, fmt.Errorf("Logs not found for service %s", s.Name)
}
// have to check file size to avoid too big of a seek
fi, err := os.Stat(fpath)
if err != nil {
return nil, err
}
size := fi.Size()
whence := 2
// Multiply by length of an average line of log in bytes
offset := lopts.Count * 200
if offset > size {
offset = size
}
offset *= -1
t, err := tail.TailFile(fpath, tail.Config{Follow: lopts.Stream, Location: &tail.SeekInfo{
Whence: whence,
Offset: int64(offset),
}, Logger: tail.DiscardingLogger})
if err != nil {
return nil, err
}
ret.tail = t
go func() {
for {
select {
case line, ok := <-t.Lines:
if !ok {
ret.Stop()
return
}
ret.stream <- runtime.LogRecord{Message: line.Text}
case <-ret.stop:
return
}
}
}()
return ret, nil
}
type logStream struct {
tail *tail.Tail
service string
stream chan runtime.LogRecord
sync.Mutex
stop chan bool
err error
}
func (l *logStream) Chan() chan runtime.LogRecord {
return l.stream
}
func (l *logStream) Error() error {
return l.err
}
func (l *logStream) Stop() error {
l.Lock()
defer l.Unlock()
select {
case <-l.stop:
return nil
default:
close(l.stop)
close(l.stream)
err := l.tail.Stop()
if err != nil {
logger.Errorf("Error stopping tail: %v", err)
return err
}
}
return nil
}
// Read returns all instances of requested service
// If no service name is provided we return all the track services.
func (r *localRuntime) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) {
r.Lock()
defer r.Unlock()
gopts := runtime.ReadOptions{}
for _, o := range opts {
o(&gopts)
}
if len(gopts.Namespace) == 0 {
gopts.Namespace = defaultNamespace
}
save := func(k, v string) bool {
if len(k) == 0 {
return true
}
return k == v
}
//nolint:prealloc
var services []*runtime.Service
if _, ok := r.namespaces[gopts.Namespace]; !ok {
return make([]*runtime.Service, 0), nil
}
for _, service := range r.namespaces[gopts.Namespace] {
if !save(gopts.Service, service.Name) {
continue
}
if !save(gopts.Version, service.Version) {
continue
}
// TODO deal with service type
// no version has sbeen requested, just append the service
services = append(services, service.Service)
}
return services, nil
}
// Update attempts to update the service
func (r *localRuntime) Update(s *runtime.Service, opts ...runtime.UpdateOption) error {
var options runtime.UpdateOptions
for _, o := range opts {
o(&options)
}
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
err := r.checkoutSourceIfNeeded(s)
if err != nil {
return err
}
r.Lock()
srvs, ok := r.namespaces[options.Namespace]
r.Unlock()
if !ok {
return errors.New("Service not found")
}
r.Lock()
service, ok := srvs[serviceKey(s)]
r.Unlock()
if !ok {
return errors.New("Service not found")
}
if err := service.Stop(); err != nil && err.Error() != "no such process" {
logger.Errorf("Error stopping service %s: %s", service.Name, err)
return err
}
return service.Start()
}
// Delete removes the service from the runtime and stops it
func (r *localRuntime) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error {
r.Lock()
defer r.Unlock()
var options runtime.DeleteOptions
for _, o := range opts {
o(&options)
}
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
srvs, ok := r.namespaces[options.Namespace]
if !ok {
return nil
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime deleting service %s", s.Name)
}
service, ok := srvs[serviceKey(s)]
if !ok {
return nil
}
// check if running
if !service.Running() {
delete(srvs, service.key())
r.namespaces[options.Namespace] = srvs
return nil
}
// otherwise stop it
if err := service.Stop(); err != nil {
return err
}
// delete it
delete(srvs, service.key())
r.namespaces[options.Namespace] = srvs
return nil
}
// Start starts the runtime
func (r *localRuntime) Start() error {
r.Lock()
defer r.Unlock()
// already running
if r.running {
return nil
}
// set running
r.running = true
r.closed = make(chan bool)
var events <-chan runtime.Event
if r.options.Scheduler != nil {
var err error
events, err = r.options.Scheduler.Notify()
if err != nil {
// TODO: should we bail here?
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to start update notifier")
}
}
}
go r.run(events)
return nil
}
// Stop stops the runtime
func (r *localRuntime) Stop() error {
r.Lock()
defer r.Unlock()
if !r.running {
return nil
}
select {
case <-r.closed:
return nil
default:
close(r.closed)
// set not running
r.running = false
// stop all the services
for _, services := range r.namespaces {
for _, service := range services {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime stopping %s", service.Name)
}
service.Stop()
}
}
// stop the scheduler
if r.options.Scheduler != nil {
return r.options.Scheduler.Close()
}
}
return nil
}
// String implements stringer interface
func (r *localRuntime) String() string {
return "local"
}

View File

@@ -10,7 +10,7 @@ import (
"strconv"
"syscall"
"github.com/micro/go-micro/v2/runtime/local/process"
"github.com/micro/go-micro/v3/runtime/local/process"
)
func (p *Process) Exec(exe *process.Executable) error {

View File

@@ -7,7 +7,7 @@ import (
"os/exec"
"strconv"
"github.com/micro/go-micro/v2/runtime/local/process"
"github.com/micro/go-micro/v3/runtime/local/process"
)
func (p *Process) Exec(exe *process.Executable) error {

View File

@@ -2,7 +2,7 @@
package os
import (
"github.com/micro/go-micro/v2/runtime/local/process"
"github.com/micro/go-micro/v3/runtime/local/process"
)
type Process struct{}

View File

@@ -4,7 +4,7 @@ package process
import (
"io"
"github.com/micro/go-micro/v2/runtime/local/build"
"github.com/micro/go-micro/v3/runtime/local/build"
)
// Process manages a running process

253
runtime/local/service.go Normal file
View File

@@ -0,0 +1,253 @@
package local
import (
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/runtime"
"github.com/micro/go-micro/v3/runtime/local/build"
"github.com/micro/go-micro/v3/runtime/local/process"
proc "github.com/micro/go-micro/v3/runtime/local/process/os"
)
type service struct {
sync.RWMutex
running bool
closed chan bool
err error
updated time.Time
retries int
maxRetries int
// output for logs
output io.Writer
// service to manage
*runtime.Service
// process creator
Process *proc.Process
// Exec
Exec *process.Executable
// process pid
PID *process.PID
}
func newService(s *runtime.Service, c runtime.CreateOptions) *service {
var exec string
var args []string
// set command
exec = strings.Join(c.Command, " ")
args = c.Args
dir := s.Source
// For uploaded packages, we upload the whole repo
// so the correct working directory to do a `go run .`
// needs to include the relative path from the repo root
// which is the service name.
//
// Could use a better upload check.
if strings.Contains(s.Source, "uploads") {
// There are two cases to consider here:
// a., if the uploaded code comes from a repo - in this case
// the service name is the relative path.
// b., if the uploaded code comes from a non repo folder -
// in this case the service name is the folder name.
// Because of this, we only append the service name to the source in
// case `a`
if ex, err := exists(filepath.Join(s.Source, s.Name)); err == nil && ex {
dir = filepath.Join(s.Source, s.Name)
}
}
return &service{
Service: s,
Process: new(proc.Process),
Exec: &process.Executable{
Package: &build.Package{
Name: s.Name,
Path: exec,
},
Env: c.Env,
Args: args,
Dir: dir,
},
closed: make(chan bool),
output: c.Output,
updated: time.Now(),
maxRetries: c.Retries,
}
}
func (s *service) streamOutput() {
go io.Copy(s.output, s.PID.Output)
go io.Copy(s.output, s.PID.Error)
}
func (s *service) shouldStart() bool {
if s.running {
return false
}
return s.retries <= s.maxRetries
}
func (s *service) key() string {
return fmt.Sprintf("%v:%v", s.Name, s.Version)
}
func (s *service) ShouldStart() bool {
s.RLock()
defer s.RUnlock()
return s.shouldStart()
}
func (s *service) Running() bool {
s.RLock()
defer s.RUnlock()
return s.running
}
// Start starts the service
func (s *service) Start() error {
s.Lock()
defer s.Unlock()
if !s.shouldStart() {
return nil
}
// reset
s.err = nil
s.closed = make(chan bool)
s.retries = 0
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
s.Status("starting", nil)
// TODO: pull source & build binary
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime service %s forking new process", s.Service.Name)
}
p, err := s.Process.Fork(s.Exec)
if err != nil {
s.Status("error", err)
return err
}
// set the pid
s.PID = p
// set to running
s.running = true
// set status
s.Status("running", nil)
// set started
s.Metadata["started"] = time.Now().Format(time.RFC3339)
if s.output != nil {
s.streamOutput()
}
// wait and watch
go s.Wait()
return nil
}
// Status updates the status of the service. Assumes it's called under a lock as it mutates state
func (s *service) Status(status string, err error) {
s.Metadata["lastStatusUpdate"] = time.Now().Format(time.RFC3339)
s.Metadata["status"] = status
if err == nil {
delete(s.Metadata, "error")
return
}
s.Metadata["error"] = err.Error()
}
// Stop stops the service
func (s *service) Stop() error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
return nil
default:
close(s.closed)
s.running = false
s.retries = 0
if s.PID == nil {
return nil
}
// set status
s.Status("stopping", nil)
// kill the process
err := s.Process.Kill(s.PID)
if err == nil {
// wait for it to exit
s.Process.Wait(s.PID)
}
// set status
s.Status("stopped", err)
// return the kill error
return err
}
}
// Error returns the last error service has returned
func (s *service) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
// Wait waits for the service to finish running
func (s *service) Wait() {
// wait for process to exit
s.RLock()
thisPID := s.PID
s.RUnlock()
err := s.Process.Wait(thisPID)
s.Lock()
defer s.Unlock()
if s.PID.ID != thisPID.ID {
// trying to update when it's already been switched out, ignore
logger.Debugf("Trying to update a process status but PID doesn't match. Old %s, New %s. Skipping update.", thisPID.ID, s.PID.ID)
return
}
// save the error
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Service %s terminated with error %s", s.Name, err)
}
s.retries++
s.Status("error", err)
s.Metadata["retries"] = strconv.Itoa(s.retries)
s.err = err
} else {
s.Status("done", nil)
}
// no longer running
s.running = false
}

View File

@@ -7,7 +7,7 @@ import (
"strings"
"github.com/go-git/go-git/v5"
"github.com/micro/go-micro/v2/runtime/local/source"
"github.com/micro/go-micro/v3/runtime/local/source"
)
// Source retrieves source code

View File

@@ -7,7 +7,7 @@ import (
"path/filepath"
"strings"
"github.com/micro/go-micro/v2/runtime/local/source"
"github.com/micro/go-micro/v3/runtime/local/source"
)
type Source struct {