a. add default context type when header not found
b. return subscribe error after handler finished
This commit is contained in:
		| @@ -5,6 +5,7 @@ import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"strings" | ||||
|  | ||||
| 	"github.com/micro/go-micro/broker" | ||||
| 	"github.com/micro/go-micro/codec" | ||||
| @@ -169,6 +170,10 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | ||||
| 	return func(p broker.Publication) error { | ||||
| 		msg := p.Message() | ||||
| 		ct := msg.Header["Content-Type"] | ||||
| 		if len(ct) == 0 { | ||||
| 			msg.Header["Content-Type"] = defaultContentType | ||||
| 			ct = defaultContentType | ||||
| 		} | ||||
| 		cf, err := g.newCodec(ct) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| @@ -181,6 +186,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | ||||
| 		delete(hdr, "Content-Type") | ||||
| 		ctx := metadata.NewContext(context.Background(), hdr) | ||||
|  | ||||
| 		results := make(chan error, len(sb.handlers)) | ||||
|  | ||||
| 		for i := 0; i < len(sb.handlers); i++ { | ||||
| 			handler := sb.handlers[i] | ||||
|  | ||||
| @@ -238,13 +245,22 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | ||||
| 				if g.wg != nil { | ||||
| 					defer g.wg.Done() | ||||
| 				} | ||||
| 				fn(ctx, &rpcMessage{ | ||||
| 				results <- fn(ctx, &rpcMessage{ | ||||
| 					topic:       sb.topic, | ||||
| 					contentType: ct, | ||||
| 					payload:     req.Interface(), | ||||
| 				}) | ||||
| 			}() | ||||
| 		} | ||||
| 		var errors []string | ||||
| 		for i := 0; i < len(sb.handlers); i++ { | ||||
| 			if err := <-results; err != nil { | ||||
| 				errors = append(errors, err.Error()) | ||||
| 			} | ||||
| 		} | ||||
| 		if len(errors) > 0 { | ||||
| 			return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user