diff --git a/http.go b/http.go index 35c8495..7cfa677 100644 --- a/http.go +++ b/http.go @@ -10,12 +10,10 @@ import ( "net" "net/http" "net/url" - "os" "strings" "sync" "time" - "go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/errors" @@ -147,6 +145,11 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client. if opts.AuthToken != "" { header.Set(metadata.HeaderAuthorization, opts.AuthToken) } + if opts.RequestMetadata != nil { + for k, v := range opts.RequestMetadata { + header.Set(k, v) + } + } if md, ok := metadata.FromOutgoingContext(ctx); ok { for k, v := range md { @@ -308,9 +311,6 @@ func (h *httpClient) Init(opts ...client.Option) error { o(&h.opts) } - if err := h.opts.Broker.Init(); err != nil { - return err - } if err := h.opts.Tracer.Init(); err != nil { return err } @@ -334,10 +334,6 @@ func (h *httpClient) Options() client.Options { return h.opts } -func (h *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { - return newHTTPMessage(topic, msg, h.opts.ContentType, opts...) -} - func (h *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request { return newHTTPRequest(service, method, req, h.opts.ContentType, opts...) } @@ -614,71 +610,6 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli return nil, grr } -func (h *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error { - return h.publish(ctx, p, opts...) -} - -func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { - return h.publish(ctx, []client.Message{p}, opts...) -} - -func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { - var body []byte - - options := client.NewPublishOptions(opts...) - - // get proxy - exchange := "" - if v, ok := os.LookupEnv("MICRO_PROXY"); ok { - exchange = v - } - - omd, ok := metadata.FromOutgoingContext(ctx) - if !ok { - omd = metadata.New(2) - } - - msgs := make([]*broker.Message, 0, len(ps)) - - for _, p := range ps { - md := metadata.Copy(omd) - md[metadata.HeaderContentType] = p.ContentType() - - // passed in raw data - if d, ok := p.Payload().(*codec.Frame); ok { - body = d.Data - } else { - // use codec for payload - cf, err := h.newCodec(p.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - // set the body - b, err := cf.Marshal(p.Payload()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - body = b - } - - topic := p.Topic() - if len(exchange) > 0 { - topic = exchange - } - - for k, v := range p.Metadata() { - md.Set(k, v) - } - md.Set(metadata.HeaderTopic, topic) - msgs = append(msgs, &broker.Message{Header: md, Body: body}) - } - - return h.opts.Broker.BatchPublish(ctx, msgs, - broker.PublishContext(ctx), - broker.PublishBodyOnly(options.BodyOnly), - ) -} - func (h *httpClient) String() string { return "http" } diff --git a/message.go b/message.go deleted file mode 100644 index c356af0..0000000 --- a/message.go +++ /dev/null @@ -1,44 +0,0 @@ -package http - -import ( - "go.unistack.org/micro/v4/client" - "go.unistack.org/micro/v4/metadata" -) - -type httpMessage struct { - payload interface{} - topic string - contentType string - opts client.MessageOptions -} - -func newHTTPMessage(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { - options := client.NewMessageOptions(opts...) - - if len(options.ContentType) > 0 { - contentType = options.ContentType - } - - return &httpMessage{ - payload: payload, - topic: topic, - contentType: contentType, - opts: options, - } -} - -func (h *httpMessage) ContentType() string { - return h.contentType -} - -func (h *httpMessage) Topic() string { - return h.topic -} - -func (h *httpMessage) Payload() interface{} { - return h.payload -} - -func (h *httpMessage) Metadata() metadata.Metadata { - return h.opts.Metadata -} diff --git a/util.go b/util.go index 96a0e2f..0a59021 100644 --- a/util.go +++ b/util.go @@ -13,6 +13,7 @@ import ( "go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/metadata" rutil "go.unistack.org/micro/v4/util/reflect" ) @@ -252,6 +253,13 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte return nil } + if opts.ResponseMetadata != nil { + *opts.ResponseMetadata = metadata.New(len(hrsp.Header)) + for k, v := range hrsp.Header { + opts.ResponseMetadata.Set(k, strings.Join(v, ",")) + } + } + select { case <-ctx.Done(): err = ctx.Err() @@ -275,7 +283,7 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte cf, cerr := h.newCodec(ct) if cerr != nil { if h.opts.Logger.V(logger.DebugLevel) { - h.opts.Logger.Debugf(ctx, "response with %v unknown content-type %s", hrsp.Header, ct, buf) + h.opts.Logger.Debugf(ctx, "response with %v unknown content-type %s %s", hrsp.Header, ct, buf) } return errors.InternalServerError("go.micro.client", cerr.Error()) }