From b122fed3cc8090777103e1f21304dc28a75273f3 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 18 Oct 2025 10:40:59 +0300 Subject: [PATCH] add compile-time interface compliance check Signed-off-by: Vasiliy Tolstov --- client.go | 103 +++++++++++++++++++++++++++++++++++++++++++++++++---- message.go | 44 +++++++++++++++++++++++ 2 files changed, 140 insertions(+), 7 deletions(-) create mode 100644 message.go diff --git a/client.go b/client.go index b8767ac..ba783f6 100644 --- a/client.go +++ b/client.go @@ -3,27 +3,34 @@ package http import ( "context" "net/http" + "os" "strconv" "sync" "time" + "go.unistack.org/micro-client-http/v3/status" + "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/client" + "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/errors" + "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/tracer" - - "go.unistack.org/micro-client-http/v3/status" ) +var _ client.Client = (*Client)(nil) + var DefaultContentType = "application/json" type Client struct { - funcCall client.FuncCall - funcStream client.FuncStream - httpClient *http.Client - opts client.Options - mu sync.RWMutex + funcPublish client.FuncPublish + funcBatchPublish client.FuncBatchPublish + funcCall client.FuncCall + funcStream client.FuncStream + httpClient *http.Client + opts client.Options + mu sync.RWMutex } func NewClient(opts ...client.Option) *Client { @@ -142,3 +149,85 @@ func (c *Client) Stream(ctx context.Context, req client.Request, opts ...client. func (c *Client) String() string { return "http" } + +func (c *Client) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { + return c.funcBatchPublish(ctx, ps, opts...) +} + +func (c *Client) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { + return c.publish(ctx, ps, opts...) +} + +func (c *Client) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + return c.funcPublish(ctx, p, opts...) +} + +func (c *Client) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + return c.publish(ctx, []client.Message{p}, opts...) +} + +func (c *Client) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { + var body []byte + + options := client.NewPublishOptions(opts...) + + // get proxy + exchange := "" + if v, ok := os.LookupEnv("MICRO_PROXY"); ok { + exchange = v + } + // get the exchange + if len(options.Exchange) > 0 { + exchange = options.Exchange + } + + msgs := make([]*broker.Message, 0, len(ps)) + + omd, ok := metadata.FromOutgoingContext(ctx) + if !ok { + omd = metadata.New(2) + } + + for _, p := range ps { + md := metadata.Copy(omd) + topic := p.Topic() + if len(exchange) > 0 { + topic = exchange + } + md.Set(metadata.HeaderTopic, topic) + iter := p.Metadata().Iterator() + var k, v string + for iter.Next(&k, &v) { + md.Set(k, v) + } + + md[metadata.HeaderContentType] = p.ContentType() + + // passed in raw data + if d, ok := p.Payload().(*codec.Frame); ok { + body = d.Data + } else { + // use codec for payload + cf, err := c.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", "%+v", err) + } + // set the body + b, err := cf.Marshal(p.Payload()) + if err != nil { + return errors.InternalServerError("go.micro.client", "%+v", err) + } + body = b + } + msgs = append(msgs, &broker.Message{Header: md, Body: body}) + } + + return c.opts.Broker.BatchPublish(ctx, msgs, + broker.PublishContext(options.Context), + broker.PublishBodyOnly(options.BodyOnly), + ) +} + +func (c *Client) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { + return newHTTPEvent(topic, msg, c.opts.ContentType, opts...) +} diff --git a/message.go b/message.go new file mode 100644 index 0000000..890f384 --- /dev/null +++ b/message.go @@ -0,0 +1,44 @@ +package http + +import ( + "go.unistack.org/micro/v3/client" + "go.unistack.org/micro/v3/metadata" +) + +type httpEvent struct { + payload interface{} + topic string + contentType string + opts client.MessageOptions +} + +func newHTTPEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { + options := client.NewMessageOptions(opts...) + + if len(options.ContentType) > 0 { + contentType = options.ContentType + } + + return &httpEvent{ + payload: payload, + topic: topic, + contentType: contentType, + opts: options, + } +} + +func (c *httpEvent) ContentType() string { + return c.contentType +} + +func (c *httpEvent) Topic() string { + return c.topic +} + +func (c *httpEvent) Payload() interface{} { + return c.payload +} + +func (c *httpEvent) Metadata() metadata.Metadata { + return c.opts.Metadata +}