diff --git a/subscriber.go b/subscriber.go index bac4c20..7a9f19a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "reflect" + "strings" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" @@ -169,6 +170,10 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke return func(p broker.Publication) error { msg := p.Message() ct := msg.Header["Content-Type"] + if len(ct) == 0 { + msg.Header["Content-Type"] = defaultContentType + ct = defaultContentType + } cf, err := g.newCodec(ct) if err != nil { return err @@ -181,6 +186,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke delete(hdr, "Content-Type") ctx := metadata.NewContext(context.Background(), hdr) + results := make(chan error, len(sb.handlers)) + for i := 0; i < len(sb.handlers); i++ { handler := sb.handlers[i] @@ -238,13 +245,22 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke if g.wg != nil { defer g.wg.Done() } - fn(ctx, &rpcMessage{ + results <- fn(ctx, &rpcMessage{ topic: sb.topic, contentType: ct, payload: req.Interface(), }) }() } + var errors []string + for i := 0; i < len(sb.handlers); i++ { + if err := <-results; err != nil { + errors = append(errors, err.Error()) + } + } + if len(errors) > 0 { + return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + } return nil } }