Set TTL on first registration of http broker. Also dont resubscribe
This commit is contained in:
parent
f4bd7f707c
commit
8cd906e75e
@ -432,7 +432,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
|||||||
svc: service,
|
svc: service,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := h.r.Register(service); err != nil {
|
if err := h.r.Register(service, registry.RegisterTTL(registerTTL)); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,6 +26,8 @@ type rpcServer struct {
|
|||||||
opts Options
|
opts Options
|
||||||
handlers map[string]Handler
|
handlers map[string]Handler
|
||||||
subscribers map[*subscriber][]broker.Subscriber
|
subscribers map[*subscriber][]broker.Subscriber
|
||||||
|
// used for first registration
|
||||||
|
registered bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRpcServer(opts ...Option) Server {
|
func newRpcServer(opts ...Option) Server {
|
||||||
@ -230,6 +232,12 @@ func (s *rpcServer) Register() error {
|
|||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
|
if s.registered {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.registered = true
|
||||||
|
|
||||||
for sb, _ := range s.subscribers {
|
for sb, _ := range s.subscribers {
|
||||||
handler := s.createSubHandler(sb, s.opts)
|
handler := s.createSubHandler(sb, s.opts)
|
||||||
var opts []broker.SubscribeOption
|
var opts []broker.SubscribeOption
|
||||||
@ -291,6 +299,14 @@ func (s *rpcServer) Deregister() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
|
|
||||||
|
if !s.registered {
|
||||||
|
s.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.registered = false
|
||||||
|
|
||||||
for sb, subs := range s.subscribers {
|
for sb, subs := range s.subscribers {
|
||||||
for _, sub := range subs {
|
for _, sub := range subs {
|
||||||
log.Infof("Unsubscribing from topic: %s", sub.Topic())
|
log.Infof("Unsubscribing from topic: %s", sub.Topic())
|
||||||
@ -298,6 +314,7 @@ func (s *rpcServer) Deregister() error {
|
|||||||
}
|
}
|
||||||
s.subscribers[sb] = nil
|
s.subscribers[sb] = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user