diff --git a/debug/log/kubernetes/stream.go b/debug/log/kubernetes/stream.go index e8fd3e98..50d18100 100644 --- a/debug/log/kubernetes/stream.go +++ b/debug/log/kubernetes/stream.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "os" + "sync" "github.com/micro/go-micro/v2/debug/log" ) @@ -20,6 +21,7 @@ func write(l log.Record) error { type kubeStream struct { // the k8s log stream stream chan log.Record + sync.Mutex // the stop chan stop chan bool } @@ -29,6 +31,8 @@ func (k *kubeStream) Chan() <-chan log.Record { } func (k *kubeStream) Stop() error { + k.Lock() + defer k.Unlock() select { case <-k.stop: return nil diff --git a/monitor/default.go b/monitor/default.go deleted file mode 100644 index 380c036b..00000000 --- a/monitor/default.go +++ /dev/null @@ -1,329 +0,0 @@ -package monitor - -import ( - "context" - "errors" - "sync" - "time" - - "github.com/micro/go-micro/v2/client" - pb "github.com/micro/go-micro/v2/debug/service/proto" - "github.com/micro/go-micro/v2/registry" - "github.com/micro/go-micro/v2/registry/cache" -) - -type monitor struct { - options Options - - exit chan bool - registry cache.Cache - client client.Client - - sync.RWMutex - running bool - services map[string]*Status -} - -func (m *monitor) Check(service string) error { - status, err := m.check(service) - if err != nil { - return err - } - m.Lock() - m.services[service] = status - m.Unlock() - - if status.Code != StatusRunning { - return errors.New(status.Info) - } - - return nil -} - -// check provides binary running/failed status. -// In the event Debug.Health cannot be called on a service we reap the node. -func (m *monitor) check(service string) (*Status, error) { - services, err := m.registry.GetService(service) - if err != nil { - return nil, err - } - - // create debug client - debug := pb.NewDebugService(service, m.client) - - var status *Status - var gerr error - - // iterate through multiple versions of a service - for _, service := range services { - for _, node := range service.Nodes { - // TODO: checks that are not just RPC based - // TODO: better matching of the protocol - // TODO: maybe everything has to be a go-micro service? - if node.Metadata["server"] != m.client.String() { - continue - } - // check the transport matches - if node.Metadata["transport"] != m.client.Options().Transport.String() { - continue - } - - rsp, err := debug.Health( - context.Background(), - // empty health request - &pb.HealthRequest{}, - // call this specific node - client.WithAddress(node.Address), - // retry in the event of failure - client.WithRetries(3), - ) - if err != nil { - // save the error - gerr = err - continue - } - - // expecting ok response status - if rsp.Status != "ok" { - gerr = errors.New(rsp.Status) - continue - } - - // no error set status - status = &Status{ - Code: StatusRunning, - Info: "running", - } - } - } - - // if we got the success case return it - if status != nil { - return status, nil - } - - // if gerr is not nil return it - if gerr != nil { - return &Status{ - Code: StatusFailed, - Info: "not running", - Error: gerr.Error(), - }, nil - } - - // otherwise unknown status - return &Status{ - Code: StatusUnknown, - Info: "unknown status", - }, nil -} - -func (m *monitor) reap() { - services, err := m.registry.ListServices() - if err != nil { - return - } - - serviceMap := make(map[string]bool) - for _, service := range services { - serviceMap[service.Name] = true - } - - m.Lock() - defer m.Unlock() - - // range over our watched services - for service := range m.services { - // check if the service exists in the registry - if !serviceMap[service] { - // if not, delete it in our status map - delete(m.services, service) - } - } -} - -func (m *monitor) run() { - // check the status every tick - t := time.NewTicker(time.Minute) - defer t.Stop() - - // reap dead services - t2 := time.NewTicker(time.Hour) - defer t2.Stop() - - // list the known services - services, _ := m.registry.ListServices() - - // create a check chan of same length - check := make(chan string, len(services)) - - // front-load the services to watch - for _, service := range services { - check <- service.Name - } - - for { - select { - // exit if we're told to - case <-m.exit: - return - // check a service when told to - case service := <-check: - // check the status - status, err := m.check(service) - if err != nil { - status = &Status{ - Code: StatusUnknown, - Info: "unknown status", - } - } - - // save the status - m.Lock() - m.services[service] = status - m.Unlock() - // on the tick interval get all services and issue a check - case <-t.C: - // create a list of services - serviceMap := make(map[string]bool) - - m.RLock() - for service := range m.services { - serviceMap[service] = true - } - m.RUnlock() - - go func() { - // check the status of all watched services - for service := range serviceMap { - select { - case <-m.exit: - return - case check <- service: - default: - // barf if we block - } - } - - // list services - services, _ := m.registry.ListServices() - - for _, service := range services { - // start watching the service - if ok := serviceMap[service.Name]; !ok { - m.Watch(service.Name) - } - } - }() - case <-t2.C: - // reap any dead/non-existent services - m.reap() - } - } -} - -func (m *monitor) Reap(service string) error { - services, err := m.registry.GetService(service) - if err != nil { - return nil - } - m.Lock() - defer m.Unlock() - delete(m.services, service) - for _, service := range services { - m.registry.Deregister(service) - } - return nil -} - -func (m *monitor) Status(service string) (Status, error) { - m.RLock() - defer m.RUnlock() - if status, ok := m.services[service]; ok { - return *status, nil - } - return Status{}, ErrNotWatching -} - -func (m *monitor) Watch(service string) error { - m.Lock() - defer m.Unlock() - - // check if we're watching - if _, ok := m.services[service]; ok { - return nil - } - - // get the status - status, err := m.check(service) - if err != nil { - return err - } - - // set the status - m.services[service] = status - return nil -} - -func (m *monitor) Run() error { - m.Lock() - defer m.Unlock() - - if m.running { - return nil - } - - // reset the exit channel - m.exit = make(chan bool) - // setup a new cache - m.registry = cache.New(m.options.Registry) - - // start running - go m.run() - - // set to running - m.running = true - - return nil -} - -func (m *monitor) Stop() error { - m.Lock() - defer m.Unlock() - - if !m.running { - return nil - } - - select { - case <-m.exit: - return nil - default: - close(m.exit) - for s := range m.services { - delete(m.services, s) - } - m.registry.Stop() - m.running = false - return nil - } -} - -func newMonitor(opts ...Option) Monitor { - options := Options{ - Client: client.DefaultClient, - Registry: registry.DefaultRegistry, - } - - for _, o := range opts { - o(&options) - } - - return &monitor{ - options: options, - exit: make(chan bool), - client: options.Client, - registry: cache.New(options.Registry), - services: make(map[string]*Status), - } -} diff --git a/monitor/default_test.go b/monitor/default_test.go deleted file mode 100644 index 335df305..00000000 --- a/monitor/default_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package monitor - -import ( - "testing" -) - -func TestMonitor(t *testing.T) { - // create new monitor - m := NewMonitor() - - if err := m.Run(); err != nil { - t.Fatalf("failed to stop monitor: %v", err) - } - - services := []string{"foo", "bar", "baz"} - - for _, service := range services { - _, err := m.Status(service) - if err == nil { - t.Fatal("expected status error for unknown service") - } - - if err := m.Watch(service); err == nil { - t.Fatal("expected watch error for unknown service") - } - - // TODO: - // 1. start a service - // 2. watch service - // 3. get service status - } - - // stop monitor - if err := m.Stop(); err != nil { - t.Fatalf("failed to stop monitor: %v", err) - } -} diff --git a/monitor/monitor.go b/monitor/monitor.go deleted file mode 100644 index c5f5033d..00000000 --- a/monitor/monitor.go +++ /dev/null @@ -1,45 +0,0 @@ -// Package monitor monitors service health -package monitor - -import ( - "errors" -) - -const ( - StatusUnknown StatusCode = iota - StatusRunning - StatusFailed -) - -type StatusCode int - -// Monitor monitors a service and reaps dead instances -type Monitor interface { - // Reap a service and stop monitoring - Reap(service string) error - // Check the status of the service now - Check(service string) error - // Status of the service - Status(service string) (Status, error) - // Watch starts watching the service - Watch(service string) error - // Run the monitor to watch all services - Run() error - // Stop monitoring - Stop() error -} - -type Status struct { - Code StatusCode - Info string - Error string -} - -var ( - ErrNotWatching = errors.New("not watching") -) - -// NewMonitor returns a new monitor -func NewMonitor(opts ...Option) Monitor { - return newMonitor(opts...) -} diff --git a/monitor/options.go b/monitor/options.go deleted file mode 100644 index 6bb74a02..00000000 --- a/monitor/options.go +++ /dev/null @@ -1,25 +0,0 @@ -package monitor - -import ( - "github.com/micro/go-micro/v2/client" - "github.com/micro/go-micro/v2/registry" -) - -type Options struct { - Client client.Client - Registry registry.Registry -} - -type Option func(*Options) - -func Client(c client.Client) Option { - return func(o *Options) { - o.Client = c - } -} - -func Registry(r registry.Registry) Option { - return func(o *Options) { - o.Registry = r - } -} diff --git a/runtime/default.go b/runtime/default.go index c0452bc8..614319d3 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -259,8 +259,9 @@ type logStream struct { tail *tail.Tail service string stream chan LogRecord - stop chan bool - err error + sync.Mutex + stop chan bool + err error } func (l *logStream) Chan() chan LogRecord { @@ -272,6 +273,8 @@ func (l *logStream) Error() error { } func (l *logStream) Stop() error { + l.Lock() + defer l.Unlock() // @todo seems like this is causing a hangup //err := l.tail.Stop() //if err != nil { diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index e381721e..12733bdb 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -330,6 +330,7 @@ type kubeStream struct { // the k8s log stream stream chan runtime.LogRecord // the stop chan + sync.Mutex stop chan bool err error } @@ -343,6 +344,8 @@ func (k *kubeStream) Chan() chan runtime.LogRecord { } func (k *kubeStream) Stop() error { + k.Lock() + defer k.Unlock() select { case <-k.stop: return nil diff --git a/runtime/kubernetes/kubernetes_logs.go b/runtime/kubernetes/kubernetes_logs.go index 45e928a1..3600170b 100644 --- a/runtime/kubernetes/kubernetes_logs.go +++ b/runtime/kubernetes/kubernetes_logs.go @@ -32,6 +32,7 @@ func (k *klog) podLogStream(podName string, stream *kubeStream) error { if err != nil { stream.err = err + stream.Stop() return err } diff --git a/runtime/service/service.go b/runtime/service/service.go index 4f7dd6e6..8cadfcdf 100644 --- a/runtime/service/service.go +++ b/runtime/service/service.go @@ -95,8 +95,9 @@ func (s *svc) Logs(service *runtime.Service, options ...runtime.LogsOption) (run type serviceLogStream struct { service string stream chan runtime.LogRecord - stop chan bool - err error + sync.Mutex + stop chan bool + err error } func (l *serviceLogStream) Error() error { @@ -108,6 +109,8 @@ func (l *serviceLogStream) Chan() chan runtime.LogRecord { } func (l *serviceLogStream) Stop() error { + l.Lock() + defer l.Unlock() select { case <-l.stop: return nil