Merge pull request #520 from xpunch/grpcSubscriberIssues
grpc server subscriber missing some bug fixings
This commit is contained in:
		| @@ -5,6 +5,7 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"reflect" | 	"reflect" | ||||||
|  | 	"strings" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/broker" | 	"github.com/micro/go-micro/broker" | ||||||
| 	"github.com/micro/go-micro/codec" | 	"github.com/micro/go-micro/codec" | ||||||
| @@ -169,6 +170,10 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | |||||||
| 	return func(p broker.Publication) error { | 	return func(p broker.Publication) error { | ||||||
| 		msg := p.Message() | 		msg := p.Message() | ||||||
| 		ct := msg.Header["Content-Type"] | 		ct := msg.Header["Content-Type"] | ||||||
|  | 		if len(ct) == 0 { | ||||||
|  | 			msg.Header["Content-Type"] = defaultContentType | ||||||
|  | 			ct = defaultContentType | ||||||
|  | 		} | ||||||
| 		cf, err := g.newCodec(ct) | 		cf, err := g.newCodec(ct) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| @@ -181,6 +186,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | |||||||
| 		delete(hdr, "Content-Type") | 		delete(hdr, "Content-Type") | ||||||
| 		ctx := metadata.NewContext(context.Background(), hdr) | 		ctx := metadata.NewContext(context.Background(), hdr) | ||||||
|  |  | ||||||
|  | 		results := make(chan error, len(sb.handlers)) | ||||||
|  |  | ||||||
| 		for i := 0; i < len(sb.handlers); i++ { | 		for i := 0; i < len(sb.handlers); i++ { | ||||||
| 			handler := sb.handlers[i] | 			handler := sb.handlers[i] | ||||||
|  |  | ||||||
| @@ -238,13 +245,22 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | |||||||
| 				if g.wg != nil { | 				if g.wg != nil { | ||||||
| 					defer g.wg.Done() | 					defer g.wg.Done() | ||||||
| 				} | 				} | ||||||
| 				fn(ctx, &rpcMessage{ | 				results <- fn(ctx, &rpcMessage{ | ||||||
| 					topic:       sb.topic, | 					topic:       sb.topic, | ||||||
| 					contentType: ct, | 					contentType: ct, | ||||||
| 					payload:     req.Interface(), | 					payload:     req.Interface(), | ||||||
| 				}) | 				}) | ||||||
| 			}() | 			}() | ||||||
| 		} | 		} | ||||||
|  | 		var errors []string | ||||||
|  | 		for i := 0; i < len(sb.handlers); i++ { | ||||||
|  | 			if err := <-results; err != nil { | ||||||
|  | 				errors = append(errors, err.Error()) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if len(errors) > 0 { | ||||||
|  | 			return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) | ||||||
|  | 		} | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user