Merge pull request 'fix request/response md handling' (#112) from request-respone-md into master
Reviewed-on: #112
This commit was merged in pull request #112.
	This commit is contained in:
		
							
								
								
									
										79
									
								
								http.go
									
									
									
									
									
								
							
							
						
						
									
										79
									
								
								http.go
									
									
									
									
									
								
							| @@ -10,12 +10,10 @@ import ( | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.unistack.org/micro/v4/broker" | ||||
| 	"go.unistack.org/micro/v4/client" | ||||
| 	"go.unistack.org/micro/v4/codec" | ||||
| 	"go.unistack.org/micro/v4/errors" | ||||
| @@ -147,6 +145,11 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client. | ||||
| 	if opts.AuthToken != "" { | ||||
| 		header.Set(metadata.HeaderAuthorization, opts.AuthToken) | ||||
| 	} | ||||
| 	if opts.RequestMetadata != nil { | ||||
| 		for k, v := range opts.RequestMetadata { | ||||
| 			header.Set(k, v) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||
| 		for k, v := range md { | ||||
| @@ -308,9 +311,6 @@ func (h *httpClient) Init(opts ...client.Option) error { | ||||
| 		o(&h.opts) | ||||
| 	} | ||||
|  | ||||
| 	if err := h.opts.Broker.Init(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := h.opts.Tracer.Init(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -334,10 +334,6 @@ func (h *httpClient) Options() client.Options { | ||||
| 	return h.opts | ||||
| } | ||||
|  | ||||
| func (h *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { | ||||
| 	return newHTTPMessage(topic, msg, h.opts.ContentType, opts...) | ||||
| } | ||||
|  | ||||
| func (h *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request { | ||||
| 	return newHTTPRequest(service, method, req, h.opts.ContentType, opts...) | ||||
| } | ||||
| @@ -614,71 +610,6 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli | ||||
| 	return nil, grr | ||||
| } | ||||
|  | ||||
| func (h *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error { | ||||
| 	return h.publish(ctx, p, opts...) | ||||
| } | ||||
|  | ||||
| func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { | ||||
| 	return h.publish(ctx, []client.Message{p}, opts...) | ||||
| } | ||||
|  | ||||
| func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { | ||||
| 	var body []byte | ||||
|  | ||||
| 	options := client.NewPublishOptions(opts...) | ||||
|  | ||||
| 	// get proxy | ||||
| 	exchange := "" | ||||
| 	if v, ok := os.LookupEnv("MICRO_PROXY"); ok { | ||||
| 		exchange = v | ||||
| 	} | ||||
|  | ||||
| 	omd, ok := metadata.FromOutgoingContext(ctx) | ||||
| 	if !ok { | ||||
| 		omd = metadata.New(2) | ||||
| 	} | ||||
|  | ||||
| 	msgs := make([]*broker.Message, 0, len(ps)) | ||||
|  | ||||
| 	for _, p := range ps { | ||||
| 		md := metadata.Copy(omd) | ||||
| 		md[metadata.HeaderContentType] = p.ContentType() | ||||
|  | ||||
| 		// passed in raw data | ||||
| 		if d, ok := p.Payload().(*codec.Frame); ok { | ||||
| 			body = d.Data | ||||
| 		} else { | ||||
| 			// use codec for payload | ||||
| 			cf, err := h.newCodec(p.ContentType()) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 			} | ||||
| 			// set the body | ||||
| 			b, err := cf.Marshal(p.Payload()) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 			} | ||||
| 			body = b | ||||
| 		} | ||||
|  | ||||
| 		topic := p.Topic() | ||||
| 		if len(exchange) > 0 { | ||||
| 			topic = exchange | ||||
| 		} | ||||
|  | ||||
| 		for k, v := range p.Metadata() { | ||||
| 			md.Set(k, v) | ||||
| 		} | ||||
| 		md.Set(metadata.HeaderTopic, topic) | ||||
| 		msgs = append(msgs, &broker.Message{Header: md, Body: body}) | ||||
| 	} | ||||
|  | ||||
| 	return h.opts.Broker.BatchPublish(ctx, msgs, | ||||
| 		broker.PublishContext(ctx), | ||||
| 		broker.PublishBodyOnly(options.BodyOnly), | ||||
| 	) | ||||
| } | ||||
|  | ||||
| func (h *httpClient) String() string { | ||||
| 	return "http" | ||||
| } | ||||
|   | ||||
							
								
								
									
										44
									
								
								message.go
									
									
									
									
									
								
							
							
						
						
									
										44
									
								
								message.go
									
									
									
									
									
								
							| @@ -1,44 +0,0 @@ | ||||
| package http | ||||
|  | ||||
| import ( | ||||
| 	"go.unistack.org/micro/v4/client" | ||||
| 	"go.unistack.org/micro/v4/metadata" | ||||
| ) | ||||
|  | ||||
| type httpMessage struct { | ||||
| 	payload     interface{} | ||||
| 	topic       string | ||||
| 	contentType string | ||||
| 	opts        client.MessageOptions | ||||
| } | ||||
|  | ||||
| func newHTTPMessage(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { | ||||
| 	options := client.NewMessageOptions(opts...) | ||||
|  | ||||
| 	if len(options.ContentType) > 0 { | ||||
| 		contentType = options.ContentType | ||||
| 	} | ||||
|  | ||||
| 	return &httpMessage{ | ||||
| 		payload:     payload, | ||||
| 		topic:       topic, | ||||
| 		contentType: contentType, | ||||
| 		opts:        options, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (h *httpMessage) ContentType() string { | ||||
| 	return h.contentType | ||||
| } | ||||
|  | ||||
| func (h *httpMessage) Topic() string { | ||||
| 	return h.topic | ||||
| } | ||||
|  | ||||
| func (h *httpMessage) Payload() interface{} { | ||||
| 	return h.payload | ||||
| } | ||||
|  | ||||
| func (h *httpMessage) Metadata() metadata.Metadata { | ||||
| 	return h.opts.Metadata | ||||
| } | ||||
							
								
								
									
										10
									
								
								util.go
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								util.go
									
									
									
									
									
								
							| @@ -13,6 +13,7 @@ import ( | ||||
| 	"go.unistack.org/micro/v4/client" | ||||
| 	"go.unistack.org/micro/v4/errors" | ||||
| 	"go.unistack.org/micro/v4/logger" | ||||
| 	"go.unistack.org/micro/v4/metadata" | ||||
| 	rutil "go.unistack.org/micro/v4/util/reflect" | ||||
| ) | ||||
|  | ||||
| @@ -252,6 +253,13 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if opts.ResponseMetadata != nil { | ||||
| 		*opts.ResponseMetadata = metadata.New(len(hrsp.Header)) | ||||
| 		for k, v := range hrsp.Header { | ||||
| 			opts.ResponseMetadata.Set(k, strings.Join(v, ",")) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	select { | ||||
| 	case <-ctx.Done(): | ||||
| 		err = ctx.Err() | ||||
| @@ -275,7 +283,7 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte | ||||
| 		cf, cerr := h.newCodec(ct) | ||||
| 		if cerr != nil { | ||||
| 			if h.opts.Logger.V(logger.DebugLevel) { | ||||
| 				h.opts.Logger.Debugf(ctx, "response with %v unknown content-type %s", hrsp.Header, ct, buf) | ||||
| 				h.opts.Logger.Debugf(ctx, "response with %v unknown content-type %s %s", hrsp.Header, ct, buf) | ||||
| 			} | ||||
| 			return errors.InternalServerError("go.micro.client", cerr.Error()) | ||||
| 		} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user