Merge pull request #268 from jasimmk/fix-connect-lock
Fixing httpBroker dead lock; If publish is called from a subscription #267
This commit is contained in:
commit
cd9441fafb
@ -276,12 +276,16 @@ func (h *httpBroker) Address() string {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Connect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if h.running {
|
||||
h.RUnlock()
|
||||
return nil
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
var l net.Listener
|
||||
var err error
|
||||
@ -351,12 +355,16 @@ func (h *httpBroker) Connect() error {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Disconnect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if !h.running {
|
||||
h.RUnlock()
|
||||
return nil
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
// stop rcache
|
||||
rc, ok := h.r.(rcache.Cache)
|
||||
@ -375,12 +383,15 @@ func (h *httpBroker) Disconnect() error {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Init(opts ...Option) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if h.running {
|
||||
h.RUnlock()
|
||||
return errors.New("cannot init while connected")
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
|
Loading…
x
Reference in New Issue
Block a user