Fixing httpBroker dead lock; If publish is called from a subscription
This commit is contained in:
parent
5372707d0e
commit
356cf82af5
@ -276,12 +276,16 @@ func (h *httpBroker) Address() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpBroker) Connect() error {
|
func (h *httpBroker) Connect() error {
|
||||||
h.Lock()
|
|
||||||
defer h.Unlock()
|
|
||||||
|
|
||||||
|
h.RLock()
|
||||||
if h.running {
|
if h.running {
|
||||||
|
h.RUnlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
h.RUnlock()
|
||||||
|
|
||||||
|
h.Lock()
|
||||||
|
defer h.Unlock()
|
||||||
|
|
||||||
var l net.Listener
|
var l net.Listener
|
||||||
var err error
|
var err error
|
||||||
@ -351,12 +355,16 @@ func (h *httpBroker) Connect() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpBroker) Disconnect() error {
|
func (h *httpBroker) Disconnect() error {
|
||||||
h.Lock()
|
|
||||||
defer h.Unlock()
|
|
||||||
|
|
||||||
|
h.RLock()
|
||||||
if !h.running {
|
if !h.running {
|
||||||
|
h.RUnlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
h.RUnlock()
|
||||||
|
|
||||||
|
h.Lock()
|
||||||
|
defer h.Unlock()
|
||||||
|
|
||||||
// stop rcache
|
// stop rcache
|
||||||
rc, ok := h.r.(rcache.Cache)
|
rc, ok := h.r.(rcache.Cache)
|
||||||
@ -375,12 +383,15 @@ func (h *httpBroker) Disconnect() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpBroker) Init(opts ...Option) error {
|
func (h *httpBroker) Init(opts ...Option) error {
|
||||||
h.Lock()
|
h.RLock()
|
||||||
defer h.Unlock()
|
|
||||||
|
|
||||||
if h.running {
|
if h.running {
|
||||||
|
h.RUnlock()
|
||||||
return errors.New("cannot init while connected")
|
return errors.New("cannot init while connected")
|
||||||
}
|
}
|
||||||
|
h.RUnlock()
|
||||||
|
|
||||||
|
h.Lock()
|
||||||
|
defer h.Unlock()
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&h.opts)
|
o(&h.opts)
|
||||||
|
Loading…
Reference in New Issue
Block a user