add compile-time interface compliance check
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		
							
								
								
									
										103
									
								
								client.go
									
									
									
									
									
								
							
							
						
						
									
										103
									
								
								client.go
									
									
									
									
									
								
							| @@ -3,27 +3,34 @@ package http | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"os" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"go.unistack.org/micro-client-http/v3/status" | ||||||
|  | 	"go.unistack.org/micro/v3/broker" | ||||||
| 	"go.unistack.org/micro/v3/client" | 	"go.unistack.org/micro/v3/client" | ||||||
|  | 	"go.unistack.org/micro/v3/codec" | ||||||
| 	"go.unistack.org/micro/v3/errors" | 	"go.unistack.org/micro/v3/errors" | ||||||
|  | 	"go.unistack.org/micro/v3/metadata" | ||||||
| 	"go.unistack.org/micro/v3/options" | 	"go.unistack.org/micro/v3/options" | ||||||
| 	"go.unistack.org/micro/v3/semconv" | 	"go.unistack.org/micro/v3/semconv" | ||||||
| 	"go.unistack.org/micro/v3/tracer" | 	"go.unistack.org/micro/v3/tracer" | ||||||
|  |  | ||||||
| 	"go.unistack.org/micro-client-http/v3/status" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | var _ client.Client = (*Client)(nil) | ||||||
|  |  | ||||||
| var DefaultContentType = "application/json" | var DefaultContentType = "application/json" | ||||||
|  |  | ||||||
| type Client struct { | type Client struct { | ||||||
| 	funcCall   client.FuncCall | 	funcPublish      client.FuncPublish | ||||||
| 	funcStream client.FuncStream | 	funcBatchPublish client.FuncBatchPublish | ||||||
| 	httpClient *http.Client | 	funcCall         client.FuncCall | ||||||
| 	opts       client.Options | 	funcStream       client.FuncStream | ||||||
| 	mu         sync.RWMutex | 	httpClient       *http.Client | ||||||
|  | 	opts             client.Options | ||||||
|  | 	mu               sync.RWMutex | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewClient(opts ...client.Option) *Client { | func NewClient(opts ...client.Option) *Client { | ||||||
| @@ -142,3 +149,85 @@ func (c *Client) Stream(ctx context.Context, req client.Request, opts ...client. | |||||||
| func (c *Client) String() string { | func (c *Client) String() string { | ||||||
| 	return "http" | 	return "http" | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (c *Client) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { | ||||||
|  | 	return c.funcBatchPublish(ctx, ps, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Client) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { | ||||||
|  | 	return c.publish(ctx, ps, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Client) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { | ||||||
|  | 	return c.funcPublish(ctx, p, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Client) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { | ||||||
|  | 	return c.publish(ctx, []client.Message{p}, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Client) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { | ||||||
|  | 	var body []byte | ||||||
|  |  | ||||||
|  | 	options := client.NewPublishOptions(opts...) | ||||||
|  |  | ||||||
|  | 	// get proxy | ||||||
|  | 	exchange := "" | ||||||
|  | 	if v, ok := os.LookupEnv("MICRO_PROXY"); ok { | ||||||
|  | 		exchange = v | ||||||
|  | 	} | ||||||
|  | 	// get the exchange | ||||||
|  | 	if len(options.Exchange) > 0 { | ||||||
|  | 		exchange = options.Exchange | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	msgs := make([]*broker.Message, 0, len(ps)) | ||||||
|  |  | ||||||
|  | 	omd, ok := metadata.FromOutgoingContext(ctx) | ||||||
|  | 	if !ok { | ||||||
|  | 		omd = metadata.New(2) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, p := range ps { | ||||||
|  | 		md := metadata.Copy(omd) | ||||||
|  | 		topic := p.Topic() | ||||||
|  | 		if len(exchange) > 0 { | ||||||
|  | 			topic = exchange | ||||||
|  | 		} | ||||||
|  | 		md.Set(metadata.HeaderTopic, topic) | ||||||
|  | 		iter := p.Metadata().Iterator() | ||||||
|  | 		var k, v string | ||||||
|  | 		for iter.Next(&k, &v) { | ||||||
|  | 			md.Set(k, v) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		md[metadata.HeaderContentType] = p.ContentType() | ||||||
|  |  | ||||||
|  | 		// passed in raw data | ||||||
|  | 		if d, ok := p.Payload().(*codec.Frame); ok { | ||||||
|  | 			body = d.Data | ||||||
|  | 		} else { | ||||||
|  | 			// use codec for payload | ||||||
|  | 			cf, err := c.newCodec(p.ContentType()) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return errors.InternalServerError("go.micro.client", "%+v", err) | ||||||
|  | 			} | ||||||
|  | 			// set the body | ||||||
|  | 			b, err := cf.Marshal(p.Payload()) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return errors.InternalServerError("go.micro.client", "%+v", err) | ||||||
|  | 			} | ||||||
|  | 			body = b | ||||||
|  | 		} | ||||||
|  | 		msgs = append(msgs, &broker.Message{Header: md, Body: body}) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return c.opts.Broker.BatchPublish(ctx, msgs, | ||||||
|  | 		broker.PublishContext(options.Context), | ||||||
|  | 		broker.PublishBodyOnly(options.BodyOnly), | ||||||
|  | 	) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *Client) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { | ||||||
|  | 	return newHTTPEvent(topic, msg, c.opts.ContentType, opts...) | ||||||
|  | } | ||||||
|   | |||||||
							
								
								
									
										44
									
								
								message.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								message.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,44 @@ | |||||||
|  | package http | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"go.unistack.org/micro/v3/client" | ||||||
|  | 	"go.unistack.org/micro/v3/metadata" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type httpEvent struct { | ||||||
|  | 	payload     interface{} | ||||||
|  | 	topic       string | ||||||
|  | 	contentType string | ||||||
|  | 	opts        client.MessageOptions | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newHTTPEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { | ||||||
|  | 	options := client.NewMessageOptions(opts...) | ||||||
|  |  | ||||||
|  | 	if len(options.ContentType) > 0 { | ||||||
|  | 		contentType = options.ContentType | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &httpEvent{ | ||||||
|  | 		payload:     payload, | ||||||
|  | 		topic:       topic, | ||||||
|  | 		contentType: contentType, | ||||||
|  | 		opts:        options, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *httpEvent) ContentType() string { | ||||||
|  | 	return c.contentType | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *httpEvent) Topic() string { | ||||||
|  | 	return c.topic | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *httpEvent) Payload() interface{} { | ||||||
|  | 	return c.payload | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *httpEvent) Metadata() metadata.Metadata { | ||||||
|  | 	return c.opts.Metadata | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user