Merge pull request #64 from unistack-org/message
add message metadata support
This commit was merged in pull request #64.
	This commit is contained in:
		
							
								
								
									
										25
									
								
								http.go
									
									
									
									
									
								
							
							
						
						
									
										25
									
								
								http.go
									
									
									
									
									
								
							| @@ -604,6 +604,8 @@ func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...clie | ||||
| } | ||||
|  | ||||
| func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { | ||||
| 	var body []byte | ||||
|  | ||||
| 	options := client.NewPublishOptions(opts...) | ||||
|  | ||||
| 	// get proxy | ||||
| @@ -622,24 +624,22 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c | ||||
| 	for _, p := range ps { | ||||
| 		md := metadata.Copy(omd) | ||||
| 		md[metadata.HeaderContentType] = p.ContentType() | ||||
| 		md[metadata.HeaderTopic] = p.Topic() | ||||
|  | ||||
| 		cf, err := h.newCodec(p.ContentType()) | ||||
| 		if err != nil { | ||||
| 			return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 		} | ||||
|  | ||||
| 		var body []byte | ||||
|  | ||||
| 		// passed in raw data | ||||
| 		if d, ok := p.Payload().(*codec.Frame); ok { | ||||
| 			body = d.Data | ||||
| 		} else { | ||||
| 			b := bytes.NewBuffer(nil) | ||||
| 			if err := cf.Write(b, &codec.Message{Type: codec.Event}, p.Payload()); err != nil { | ||||
| 			// use codec for payload | ||||
| 			cf, err := h.newCodec(p.ContentType()) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 			} | ||||
| 			body = b.Bytes() | ||||
| 			// set the body | ||||
| 			b, err := cf.Marshal(p.Payload()) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 			} | ||||
| 			body = b | ||||
| 		} | ||||
|  | ||||
| 		topic := p.Topic() | ||||
| @@ -647,6 +647,9 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c | ||||
| 			topic = exchange | ||||
| 		} | ||||
|  | ||||
| 		for k, v := range p.Metadata() { | ||||
| 			md.Set(k, v) | ||||
| 		} | ||||
| 		md.Set(metadata.HeaderTopic, topic) | ||||
| 		msgs = append(msgs, &broker.Message{Header: md, Body: body}) | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user