Merge pull request #759 from micro/runtime

update runtime to function
This commit is contained in:
Asim Aslam 2019-09-13 22:01:52 -07:00 committed by GitHub
commit c8a675249d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 298 additions and 4 deletions

239
runtime/default.go Normal file
View File

@ -0,0 +1,239 @@
package runtime
import (
"errors"
"os"
"strings"
"sync"
"time"
"github.com/micro/go-micro/runtime/package"
"github.com/micro/go-micro/runtime/process"
proc "github.com/micro/go-micro/runtime/process/os"
"github.com/micro/go-micro/util/log"
)
type runtime struct {
sync.RWMutex
closed chan bool
running bool
services map[string]*service
}
type service struct {
sync.RWMutex
running bool
closed chan bool
err error
// service to manage
*Service
// process creator
Process *proc.Process
// Exec
Exec *process.Executable
// process pid
PID *process.PID
}
func newRuntime() *runtime {
return &runtime{
closed: make(chan bool),
services: make(map[string]*service),
}
}
func newService(s *Service) *service {
parts := strings.Split(s.Exec, " ")
exec := parts[0]
args := []string{}
if len(parts) > 1 {
args = parts[1:]
}
return &service{
Service: s,
Process: new(proc.Process),
Exec: &process.Executable{
Binary: &packager.Binary{
Name: s.Name,
Path: exec,
},
Env: os.Environ(),
Args: args,
},
}
}
func (s *service) Running() bool {
s.RLock()
defer s.RUnlock()
return s.running
}
func (s *service) Start() error {
s.Lock()
defer s.Unlock()
if s.running {
return nil
}
// reset
s.err = nil
s.closed = make(chan bool)
// TODO: pull source & build binary
p, err := s.Process.Fork(s.Exec)
if err != nil {
return err
}
// set the pid
s.PID = p
// set to running
s.running = true
// wait and watch
go s.Wait()
return nil
}
func (s *service) Stop() error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
return nil
default:
close(s.closed)
s.running = false
return s.Process.Kill(s.PID)
}
return nil
}
func (s *service) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
func (s *service) Wait() {
// wait for process to exit
err := s.Process.Wait(s.PID)
s.Lock()
defer s.Unlock()
// save the error
if err != nil {
s.err = err
}
// no longer running
s.running = false
}
func (r *runtime) Create(s *Service) error {
r.Lock()
defer r.Unlock()
if _, ok := r.services[s.Name]; ok {
return errors.New("service already registered")
}
// save service
r.services[s.Name] = newService(s)
return nil
}
func (r *runtime) Delete(s *Service) error {
r.Lock()
defer r.Unlock()
if s, ok := r.services[s.Name]; ok {
delete(r.services, s.Name)
return s.Stop()
}
return nil
}
func (r *runtime) Start() error {
r.Lock()
// already running
if r.running {
r.Unlock()
return nil
}
// set running
r.running = true
r.closed = make(chan bool)
closed := r.closed
r.Unlock()
t := time.NewTicker(time.Second * 5)
defer t.Stop()
for {
select {
case <-t.C:
// check running services
r.RLock()
for _, service := range r.services {
if service.Running() {
continue
}
// TODO: check service error
log.Debugf("Starting %s", service.Name)
if err := service.Start(); err != nil {
log.Debugf("Error starting %s: %v", service.Name, err)
}
}
r.RUnlock()
case <-closed:
// TODO: stop all the things
return nil
}
}
return nil
}
func (r *runtime) Stop() error {
r.Lock()
defer r.Unlock()
if !r.running {
return nil
}
select {
case <-r.closed:
return nil
default:
close(r.closed)
// set not running
r.running = false
// stop all the services
for _, service := range r.services {
service.Stop()
}
}
return nil
}

View File

@ -19,10 +19,11 @@ func (p *Process) Exec(exe *process.Executable) error {
} }
func (p *Process) Fork(exe *process.Executable) (*process.PID, error) { func (p *Process) Fork(exe *process.Executable) (*process.PID, error) {
cmd := exec.Command(exe.Binary.Path) // create command
if err := cmd.Start(); err != nil { cmd := exec.Command(exe.Binary.Path, exe.Args...)
return nil, err // set env vars
} cmd.Env = append(cmd.Env, exe.Env...)
in, err := cmd.StdinPipe() in, err := cmd.StdinPipe()
if err != nil { if err != nil {
return nil, err return nil, err
@ -36,6 +37,11 @@ func (p *Process) Fork(exe *process.Executable) (*process.PID, error) {
return nil, err return nil, err
} }
// start the process
if err := cmd.Start(); err != nil {
return nil, err
}
return &process.PID{ return &process.PID{
ID: fmt.Sprintf("%d", cmd.Process.Pid), ID: fmt.Sprintf("%d", cmd.Process.Pid),
Input: in, Input: in,

View File

@ -22,6 +22,10 @@ type Process interface {
type Executable struct { type Executable struct {
// The executable binary // The executable binary
Binary *packager.Binary Binary *packager.Binary
// The env variables
Env []string
// Args to pass
Args []string
} }
// PID is the running process // PID is the running process

45
runtime/runtime.go Normal file
View File

@ -0,0 +1,45 @@
// Package runtime is a service runtime manager
package runtime
// Runtime is a service runtime manager
type Runtime interface {
// Registers a service
Create(*Service) error
// Remove a service
Delete(*Service) error
// starts the runtime
Start() error
// Shutdown the runtime
Stop() error
}
type Service struct {
// name of the service
Name string
// url location of source
Source string
// path to store source
Path string
// exec command
Exec string
}
var (
DefaultRuntime = newRuntime()
)
func Create(s *Service) error {
return DefaultRuntime.Create(s)
}
func Delete(s *Service) error {
return DefaultRuntime.Delete(s)
}
func Start() error {
return DefaultRuntime.Start()
}
func Stop() error {
return DefaultRuntime.Stop()
}