diff --git a/runtime/default.go b/runtime/default.go index 832558f5..6fdb2953 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -98,31 +98,39 @@ func (r *runtime) run(events <-chan Event) { } buildTime := time.Unix(updateTimeStamp, 0) processEvent := func(event Event, service *Service) error { - buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64) + r.RLock() + name := service.Name + version := service.Version + r.RUnlock() + + buildTimeStamp, err := strconv.ParseInt(version, 10, 64) if err != nil { return err } muBuild := time.Unix(buildTimeStamp, 0) if buildTime.After(muBuild) { + log.Debugf("Runtime updating service %s", name) if err := r.Update(service); err != nil { return err } + r.Lock() service.Version = fmt.Sprintf("%d", buildTime.Unix()) + r.Unlock() } return nil } - r.Lock() + if len(event.Service) > 0 { + r.RLock() service, ok := r.services[event.Service] + r.RUnlock() if !ok { log.Debugf("Runtime unknown service: %s", event.Service) - r.Unlock() continue } if err := processEvent(event, service.Service); err != nil { log.Debugf("Runtime error updating service %s: %v", event.Service, err) } - r.Unlock() continue } // if blank service was received we update all services @@ -131,7 +139,6 @@ func (r *runtime) run(events <-chan Event) { log.Debugf("Runtime error updating service %s: %v", service.Name, err) } } - r.Unlock() } case <-r.closed: log.Debugf("Runtime stopped.") @@ -162,6 +169,7 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error { r.services[s.Name] = newService(s, options) // push into start queue + log.Debugf("Runtime creating service %s", s.Name) r.start <- r.services[s.Name] return nil @@ -172,9 +180,20 @@ func (r *runtime) Delete(s *Service) error { r.Lock() defer r.Unlock() + log.Debugf("Runtime deleting service %s", s.Name) if s, ok := r.services[s.Name]; ok { + // check if running + if !s.Running() { + delete(r.services, s.Name) + return nil + } + // otherwise stop it + if err := s.Stop(); err != nil { + return err + } + // delete it delete(r.services, s.Name) - return s.Stop() + return nil } return nil @@ -182,13 +201,22 @@ func (r *runtime) Delete(s *Service) error { // Update attemps to update the service func (r *runtime) Update(s *Service) error { + var opts []CreateOption + + // check if the service already exists + r.RLock() + if service, ok := r.services[s.Name]; ok { + opts = append(opts, WithOutput(service.output)) + } + r.RUnlock() + // delete the service if err := r.Delete(s); err != nil { return err } // create new service - return r.Create(s) + return r.Create(s, opts...) } // List returns a slice of all services tracked by the runtime diff --git a/runtime/process/os/os.go b/runtime/process/os/os.go index ec7d09f3..83eca1f3 100644 --- a/runtime/process/os/os.go +++ b/runtime/process/os/os.go @@ -6,6 +6,7 @@ import ( "os" "os/exec" "strconv" + "syscall" "github.com/micro/go-micro/runtime/process" ) @@ -24,6 +25,9 @@ func (p *Process) Fork(exe *process.Executable) (*process.PID, error) { // set env vars cmd.Env = append(cmd.Env, exe.Env...) + // create process group + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + in, err := cmd.StdinPipe() if err != nil { return nil, err @@ -61,7 +65,16 @@ func (p *Process) Kill(pid *process.PID) error { return err } - return pr.Kill() + // now kill it + err = pr.Kill() + + // kill the group + if pgid, err := syscall.Getpgid(id); err == nil { + syscall.Kill(-pgid, syscall.SIGKILL) + } + + // return the kill error + return err } func (p *Process) Wait(pid *process.PID) error { diff --git a/runtime/service.go b/runtime/service.go index cb0d3bb9..d83d7a5f 100644 --- a/runtime/service.go +++ b/runtime/service.go @@ -129,7 +129,12 @@ func (s *service) Stop() error { if s.PID == nil { return nil } - return s.Process.Kill(s.PID) + // kill the process + err := s.Process.Kill(s.PID) + // wait for it to exit + s.Process.Wait(s.PID) + // return the kill error + return err } }