subscriber recovery
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -4,12 +4,15 @@ import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"runtime/debug" | ||||
| 	"strings" | ||||
|  | ||||
| 	"github.com/micro/go-micro/broker" | ||||
| 	"github.com/micro/go-micro/errors" | ||||
| 	"github.com/micro/go-micro/metadata" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| 	"github.com/micro/go-micro/server" | ||||
| 	"github.com/micro/go-micro/util/log" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| @@ -165,6 +168,16 @@ func validateSubscriber(sub server.Subscriber) error { | ||||
|  | ||||
| func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { | ||||
| 	return func(p broker.Event) error { | ||||
| 		var err error | ||||
|  | ||||
| 		defer func() { | ||||
| 			if r := recover(); r != nil { | ||||
| 				log.Log("panic recovered: ", r) | ||||
| 				log.Logf(string(debug.Stack())) | ||||
| 				err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r) | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		msg := p.Message() | ||||
| 		ct := msg.Header["Content-Type"] | ||||
| 		if len(ct) == 0 { | ||||
| @@ -201,7 +214,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | ||||
| 				req = req.Elem() | ||||
| 			} | ||||
|  | ||||
| 			if err := cf.Unmarshal(msg.Body, req.Interface()); err != nil { | ||||
| 			if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| @@ -217,8 +230,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | ||||
| 				vals = append(vals, reflect.ValueOf(msg.Payload())) | ||||
|  | ||||
| 				returnValues := handler.method.Call(vals) | ||||
| 				if err := returnValues[0].Interface(); err != nil { | ||||
| 					return err.(error) | ||||
| 				if rerr := returnValues[0].Interface(); rerr != nil { | ||||
| 					return rerr.(error) | ||||
| 				} | ||||
| 				return nil | ||||
| 			} | ||||
| @@ -245,14 +258,15 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | ||||
| 		} | ||||
| 		var errors []string | ||||
| 		for i := 0; i < len(sb.handlers); i++ { | ||||
| 			if err := <-results; err != nil { | ||||
| 				errors = append(errors, err.Error()) | ||||
| 			if rerr := <-results; err != nil { | ||||
| 				errors = append(errors, rerr.Error()) | ||||
| 			} | ||||
| 		} | ||||
| 		if len(errors) > 0 { | ||||
| 			return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) | ||||
| 			err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) | ||||
| 		} | ||||
| 		return nil | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user