Merge pull request #1009 from unistack-org/panics

recover panics
This commit is contained in:
Asim Aslam 2019-12-02 23:30:17 +00:00 committed by GitHub
commit b5d65305db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 12 deletions

View File

@ -125,6 +125,10 @@ func newOptions(options ...Option) Options {
opts.Transport = transport.DefaultTransport opts.Transport = transport.DefaultTransport
} }
if opts.Context == nil {
opts.Context = context.Background()
}
return opts return opts
} }

View File

@ -12,12 +12,14 @@ import (
"fmt" "fmt"
"io" "io"
"reflect" "reflect"
"runtime/debug"
"strings" "strings"
"sync" "sync"
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
merrors "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"
) )
@ -505,6 +507,17 @@ func (router *router) Subscribe(s Subscriber) error {
} }
func (router *router) ProcessMessage(ctx context.Context, msg Message) error { func (router *router) ProcessMessage(ctx context.Context, msg Message) error {
var err error
defer func() {
// recover any panics
if r := recover(); r != nil {
log.Log("panic recovered: ", r)
log.Log(string(debug.Stack()))
err = merrors.InternalServerError("go.micro.server", "panic recovered: %v", r)
}
}()
router.su.RLock() router.su.RLock()
// get the subscribers by topic // get the subscribers by topic
@ -517,7 +530,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error {
// unlock since we only need to get the subs // unlock since we only need to get the subs
router.su.RUnlock() router.su.RUnlock()
var results []string var errResults []string
// we may have multiple subscribers for the topic // we may have multiple subscribers for the topic
for _, sub := range subs { for _, sub := range subs {
@ -557,12 +570,12 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error {
cc := msg.Codec() cc := msg.Codec()
// read the header. mostly a noop // read the header. mostly a noop
if err := cc.ReadHeader(&codec.Message{}, codec.Event); err != nil { if err = cc.ReadHeader(&codec.Message{}, codec.Event); err != nil {
return err return err
} }
// read the body into the handler request value // read the body into the handler request value
if err := cc.ReadBody(req.Interface()); err != nil { if err = cc.ReadBody(req.Interface()); err != nil {
return err return err
} }
@ -581,10 +594,10 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error {
// execute the actuall call of the handler // execute the actuall call of the handler
returnValues := handler.method.Call(vals) returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil { if rerr := returnValues[0].Interface(); rerr != nil {
return err.(error) err = rerr.(error)
} }
return nil return err
} }
// wrap with subscriber wrappers // wrap with subscriber wrappers
@ -603,16 +616,16 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error {
} }
// execute the message handler // execute the message handler
if err := fn(ctx, rpcMsg); err != nil { if err = fn(ctx, rpcMsg); err != nil {
results = append(results, err.Error()) errResults = append(errResults, err.Error())
} }
} }
} }
// if no errors just return // if no errors just return
if len(results) == 0 { if len(errResults) > 0 {
return nil err = merrors.InternalServerError("go.micro.server", "subscriber error: %v", strings.Join(errResults, "\n"))
} }
return errors.New("subscriber error: " + strings.Join(results, "\n")) return err
} }

View File

@ -3,6 +3,7 @@ package grpc
import ( import (
"runtime/debug" "runtime/debug"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
pb "github.com/micro/go-micro/transport/grpc/proto" pb "github.com/micro/go-micro/transport/grpc/proto"
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"
@ -16,6 +17,8 @@ type microTransport struct {
} }
func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { func (m *microTransport) Stream(ts pb.Transport_StreamServer) error {
var err error
sock := &grpcTransportSocket{ sock := &grpcTransportSocket{
stream: ts, stream: ts,
local: m.addr, local: m.addr,
@ -30,10 +33,12 @@ func (m *microTransport) Stream(ts pb.Transport_StreamServer) error {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Log(r, string(debug.Stack())) log.Log(r, string(debug.Stack()))
sock.Close() sock.Close()
err = errors.InternalServerError("go.micro.transport", "panic recovered: %v", r)
} }
}() }()
// execute socket func // execute socket func
m.fn(sock) m.fn(sock)
return nil
return err
} }