diff --git a/monitor/default.go b/monitor/default.go index f534ad53..ac89d2e3 100644 --- a/monitor/default.go +++ b/monitor/default.go @@ -20,6 +20,7 @@ type monitor struct { client client.Client sync.RWMutex + running bool services map[string]*Status } @@ -97,6 +98,124 @@ func (m *monitor) check(service string) (*Status, error) { }, nil } +func (m *monitor) reap() { + services, err := m.registry.ListServices() + if err != nil { + return + } + + serviceMap := make(map[string]bool) + for _, service := range services { + serviceMap[service.Name] = true + } + + m.Lock() + defer m.Unlock() + + // range over our watched services + for service, _ := range m.services { + // check if the service exists in the registry + if !serviceMap[service] { + // if not, delete it in our status map + delete(m.services, service) + } + } +} + +func (m *monitor) run() { + // check the status every tick + t := time.NewTicker(time.Minute) + defer t.Stop() + + // reap dead services + t2 := time.NewTicker(time.Hour) + defer t2.Stop() + + // list the known services + services, _ := m.registry.ListServices() + + // create a check chan of same length + check := make(chan string, len(services)) + + // front-load the services to watch + for _, service := range services { + check <- service.Name + } + + for { + select { + // exit if we're told to + case <-m.exit: + return + // check a service when told to + case service := <-check: + // check the status + status, err := m.check(service) + if err != nil { + status = &Status{ + Code: StatusUnknown, + Info: "unknown status", + } + } + + // save the status + m.Lock() + m.services[service] = status + m.Unlock() + // on the tick interval get all services and issue a check + case <-t.C: + // create a list of services + serviceMap := make(map[string]bool) + + m.RLock() + for service, _ := range m.services { + serviceMap[service] = true + } + m.RUnlock() + + go func() { + // check the status of all watched services + for service, _ := range serviceMap { + select { + case <-m.exit: + return + case check <- service: + default: + // barf if we block + } + } + + // list services + services, _ := m.registry.ListServices() + + for _, service := range services { + // start watching the service + if ok := serviceMap[service.Name]; !ok { + m.Watch(service.Name) + } + } + }() + case <-t2.C: + // reap any dead/non-existent services + m.reap() + } + } +} + +func (m *monitor) Reap(service string) error { + services, err := m.registry.GetService(service) + if err != nil { + return nil + } + m.Lock() + defer m.Unlock() + delete(m.services, service) + for _, service := range services { + m.registry.Deregister(service) + } + return nil +} + func (m *monitor) Status(service string) (Status, error) { m.RLock() defer m.RUnlock() @@ -126,10 +245,36 @@ func (m *monitor) Watch(service string) error { return nil } +func (m *monitor) Run() error { + m.Lock() + defer m.Unlock() + + if m.running { + return nil + } + + // reset the exit channel + m.exit = make(chan bool) + // setup a new cache + m.registry = cache.New(m.options.Registry) + + // start running + go m.run() + + // set to running + m.running = true + + return nil +} + func (m *monitor) Stop() error { m.Lock() defer m.Unlock() + if !m.running { + return nil + } + select { case <-m.exit: return nil @@ -139,58 +284,13 @@ func (m *monitor) Stop() error { delete(m.services, s) } m.registry.Stop() + m.running = false return nil } return nil } -func (m *monitor) run() { - // check the status every tick - t := time.NewTicker(time.Minute) - defer t.Stop() - - check := make(chan string) - - for { - select { - case <-m.exit: - return - case service := <-check: - // check the status - status, err := m.check(service) - if err != nil { - status = &Status{ - Code: StatusUnknown, - Info: "unknown status", - } - } - - // save the status - m.Lock() - m.services[service] = status - m.Unlock() - case <-t.C: - // create a list of services - var services []string - m.RLock() - for service, _ := range m.services { - services = append(services, service) - } - m.RUnlock() - - // check the status of all watched services - for _, service := range services { - select { - case <-m.exit: - return - case check <- service: - } - } - } - } -} - func newMonitor(opts ...Option) Monitor { options := Options{ Client: client.DefaultClient, @@ -201,14 +301,11 @@ func newMonitor(opts ...Option) Monitor { o(&options) } - m := &monitor{ + return &monitor{ options: options, exit: make(chan bool), client: options.Client, registry: cache.New(options.Registry), services: make(map[string]*Status), } - - go m.run() - return m } diff --git a/monitor/default_test.go b/monitor/default_test.go index da8ff1f3..335df305 100644 --- a/monitor/default_test.go +++ b/monitor/default_test.go @@ -8,6 +8,10 @@ func TestMonitor(t *testing.T) { // create new monitor m := NewMonitor() + if err := m.Run(); err != nil { + t.Fatalf("failed to stop monitor: %v", err) + } + services := []string{"foo", "bar", "baz"} for _, service := range services { @@ -27,5 +31,7 @@ func TestMonitor(t *testing.T) { } // stop monitor - m.Stop() + if err := m.Stop(); err != nil { + t.Fatalf("failed to stop monitor: %v", err) + } } diff --git a/monitor/monitor.go b/monitor/monitor.go index 41fd17cd..711d7d03 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -15,10 +15,14 @@ type StatusCode int // Monitor monitors a service and reaps dead instances type Monitor interface { + // Reap a service and stop monitoring + Reap(service string) error // Status of the service Status(service string) (Status, error) // Watch starts watching the service Watch(service string) error + // Run the monitor to watch all services + Run() error // Stop monitoring Stop() error }