add recovery in case of panics
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -12,12 +12,14 @@ import ( | |||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io" | 	"io" | ||||||
| 	"reflect" | 	"reflect" | ||||||
|  | 	"runtime/debug" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"unicode" | 	"unicode" | ||||||
| 	"unicode/utf8" | 	"unicode/utf8" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/codec" | 	"github.com/micro/go-micro/codec" | ||||||
|  | 	merrors "github.com/micro/go-micro/errors" | ||||||
| 	"github.com/micro/go-micro/util/log" | 	"github.com/micro/go-micro/util/log" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -505,6 +507,17 @@ func (router *router) Subscribe(s Subscriber) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | ||||||
|  | 	var err error | ||||||
|  |  | ||||||
|  | 	defer func() { | ||||||
|  | 		// recover any panics | ||||||
|  | 		if r := recover(); r != nil { | ||||||
|  | 			log.Log("panic recovered: ", r) | ||||||
|  | 			log.Log(string(debug.Stack())) | ||||||
|  | 			err = merrors.InternalServerError("go.micro.server", "panic recovered: %v", r) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
| 	router.su.RLock() | 	router.su.RLock() | ||||||
|  |  | ||||||
| 	// get the subscribers by topic | 	// get the subscribers by topic | ||||||
| @@ -517,7 +530,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | |||||||
| 	// unlock since we only need to get the subs | 	// unlock since we only need to get the subs | ||||||
| 	router.su.RUnlock() | 	router.su.RUnlock() | ||||||
|  |  | ||||||
| 	var results []string | 	var errResults []string | ||||||
|  |  | ||||||
| 	// we may have multiple subscribers for the topic | 	// we may have multiple subscribers for the topic | ||||||
| 	for _, sub := range subs { | 	for _, sub := range subs { | ||||||
| @@ -557,12 +570,12 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | |||||||
| 			cc := msg.Codec() | 			cc := msg.Codec() | ||||||
|  |  | ||||||
| 			// read the header. mostly a noop | 			// read the header. mostly a noop | ||||||
| 			if err := cc.ReadHeader(&codec.Message{}, codec.Event); err != nil { | 			if err = cc.ReadHeader(&codec.Message{}, codec.Event); err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// read the body into the handler request value | 			// read the body into the handler request value | ||||||
| 			if err := cc.ReadBody(req.Interface()); err != nil { | 			if err = cc.ReadBody(req.Interface()); err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| @@ -581,10 +594,10 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | |||||||
|  |  | ||||||
| 				// execute the actuall call of the handler | 				// execute the actuall call of the handler | ||||||
| 				returnValues := handler.method.Call(vals) | 				returnValues := handler.method.Call(vals) | ||||||
| 				if err := returnValues[0].Interface(); err != nil { | 				if rerr := returnValues[0].Interface(); rerr != nil { | ||||||
| 					return err.(error) | 					err = rerr.(error) | ||||||
| 				} | 				} | ||||||
| 				return nil | 				return err | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// wrap with subscriber wrappers | 			// wrap with subscriber wrappers | ||||||
| @@ -603,16 +616,16 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | |||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// execute the message handler | 			// execute the message handler | ||||||
| 			if err := fn(ctx, rpcMsg); err != nil { | 			if err = fn(ctx, rpcMsg); err != nil { | ||||||
| 				results = append(results, err.Error()) | 				errResults = append(errResults, err.Error()) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// if no errors just return | 	// if no errors just return | ||||||
| 	if len(results) == 0 { | 	if len(errResults) > 0 { | ||||||
| 		return nil | 		err = merrors.InternalServerError("go.micro.server", "subscriber error: %v", strings.Join(errResults, "\n")) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return errors.New("subscriber error: " + strings.Join(results, "\n")) | 	return err | ||||||
| } | } | ||||||
|   | |||||||
| @@ -3,6 +3,7 @@ package grpc | |||||||
| import ( | import ( | ||||||
| 	"runtime/debug" | 	"runtime/debug" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/errors" | ||||||
| 	"github.com/micro/go-micro/transport" | 	"github.com/micro/go-micro/transport" | ||||||
| 	pb "github.com/micro/go-micro/transport/grpc/proto" | 	pb "github.com/micro/go-micro/transport/grpc/proto" | ||||||
| 	"github.com/micro/go-micro/util/log" | 	"github.com/micro/go-micro/util/log" | ||||||
| @@ -16,6 +17,8 @@ type microTransport struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { | func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { | ||||||
|  | 	var err error | ||||||
|  |  | ||||||
| 	sock := &grpcTransportSocket{ | 	sock := &grpcTransportSocket{ | ||||||
| 		stream: ts, | 		stream: ts, | ||||||
| 		local:  m.addr, | 		local:  m.addr, | ||||||
| @@ -30,10 +33,12 @@ func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { | |||||||
| 		if r := recover(); r != nil { | 		if r := recover(); r != nil { | ||||||
| 			log.Log(r, string(debug.Stack())) | 			log.Log(r, string(debug.Stack())) | ||||||
| 			sock.Close() | 			sock.Close() | ||||||
|  | 			err = errors.InternalServerError("go.micro.transport", "panic recovered: %v", r) | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	// execute socket func | 	// execute socket func | ||||||
| 	m.fn(sock) | 	m.fn(sock) | ||||||
| 	return nil |  | ||||||
|  | 	return err | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user