From b8e96f45d48d4fcaf8fc00638834db69eb2a2802 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 3 Dec 2019 01:36:14 +0300 Subject: [PATCH] add recovery in case of panics Signed-off-by: Vasiliy Tolstov --- server/rpc_router.go | 35 ++++++++++++++++++++++++----------- transport/grpc/handler.go | 7 ++++++- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/server/rpc_router.go b/server/rpc_router.go index fddce6fd..19f672f4 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -12,12 +12,14 @@ import ( "fmt" "io" "reflect" + "runtime/debug" "strings" "sync" "unicode" "unicode/utf8" "github.com/micro/go-micro/codec" + merrors "github.com/micro/go-micro/errors" "github.com/micro/go-micro/util/log" ) @@ -505,6 +507,17 @@ func (router *router) Subscribe(s Subscriber) error { } func (router *router) ProcessMessage(ctx context.Context, msg Message) error { + var err error + + defer func() { + // recover any panics + if r := recover(); r != nil { + log.Log("panic recovered: ", r) + log.Log(string(debug.Stack())) + err = merrors.InternalServerError("go.micro.server", "panic recovered: %v", r) + } + }() + router.su.RLock() // get the subscribers by topic @@ -517,7 +530,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { // unlock since we only need to get the subs router.su.RUnlock() - var results []string + var errResults []string // we may have multiple subscribers for the topic for _, sub := range subs { @@ -557,12 +570,12 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { cc := msg.Codec() // read the header. mostly a noop - if err := cc.ReadHeader(&codec.Message{}, codec.Event); err != nil { + if err = cc.ReadHeader(&codec.Message{}, codec.Event); err != nil { return err } // read the body into the handler request value - if err := cc.ReadBody(req.Interface()); err != nil { + if err = cc.ReadBody(req.Interface()); err != nil { return err } @@ -581,10 +594,10 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { // execute the actuall call of the handler returnValues := handler.method.Call(vals) - if err := returnValues[0].Interface(); err != nil { - return err.(error) + if rerr := returnValues[0].Interface(); rerr != nil { + err = rerr.(error) } - return nil + return err } // wrap with subscriber wrappers @@ -603,16 +616,16 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { } // execute the message handler - if err := fn(ctx, rpcMsg); err != nil { - results = append(results, err.Error()) + if err = fn(ctx, rpcMsg); err != nil { + errResults = append(errResults, err.Error()) } } } // if no errors just return - if len(results) == 0 { - return nil + if len(errResults) > 0 { + err = merrors.InternalServerError("go.micro.server", "subscriber error: %v", strings.Join(errResults, "\n")) } - return errors.New("subscriber error: " + strings.Join(results, "\n")) + return err } diff --git a/transport/grpc/handler.go b/transport/grpc/handler.go index 3220f7dd..e02e1445 100644 --- a/transport/grpc/handler.go +++ b/transport/grpc/handler.go @@ -3,6 +3,7 @@ package grpc import ( "runtime/debug" + "github.com/micro/go-micro/errors" "github.com/micro/go-micro/transport" pb "github.com/micro/go-micro/transport/grpc/proto" "github.com/micro/go-micro/util/log" @@ -16,6 +17,8 @@ type microTransport struct { } func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { + var err error + sock := &grpcTransportSocket{ stream: ts, local: m.addr, @@ -30,10 +33,12 @@ func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { if r := recover(); r != nil { log.Log(r, string(debug.Stack())) sock.Close() + err = errors.InternalServerError("go.micro.transport", "panic recovered: %v", r) } }() // execute socket func m.fn(sock) - return nil + + return err }