remove global error tracking (#1777)
Co-authored-by: Asim Aslam <asim@aslam.me>
This commit is contained in:
parent
eaa46c2de7
commit
51caf2a24e
@ -131,8 +131,6 @@ func (s *rpcServer) HandleEvent(e broker.Event) error {
|
|||||||
|
|
||||||
// ServeConn serves a single connection
|
// ServeConn serves a single connection
|
||||||
func (s *rpcServer) ServeConn(sock transport.Socket) {
|
func (s *rpcServer) ServeConn(sock transport.Socket) {
|
||||||
// global error tracking
|
|
||||||
var gerr error
|
|
||||||
// streams are multiplexed on Micro-Stream or Micro-Id header
|
// streams are multiplexed on Micro-Stream or Micro-Id header
|
||||||
pool := socket.NewPool()
|
pool := socket.NewPool()
|
||||||
|
|
||||||
@ -147,11 +145,8 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// only wait if there's no error
|
// wait till done
|
||||||
if gerr == nil {
|
wg.Wait()
|
||||||
// wait till done
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// close all the sockets for this connection
|
// close all the sockets for this connection
|
||||||
pool.Close()
|
pool.Close()
|
||||||
@ -172,10 +167,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
|||||||
var msg transport.Message
|
var msg transport.Message
|
||||||
// process inbound messages one at a time
|
// process inbound messages one at a time
|
||||||
if err := sock.Recv(&msg); err != nil {
|
if err := sock.Recv(&msg); err != nil {
|
||||||
// set a global error and return
|
|
||||||
// we're saying we essentially can't
|
|
||||||
// use the socket anymore
|
|
||||||
gerr = err
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -193,7 +184,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
|||||||
if err := sock.Send(&transport.Message{
|
if err := sock.Send(&transport.Message{
|
||||||
Header: msg.Header,
|
Header: msg.Header,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
gerr = err
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// we're done
|
// we're done
|
||||||
@ -303,14 +293,12 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
|||||||
// try get a new codec
|
// try get a new codec
|
||||||
if cf, err = s.newCodec(ct); err != nil {
|
if cf, err = s.newCodec(ct); err != nil {
|
||||||
// no codec found so send back an error
|
// no codec found so send back an error
|
||||||
if err := sock.Send(&transport.Message{
|
sock.Send(&transport.Message{
|
||||||
Header: map[string]string{
|
Header: map[string]string{
|
||||||
"Content-Type": "text/plain",
|
"Content-Type": "text/plain",
|
||||||
},
|
},
|
||||||
Body: []byte(err.Error()),
|
Body: []byte(err.Error()),
|
||||||
}); err != nil {
|
})
|
||||||
gerr = err
|
|
||||||
}
|
|
||||||
|
|
||||||
// release the socket we just created
|
// release the socket we just created
|
||||||
pool.Release(psock)
|
pool.Release(psock)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user