add message metadata support
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		
							
								
								
									
										26
									
								
								drpc.go
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								drpc.go
									
									
									
									
									
								
							| @@ -2,7 +2,6 @@ | ||||
| package drpc // import "go.unistack.org/micro-client-drpc/v3" | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"net" | ||||
| @@ -615,6 +614,8 @@ func (c *drpcClient) Publish(ctx context.Context, msg client.Message, opts ...cl | ||||
| } | ||||
|  | ||||
| func (c *drpcClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { | ||||
| 	var body []byte | ||||
|  | ||||
| 	options := client.NewPublishOptions(opts...) | ||||
|  | ||||
| 	// get proxy | ||||
| @@ -633,24 +634,22 @@ func (c *drpcClient) publish(ctx context.Context, ps []client.Message, opts ...c | ||||
| 	for _, p := range ps { | ||||
| 		md := metadata.Copy(omd) | ||||
| 		md[metadata.HeaderContentType] = p.ContentType() | ||||
| 		md[metadata.HeaderTopic] = p.Topic() | ||||
|  | ||||
| 		cf, err := c.newCodec(p.ContentType()) | ||||
| 		if err != nil { | ||||
| 			return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 		} | ||||
|  | ||||
| 		var body []byte | ||||
|  | ||||
| 		// passed in raw data | ||||
| 		if d, ok := p.Payload().(*codec.Frame); ok { | ||||
| 			body = d.Data | ||||
| 		} else { | ||||
| 			b := bytes.NewBuffer(nil) | ||||
| 			if err := cf.Write(b, &codec.Message{Type: codec.Event}, p.Payload()); err != nil { | ||||
| 			// use codec for payload | ||||
| 			cf, err := c.newCodec(p.ContentType()) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 			} | ||||
| 			body = b.Bytes() | ||||
| 			// set the body | ||||
| 			b, err := cf.Marshal(p.Payload()) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", err.Error()) | ||||
| 			} | ||||
| 			body = b | ||||
| 		} | ||||
|  | ||||
| 		topic := p.Topic() | ||||
| @@ -658,6 +657,9 @@ func (c *drpcClient) publish(ctx context.Context, ps []client.Message, opts ...c | ||||
| 			topic = exchange | ||||
| 		} | ||||
|  | ||||
| 		for k, v := range p.Metadata() { | ||||
| 			md.Set(k, v) | ||||
| 		} | ||||
| 		md.Set(metadata.HeaderTopic, topic) | ||||
| 		msgs = append(msgs, &broker.Message{Header: md, Body: body}) | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user