|
|
|
@@ -1,5 +1,5 @@
|
|
|
|
|
// Package http provides a http client
|
|
|
|
|
package http // import "go.unistack.org/micro-client-http/v3"
|
|
|
|
|
package http // import "go.unistack.org/micro-client-http/v4"
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bufio"
|
|
|
|
@@ -11,22 +11,18 @@ import (
|
|
|
|
|
"net/http"
|
|
|
|
|
"net/url"
|
|
|
|
|
"os"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"go.unistack.org/micro/v3/broker"
|
|
|
|
|
"go.unistack.org/micro/v3/client"
|
|
|
|
|
"go.unistack.org/micro/v3/codec"
|
|
|
|
|
"go.unistack.org/micro/v3/errors"
|
|
|
|
|
"go.unistack.org/micro/v3/logger"
|
|
|
|
|
"go.unistack.org/micro/v3/metadata"
|
|
|
|
|
"go.unistack.org/micro/v3/options"
|
|
|
|
|
"go.unistack.org/micro/v3/selector"
|
|
|
|
|
"go.unistack.org/micro/v3/semconv"
|
|
|
|
|
"go.unistack.org/micro/v3/tracer"
|
|
|
|
|
rutil "go.unistack.org/micro/v3/util/reflect"
|
|
|
|
|
"go.unistack.org/micro/v4/broker"
|
|
|
|
|
"go.unistack.org/micro/v4/client"
|
|
|
|
|
"go.unistack.org/micro/v4/codec"
|
|
|
|
|
"go.unistack.org/micro/v4/errors"
|
|
|
|
|
"go.unistack.org/micro/v4/logger"
|
|
|
|
|
"go.unistack.org/micro/v4/metadata"
|
|
|
|
|
"go.unistack.org/micro/v4/selector"
|
|
|
|
|
rutil "go.unistack.org/micro/v4/util/reflect"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var DefaultContentType = "application/json"
|
|
|
|
@@ -39,12 +35,8 @@ func filterLabel(r []router.Route) []router.Route {
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
type httpClient struct {
|
|
|
|
|
funcPublish client.FuncPublish
|
|
|
|
|
funcBatchPublish client.FuncBatchPublish
|
|
|
|
|
funcCall client.FuncCall
|
|
|
|
|
funcStream client.FuncStream
|
|
|
|
|
httpcli *http.Client
|
|
|
|
|
opts client.Options
|
|
|
|
|
httpcli *http.Client
|
|
|
|
|
opts client.Options
|
|
|
|
|
sync.RWMutex
|
|
|
|
|
init bool
|
|
|
|
|
}
|
|
|
|
@@ -155,11 +147,6 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client.
|
|
|
|
|
if opts.AuthToken != "" {
|
|
|
|
|
header.Set(metadata.HeaderAuthorization, opts.AuthToken)
|
|
|
|
|
}
|
|
|
|
|
if opts.RequestMetadata != nil {
|
|
|
|
|
for k, v := range opts.RequestMetadata {
|
|
|
|
|
header.Set(k, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
|
|
|
for k, v := range md {
|
|
|
|
@@ -225,29 +212,29 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client.
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if log.V(logger.DebugLevel) {
|
|
|
|
|
log.Debug(ctx, fmt.Sprintf("request %s to %s with headers %v body %s", method, u.String(), hreq.Header, b))
|
|
|
|
|
log.Debugf(ctx, "request %s to %s with headers %v body %s", method, u.String(), hreq.Header, b)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return hreq, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
|
|
|
|
func (h *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
|
|
|
|
ct := req.ContentType()
|
|
|
|
|
if len(opts.ContentType) > 0 {
|
|
|
|
|
ct = opts.ContentType
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cf, err := c.newCodec(ct)
|
|
|
|
|
cf, err := h.newCodec(ct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.BadRequest("go.micro.client", err.Error())
|
|
|
|
|
}
|
|
|
|
|
hreq, err := newRequest(ctx, c.opts.Logger, addr, req, ct, cf, req.Body(), opts)
|
|
|
|
|
hreq, err := newRequest(ctx, h.opts.Logger, addr, req, ct, cf, req.Body(), opts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// make the request
|
|
|
|
|
hrsp, err := c.httpcli.Do(hreq)
|
|
|
|
|
hrsp, err := h.httpcli.Do(hreq)
|
|
|
|
|
if err != nil {
|
|
|
|
|
switch err := err.(type) {
|
|
|
|
|
case *url.Error:
|
|
|
|
@@ -264,29 +251,29 @@ func (c *httpClient) call(ctx context.Context, addr string, req client.Request,
|
|
|
|
|
|
|
|
|
|
defer hrsp.Body.Close()
|
|
|
|
|
|
|
|
|
|
return c.parseRsp(ctx, hrsp, rsp, opts)
|
|
|
|
|
return h.parseRsp(ctx, hrsp, rsp, opts)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) {
|
|
|
|
|
func (h *httpClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) {
|
|
|
|
|
ct := req.ContentType()
|
|
|
|
|
if len(opts.ContentType) > 0 {
|
|
|
|
|
ct = opts.ContentType
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// get codec
|
|
|
|
|
cf, err := c.newCodec(ct)
|
|
|
|
|
cf, err := h.newCodec(ct)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.BadRequest("go.micro.client", err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cc, err := (c.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr)
|
|
|
|
|
cc, err := (h.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &httpStream{
|
|
|
|
|
address: addr,
|
|
|
|
|
logger: c.opts.Logger,
|
|
|
|
|
logger: h.opts.Logger,
|
|
|
|
|
context: ctx,
|
|
|
|
|
closed: make(chan bool),
|
|
|
|
|
opts: opts,
|
|
|
|
@@ -298,88 +285,66 @@ func (c *httpClient) stream(ctx context.Context, addr string, req client.Request
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) newCodec(ct string) (codec.Codec, error) {
|
|
|
|
|
c.RLock()
|
|
|
|
|
func (h *httpClient) newCodec(ct string) (codec.Codec, error) {
|
|
|
|
|
h.RLock()
|
|
|
|
|
defer h.RUnlock()
|
|
|
|
|
|
|
|
|
|
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
|
|
|
|
ct = ct[:idx]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if cf, ok := c.opts.Codecs[ct]; ok {
|
|
|
|
|
c.RUnlock()
|
|
|
|
|
return cf, nil
|
|
|
|
|
if c, ok := h.opts.Codecs[ct]; ok {
|
|
|
|
|
return c, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.RUnlock()
|
|
|
|
|
return nil, codec.ErrUnknownContentType
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) Init(opts ...client.Option) error {
|
|
|
|
|
func (h *httpClient) Init(opts ...client.Option) error {
|
|
|
|
|
if len(opts) == 0 && h.init {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
for _, o := range opts {
|
|
|
|
|
o(&c.opts)
|
|
|
|
|
o(&h.opts)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.funcCall = c.fnCall
|
|
|
|
|
c.funcStream = c.fnStream
|
|
|
|
|
c.funcPublish = c.fnPublish
|
|
|
|
|
c.funcBatchPublish = c.fnBatchPublish
|
|
|
|
|
|
|
|
|
|
c.opts.Hooks.EachNext(func(hook options.Hook) {
|
|
|
|
|
switch h := hook.(type) {
|
|
|
|
|
case client.HookCall:
|
|
|
|
|
c.funcCall = h(c.funcCall)
|
|
|
|
|
case client.HookStream:
|
|
|
|
|
c.funcStream = h(c.funcStream)
|
|
|
|
|
case client.HookPublish:
|
|
|
|
|
c.funcPublish = h(c.funcPublish)
|
|
|
|
|
case client.HookBatchPublish:
|
|
|
|
|
c.funcBatchPublish = h(c.funcBatchPublish)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
if err := h.opts.Broker.Init(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := h.opts.Tracer.Init(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := h.opts.Router.Init(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := h.opts.Logger.Init(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := h.opts.Meter.Init(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if err := h.opts.Transport.Init(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) Options() client.Options {
|
|
|
|
|
return c.opts
|
|
|
|
|
func (h *httpClient) Options() client.Options {
|
|
|
|
|
return h.opts
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
|
|
|
|
return newHTTPMessage(topic, msg, c.opts.ContentType, opts...)
|
|
|
|
|
func (h *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
|
|
|
|
return newHTTPMessage(topic, msg, h.opts.ContentType, opts...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request {
|
|
|
|
|
return newHTTPRequest(service, method, req, c.opts.ContentType, opts...)
|
|
|
|
|
func (h *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request {
|
|
|
|
|
return newHTTPRequest(service, method, req, h.opts.ContentType, opts...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
|
|
|
|
ts := time.Now()
|
|
|
|
|
c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
|
|
|
|
var sp tracer.Span
|
|
|
|
|
ctx, sp = c.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
|
|
|
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
|
|
|
|
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
|
|
|
|
)
|
|
|
|
|
err := c.funcCall(ctx, req, rsp, opts...)
|
|
|
|
|
c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
|
|
|
|
te := time.Since(ts)
|
|
|
|
|
c.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
|
|
|
|
c.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
|
|
|
|
|
|
|
|
|
if me := errors.FromError(err); me == nil {
|
|
|
|
|
sp.Finish()
|
|
|
|
|
c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
|
|
|
|
|
} else {
|
|
|
|
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
|
|
|
|
c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
|
|
|
|
func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
|
|
|
|
// make a copy of call opts
|
|
|
|
|
callOpts := c.opts.CallOptions
|
|
|
|
|
callOpts := h.opts.CallOptions
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
|
opt(&callOpts)
|
|
|
|
|
}
|
|
|
|
@@ -406,21 +371,26 @@ func (c *httpClient) fnCall(ctx context.Context, req client.Request, rsp interfa
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// make copy of call method
|
|
|
|
|
hcall := c.call
|
|
|
|
|
hcall := h.call
|
|
|
|
|
|
|
|
|
|
// wrap the call in reverse
|
|
|
|
|
for i := len(callOpts.CallWrappers); i > 0; i-- {
|
|
|
|
|
hcall = callOpts.CallWrappers[i-1](hcall)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// use the router passed as a call option, or fallback to the rpc clients router
|
|
|
|
|
if callOpts.Router == nil {
|
|
|
|
|
callOpts.Router = c.opts.Router
|
|
|
|
|
callOpts.Router = h.opts.Router
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if callOpts.Selector == nil {
|
|
|
|
|
callOpts.Selector = c.opts.Selector
|
|
|
|
|
callOpts.Selector = h.opts.Selector
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// inject proxy address
|
|
|
|
|
// TODO: don't even bother using Lookup/Select in this case
|
|
|
|
|
if len(c.opts.Proxy) > 0 {
|
|
|
|
|
callOpts.Address = []string{c.opts.Proxy}
|
|
|
|
|
if len(h.opts.Proxy) > 0 {
|
|
|
|
|
callOpts.Address = []string{h.opts.Proxy}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var next selector.Next
|
|
|
|
@@ -442,7 +412,7 @@ func (c *httpClient) fnCall(ctx context.Context, req client.Request, rsp interfa
|
|
|
|
|
var routes []string
|
|
|
|
|
// lookup the route to send the reques to
|
|
|
|
|
// TODO apply any filtering here
|
|
|
|
|
routes, err = c.opts.Lookup(ctx, req, callOpts)
|
|
|
|
|
routes, err = h.opts.Lookup(ctx, req, callOpts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.InternalServerError("go.micro.client", err.Error())
|
|
|
|
|
}
|
|
|
|
@@ -459,7 +429,7 @@ func (c *httpClient) fnCall(ctx context.Context, req client.Request, rsp interfa
|
|
|
|
|
// make the call
|
|
|
|
|
err = hcall(ctx, node, req, rsp, callOpts)
|
|
|
|
|
// record the result of the call to inform future routing decisions
|
|
|
|
|
if verr := c.opts.Selector.Record(node, err); verr != nil {
|
|
|
|
|
if verr := h.opts.Selector.Record(node, err); verr != nil {
|
|
|
|
|
return verr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -504,36 +474,11 @@ func (c *httpClient) fnCall(ctx context.Context, req client.Request, rsp interfa
|
|
|
|
|
return gerr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
|
|
|
|
ts := time.Now()
|
|
|
|
|
c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
|
|
|
|
var sp tracer.Span
|
|
|
|
|
ctx, sp = c.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
|
|
|
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
|
|
|
|
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
|
|
|
|
)
|
|
|
|
|
stream, err := c.funcStream(ctx, req, opts...)
|
|
|
|
|
c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
|
|
|
|
te := time.Since(ts)
|
|
|
|
|
c.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
|
|
|
|
c.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
|
|
|
|
|
|
|
|
|
if me := errors.FromError(err); me == nil {
|
|
|
|
|
sp.Finish()
|
|
|
|
|
c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
|
|
|
|
|
} else {
|
|
|
|
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
|
|
|
|
c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return stream, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
|
|
|
|
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
|
|
// make a copy of call opts
|
|
|
|
|
callOpts := c.opts.CallOptions
|
|
|
|
|
callOpts := h.opts.CallOptions
|
|
|
|
|
for _, o := range opts {
|
|
|
|
|
o(&callOpts)
|
|
|
|
|
}
|
|
|
|
@@ -570,17 +515,17 @@ func (c *httpClient) fnStream(ctx context.Context, req client.Request, opts ...c
|
|
|
|
|
|
|
|
|
|
// use the router passed as a call option, or fallback to the rpc clients router
|
|
|
|
|
if callOpts.Router == nil {
|
|
|
|
|
callOpts.Router = c.opts.Router
|
|
|
|
|
callOpts.Router = h.opts.Router
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if callOpts.Selector == nil {
|
|
|
|
|
callOpts.Selector = c.opts.Selector
|
|
|
|
|
callOpts.Selector = h.opts.Selector
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// inject proxy address
|
|
|
|
|
// TODO: don't even bother using Lookup/Select in this case
|
|
|
|
|
if len(c.opts.Proxy) > 0 {
|
|
|
|
|
callOpts.Address = []string{c.opts.Proxy}
|
|
|
|
|
if len(h.opts.Proxy) > 0 {
|
|
|
|
|
callOpts.Address = []string{h.opts.Proxy}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var next selector.Next
|
|
|
|
@@ -601,7 +546,7 @@ func (c *httpClient) fnStream(ctx context.Context, req client.Request, opts ...c
|
|
|
|
|
var routes []string
|
|
|
|
|
// lookup the route to send the reques to
|
|
|
|
|
// TODO apply any filtering here
|
|
|
|
|
routes, err = c.opts.Lookup(ctx, req, callOpts)
|
|
|
|
|
routes, err = h.opts.Lookup(ctx, req, callOpts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
|
|
|
|
}
|
|
|
|
@@ -615,10 +560,10 @@ func (c *httpClient) fnStream(ctx context.Context, req client.Request, opts ...c
|
|
|
|
|
|
|
|
|
|
node := next()
|
|
|
|
|
|
|
|
|
|
stream, cerr := c.stream(ctx, node, req, callOpts)
|
|
|
|
|
stream, cerr := h.stream(ctx, node, req, callOpts)
|
|
|
|
|
|
|
|
|
|
// record the result of the call to inform future routing decisions
|
|
|
|
|
if verr := c.opts.Selector.Record(node, cerr); verr != nil {
|
|
|
|
|
if verr := h.opts.Selector.Record(node, cerr); verr != nil {
|
|
|
|
|
return nil, verr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -669,23 +614,15 @@ func (c *httpClient) fnStream(ctx context.Context, req client.Request, opts ...c
|
|
|
|
|
return nil, grr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error {
|
|
|
|
|
return c.funcBatchPublish(ctx, p, opts...)
|
|
|
|
|
func (h *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error {
|
|
|
|
|
return h.publish(ctx, p, opts...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) fnBatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error {
|
|
|
|
|
return c.publish(ctx, p, opts...)
|
|
|
|
|
func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
|
|
|
|
return h.publish(ctx, []client.Message{p}, opts...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
|
|
|
|
return c.funcPublish(ctx, p, opts...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
|
|
|
|
return c.publish(ctx, []client.Message{p}, opts...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
|
|
|
|
func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
|
|
|
|
var body []byte
|
|
|
|
|
|
|
|
|
|
options := client.NewPublishOptions(opts...)
|
|
|
|
@@ -695,10 +632,6 @@ func (c *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|
|
|
|
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
|
|
|
|
|
exchange = v
|
|
|
|
|
}
|
|
|
|
|
// get the exchange
|
|
|
|
|
if len(options.Exchange) > 0 {
|
|
|
|
|
exchange = options.Exchange
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
omd, ok := metadata.FromOutgoingContext(ctx)
|
|
|
|
|
if !ok {
|
|
|
|
@@ -710,23 +643,13 @@ func (c *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|
|
|
|
for _, p := range ps {
|
|
|
|
|
md := metadata.Copy(omd)
|
|
|
|
|
md[metadata.HeaderContentType] = p.ContentType()
|
|
|
|
|
topic := p.Topic()
|
|
|
|
|
if len(exchange) > 0 {
|
|
|
|
|
topic = exchange
|
|
|
|
|
}
|
|
|
|
|
md.Set(metadata.HeaderTopic, topic)
|
|
|
|
|
iter := p.Metadata().Iterator()
|
|
|
|
|
var k, v string
|
|
|
|
|
for iter.Next(&k, &v) {
|
|
|
|
|
md.Set(k, v)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// passed in raw data
|
|
|
|
|
if d, ok := p.Payload().(*codec.Frame); ok {
|
|
|
|
|
body = d.Data
|
|
|
|
|
} else {
|
|
|
|
|
// use codec for payload
|
|
|
|
|
cf, err := c.newCodec(p.ContentType())
|
|
|
|
|
cf, err := h.newCodec(p.ContentType())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.InternalServerError("go.micro.client", err.Error())
|
|
|
|
|
}
|
|
|
|
@@ -738,31 +661,40 @@ func (c *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|
|
|
|
body = b
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
topic := p.Topic()
|
|
|
|
|
if len(exchange) > 0 {
|
|
|
|
|
topic = exchange
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for k, v := range p.Metadata() {
|
|
|
|
|
md.Set(k, v)
|
|
|
|
|
}
|
|
|
|
|
md.Set(metadata.HeaderTopic, topic)
|
|
|
|
|
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return c.opts.Broker.BatchPublish(ctx, msgs,
|
|
|
|
|
return h.opts.Broker.BatchPublish(ctx, msgs,
|
|
|
|
|
broker.PublishContext(ctx),
|
|
|
|
|
broker.PublishBodyOnly(options.BodyOnly),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) String() string {
|
|
|
|
|
func (h *httpClient) String() string {
|
|
|
|
|
return "http"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *httpClient) Name() string {
|
|
|
|
|
return c.opts.Name
|
|
|
|
|
func (h *httpClient) Name() string {
|
|
|
|
|
return h.opts.Name
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewClient(opts ...client.Option) *httpClient {
|
|
|
|
|
func NewClient(opts ...client.Option) client.Client {
|
|
|
|
|
options := client.NewOptions(opts...)
|
|
|
|
|
|
|
|
|
|
if len(options.ContentType) == 0 {
|
|
|
|
|
options.ContentType = DefaultContentType
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c := &httpClient{
|
|
|
|
|
rc := &httpClient{
|
|
|
|
|
opts: options,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -785,7 +717,7 @@ func NewClient(opts ...client.Option) *httpClient {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if httpcli, ok := options.Context.Value(httpClientKey{}).(*http.Client); ok {
|
|
|
|
|
c.httpcli = httpcli
|
|
|
|
|
rc.httpcli = httpcli
|
|
|
|
|
} else {
|
|
|
|
|
// TODO customTransport := http.DefaultTransport.(*http.Transport).Clone()
|
|
|
|
|
tr := &http.Transport{
|
|
|
|
@@ -801,13 +733,14 @@ func NewClient(opts ...client.Option) *httpClient {
|
|
|
|
|
ExpectContinueTimeout: 1 * time.Second,
|
|
|
|
|
TLSClientConfig: options.TLSConfig,
|
|
|
|
|
}
|
|
|
|
|
c.httpcli = &http.Client{Transport: tr}
|
|
|
|
|
rc.httpcli = &http.Client{Transport: tr}
|
|
|
|
|
}
|
|
|
|
|
c := client.Client(rc)
|
|
|
|
|
|
|
|
|
|
c.funcCall = c.fnCall
|
|
|
|
|
c.funcStream = c.fnStream
|
|
|
|
|
c.funcPublish = c.fnPublish
|
|
|
|
|
c.funcBatchPublish = c.fnBatchPublish
|
|
|
|
|
// wrap in reverse
|
|
|
|
|
for i := len(options.Wrappers); i > 0; i-- {
|
|
|
|
|
c = options.Wrappers[i-1](c)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return c
|
|
|
|
|
}
|
|
|
|
|