289 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			289 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package http
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	"net/url"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"go.unistack.org/micro/v4/client"
 | 
						|
	"go.unistack.org/micro/v4/codec"
 | 
						|
	"go.unistack.org/micro/v4/errors"
 | 
						|
	"go.unistack.org/micro/v4/logger"
 | 
						|
	"go.unistack.org/micro/v4/metadata"
 | 
						|
	"go.unistack.org/micro/v4/selector"
 | 
						|
 | 
						|
	"go.unistack.org/micro-client-http/v4/status"
 | 
						|
)
 | 
						|
 | 
						|
func (c *Client) fnCall(ctx context.Context, req client.Request, rsp any, opts ...client.CallOption) error {
 | 
						|
	// make a copy of call opts
 | 
						|
	callOpts := c.opts.CallOptions
 | 
						|
	for _, opt := range opts {
 | 
						|
		opt(&callOpts)
 | 
						|
	}
 | 
						|
 | 
						|
	// check if we already have a deadline
 | 
						|
	d, ok := ctx.Deadline()
 | 
						|
	if !ok {
 | 
						|
		var cancel context.CancelFunc
 | 
						|
		// no deadline so we create a new one
 | 
						|
		ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout)
 | 
						|
		defer cancel()
 | 
						|
	} else {
 | 
						|
		// got a deadline so no need to setup context,
 | 
						|
		// but we need to set the timeout we pass along
 | 
						|
		opt := client.WithRequestTimeout(time.Until(d))
 | 
						|
		opt(&callOpts)
 | 
						|
	}
 | 
						|
 | 
						|
	// should we noop right here?
 | 
						|
	select {
 | 
						|
	case <-ctx.Done():
 | 
						|
		return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
 | 
						|
	default:
 | 
						|
	}
 | 
						|
 | 
						|
	// make copy of call method
 | 
						|
	hcall := c.call
 | 
						|
 | 
						|
	// use the router passed as a call option, or fallback to the rpc clients router
 | 
						|
	if callOpts.Router == nil {
 | 
						|
		callOpts.Router = c.opts.Router
 | 
						|
	}
 | 
						|
 | 
						|
	if callOpts.Selector == nil {
 | 
						|
		callOpts.Selector = c.opts.Selector
 | 
						|
	}
 | 
						|
 | 
						|
	// inject proxy address
 | 
						|
	// TODO: don't even bother using Lookup/Select in this case
 | 
						|
	if len(c.opts.Proxy) > 0 {
 | 
						|
		callOpts.Address = []string{c.opts.Proxy}
 | 
						|
	}
 | 
						|
 | 
						|
	var next selector.Next
 | 
						|
 | 
						|
	call := func(i int) error {
 | 
						|
		// call backoff first. Someone may want an initial start delay
 | 
						|
		t, err := callOpts.Backoff(ctx, req, i)
 | 
						|
		if err != nil {
 | 
						|
			return errors.InternalServerError("go.micro.client", "%+v", err)
 | 
						|
		}
 | 
						|
 | 
						|
		// only sleep if greater than 0
 | 
						|
		if t.Seconds() > 0 {
 | 
						|
			time.Sleep(t)
 | 
						|
		}
 | 
						|
 | 
						|
		if next == nil {
 | 
						|
			var routes []string
 | 
						|
			// lookup the route to send the reques to
 | 
						|
			// TODO apply any filtering here
 | 
						|
			routes, err = c.opts.Lookup(ctx, req, callOpts)
 | 
						|
			if err != nil {
 | 
						|
				return errors.InternalServerError("go.micro.client", "%+v", err)
 | 
						|
			}
 | 
						|
 | 
						|
			// balance the list of nodes
 | 
						|
			next, err = callOpts.Selector.Select(routes)
 | 
						|
			if err != nil {
 | 
						|
				return errors.InternalServerError("go.micro.client", "%+v", err)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		node := next()
 | 
						|
 | 
						|
		// make the call
 | 
						|
		err = hcall(ctx, node, req, rsp, callOpts)
 | 
						|
 | 
						|
		// record the result of the call to inform future routing decisions
 | 
						|
		if verr := c.opts.Selector.Record(node, err); verr != nil {
 | 
						|
			return errors.InternalServerError("go.micro.client", "%+v", verr)
 | 
						|
		}
 | 
						|
 | 
						|
		// try and transform the error to micro error
 | 
						|
		if verr, ok := err.(*errors.Error); ok {
 | 
						|
			return verr
 | 
						|
		}
 | 
						|
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	ch := make(chan error, callOpts.Retries)
 | 
						|
	var gerr error
 | 
						|
 | 
						|
	for i := 0; i <= callOpts.Retries; i++ {
 | 
						|
		go func() {
 | 
						|
			ch <- call(i)
 | 
						|
		}()
 | 
						|
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
			return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
 | 
						|
		case err := <-ch:
 | 
						|
			// if the call succeeded lets bail early
 | 
						|
			if err == nil {
 | 
						|
				return nil
 | 
						|
			}
 | 
						|
 | 
						|
			retry, rerr := callOpts.Retry(ctx, req, i, err)
 | 
						|
			if rerr != nil {
 | 
						|
				return rerr
 | 
						|
			}
 | 
						|
 | 
						|
			if !retry {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
 | 
						|
			gerr = err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return gerr
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) call(ctx context.Context, addr string, req client.Request, rsp any, opts client.CallOptions) error {
 | 
						|
	ct := req.ContentType()
 | 
						|
	if len(opts.ContentType) > 0 {
 | 
						|
		ct = opts.ContentType
 | 
						|
	}
 | 
						|
 | 
						|
	cf, err := c.newCodec(ct)
 | 
						|
	if err != nil {
 | 
						|
		return errors.BadRequest("go.micro.client", "%+v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	hreq, err := buildHTTPRequest(ctx, addr, req.Endpoint(), ct, cf, req.Body(), opts, c.opts.Logger)
 | 
						|
	if err != nil {
 | 
						|
		return errors.BadRequest("go.micro.client", "%+v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	hrsp, err := c.httpClient.Do(hreq)
 | 
						|
	if err != nil {
 | 
						|
		switch err := err.(type) {
 | 
						|
		case *url.Error:
 | 
						|
			if err, ok := err.Err.(net.Error); ok && err.Timeout() {
 | 
						|
				return errors.Timeout("go.micro.client", "%+v", err)
 | 
						|
			}
 | 
						|
		case net.Error:
 | 
						|
			if err.Timeout() {
 | 
						|
				return errors.Timeout("go.micro.client", "%+v", err)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return errors.InternalServerError("go.micro.client", "%+v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	defer hrsp.Body.Close()
 | 
						|
 | 
						|
	return c.parseRsp(ctx, hrsp, rsp, opts)
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) newCodec(ct string) (codec.Codec, error) {
 | 
						|
	c.mu.RLock()
 | 
						|
	defer c.mu.RUnlock()
 | 
						|
 | 
						|
	if idx := strings.IndexRune(ct, ';'); idx >= 0 {
 | 
						|
		ct = ct[:idx]
 | 
						|
	}
 | 
						|
 | 
						|
	if cf, ok := c.opts.Codecs[ct]; ok {
 | 
						|
		return cf, nil
 | 
						|
	}
 | 
						|
 | 
						|
	return nil, codec.ErrUnknownContentType
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) parseRsp(ctx context.Context, hrsp *http.Response, rsp any, opts client.CallOptions) error {
 | 
						|
	log := c.opts.Logger
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-ctx.Done():
 | 
						|
		return ctx.Err()
 | 
						|
	default:
 | 
						|
	}
 | 
						|
 | 
						|
	if opts.ResponseMetadata != nil {
 | 
						|
		for k, v := range hrsp.Header {
 | 
						|
			opts.ResponseMetadata.Append(k, v...)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if hrsp.StatusCode == http.StatusNoContent {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	ct := DefaultContentType
 | 
						|
	if htype := hrsp.Header.Get(metadata.HeaderContentType); htype != "" {
 | 
						|
		ct = htype
 | 
						|
	}
 | 
						|
 | 
						|
	var buf []byte
 | 
						|
 | 
						|
	if hrsp.Body != nil {
 | 
						|
		var err error
 | 
						|
		buf, err = io.ReadAll(hrsp.Body)
 | 
						|
		if err != nil {
 | 
						|
			return errors.InternalServerError("go.micro.client", "read body: %v", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if log.V(logger.DebugLevel) {
 | 
						|
		if shouldLogBody(ct) {
 | 
						|
			log.Debug(
 | 
						|
				ctx,
 | 
						|
				fmt.Sprintf(
 | 
						|
					"micro.client http response: status=%s headers=%v body=%s",
 | 
						|
					hrsp.Status, hrsp.Header, buf,
 | 
						|
				),
 | 
						|
			)
 | 
						|
		} else {
 | 
						|
			log.Debug(
 | 
						|
				ctx,
 | 
						|
				fmt.Sprintf(
 | 
						|
					"micro.client http response: status=%s headers=%v",
 | 
						|
					hrsp.Status, hrsp.Header,
 | 
						|
				),
 | 
						|
			)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	cf, err := c.newCodec(ct)
 | 
						|
	if err != nil {
 | 
						|
		return errors.InternalServerError("go.micro.client", "unknown content-type %s: %v", ct, err)
 | 
						|
	}
 | 
						|
 | 
						|
	if hrsp.StatusCode < http.StatusBadRequest {
 | 
						|
		if err = cf.Unmarshal(buf, rsp); err != nil {
 | 
						|
			return errors.InternalServerError("go.micro.client", "unmarshal response: %v", err)
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	s := status.New(hrsp.StatusCode)
 | 
						|
 | 
						|
	var mappedErr any
 | 
						|
 | 
						|
	errMap, ok := errorMapFromOpts(opts)
 | 
						|
	if ok && errMap != nil {
 | 
						|
		mappedErr, ok = errMap[fmt.Sprintf("%d", hrsp.StatusCode)]
 | 
						|
		if !ok {
 | 
						|
			mappedErr, ok = errMap["default"]
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if !ok || mappedErr == nil {
 | 
						|
		return s.Err()
 | 
						|
	}
 | 
						|
 | 
						|
	if err = cf.Unmarshal(buf, mappedErr); err != nil {
 | 
						|
		return errors.InternalServerError("go.micro.client", "unmarshal response: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	return s.WithDetails(mappedErr).Err()
 | 
						|
}
 |