diff --git a/go.mod b/go.mod index 2c64390..6e0478c 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,11 @@ module go.unistack.org/micro-client-http/v3 go 1.18 -require go.unistack.org/micro/v3 v3.10.28 +require go.unistack.org/micro/v3 v3.10.64 + +require ( + golang.org/x/sys v0.19.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect + google.golang.org/grpc v1.63.2 // indirect + google.golang.org/protobuf v1.33.0 // indirect +) diff --git a/go.sum b/go.sum index 7e2c165..3a2248e 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,13 @@ -go.unistack.org/micro/v3 v3.10.28 h1:/87lGekrmi0/66pioy+Nh8lVUBBYnVqKoHiNYX5OmMI= -go.unistack.org/micro/v3 v3.10.28/go.mod h1:eUgtvbtiiz6te93m0ZdmoecbitWwjdBmmr84srmEIKA= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +go.unistack.org/micro/v3 v3.10.64 h1:hTRW/4krLjdGb1gDwN8UDub0RmUqrn/oYclL4RXZTOw= +go.unistack.org/micro/v3 v3.10.64/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= diff --git a/http.go b/http.go index 2b7c0f3..29ff3fa 100644 --- a/http.go +++ b/http.go @@ -11,6 +11,7 @@ import ( "net/http" "net/url" "os" + "strconv" "strings" "sync" "time" @@ -21,7 +22,10 @@ import ( "go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/selector" + "go.unistack.org/micro/v3/semconv" + "go.unistack.org/micro/v3/tracer" rutil "go.unistack.org/micro/v3/util/reflect" ) @@ -35,8 +39,12 @@ func filterLabel(r []router.Route) []router.Route { */ type httpClient struct { - httpcli *http.Client - opts client.Options + funcPublish client.FuncPublish + funcBatchPublish client.FuncBatchPublish + funcCall client.FuncCall + funcStream client.FuncStream + httpcli *http.Client + opts client.Options sync.RWMutex init bool } @@ -223,23 +231,23 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client. return hreq, nil } -func (h *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { +func (c *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { ct := req.ContentType() if len(opts.ContentType) > 0 { ct = opts.ContentType } - cf, err := h.newCodec(ct) + cf, err := c.newCodec(ct) if err != nil { return errors.BadRequest("go.micro.client", err.Error()) } - hreq, err := newRequest(ctx, h.opts.Logger, addr, req, ct, cf, req.Body(), opts) + hreq, err := newRequest(ctx, c.opts.Logger, addr, req, ct, cf, req.Body(), opts) if err != nil { return err } // make the request - hrsp, err := h.httpcli.Do(hreq) + hrsp, err := c.httpcli.Do(hreq) if err != nil { switch err := err.(type) { case *url.Error: @@ -256,29 +264,29 @@ func (h *httpClient) call(ctx context.Context, addr string, req client.Request, defer hrsp.Body.Close() - return h.parseRsp(ctx, hrsp, rsp, opts) + return c.parseRsp(ctx, hrsp, rsp, opts) } -func (h *httpClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) { +func (c *httpClient) stream(ctx context.Context, addr string, req client.Request, opts client.CallOptions) (client.Stream, error) { ct := req.ContentType() if len(opts.ContentType) > 0 { ct = opts.ContentType } // get codec - cf, err := h.newCodec(ct) + cf, err := c.newCodec(ct) if err != nil { return nil, errors.BadRequest("go.micro.client", err.Error()) } - cc, err := (h.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr) + cc, err := (c.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err)) } return &httpStream{ address: addr, - logger: h.opts.Logger, + logger: c.opts.Logger, context: ctx, closed: make(chan bool), opts: opts, @@ -290,66 +298,88 @@ func (h *httpClient) stream(ctx context.Context, addr string, req client.Request }, nil } -func (h *httpClient) newCodec(ct string) (codec.Codec, error) { - h.RLock() - defer h.RUnlock() +func (c *httpClient) newCodec(ct string) (codec.Codec, error) { + c.RLock() if idx := strings.IndexRune(ct, ';'); idx >= 0 { ct = ct[:idx] } - if c, ok := h.opts.Codecs[ct]; ok { - return c, nil + if cf, ok := c.opts.Codecs[ct]; ok { + c.RUnlock() + return cf, nil } + c.RUnlock() return nil, codec.ErrUnknownContentType } -func (h *httpClient) Init(opts ...client.Option) error { - if len(opts) == 0 && h.init { - return nil - } +func (c *httpClient) Init(opts ...client.Option) error { for _, o := range opts { - o(&h.opts) + o(&c.opts) } - if err := h.opts.Broker.Init(); err != nil { - return err - } - if err := h.opts.Tracer.Init(); err != nil { - return err - } - if err := h.opts.Router.Init(); err != nil { - return err - } - if err := h.opts.Logger.Init(); err != nil { - return err - } - if err := h.opts.Meter.Init(); err != nil { - return err - } - if err := h.opts.Transport.Init(); err != nil { - return err - } + c.funcCall = c.fnCall + c.funcStream = c.fnStream + c.funcPublish = c.fnPublish + c.funcBatchPublish = c.fnBatchPublish + + c.opts.Hooks.EachNext(func(hook options.Hook) { + switch h := hook.(type) { + case client.HookCall: + c.funcCall = h(c.funcCall) + case client.HookStream: + c.funcStream = h(c.funcStream) + case client.HookPublish: + c.funcPublish = h(c.funcPublish) + case client.HookBatchPublish: + c.funcBatchPublish = h(c.funcBatchPublish) + } + }) return nil } -func (h *httpClient) Options() client.Options { - return h.opts +func (c *httpClient) Options() client.Options { + return c.opts } -func (h *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { - return newHTTPMessage(topic, msg, h.opts.ContentType, opts...) +func (c *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { + return newHTTPMessage(topic, msg, c.opts.ContentType, opts...) } -func (h *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request { - return newHTTPRequest(service, method, req, h.opts.ContentType, opts...) +func (c *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request { + return newHTTPRequest(service, method, req, c.opts.ContentType, opts...) } -func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { +func (c *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + ts := time.Now() + c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() + var sp tracer.Span + ctx, sp = c.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels("endpoint", req.Endpoint()), + ) + err := c.funcCall(ctx, req, rsp, opts...) + c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec() + te := time.Since(ts) + c.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds()) + c.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds()) + + if me := errors.FromError(err); me == nil { + sp.Finish() + c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc() + } else { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc() + } + + return err +} + +func (c *httpClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { // make a copy of call opts - callOpts := h.opts.CallOptions + callOpts := c.opts.CallOptions for _, opt := range opts { opt(&callOpts) } @@ -376,26 +406,21 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface } // make copy of call method - hcall := h.call - - // wrap the call in reverse - for i := len(callOpts.CallWrappers); i > 0; i-- { - hcall = callOpts.CallWrappers[i-1](hcall) - } + hcall := c.call // use the router passed as a call option, or fallback to the rpc clients router if callOpts.Router == nil { - callOpts.Router = h.opts.Router + callOpts.Router = c.opts.Router } if callOpts.Selector == nil { - callOpts.Selector = h.opts.Selector + callOpts.Selector = c.opts.Selector } // inject proxy address // TODO: don't even bother using Lookup/Select in this case - if len(h.opts.Proxy) > 0 { - callOpts.Address = []string{h.opts.Proxy} + if len(c.opts.Proxy) > 0 { + callOpts.Address = []string{c.opts.Proxy} } var next selector.Next @@ -417,7 +442,7 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface var routes []string // lookup the route to send the reques to // TODO apply any filtering here - routes, err = h.opts.Lookup(ctx, req, callOpts) + routes, err = c.opts.Lookup(ctx, req, callOpts) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } @@ -434,7 +459,7 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface // make the call err = hcall(ctx, node, req, rsp, callOpts) // record the result of the call to inform future routing decisions - if verr := h.opts.Selector.Record(node, err); verr != nil { + if verr := c.opts.Selector.Record(node, err); verr != nil { return verr } @@ -479,11 +504,36 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface return gerr } -func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { +func (c *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + ts := time.Now() + c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() + var sp tracer.Span + ctx, sp = c.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels("endpoint", req.Endpoint()), + ) + stream, err := c.funcStream(ctx, req, opts...) + c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec() + te := time.Since(ts) + c.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds()) + c.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds()) + + if me := errors.FromError(err); me == nil { + sp.Finish() + c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc() + } else { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc() + } + + return stream, err +} + +func (c *httpClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { var err error // make a copy of call opts - callOpts := h.opts.CallOptions + callOpts := c.opts.CallOptions for _, o := range opts { o(&callOpts) } @@ -520,17 +570,17 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli // use the router passed as a call option, or fallback to the rpc clients router if callOpts.Router == nil { - callOpts.Router = h.opts.Router + callOpts.Router = c.opts.Router } if callOpts.Selector == nil { - callOpts.Selector = h.opts.Selector + callOpts.Selector = c.opts.Selector } // inject proxy address // TODO: don't even bother using Lookup/Select in this case - if len(h.opts.Proxy) > 0 { - callOpts.Address = []string{h.opts.Proxy} + if len(c.opts.Proxy) > 0 { + callOpts.Address = []string{c.opts.Proxy} } var next selector.Next @@ -551,7 +601,7 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli var routes []string // lookup the route to send the reques to // TODO apply any filtering here - routes, err = h.opts.Lookup(ctx, req, callOpts) + routes, err = c.opts.Lookup(ctx, req, callOpts) if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } @@ -565,10 +615,10 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli node := next() - stream, cerr := h.stream(ctx, node, req, callOpts) + stream, cerr := c.stream(ctx, node, req, callOpts) // record the result of the call to inform future routing decisions - if verr := h.opts.Selector.Record(node, cerr); verr != nil { + if verr := c.opts.Selector.Record(node, cerr); verr != nil { return nil, verr } @@ -619,15 +669,23 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli return nil, grr } -func (h *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error { - return h.publish(ctx, p, opts...) +func (c *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error { + return c.funcBatchPublish(ctx, p, opts...) } -func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { - return h.publish(ctx, []client.Message{p}, opts...) +func (c *httpClient) fnBatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error { + return c.publish(ctx, p, opts...) } -func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { +func (c *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + return c.funcPublish(ctx, p, opts...) +} + +func (c *httpClient) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + return c.publish(ctx, []client.Message{p}, opts...) +} + +func (c *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { var body []byte options := client.NewPublishOptions(opts...) @@ -668,7 +726,7 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c body = d.Data } else { // use codec for payload - cf, err := h.newCodec(p.ContentType()) + cf, err := c.newCodec(p.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } @@ -683,28 +741,28 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c msgs = append(msgs, &broker.Message{Header: md, Body: body}) } - return h.opts.Broker.BatchPublish(ctx, msgs, + return c.opts.Broker.BatchPublish(ctx, msgs, broker.PublishContext(ctx), broker.PublishBodyOnly(options.BodyOnly), ) } -func (h *httpClient) String() string { +func (c *httpClient) String() string { return "http" } -func (h *httpClient) Name() string { - return h.opts.Name +func (c *httpClient) Name() string { + return c.opts.Name } -func NewClient(opts ...client.Option) client.Client { +func NewClient(opts ...client.Option) *httpClient { options := client.NewOptions(opts...) if len(options.ContentType) == 0 { options.ContentType = DefaultContentType } - rc := &httpClient{ + c := &httpClient{ opts: options, } @@ -727,7 +785,7 @@ func NewClient(opts ...client.Option) client.Client { } if httpcli, ok := options.Context.Value(httpClientKey{}).(*http.Client); ok { - rc.httpcli = httpcli + c.httpcli = httpcli } else { // TODO customTransport := http.DefaultTransport.(*http.Transport).Clone() tr := &http.Transport{ @@ -743,14 +801,13 @@ func NewClient(opts ...client.Option) client.Client { ExpectContinueTimeout: 1 * time.Second, TLSClientConfig: options.TLSConfig, } - rc.httpcli = &http.Client{Transport: tr} + c.httpcli = &http.Client{Transport: tr} } - c := client.Client(rc) - // wrap in reverse - for i := len(options.Wrappers); i > 0; i-- { - c = options.Wrappers[i-1](c) - } + c.funcCall = c.fnCall + c.funcStream = c.fnStream + c.funcPublish = c.fnPublish + c.funcBatchPublish = c.fnBatchPublish return c }