Ensure the broker also expires registry entries

This commit is contained in:
Asim 2016-01-27 20:17:31 +00:00
parent 013d1de2c4
commit dd9067ff4e

View File

@ -62,6 +62,8 @@ type httpPublication struct {
var (
DefaultSubPath = "/_sub"
broadcastVersion = "ff.http.broadcast"
registerTTL = time.Minute
registerInterval = time.Second * 30
)
func init() {
@ -148,6 +150,44 @@ func (h *httpSubscriber) Unsubscribe() error {
return nil
}
func (h *httpBroker) run(l net.Listener) {
t := time.NewTicker(registerInterval)
defer t.Stop()
for {
select {
// heartbeat for each subscriber
case <-t.C:
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.Lock()
h.running = false
h.Unlock()
return
// unsubscribe subscriber
case subscriber := <-h.unsubscribe:
h.Lock()
var subscribers []*httpSubscriber
for _, sub := range h.subscribers[subscriber.topic] {
if sub.id == subscriber.id {
h.r.Deregister(sub.svc)
}
subscribers = append(subscribers, sub)
}
h.subscribers[subscriber.topic] = subscribers
h.Unlock()
}
}
}
func (h *httpBroker) start() error {
h.Lock()
defer h.Unlock()
@ -181,39 +221,25 @@ func (h *httpBroker) start() error {
h.address = l.Addr().String()
go http.Serve(l, h)
go func() {
for {
select {
case ch := <-h.exit:
ch <- l.Close()
h.Lock()
h.running = false
h.Unlock()
return
case subscriber := <-h.unsubscribe:
h.Lock()
var subscribers []*httpSubscriber
for _, sub := range h.subscribers[subscriber.topic] {
if sub.id == subscriber.id {
h.r.Deregister(sub.svc)
}
subscribers = append(subscribers, sub)
}
h.subscribers[subscriber.topic] = subscribers
h.Unlock()
}
}
}()
go h.run(l)
h.running = true
return nil
}
func (h *httpBroker) stop() error {
h.Lock()
defer h.Unlock()
if !h.running {
return nil
}
ch := make(chan error)
h.exit <- ch
return <-ch
err := <-ch
h.running = false
return err
}
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {