Fix a runtime deadlock as well as fixing some graceful exiting issues (#945)

This commit is contained in:
Asim Aslam 2019-11-14 14:26:21 +00:00 committed by GitHub
parent 16754a7477
commit 383658edf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 9 deletions

View File

@ -98,31 +98,39 @@ func (r *runtime) run(events <-chan Event) {
} }
buildTime := time.Unix(updateTimeStamp, 0) buildTime := time.Unix(updateTimeStamp, 0)
processEvent := func(event Event, service *Service) error { processEvent := func(event Event, service *Service) error {
buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64) r.RLock()
name := service.Name
version := service.Version
r.RUnlock()
buildTimeStamp, err := strconv.ParseInt(version, 10, 64)
if err != nil { if err != nil {
return err return err
} }
muBuild := time.Unix(buildTimeStamp, 0) muBuild := time.Unix(buildTimeStamp, 0)
if buildTime.After(muBuild) { if buildTime.After(muBuild) {
log.Debugf("Runtime updating service %s", name)
if err := r.Update(service); err != nil { if err := r.Update(service); err != nil {
return err return err
} }
r.Lock()
service.Version = fmt.Sprintf("%d", buildTime.Unix()) service.Version = fmt.Sprintf("%d", buildTime.Unix())
r.Unlock()
} }
return nil return nil
} }
r.Lock()
if len(event.Service) > 0 { if len(event.Service) > 0 {
r.RLock()
service, ok := r.services[event.Service] service, ok := r.services[event.Service]
r.RUnlock()
if !ok { if !ok {
log.Debugf("Runtime unknown service: %s", event.Service) log.Debugf("Runtime unknown service: %s", event.Service)
r.Unlock()
continue continue
} }
if err := processEvent(event, service.Service); err != nil { if err := processEvent(event, service.Service); err != nil {
log.Debugf("Runtime error updating service %s: %v", event.Service, err) log.Debugf("Runtime error updating service %s: %v", event.Service, err)
} }
r.Unlock()
continue continue
} }
// if blank service was received we update all services // if blank service was received we update all services
@ -131,7 +139,6 @@ func (r *runtime) run(events <-chan Event) {
log.Debugf("Runtime error updating service %s: %v", service.Name, err) log.Debugf("Runtime error updating service %s: %v", service.Name, err)
} }
} }
r.Unlock()
} }
case <-r.closed: case <-r.closed:
log.Debugf("Runtime stopped.") log.Debugf("Runtime stopped.")
@ -162,6 +169,7 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error {
r.services[s.Name] = newService(s, options) r.services[s.Name] = newService(s, options)
// push into start queue // push into start queue
log.Debugf("Runtime creating service %s", s.Name)
r.start <- r.services[s.Name] r.start <- r.services[s.Name]
return nil return nil
@ -172,9 +180,20 @@ func (r *runtime) Delete(s *Service) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
log.Debugf("Runtime deleting service %s", s.Name)
if s, ok := r.services[s.Name]; ok { if s, ok := r.services[s.Name]; ok {
// check if running
if !s.Running() {
delete(r.services, s.Name) delete(r.services, s.Name)
return s.Stop() return nil
}
// otherwise stop it
if err := s.Stop(); err != nil {
return err
}
// delete it
delete(r.services, s.Name)
return nil
} }
return nil return nil
@ -182,13 +201,22 @@ func (r *runtime) Delete(s *Service) error {
// Update attemps to update the service // Update attemps to update the service
func (r *runtime) Update(s *Service) error { func (r *runtime) Update(s *Service) error {
var opts []CreateOption
// check if the service already exists
r.RLock()
if service, ok := r.services[s.Name]; ok {
opts = append(opts, WithOutput(service.output))
}
r.RUnlock()
// delete the service // delete the service
if err := r.Delete(s); err != nil { if err := r.Delete(s); err != nil {
return err return err
} }
// create new service // create new service
return r.Create(s) return r.Create(s, opts...)
} }
// List returns a slice of all services tracked by the runtime // List returns a slice of all services tracked by the runtime

View File

@ -6,6 +6,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"strconv" "strconv"
"syscall"
"github.com/micro/go-micro/runtime/process" "github.com/micro/go-micro/runtime/process"
) )
@ -24,6 +25,9 @@ func (p *Process) Fork(exe *process.Executable) (*process.PID, error) {
// set env vars // set env vars
cmd.Env = append(cmd.Env, exe.Env...) cmd.Env = append(cmd.Env, exe.Env...)
// create process group
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
in, err := cmd.StdinPipe() in, err := cmd.StdinPipe()
if err != nil { if err != nil {
return nil, err return nil, err
@ -61,7 +65,16 @@ func (p *Process) Kill(pid *process.PID) error {
return err return err
} }
return pr.Kill() // now kill it
err = pr.Kill()
// kill the group
if pgid, err := syscall.Getpgid(id); err == nil {
syscall.Kill(-pgid, syscall.SIGKILL)
}
// return the kill error
return err
} }
func (p *Process) Wait(pid *process.PID) error { func (p *Process) Wait(pid *process.PID) error {

View File

@ -129,7 +129,12 @@ func (s *service) Stop() error {
if s.PID == nil { if s.PID == nil {
return nil return nil
} }
return s.Process.Kill(s.PID) // kill the process
err := s.Process.Kill(s.PID)
// wait for it to exit
s.Process.Wait(s.PID)
// return the kill error
return err
} }
} }