Merge pull request #981 from unistack-org/subscriber

subscriber recovery
This commit is contained in:
Asim Aslam 2019-11-27 10:28:51 +00:00 committed by GitHub
commit 5932dd753c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,12 +4,15 @@ import (
"context" "context"
"fmt" "fmt"
"reflect" "reflect"
"runtime/debug"
"strings" "strings"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
"github.com/micro/go-micro/util/log"
) )
const ( const (
@ -165,6 +168,16 @@ func validateSubscriber(sub server.Subscriber) error {
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error { return func(p broker.Event) error {
var err error
defer func() {
if r := recover(); r != nil {
log.Log("panic recovered: ", r)
log.Logf(string(debug.Stack()))
err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r)
}
}()
msg := p.Message() msg := p.Message()
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
if len(ct) == 0 { if len(ct) == 0 {
@ -201,7 +214,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
req = req.Elem() req = req.Elem()
} }
if err := cf.Unmarshal(msg.Body, req.Interface()); err != nil { if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil {
return err return err
} }
@ -217,8 +230,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
vals = append(vals, reflect.ValueOf(msg.Payload())) vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals) returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil { if rerr := returnValues[0].Interface(); rerr != nil {
return err.(error) return rerr.(error)
} }
return nil return nil
} }
@ -245,14 +258,15 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
} }
var errors []string var errors []string
for i := 0; i < len(sb.handlers); i++ { for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil { if rerr := <-results; err != nil {
errors = append(errors, err.Error()) errors = append(errors, rerr.Error())
} }
} }
if len(errors) > 0 { if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
} }
return nil
return err
} }
} }