From 0fc4c180eeffe019e3c63f8ac099ecdb8e4b6e88 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 13 Sep 2019 21:33:14 -0700 Subject: [PATCH] update runtime to function --- runtime/default.go | 227 +++++++++++++++++++++++++++++++++++++ runtime/process/os/os.go | 14 ++- runtime/process/process.go | 4 + runtime/runtime.go | 39 +++++++ 4 files changed, 280 insertions(+), 4 deletions(-) create mode 100644 runtime/default.go create mode 100644 runtime/runtime.go diff --git a/runtime/default.go b/runtime/default.go new file mode 100644 index 00000000..b03521d0 --- /dev/null +++ b/runtime/default.go @@ -0,0 +1,227 @@ +package runtime + +import ( + "errors" + "os" + "strings" + "sync" + "time" + + "github.com/micro/go-micro/runtime/package" + "github.com/micro/go-micro/runtime/process" + proc "github.com/micro/go-micro/runtime/process/os" + "github.com/micro/go-micro/util/log" +) + +type runtime struct { + sync.RWMutex + closed chan bool + running bool + services map[string]*service +} + +type service struct { + sync.RWMutex + + running bool + closed chan bool + err error + + // service to manage + *Service + // process creator + Process *proc.Process + // Exec + Exec *process.Executable + // process pid + PID *process.PID +} + +func newRuntime() *runtime { + return &runtime{ + closed: make(chan bool), + services: make(map[string]*service), + } +} + +func newService(s *Service) *service { + parts := strings.Split(s.Exec, " ") + exec := parts[0] + args := []string{} + + if len(parts) > 1 { + args = parts[1:] + } + + return &service{ + Service: s, + Process: new(proc.Process), + Exec: &process.Executable{ + Binary: &packager.Binary{ + Name: s.Name, + Path: exec, + }, + Env: os.Environ(), + Args: args, + }, + } +} + +func (s *service) Running() bool { + s.RLock() + defer s.RUnlock() + return s.running +} + +func (s *service) Start() error { + s.Lock() + defer s.Unlock() + + if s.running { + return nil + } + + // reset + s.err = nil + s.closed = make(chan bool) + + // TODO: pull source & build binary + + p, err := s.Process.Fork(s.Exec) + if err != nil { + return err + } + + // set the pid + s.PID = p + // set to running + s.running = true + + // wait and watch + go s.Wait() + + return nil +} + +func (s *service) Stop() error { + s.Lock() + defer s.Unlock() + + select { + case <-s.closed: + return nil + default: + close(s.closed) + s.running = false + return s.Process.Kill(s.PID) + } + + return nil +} + +func (s *service) Error() error { + s.RLock() + defer s.RUnlock() + return s.err +} + +func (s *service) Wait() { + // wait for process to exit + err := s.Process.Wait(s.PID) + + s.Lock() + defer s.Unlock() + + // save the error + if err != nil { + s.err = err + } + + // no longer running + s.running = false +} + +func (r *runtime) Register(s *Service) error { + r.Lock() + defer r.Unlock() + + if _, ok := r.services[s.Name]; ok { + return errors.New("service already registered") + } + + // save service + r.services[s.Name] = newService(s) + + return nil +} + +func (r *runtime) Run() error { + r.Lock() + + // already running + if r.running { + r.Unlock() + return nil + } + + // set running + r.running = true + r.closed = make(chan bool) + closed := r.closed + + r.Unlock() + + t := time.NewTicker(time.Second * 5) + defer t.Stop() + + for { + select { + case <-t.C: + // check running services + r.RLock() + for _, service := range r.services { + if service.Running() { + continue + } + + // TODO: check service error + log.Debugf("Starting %s", service.Name) + if err := service.Start(); err != nil { + log.Debugf("Error starting %s: %v", service.Name, err) + } + } + r.RUnlock() + case <-closed: + // TODO: stop all the things + return nil + } + } + + return nil +} + +func (r *runtime) Stop() error { + r.Lock() + defer r.Unlock() + + if !r.running { + return nil + } + + select { + case <-r.closed: + return nil + default: + close(r.closed) + + // set not running + r.running = false + + // stop all the services + for _, service := range r.services { + service.Stop() + } + } + + return nil +} diff --git a/runtime/process/os/os.go b/runtime/process/os/os.go index 30d69680..ec7d09f3 100644 --- a/runtime/process/os/os.go +++ b/runtime/process/os/os.go @@ -19,10 +19,11 @@ func (p *Process) Exec(exe *process.Executable) error { } func (p *Process) Fork(exe *process.Executable) (*process.PID, error) { - cmd := exec.Command(exe.Binary.Path) - if err := cmd.Start(); err != nil { - return nil, err - } + // create command + cmd := exec.Command(exe.Binary.Path, exe.Args...) + // set env vars + cmd.Env = append(cmd.Env, exe.Env...) + in, err := cmd.StdinPipe() if err != nil { return nil, err @@ -36,6 +37,11 @@ func (p *Process) Fork(exe *process.Executable) (*process.PID, error) { return nil, err } + // start the process + if err := cmd.Start(); err != nil { + return nil, err + } + return &process.PID{ ID: fmt.Sprintf("%d", cmd.Process.Pid), Input: in, diff --git a/runtime/process/process.go b/runtime/process/process.go index 0e47e88d..b5b302d8 100644 --- a/runtime/process/process.go +++ b/runtime/process/process.go @@ -22,6 +22,10 @@ type Process interface { type Executable struct { // The executable binary Binary *packager.Binary + // The env variables + Env []string + // Args to pass + Args []string } // PID is the running process diff --git a/runtime/runtime.go b/runtime/runtime.go new file mode 100644 index 00000000..0fc9d549 --- /dev/null +++ b/runtime/runtime.go @@ -0,0 +1,39 @@ +// Package runtime is a service runtime manager +package runtime + +// Runtime is a service runtime manager +type Runtime interface { + // Registers a service + Register(*Service) error + // starts the runtime + Run() error + // Shutdown the runtime + Stop() error +} + +type Service struct { + // name of the service + Name string + // url location of source + Source string + // path to store source + Path string + // exec command + Exec string +} + +var ( + DefaultRuntime = newRuntime() +) + +func Register(s *Service) error { + return DefaultRuntime.Register(s) +} + +func Run() error { + return DefaultRuntime.Run() +} + +func Stop() error { + return DefaultRuntime.Stop() +}