add metrics and tracing
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
a648bf77f4
commit
d4a07099a2
9
go.mod
9
go.mod
@ -2,4 +2,11 @@ module go.unistack.org/micro-client-http/v3
|
|||||||
|
|
||||||
go 1.18
|
go 1.18
|
||||||
|
|
||||||
require go.unistack.org/micro/v3 v3.10.28
|
require go.unistack.org/micro/v3 v3.10.64
|
||||||
|
|
||||||
|
require (
|
||||||
|
golang.org/x/sys v0.19.0 // indirect
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
|
||||||
|
google.golang.org/grpc v1.63.2 // indirect
|
||||||
|
google.golang.org/protobuf v1.33.0 // indirect
|
||||||
|
)
|
||||||
|
15
go.sum
15
go.sum
@ -1,2 +1,13 @@
|
|||||||
go.unistack.org/micro/v3 v3.10.28 h1:/87lGekrmi0/66pioy+Nh8lVUBBYnVqKoHiNYX5OmMI=
|
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||||
go.unistack.org/micro/v3 v3.10.28/go.mod h1:eUgtvbtiiz6te93m0ZdmoecbitWwjdBmmr84srmEIKA=
|
go.unistack.org/micro/v3 v3.10.64 h1:hTRW/4krLjdGb1gDwN8UDub0RmUqrn/oYclL4RXZTOw=
|
||||||
|
go.unistack.org/micro/v3 v3.10.64/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
|
||||||
|
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
|
||||||
|
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
|
||||||
|
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
|
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
|
||||||
|
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
|
||||||
|
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
|
||||||
|
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
|
||||||
|
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||||
|
231
http.go
231
http.go
@ -11,6 +11,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -21,7 +22,10 @@ import (
|
|||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v3/errors"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v3/selector"
|
||||||
|
"go.unistack.org/micro/v3/semconv"
|
||||||
|
"go.unistack.org/micro/v3/tracer"
|
||||||
rutil "go.unistack.org/micro/v3/util/reflect"
|
rutil "go.unistack.org/micro/v3/util/reflect"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -35,8 +39,12 @@ func filterLabel(r []router.Route) []router.Route {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
type httpClient struct {
|
type httpClient struct {
|
||||||
httpcli *http.Client
|
funcPublish client.FuncPublish
|
||||||
opts client.Options
|
funcBatchPublish client.FuncBatchPublish
|
||||||
|
funcCall client.FuncCall
|
||||||
|
funcStream client.FuncStream
|
||||||
|
httpcli *http.Client
|
||||||
|
opts client.Options
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
init bool
|
init bool
|
||||||
}
|
}
|
||||||
@ -223,23 +231,23 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client.
|
|||||||
return hreq, nil
|
return hreq, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
func (c *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||||
ct := req.ContentType()
|
ct := req.ContentType()
|
||||||
if len(opts.ContentType) > 0 {
|
if len(opts.ContentType) > 0 {
|
||||||
ct = opts.ContentType
|
ct = opts.ContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
cf, err := h.newCodec(ct)
|
cf, err := c.newCodec(ct)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.BadRequest("go.micro.client", err.Error())
|
return errors.BadRequest("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
hreq, err := newRequest(ctx, h.opts.Logger, addr, req, ct, cf, req.Body(), opts)
|
hreq, err := newRequest(ctx, c.opts.Logger, addr, req, ct, cf, req.Body(), opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// make the request
|
// make the request
|
||||||
hrsp, err := h.httpcli.Do(hreq)
|
hrsp, err := c.httpcli.Do(hreq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err := err.(type) {
|
switch err := err.(type) {
|
||||||
case *url.Error:
|
case *url.Error:
|
||||||
@ -256,29 +264,29 @@ func (h *httpClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
|
|
||||||
defer hrsp.Body.Close()
|
defer hrsp.Body.Close()
|
||||||
|
|
||||||
return h.parseRsp(ctx, hrsp, rsp, opts)
|
return c.parseRsp(ctx, hrsp, rsp, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) {
|
func (c *httpClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) {
|
||||||
ct := req.ContentType()
|
ct := req.ContentType()
|
||||||
if len(opts.ContentType) > 0 {
|
if len(opts.ContentType) > 0 {
|
||||||
ct = opts.ContentType
|
ct = opts.ContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
// get codec
|
// get codec
|
||||||
cf, err := h.newCodec(ct)
|
cf, err := c.newCodec(ct)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.BadRequest("go.micro.client", err.Error())
|
return nil, errors.BadRequest("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := (h.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr)
|
cc, err := (c.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err))
|
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &httpStream{
|
return &httpStream{
|
||||||
address: addr,
|
address: addr,
|
||||||
logger: h.opts.Logger,
|
logger: c.opts.Logger,
|
||||||
context: ctx,
|
context: ctx,
|
||||||
closed: make(chan bool),
|
closed: make(chan bool),
|
||||||
opts: opts,
|
opts: opts,
|
||||||
@ -290,66 +298,88 @@ func (h *httpClient) stream(ctx context.Context, addr string, req client.Request
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) newCodec(ct string) (codec.Codec, error) {
|
func (c *httpClient) newCodec(ct string) (codec.Codec, error) {
|
||||||
h.RLock()
|
c.RLock()
|
||||||
defer h.RUnlock()
|
|
||||||
|
|
||||||
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
||||||
ct = ct[:idx]
|
ct = ct[:idx]
|
||||||
}
|
}
|
||||||
|
|
||||||
if c, ok := h.opts.Codecs[ct]; ok {
|
if cf, ok := c.opts.Codecs[ct]; ok {
|
||||||
return c, nil
|
c.RUnlock()
|
||||||
|
return cf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.RUnlock()
|
||||||
return nil, codec.ErrUnknownContentType
|
return nil, codec.ErrUnknownContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) Init(opts ...client.Option) error {
|
func (c *httpClient) Init(opts ...client.Option) error {
|
||||||
if len(opts) == 0 && h.init {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&h.opts)
|
o(&c.opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := h.opts.Broker.Init(); err != nil {
|
c.funcCall = c.fnCall
|
||||||
return err
|
c.funcStream = c.fnStream
|
||||||
}
|
c.funcPublish = c.fnPublish
|
||||||
if err := h.opts.Tracer.Init(); err != nil {
|
c.funcBatchPublish = c.fnBatchPublish
|
||||||
return err
|
|
||||||
}
|
c.opts.Hooks.EachNext(func(hook options.Hook) {
|
||||||
if err := h.opts.Router.Init(); err != nil {
|
switch h := hook.(type) {
|
||||||
return err
|
case client.HookCall:
|
||||||
}
|
c.funcCall = h(c.funcCall)
|
||||||
if err := h.opts.Logger.Init(); err != nil {
|
case client.HookStream:
|
||||||
return err
|
c.funcStream = h(c.funcStream)
|
||||||
}
|
case client.HookPublish:
|
||||||
if err := h.opts.Meter.Init(); err != nil {
|
c.funcPublish = h(c.funcPublish)
|
||||||
return err
|
case client.HookBatchPublish:
|
||||||
}
|
c.funcBatchPublish = h(c.funcBatchPublish)
|
||||||
if err := h.opts.Transport.Init(); err != nil {
|
}
|
||||||
return err
|
})
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) Options() client.Options {
|
func (c *httpClient) Options() client.Options {
|
||||||
return h.opts
|
return c.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
func (c *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
||||||
return newHTTPMessage(topic, msg, h.opts.ContentType, opts...)
|
return newHTTPMessage(topic, msg, c.opts.ContentType, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request {
|
func (c *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request {
|
||||||
return newHTTPRequest(service, method, req, h.opts.ContentType, opts...)
|
return newHTTPRequest(service, method, req, c.opts.ContentType, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
func (c *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
|
ts := time.Now()
|
||||||
|
c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
||||||
|
var sp tracer.Span
|
||||||
|
ctx, sp = c.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
||||||
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
|
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
||||||
|
)
|
||||||
|
err := c.funcCall(ctx, req, rsp, opts...)
|
||||||
|
c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
||||||
|
te := time.Since(ts)
|
||||||
|
c.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||||
|
c.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||||
|
|
||||||
|
if me := errors.FromError(err); me == nil {
|
||||||
|
sp.Finish()
|
||||||
|
c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
|
||||||
|
} else {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
|
c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
// make a copy of call opts
|
// make a copy of call opts
|
||||||
callOpts := h.opts.CallOptions
|
callOpts := c.opts.CallOptions
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
opt(&callOpts)
|
opt(&callOpts)
|
||||||
}
|
}
|
||||||
@ -376,26 +406,21 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
// make copy of call method
|
// make copy of call method
|
||||||
hcall := h.call
|
hcall := c.call
|
||||||
|
|
||||||
// wrap the call in reverse
|
|
||||||
for i := len(callOpts.CallWrappers); i > 0; i-- {
|
|
||||||
hcall = callOpts.CallWrappers[i-1](hcall)
|
|
||||||
}
|
|
||||||
|
|
||||||
// use the router passed as a call option, or fallback to the rpc clients router
|
// use the router passed as a call option, or fallback to the rpc clients router
|
||||||
if callOpts.Router == nil {
|
if callOpts.Router == nil {
|
||||||
callOpts.Router = h.opts.Router
|
callOpts.Router = c.opts.Router
|
||||||
}
|
}
|
||||||
|
|
||||||
if callOpts.Selector == nil {
|
if callOpts.Selector == nil {
|
||||||
callOpts.Selector = h.opts.Selector
|
callOpts.Selector = c.opts.Selector
|
||||||
}
|
}
|
||||||
|
|
||||||
// inject proxy address
|
// inject proxy address
|
||||||
// TODO: don't even bother using Lookup/Select in this case
|
// TODO: don't even bother using Lookup/Select in this case
|
||||||
if len(h.opts.Proxy) > 0 {
|
if len(c.opts.Proxy) > 0 {
|
||||||
callOpts.Address = []string{h.opts.Proxy}
|
callOpts.Address = []string{c.opts.Proxy}
|
||||||
}
|
}
|
||||||
|
|
||||||
var next selector.Next
|
var next selector.Next
|
||||||
@ -417,7 +442,7 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
var routes []string
|
var routes []string
|
||||||
// lookup the route to send the reques to
|
// lookup the route to send the reques to
|
||||||
// TODO apply any filtering here
|
// TODO apply any filtering here
|
||||||
routes, err = h.opts.Lookup(ctx, req, callOpts)
|
routes, err = c.opts.Lookup(ctx, req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
@ -434,7 +459,7 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
// make the call
|
// make the call
|
||||||
err = hcall(ctx, node, req, rsp, callOpts)
|
err = hcall(ctx, node, req, rsp, callOpts)
|
||||||
// record the result of the call to inform future routing decisions
|
// record the result of the call to inform future routing decisions
|
||||||
if verr := h.opts.Selector.Record(node, err); verr != nil {
|
if verr := c.opts.Selector.Record(node, err); verr != nil {
|
||||||
return verr
|
return verr
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -479,11 +504,36 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
return gerr
|
return gerr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
func (c *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
|
ts := time.Now()
|
||||||
|
c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
||||||
|
var sp tracer.Span
|
||||||
|
ctx, sp = c.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
||||||
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
|
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
||||||
|
)
|
||||||
|
stream, err := c.funcStream(ctx, req, opts...)
|
||||||
|
c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
|
||||||
|
te := time.Since(ts)
|
||||||
|
c.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||||
|
c.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
|
||||||
|
|
||||||
|
if me := errors.FromError(err); me == nil {
|
||||||
|
sp.Finish()
|
||||||
|
c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
|
||||||
|
} else {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
|
c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// make a copy of call opts
|
// make a copy of call opts
|
||||||
callOpts := h.opts.CallOptions
|
callOpts := c.opts.CallOptions
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&callOpts)
|
o(&callOpts)
|
||||||
}
|
}
|
||||||
@ -520,17 +570,17 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
|
|
||||||
// use the router passed as a call option, or fallback to the rpc clients router
|
// use the router passed as a call option, or fallback to the rpc clients router
|
||||||
if callOpts.Router == nil {
|
if callOpts.Router == nil {
|
||||||
callOpts.Router = h.opts.Router
|
callOpts.Router = c.opts.Router
|
||||||
}
|
}
|
||||||
|
|
||||||
if callOpts.Selector == nil {
|
if callOpts.Selector == nil {
|
||||||
callOpts.Selector = h.opts.Selector
|
callOpts.Selector = c.opts.Selector
|
||||||
}
|
}
|
||||||
|
|
||||||
// inject proxy address
|
// inject proxy address
|
||||||
// TODO: don't even bother using Lookup/Select in this case
|
// TODO: don't even bother using Lookup/Select in this case
|
||||||
if len(h.opts.Proxy) > 0 {
|
if len(c.opts.Proxy) > 0 {
|
||||||
callOpts.Address = []string{h.opts.Proxy}
|
callOpts.Address = []string{c.opts.Proxy}
|
||||||
}
|
}
|
||||||
|
|
||||||
var next selector.Next
|
var next selector.Next
|
||||||
@ -551,7 +601,7 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
var routes []string
|
var routes []string
|
||||||
// lookup the route to send the reques to
|
// lookup the route to send the reques to
|
||||||
// TODO apply any filtering here
|
// TODO apply any filtering here
|
||||||
routes, err = h.opts.Lookup(ctx, req, callOpts)
|
routes, err = c.opts.Lookup(ctx, req, callOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
@ -565,10 +615,10 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
|
|
||||||
node := next()
|
node := next()
|
||||||
|
|
||||||
stream, cerr := h.stream(ctx, node, req, callOpts)
|
stream, cerr := c.stream(ctx, node, req, callOpts)
|
||||||
|
|
||||||
// record the result of the call to inform future routing decisions
|
// record the result of the call to inform future routing decisions
|
||||||
if verr := h.opts.Selector.Record(node, cerr); verr != nil {
|
if verr := c.opts.Selector.Record(node, cerr); verr != nil {
|
||||||
return nil, verr
|
return nil, verr
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -619,15 +669,23 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
return nil, grr
|
return nil, grr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error {
|
func (c *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error {
|
||||||
return h.publish(ctx, p, opts...)
|
return c.funcBatchPublish(ctx, p, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
func (c *httpClient) fnBatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error {
|
||||||
return h.publish(ctx, []client.Message{p}, opts...)
|
return c.publish(ctx, p, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
func (c *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
|
return c.funcPublish(ctx, p, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
|
return c.publish(ctx, []client.Message{p}, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
||||||
var body []byte
|
var body []byte
|
||||||
|
|
||||||
options := client.NewPublishOptions(opts...)
|
options := client.NewPublishOptions(opts...)
|
||||||
@ -668,7 +726,7 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|||||||
body = d.Data
|
body = d.Data
|
||||||
} else {
|
} else {
|
||||||
// use codec for payload
|
// use codec for payload
|
||||||
cf, err := h.newCodec(p.ContentType())
|
cf, err := c.newCodec(p.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
@ -683,28 +741,28 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c
|
|||||||
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
||||||
}
|
}
|
||||||
|
|
||||||
return h.opts.Broker.BatchPublish(ctx, msgs,
|
return c.opts.Broker.BatchPublish(ctx, msgs,
|
||||||
broker.PublishContext(ctx),
|
broker.PublishContext(ctx),
|
||||||
broker.PublishBodyOnly(options.BodyOnly),
|
broker.PublishBodyOnly(options.BodyOnly),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) String() string {
|
func (c *httpClient) String() string {
|
||||||
return "http"
|
return "http"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) Name() string {
|
func (c *httpClient) Name() string {
|
||||||
return h.opts.Name
|
return c.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(opts ...client.Option) client.Client {
|
func NewClient(opts ...client.Option) *httpClient {
|
||||||
options := client.NewOptions(opts...)
|
options := client.NewOptions(opts...)
|
||||||
|
|
||||||
if len(options.ContentType) == 0 {
|
if len(options.ContentType) == 0 {
|
||||||
options.ContentType = DefaultContentType
|
options.ContentType = DefaultContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
rc := &httpClient{
|
c := &httpClient{
|
||||||
opts: options,
|
opts: options,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -727,7 +785,7 @@ func NewClient(opts ...client.Option) client.Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if httpcli, ok := options.Context.Value(httpClientKey{}).(*http.Client); ok {
|
if httpcli, ok := options.Context.Value(httpClientKey{}).(*http.Client); ok {
|
||||||
rc.httpcli = httpcli
|
c.httpcli = httpcli
|
||||||
} else {
|
} else {
|
||||||
// TODO customTransport := http.DefaultTransport.(*http.Transport).Clone()
|
// TODO customTransport := http.DefaultTransport.(*http.Transport).Clone()
|
||||||
tr := &http.Transport{
|
tr := &http.Transport{
|
||||||
@ -743,14 +801,13 @@ func NewClient(opts ...client.Option) client.Client {
|
|||||||
ExpectContinueTimeout: 1 * time.Second,
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
TLSClientConfig: options.TLSConfig,
|
TLSClientConfig: options.TLSConfig,
|
||||||
}
|
}
|
||||||
rc.httpcli = &http.Client{Transport: tr}
|
c.httpcli = &http.Client{Transport: tr}
|
||||||
}
|
}
|
||||||
c := client.Client(rc)
|
|
||||||
|
|
||||||
// wrap in reverse
|
c.funcCall = c.fnCall
|
||||||
for i := len(options.Wrappers); i > 0; i-- {
|
c.funcStream = c.fnStream
|
||||||
c = options.Wrappers[i-1](c)
|
c.funcPublish = c.fnPublish
|
||||||
}
|
c.funcBatchPublish = c.fnBatchPublish
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user