commit
1c401a852e
@ -176,7 +176,7 @@ func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
|
|||||||
for _, sub := range h.subscribers[s.topic] {
|
for _, sub := range h.subscribers[s.topic] {
|
||||||
// deregister and skip forward
|
// deregister and skip forward
|
||||||
if sub.id == s.id {
|
if sub.id == s.id {
|
||||||
h.r.Deregister(sub.svc)
|
_ = h.r.Deregister(sub.svc)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// keep subscriber
|
// keep subscriber
|
||||||
@ -200,7 +200,7 @@ func (h *httpBroker) run(l net.Listener) {
|
|||||||
h.RLock()
|
h.RLock()
|
||||||
for _, subs := range h.subscribers {
|
for _, subs := range h.subscribers {
|
||||||
for _, sub := range subs {
|
for _, sub := range subs {
|
||||||
h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
|
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
h.RUnlock()
|
h.RUnlock()
|
||||||
@ -210,7 +210,7 @@ func (h *httpBroker) run(l net.Listener) {
|
|||||||
h.RLock()
|
h.RLock()
|
||||||
for _, subs := range h.subscribers {
|
for _, subs := range h.subscribers {
|
||||||
for _, sub := range subs {
|
for _, sub := range subs {
|
||||||
h.r.Deregister(sub.svc)
|
_ = h.r.Deregister(sub.svc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
h.RUnlock()
|
h.RUnlock()
|
||||||
|
@ -54,7 +54,8 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
|||||||
case codec.Response:
|
case codec.Response:
|
||||||
return j.c.ReadHeader(m)
|
return j.c.ReadHeader(m)
|
||||||
case codec.Publication:
|
case codec.Publication:
|
||||||
io.Copy(j.buf, j.rwc)
|
_, err := io.Copy(j.buf, j.rwc)
|
||||||
|
return err
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unrecognised message type: %v", mt)
|
return fmt.Errorf("Unrecognised message type: %v", mt)
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if flusher, ok := c.rwc.(flusher); ok {
|
if flusher, ok := c.rwc.(flusher); ok {
|
||||||
err = flusher.Flush()
|
if err = flusher.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case codec.Response:
|
case codec.Response:
|
||||||
c.Lock()
|
c.Lock()
|
||||||
@ -82,7 +84,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if flusher, ok := c.rwc.(flusher); ok {
|
if flusher, ok := c.rwc.(flusher); ok {
|
||||||
err = flusher.Flush()
|
if err = flusher.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case codec.Publication:
|
case codec.Publication:
|
||||||
data, err := proto.Marshal(b.(proto.Message))
|
data, err := proto.Marshal(b.(proto.Message))
|
||||||
@ -127,7 +131,8 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
|||||||
m.Id = rtmp.GetSeq()
|
m.Id = rtmp.GetSeq()
|
||||||
m.Error = rtmp.GetError()
|
m.Error = rtmp.GetError()
|
||||||
case codec.Publication:
|
case codec.Publication:
|
||||||
io.Copy(c.buf, c.rwc)
|
_, err := io.Copy(c.buf, c.rwc)
|
||||||
|
return err
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unrecognised message type: %v", mt)
|
return fmt.Errorf("Unrecognised message type: %v", mt)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user