From 4006d9f1023ed9fc17f6657013abd23f57543edd Mon Sep 17 00:00:00 2001 From: Mikhail Grachev Date: Tue, 13 Nov 2018 11:56:21 +0300 Subject: [PATCH] Add errors check --- broker/http_broker.go | 6 +++--- codec/jsonrpc/jsonrpc.go | 3 ++- codec/protorpc/protorpc.go | 11 ++++++++--- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/broker/http_broker.go b/broker/http_broker.go index 48a110c6..a3e814fe 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -176,7 +176,7 @@ func (h *httpBroker) unsubscribe(s *httpSubscriber) error { for _, sub := range h.subscribers[s.topic] { // deregister and skip forward if sub.id == s.id { - h.r.Deregister(sub.svc) + _ = h.r.Deregister(sub.svc) continue } // keep subscriber @@ -200,7 +200,7 @@ func (h *httpBroker) run(l net.Listener) { h.RLock() for _, subs := range h.subscribers { for _, sub := range subs { - h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) + _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) } } h.RUnlock() @@ -210,7 +210,7 @@ func (h *httpBroker) run(l net.Listener) { h.RLock() for _, subs := range h.subscribers { for _, sub := range subs { - h.r.Deregister(sub.svc) + _ = h.r.Deregister(sub.svc) } } h.RUnlock() diff --git a/codec/jsonrpc/jsonrpc.go b/codec/jsonrpc/jsonrpc.go index e73c35c6..0a22aeb6 100644 --- a/codec/jsonrpc/jsonrpc.go +++ b/codec/jsonrpc/jsonrpc.go @@ -54,7 +54,8 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { case codec.Response: return j.c.ReadHeader(m) case codec.Publication: - io.Copy(j.buf, j.rwc) + _, err := io.Copy(j.buf, j.rwc) + return err default: return fmt.Errorf("Unrecognised message type: %v", mt) } diff --git a/codec/protorpc/protorpc.go b/codec/protorpc/protorpc.go index 22eecd7f..c2b63433 100644 --- a/codec/protorpc/protorpc.go +++ b/codec/protorpc/protorpc.go @@ -55,7 +55,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { return err } if flusher, ok := c.rwc.(flusher); ok { - err = flusher.Flush() + if err = flusher.Flush(); err != nil { + return err + } } case codec.Response: c.Lock() @@ -82,7 +84,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { return err } if flusher, ok := c.rwc.(flusher); ok { - err = flusher.Flush() + if err = flusher.Flush(); err != nil { + return err + } } case codec.Publication: data, err := proto.Marshal(b.(proto.Message)) @@ -127,7 +131,8 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { m.Id = rtmp.GetSeq() m.Error = rtmp.GetError() case codec.Publication: - io.Copy(c.buf, c.rwc) + _, err := io.Copy(c.buf, c.rwc) + return err default: return fmt.Errorf("Unrecognised message type: %v", mt) }