From 3f3c3a447161dd5a8e4cb6937aafe531bf9511bb Mon Sep 17 00:00:00 2001 From: pugnack Date: Wed, 29 Oct 2025 11:20:04 +0500 Subject: [PATCH] [v3] fix panic on publish methods (#167) * move publish methods to a separate file * reorder client methods * fix panic for publish * refactoring * go mod tidy * rename go.micro => micro * add comment to README about Set-Cookie headers --- README.md | 1 + client.go | 109 +++++++++++----------------------------------- client_publish.go | 84 +++++++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- message.go | 40 +++++------------ 6 files changed, 126 insertions(+), 114 deletions(-) create mode 100644 client_publish.go diff --git a/README.md b/README.md index c51a82c..e335cc8 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ implements HTTP rules defined in the [google/api/http.proto](https://github.com/ * Streaming is not yet implemented. * Only protobuf-generated messages are supported. +* In `micro/v3`, metadata is implemented as `map[string]string`, which works for most headers but not for multiple `Set-Cookie` headers. The HTTP specification forbids the use of commas in `Set-Cookie` headers; therefore, their values cannot be parsed reliably. In `micro/v4`, metadata uses `map[string][]string`, resolving this issue. ## Usage diff --git a/client.go b/client.go index ba783f6..7c19667 100644 --- a/client.go +++ b/client.go @@ -3,20 +3,17 @@ package http import ( "context" "net/http" - "os" "strconv" "sync" "time" - "go.unistack.org/micro-client-http/v3/status" - "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/client" - "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/errors" - "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/tracer" + + "go.unistack.org/micro-client-http/v3/status" ) var _ client.Client = (*Client)(nil) @@ -52,6 +49,8 @@ func NewClient(opts ...client.Option) *Client { c.httpClient = defaultHTTPClient(dialer, clientOpts.TLSConfig) } + c.funcPublish = c.fnPublish + c.funcBatchPublish = c.fnBatchPublish c.funcCall = c.fnCall c.funcStream = c.fnStream @@ -73,6 +72,10 @@ func (c *Client) Init(opts ...client.Option) error { c.funcCall = h(c.funcCall) case client.HookStream: c.funcStream = h(c.funcStream) + case client.HookPublish: + c.funcPublish = h(c.funcPublish) + case client.HookBatchPublish: + c.funcBatchPublish = h(c.funcBatchPublish) } }) @@ -83,6 +86,20 @@ func (c *Client) Options() client.Options { return c.opts } +func (c *Client) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { + msgOpts := client.NewMessageOptions(opts...) + if msgOpts.ContentType == "" { + msgOpts.ContentType = c.opts.ContentType + } + + return &httpMessage{ + topic: topic, + payload: msg, + opts: msgOpts, + } + +} + func (c *Client) NewRequest(service, method string, req any, opts ...client.RequestOption) client.Request { reqOpts := client.NewRequestOptions(opts...) if reqOpts.ContentType == "" { @@ -146,88 +163,14 @@ func (c *Client) Stream(ctx context.Context, req client.Request, opts ...client. return c.funcStream(ctx, req, opts...) } -func (c *Client) String() string { - return "http" +func (c *Client) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + return c.funcPublish(ctx, p, opts...) } func (c *Client) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { return c.funcBatchPublish(ctx, ps, opts...) } -func (c *Client) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { - return c.publish(ctx, ps, opts...) -} - -func (c *Client) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { - return c.funcPublish(ctx, p, opts...) -} - -func (c *Client) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { - return c.publish(ctx, []client.Message{p}, opts...) -} - -func (c *Client) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { - var body []byte - - options := client.NewPublishOptions(opts...) - - // get proxy - exchange := "" - if v, ok := os.LookupEnv("MICRO_PROXY"); ok { - exchange = v - } - // get the exchange - if len(options.Exchange) > 0 { - exchange = options.Exchange - } - - msgs := make([]*broker.Message, 0, len(ps)) - - omd, ok := metadata.FromOutgoingContext(ctx) - if !ok { - omd = metadata.New(2) - } - - for _, p := range ps { - md := metadata.Copy(omd) - topic := p.Topic() - if len(exchange) > 0 { - topic = exchange - } - md.Set(metadata.HeaderTopic, topic) - iter := p.Metadata().Iterator() - var k, v string - for iter.Next(&k, &v) { - md.Set(k, v) - } - - md[metadata.HeaderContentType] = p.ContentType() - - // passed in raw data - if d, ok := p.Payload().(*codec.Frame); ok { - body = d.Data - } else { - // use codec for payload - cf, err := c.newCodec(p.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", "%+v", err) - } - // set the body - b, err := cf.Marshal(p.Payload()) - if err != nil { - return errors.InternalServerError("go.micro.client", "%+v", err) - } - body = b - } - msgs = append(msgs, &broker.Message{Header: md, Body: body}) - } - - return c.opts.Broker.BatchPublish(ctx, msgs, - broker.PublishContext(options.Context), - broker.PublishBodyOnly(options.BodyOnly), - ) -} - -func (c *Client) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { - return newHTTPEvent(topic, msg, c.opts.ContentType, opts...) +func (c *Client) String() string { + return "http" } diff --git a/client_publish.go b/client_publish.go new file mode 100644 index 0000000..dcfaed2 --- /dev/null +++ b/client_publish.go @@ -0,0 +1,84 @@ +package http + +import ( + "context" + "os" + + "go.unistack.org/micro/v3/broker" + "go.unistack.org/micro/v3/client" + "go.unistack.org/micro/v3/codec" + "go.unistack.org/micro/v3/errors" + "go.unistack.org/micro/v3/metadata" +) + +func (c *Client) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { + var body []byte + + options := client.NewPublishOptions(opts...) + + // get proxy + exchange := "" + if v, ok := os.LookupEnv("MICRO_PROXY"); ok { + exchange = v + } + // get the exchange + if len(options.Exchange) > 0 { + exchange = options.Exchange + } + + msgs := make([]*broker.Message, 0, len(ps)) + + omd, ok := metadata.FromOutgoingContext(ctx) + if !ok { + omd = metadata.New(2) + } + + for _, p := range ps { + md := metadata.Copy(omd) + topic := p.Topic() + if len(exchange) > 0 { + topic = exchange + } + md.Set(metadata.HeaderTopic, topic) + iter := p.Metadata().Iterator() + var k, v string + for iter.Next(&k, &v) { + md.Set(k, v) + } + + md[metadata.HeaderContentType] = p.ContentType() + + // passed in raw data + if d, ok := p.Payload().(*codec.Frame); ok { + body = d.Data + } else { + // use codec for payload + cf, err := c.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", "%+v", err) + } + // set the body + b, err := cf.Marshal(p.Payload()) + if err != nil { + return errors.InternalServerError("go.micro.client", "%+v", err) + } + body = b + } + msgs = append(msgs, &broker.Message{Header: md, Body: body}) + } + + return c.opts.Broker.BatchPublish( + ctx, + msgs, + broker.PublishContext(options.Context), + broker.PublishBodyOnly(options.BodyOnly), + ) +} + +func (c *Client) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + return c.publish(ctx, []client.Message{p}, opts...) +} + +func (c *Client) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { + return c.publish(ctx, ps, opts...) +} diff --git a/go.mod b/go.mod index 2d81922..84458aa 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.24.3 require ( github.com/stretchr/testify v1.11.1 go.unistack.org/micro-codec-json/v3 v3.10.3 - go.unistack.org/micro/v3 v3.11.45 + go.unistack.org/micro/v3 v3.11.48 google.golang.org/protobuf v1.36.10 ) diff --git a/go.sum b/go.sum index dd81597..2c55eba 100644 --- a/go.sum +++ b/go.sum @@ -29,8 +29,8 @@ go.unistack.org/micro-codec-json/v3 v3.10.3 h1:FwSBfJswov30Dyqxp1XfQW1EG4h77uTEe go.unistack.org/micro-codec-json/v3 v3.10.3/go.mod h1:26OK5MizMNKhspGC6PRVwpDIp5w1GmRb0nE5eRWWDxA= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -go.unistack.org/micro/v3 v3.11.45 h1:fjTLZYWgsVf9FIMZBxOg8ios2/tmyimnjZrsrxEUeXU= -go.unistack.org/micro/v3 v3.11.45/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4= +go.unistack.org/micro/v3 v3.11.48 h1:lHJYSHU2z1TTcuswItGwG7cZXN6n04EFqY7lk/0gA7w= +go.unistack.org/micro/v3 v3.11.48/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4= golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= diff --git a/message.go b/message.go index 890f384..2ac777e 100644 --- a/message.go +++ b/message.go @@ -5,40 +5,24 @@ import ( "go.unistack.org/micro/v3/metadata" ) -type httpEvent struct { - payload interface{} - topic string - contentType string - opts client.MessageOptions +type httpMessage struct { + topic string + payload interface{} + opts client.MessageOptions } -func newHTTPEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { - options := client.NewMessageOptions(opts...) - - if len(options.ContentType) > 0 { - contentType = options.ContentType - } - - return &httpEvent{ - payload: payload, - topic: topic, - contentType: contentType, - opts: options, - } +func (m *httpMessage) Topic() string { + return m.topic } -func (c *httpEvent) ContentType() string { - return c.contentType +func (m *httpMessage) Payload() interface{} { + return m.payload } -func (c *httpEvent) Topic() string { - return c.topic +func (m *httpMessage) ContentType() string { + return m.opts.ContentType } -func (c *httpEvent) Payload() interface{} { - return c.payload -} - -func (c *httpEvent) Metadata() metadata.Metadata { - return c.opts.Metadata +func (m *httpMessage) Metadata() metadata.Metadata { + return m.opts.Metadata }