micro/runtime/default.go

325 lines
5.1 KiB
Go

package runtime
import (
"errors"
"io"
"strings"
"sync"
"time"
"github.com/micro/go-micro/runtime/package"
"github.com/micro/go-micro/runtime/process"
proc "github.com/micro/go-micro/runtime/process/os"
"github.com/micro/go-micro/util/log"
)
type runtime struct {
sync.RWMutex
// used to stop the runtime
closed chan bool
// used to start new services
start chan *service
// indicates if we're running
running bool
// the service map
services map[string]*service
}
type service struct {
sync.RWMutex
running bool
closed chan bool
err error
// output for logs
output io.Writer
// service to manage
*Service
// process creator
Process *proc.Process
// Exec
Exec *process.Executable
// process pid
PID *process.PID
}
func newRuntime() *runtime {
return &runtime{
closed: make(chan bool),
start: make(chan *service, 128),
services: make(map[string]*service),
}
}
func newService(s *Service, c CreateOptions) *service {
var exec string
var args []string
if len(s.Exec) > 0 {
parts := strings.Split(s.Exec, " ")
exec = parts[0]
args = []string{}
if len(parts) > 1 {
args = parts[1:]
}
} else {
// set command
exec = c.Command[0]
// set args
if len(c.Command) > 1 {
args = c.Command[1:]
}
}
return &service{
Service: s,
Process: new(proc.Process),
Exec: &process.Executable{
Binary: &packager.Binary{
Name: s.Name,
Path: exec,
},
Env: c.Env,
Args: args,
},
closed: make(chan bool),
output: c.Output,
}
}
func (s *service) streamOutput() {
go io.Copy(s.output, s.PID.Output)
go io.Copy(s.output, s.PID.Error)
}
func (s *service) Running() bool {
s.RLock()
defer s.RUnlock()
return s.running
}
func (s *service) Start() error {
s.Lock()
defer s.Unlock()
if s.running {
return nil
}
// reset
s.err = nil
s.closed = make(chan bool)
// TODO: pull source & build binary
log.Debugf("Runtime service %s forking new process\n", s.Service.Name)
p, err := s.Process.Fork(s.Exec)
if err != nil {
return err
}
// set the pid
s.PID = p
// set to running
s.running = true
if s.output != nil {
s.streamOutput()
}
// wait and watch
go s.Wait()
return nil
}
func (s *service) Stop() error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
return nil
default:
close(s.closed)
s.running = false
if s.PID == nil {
return nil
}
return s.Process.Kill(s.PID)
}
return nil
}
func (s *service) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
func (s *service) Wait() {
// wait for process to exit
err := s.Process.Wait(s.PID)
s.Lock()
defer s.Unlock()
// save the error
if err != nil {
s.err = err
}
// no longer running
s.running = false
}
func (r *runtime) run() {
r.RLock()
closed := r.closed
r.RUnlock()
t := time.NewTicker(time.Second * 5)
defer t.Stop()
for {
select {
case <-t.C:
// check running services
r.RLock()
for _, service := range r.services {
if service.Running() {
continue
}
// TODO: check service error
log.Debugf("Runtime starting %s", service.Name)
if err := service.Start(); err != nil {
log.Debugf("Runtime error starting %s: %v", service.Name, err)
}
}
r.RUnlock()
case service := <-r.start:
if service.Running() {
continue
}
// TODO: check service error
log.Debugf("Starting %s", service.Name)
if err := service.Start(); err != nil {
log.Debugf("Runtime error starting %s: %v", service.Name, err)
}
case <-closed:
// TODO: stop all the things
return
}
}
}
func (r *runtime) Create(s *Service, opts ...CreateOption) error {
r.Lock()
defer r.Unlock()
if _, ok := r.services[s.Name]; ok {
return errors.New("service already registered")
}
var options CreateOptions
for _, o := range opts {
o(&options)
}
if len(s.Exec) == 0 && len(options.Command) == 0 {
return errors.New("missing exec command")
}
// save service
r.services[s.Name] = newService(s, options)
// push into start queue
r.start <- r.services[s.Name]
return nil
}
func (r *runtime) Delete(s *Service) error {
r.Lock()
defer r.Unlock()
if s, ok := r.services[s.Name]; ok {
delete(r.services, s.Name)
return s.Stop()
}
return nil
}
func (r *runtime) Update(s *Service) error {
// delete the service
if err := r.Delete(s); err != nil {
return err
}
// create new service
return r.Create(s)
}
func (r *runtime) List() ([]*Service, error) {
var services []*Service
r.RLock()
defer r.RUnlock()
for _, service := range r.services {
services = append(services, service.Service)
}
return services, nil
}
func (r *runtime) Start() error {
r.Lock()
defer r.Unlock()
// already running
if r.running {
return nil
}
// set running
r.running = true
r.closed = make(chan bool)
go r.run()
return nil
}
func (r *runtime) Stop() error {
r.Lock()
defer r.Unlock()
if !r.running {
return nil
}
select {
case <-r.closed:
return nil
default:
close(r.closed)
// set not running
r.running = false
// stop all the services
for _, service := range r.services {
log.Debugf("Runtime stopping %s", service.Name)
service.Stop()
}
}
return nil
}