Add errors check

This commit is contained in:
Mikhail Grachev 2018-11-13 11:56:21 +03:00
parent 4c821baab4
commit 4006d9f102
3 changed files with 13 additions and 7 deletions

View File

@ -176,7 +176,7 @@ func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
for _, sub := range h.subscribers[s.topic] { for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward // deregister and skip forward
if sub.id == s.id { if sub.id == s.id {
h.r.Deregister(sub.svc) _ = h.r.Deregister(sub.svc)
continue continue
} }
// keep subscriber // keep subscriber
@ -200,7 +200,7 @@ func (h *httpBroker) run(l net.Listener) {
h.RLock() h.RLock()
for _, subs := range h.subscribers { for _, subs := range h.subscribers {
for _, sub := range subs { for _, sub := range subs {
h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
} }
} }
h.RUnlock() h.RUnlock()
@ -210,7 +210,7 @@ func (h *httpBroker) run(l net.Listener) {
h.RLock() h.RLock()
for _, subs := range h.subscribers { for _, subs := range h.subscribers {
for _, sub := range subs { for _, sub := range subs {
h.r.Deregister(sub.svc) _ = h.r.Deregister(sub.svc)
} }
} }
h.RUnlock() h.RUnlock()

View File

@ -54,7 +54,8 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
case codec.Response: case codec.Response:
return j.c.ReadHeader(m) return j.c.ReadHeader(m)
case codec.Publication: case codec.Publication:
io.Copy(j.buf, j.rwc) _, err := io.Copy(j.buf, j.rwc)
return err
default: default:
return fmt.Errorf("Unrecognised message type: %v", mt) return fmt.Errorf("Unrecognised message type: %v", mt)
} }

View File

@ -55,7 +55,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err return err
} }
if flusher, ok := c.rwc.(flusher); ok { if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush() if err = flusher.Flush(); err != nil {
return err
}
} }
case codec.Response: case codec.Response:
c.Lock() c.Lock()
@ -82,7 +84,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err return err
} }
if flusher, ok := c.rwc.(flusher); ok { if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush() if err = flusher.Flush(); err != nil {
return err
}
} }
case codec.Publication: case codec.Publication:
data, err := proto.Marshal(b.(proto.Message)) data, err := proto.Marshal(b.(proto.Message))
@ -127,7 +131,8 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
m.Id = rtmp.GetSeq() m.Id = rtmp.GetSeq()
m.Error = rtmp.GetError() m.Error = rtmp.GetError()
case codec.Publication: case codec.Publication:
io.Copy(c.buf, c.rwc) _, err := io.Copy(c.buf, c.rwc)
return err
default: default:
return fmt.Errorf("Unrecognised message type: %v", mt) return fmt.Errorf("Unrecognised message type: %v", mt)
} }