Merge branch 'master' of ssh://github.com/micro/go-micro
This commit is contained in:
		| @@ -125,6 +125,10 @@ func newOptions(options ...Option) Options { | ||||
| 		opts.Transport = transport.DefaultTransport | ||||
| 	} | ||||
|  | ||||
| 	if opts.Context == nil { | ||||
| 		opts.Context = context.Background() | ||||
| 	} | ||||
|  | ||||
| 	return opts | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -343,27 +343,22 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, | ||||
| 		} | ||||
|  | ||||
| 		// define the handler func | ||||
| 		fn := func(ctx context.Context, req server.Request, rsp interface{}) error { | ||||
| 			ch := make(chan error, 1) | ||||
| 			defer close(ch) | ||||
|  | ||||
| 		fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) { | ||||
| 			defer func() { | ||||
| 				if r := recover(); r != nil { | ||||
| 					log.Log("panic recovered: ", r) | ||||
| 					log.Logf(string(debug.Stack())) | ||||
| 					ch <- errors.InternalServerError("go.micro.server", "panic recovered: %v", r) | ||||
| 					err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r) | ||||
| 				} | ||||
| 			}() | ||||
| 			returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) | ||||
|  | ||||
| 			// The return value for the method is an error. | ||||
| 			if err := returnValues[0].Interface(); err != nil { | ||||
| 				ch <- err.(error) | ||||
| 			if rerr := returnValues[0].Interface(); rerr != nil { | ||||
| 				err = rerr.(error) | ||||
| 			} | ||||
|  | ||||
| 			ch <- nil | ||||
|  | ||||
| 			return <-ch | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		// wrap the handler func | ||||
|   | ||||
| @@ -12,12 +12,14 @@ import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"reflect" | ||||
| 	"runtime/debug" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"unicode" | ||||
| 	"unicode/utf8" | ||||
|  | ||||
| 	"github.com/micro/go-micro/codec" | ||||
| 	merrors "github.com/micro/go-micro/errors" | ||||
| 	"github.com/micro/go-micro/util/log" | ||||
| ) | ||||
|  | ||||
| @@ -505,6 +507,17 @@ func (router *router) Subscribe(s Subscriber) error { | ||||
| } | ||||
|  | ||||
| func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | ||||
| 	var err error | ||||
|  | ||||
| 	defer func() { | ||||
| 		// recover any panics | ||||
| 		if r := recover(); r != nil { | ||||
| 			log.Log("panic recovered: ", r) | ||||
| 			log.Log(string(debug.Stack())) | ||||
| 			err = merrors.InternalServerError("go.micro.server", "panic recovered: %v", r) | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	router.su.RLock() | ||||
|  | ||||
| 	// get the subscribers by topic | ||||
| @@ -517,7 +530,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | ||||
| 	// unlock since we only need to get the subs | ||||
| 	router.su.RUnlock() | ||||
|  | ||||
| 	var results []string | ||||
| 	var errResults []string | ||||
|  | ||||
| 	// we may have multiple subscribers for the topic | ||||
| 	for _, sub := range subs { | ||||
| @@ -557,12 +570,12 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | ||||
| 			cc := msg.Codec() | ||||
|  | ||||
| 			// read the header. mostly a noop | ||||
| 			if err := cc.ReadHeader(&codec.Message{}, codec.Event); err != nil { | ||||
| 			if err = cc.ReadHeader(&codec.Message{}, codec.Event); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			// read the body into the handler request value | ||||
| 			if err := cc.ReadBody(req.Interface()); err != nil { | ||||
| 			if err = cc.ReadBody(req.Interface()); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| @@ -581,10 +594,10 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | ||||
|  | ||||
| 				// execute the actuall call of the handler | ||||
| 				returnValues := handler.method.Call(vals) | ||||
| 				if err := returnValues[0].Interface(); err != nil { | ||||
| 					return err.(error) | ||||
| 				if rerr := returnValues[0].Interface(); rerr != nil { | ||||
| 					err = rerr.(error) | ||||
| 				} | ||||
| 				return nil | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			// wrap with subscriber wrappers | ||||
| @@ -603,16 +616,16 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) error { | ||||
| 			} | ||||
|  | ||||
| 			// execute the message handler | ||||
| 			if err := fn(ctx, rpcMsg); err != nil { | ||||
| 				results = append(results, err.Error()) | ||||
| 			if err = fn(ctx, rpcMsg); err != nil { | ||||
| 				errResults = append(errResults, err.Error()) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// if no errors just return | ||||
| 	if len(results) == 0 { | ||||
| 		return nil | ||||
| 	if len(errResults) > 0 { | ||||
| 		err = merrors.InternalServerError("go.micro.server", "subscriber error: %v", strings.Join(errResults, "\n")) | ||||
| 	} | ||||
|  | ||||
| 	return errors.New("subscriber error: " + strings.Join(results, "\n")) | ||||
| 	return err | ||||
| } | ||||
|   | ||||
| @@ -3,6 +3,7 @@ package grpc | ||||
| import ( | ||||
| 	"runtime/debug" | ||||
|  | ||||
| 	"github.com/micro/go-micro/errors" | ||||
| 	"github.com/micro/go-micro/transport" | ||||
| 	pb "github.com/micro/go-micro/transport/grpc/proto" | ||||
| 	"github.com/micro/go-micro/util/log" | ||||
| @@ -16,6 +17,8 @@ type microTransport struct { | ||||
| } | ||||
|  | ||||
| func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { | ||||
| 	var err error | ||||
|  | ||||
| 	sock := &grpcTransportSocket{ | ||||
| 		stream: ts, | ||||
| 		local:  m.addr, | ||||
| @@ -30,10 +33,12 @@ func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { | ||||
| 		if r := recover(); r != nil { | ||||
| 			log.Log(r, string(debug.Stack())) | ||||
| 			sock.Close() | ||||
| 			err = errors.InternalServerError("go.micro.transport", "panic recovered: %v", r) | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	// execute socket func | ||||
| 	m.fn(sock) | ||||
| 	return nil | ||||
|  | ||||
| 	return err | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user