From 86a63282547936d96a0dd7ec0209ba56fa7bb084 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Mon, 25 Nov 2019 22:50:59 +0300 Subject: [PATCH] subscriber recovery Signed-off-by: Vasiliy Tolstov --- server/grpc/subscriber.go | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/server/grpc/subscriber.go b/server/grpc/subscriber.go index 0e4f4f4f..6d987974 100644 --- a/server/grpc/subscriber.go +++ b/server/grpc/subscriber.go @@ -4,12 +4,15 @@ import ( "context" "fmt" "reflect" + "runtime/debug" "strings" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/errors" "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/server" + "github.com/micro/go-micro/util/log" ) const ( @@ -165,6 +168,16 @@ func validateSubscriber(sub server.Subscriber) error { func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { return func(p broker.Event) error { + var err error + + defer func() { + if r := recover(); r != nil { + log.Log("panic recovered: ", r) + log.Logf(string(debug.Stack())) + err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r) + } + }() + msg := p.Message() ct := msg.Header["Content-Type"] if len(ct) == 0 { @@ -201,7 +214,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke req = req.Elem() } - if err := cf.Unmarshal(msg.Body, req.Interface()); err != nil { + if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil { return err } @@ -217,8 +230,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke vals = append(vals, reflect.ValueOf(msg.Payload())) returnValues := handler.method.Call(vals) - if err := returnValues[0].Interface(); err != nil { - return err.(error) + if rerr := returnValues[0].Interface(); rerr != nil { + return rerr.(error) } return nil } @@ -245,14 +258,15 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke } var errors []string for i := 0; i < len(sb.handlers); i++ { - if err := <-results; err != nil { - errors = append(errors, err.Error()) + if rerr := <-results; err != nil { + errors = append(errors, rerr.Error()) } } if len(errors) > 0 { - return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) } - return nil + + return err } }