integrate request builder into HTTP client for googleapis support (#159)
This commit is contained in:
		
							
								
								
									
										272
									
								
								client_unary_call.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										272
									
								
								client_unary_call.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,272 @@ | ||||
| package http | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.unistack.org/micro/v3/client" | ||||
| 	"go.unistack.org/micro/v3/codec" | ||||
| 	"go.unistack.org/micro/v3/errors" | ||||
| 	"go.unistack.org/micro/v3/logger" | ||||
| 	"go.unistack.org/micro/v3/metadata" | ||||
| 	"go.unistack.org/micro/v3/selector" | ||||
|  | ||||
| 	"go.unistack.org/micro-client-http/v3/status" | ||||
| ) | ||||
|  | ||||
| func (c *Client) fnCall(ctx context.Context, req client.Request, rsp any, opts ...client.CallOption) error { | ||||
| 	// make a copy of call opts | ||||
| 	callOpts := c.opts.CallOptions | ||||
| 	for _, opt := range opts { | ||||
| 		opt(&callOpts) | ||||
| 	} | ||||
|  | ||||
| 	// check if we already have a deadline | ||||
| 	d, ok := ctx.Deadline() | ||||
| 	if !ok { | ||||
| 		var cancel context.CancelFunc | ||||
| 		// no deadline so we create a new one | ||||
| 		ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) | ||||
| 		defer cancel() | ||||
| 	} else { | ||||
| 		// got a deadline so no need to setup context, | ||||
| 		// but we need to set the timeout we pass along | ||||
| 		opt := client.WithRequestTimeout(time.Until(d)) | ||||
| 		opt(&callOpts) | ||||
| 	} | ||||
|  | ||||
| 	// should we noop right here? | ||||
| 	select { | ||||
| 	case <-ctx.Done(): | ||||
| 		return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) | ||||
| 	default: | ||||
| 	} | ||||
|  | ||||
| 	// make copy of call method | ||||
| 	hcall := c.call | ||||
|  | ||||
| 	// use the router passed as a call option, or fallback to the rpc clients router | ||||
| 	if callOpts.Router == nil { | ||||
| 		callOpts.Router = c.opts.Router | ||||
| 	} | ||||
|  | ||||
| 	if callOpts.Selector == nil { | ||||
| 		callOpts.Selector = c.opts.Selector | ||||
| 	} | ||||
|  | ||||
| 	// inject proxy address | ||||
| 	// TODO: don't even bother using Lookup/Select in this case | ||||
| 	if len(c.opts.Proxy) > 0 { | ||||
| 		callOpts.Address = []string{c.opts.Proxy} | ||||
| 	} | ||||
|  | ||||
| 	var next selector.Next | ||||
|  | ||||
| 	call := func(i int) error { | ||||
| 		// call backoff first. Someone may want an initial start delay | ||||
| 		t, err := callOpts.Backoff(ctx, req, i) | ||||
| 		if err != nil { | ||||
| 			return errors.InternalServerError("go.micro.client", "%+v", err) | ||||
| 		} | ||||
|  | ||||
| 		// only sleep if greater than 0 | ||||
| 		if t.Seconds() > 0 { | ||||
| 			time.Sleep(t) | ||||
| 		} | ||||
|  | ||||
| 		if next == nil { | ||||
| 			var routes []string | ||||
| 			// lookup the route to send the reques to | ||||
| 			// TODO apply any filtering here | ||||
| 			routes, err = c.opts.Lookup(ctx, req, callOpts) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", "%+v", err) | ||||
| 			} | ||||
|  | ||||
| 			// balance the list of nodes | ||||
| 			next, err = callOpts.Selector.Select(routes) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", "%+v", err) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		node := next() | ||||
|  | ||||
| 		// make the call | ||||
| 		err = hcall(ctx, node, req, rsp, callOpts) | ||||
|  | ||||
| 		// record the result of the call to inform future routing decisions | ||||
| 		if verr := c.opts.Selector.Record(node, err); verr != nil { | ||||
| 			return errors.InternalServerError("go.micro.client", "%+v", verr) | ||||
| 		} | ||||
|  | ||||
| 		// try and transform the error to micro error | ||||
| 		if verr, ok := err.(*errors.Error); ok { | ||||
| 			return verr | ||||
| 		} | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	ch := make(chan error, callOpts.Retries) | ||||
| 	var gerr error | ||||
|  | ||||
| 	for i := 0; i <= callOpts.Retries; i++ { | ||||
| 		go func() { | ||||
| 			ch <- call(i) | ||||
| 		}() | ||||
|  | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) | ||||
| 		case err := <-ch: | ||||
| 			// if the call succeeded lets bail early | ||||
| 			if err == nil { | ||||
| 				return nil | ||||
| 			} | ||||
|  | ||||
| 			retry, rerr := callOpts.Retry(ctx, req, i, err) | ||||
| 			if rerr != nil { | ||||
| 				return rerr | ||||
| 			} | ||||
|  | ||||
| 			if !retry { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			gerr = err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return gerr | ||||
| } | ||||
|  | ||||
| func (c *Client) call(ctx context.Context, addr string, req client.Request, rsp any, opts client.CallOptions) error { | ||||
| 	ct := req.ContentType() | ||||
| 	if len(opts.ContentType) > 0 { | ||||
| 		ct = opts.ContentType | ||||
| 	} | ||||
|  | ||||
| 	cf, err := c.newCodec(ct) | ||||
| 	if err != nil { | ||||
| 		return errors.BadRequest("go.micro.client", "%+v", err) | ||||
| 	} | ||||
|  | ||||
| 	hreq, err := buildHTTPRequest(ctx, addr, req.Endpoint(), ct, cf, req.Body(), opts, c.opts.Logger) | ||||
| 	if err != nil { | ||||
| 		return errors.BadRequest("go.micro.client", "%+v", err) | ||||
| 	} | ||||
|  | ||||
| 	hrsp, err := c.httpClient.Do(hreq) | ||||
| 	if err != nil { | ||||
| 		switch err := err.(type) { | ||||
| 		case *url.Error: | ||||
| 			if err, ok := err.Err.(net.Error); ok && err.Timeout() { | ||||
| 				return errors.Timeout("go.micro.client", "%+v", err) | ||||
| 			} | ||||
| 		case net.Error: | ||||
| 			if err.Timeout() { | ||||
| 				return errors.Timeout("go.micro.client", "%+v", err) | ||||
| 			} | ||||
| 		} | ||||
| 		return errors.InternalServerError("go.micro.client", "%+v", err) | ||||
| 	} | ||||
|  | ||||
| 	defer hrsp.Body.Close() | ||||
|  | ||||
| 	return c.parseRsp(ctx, hrsp, rsp, opts) | ||||
| } | ||||
|  | ||||
| func (c *Client) newCodec(ct string) (codec.Codec, error) { | ||||
| 	c.mu.RLock() | ||||
| 	defer c.mu.RUnlock() | ||||
|  | ||||
| 	if idx := strings.IndexRune(ct, ';'); idx >= 0 { | ||||
| 		ct = ct[:idx] | ||||
| 	} | ||||
|  | ||||
| 	if cf, ok := c.opts.Codecs[ct]; ok { | ||||
| 		return cf, nil | ||||
| 	} | ||||
|  | ||||
| 	return nil, codec.ErrUnknownContentType | ||||
| } | ||||
|  | ||||
| func (c *Client) parseRsp(ctx context.Context, hrsp *http.Response, rsp any, opts client.CallOptions) error { | ||||
| 	log := c.opts.Logger | ||||
|  | ||||
| 	select { | ||||
| 	case <-ctx.Done(): | ||||
| 		return ctx.Err() | ||||
| 	default: | ||||
| 	} | ||||
|  | ||||
| 	var buf []byte | ||||
|  | ||||
| 	if opts.ResponseMetadata != nil { | ||||
| 		for k, v := range hrsp.Header { | ||||
| 			opts.ResponseMetadata.Set(k, strings.Join(v, ",")) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if hrsp.StatusCode == http.StatusNoContent { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	ct := DefaultContentType | ||||
| 	if htype := hrsp.Header.Get(metadata.HeaderContentType); htype != "" { | ||||
| 		ct = htype | ||||
| 	} | ||||
|  | ||||
| 	if hrsp.Body != nil { | ||||
| 		var err error | ||||
| 		buf, err = io.ReadAll(hrsp.Body) | ||||
| 		if err != nil { | ||||
| 			return errors.InternalServerError("go.micro.client", "read body: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	cf, err := c.newCodec(ct) | ||||
| 	if err != nil { | ||||
| 		return errors.InternalServerError("go.micro.client", "unknown content-type %s: %v", ct, err) | ||||
| 	} | ||||
|  | ||||
| 	if log.V(logger.DebugLevel) { | ||||
| 		log.Debug(ctx, fmt.Sprintf("response with headers: %v and body: %s", hrsp.Header, buf)) | ||||
| 	} | ||||
|  | ||||
| 	if hrsp.StatusCode < http.StatusBadRequest { | ||||
| 		if err = cf.Unmarshal(buf, rsp); err != nil { | ||||
| 			return errors.InternalServerError("go.micro.client", "unmarshal response: %v", err) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	s := status.New(hrsp.StatusCode) | ||||
|  | ||||
| 	var mappedErr any | ||||
|  | ||||
| 	errMap, ok := errorMapFromOpts(opts) | ||||
| 	if ok && errMap != nil { | ||||
| 		mappedErr, ok = errMap[fmt.Sprintf("%d", hrsp.StatusCode)] | ||||
| 		if !ok { | ||||
| 			mappedErr, ok = errMap["default"] | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if !ok || mappedErr == nil { | ||||
| 		return s.Err() | ||||
| 	} | ||||
|  | ||||
| 	if err = cf.Unmarshal(buf, mappedErr); err != nil { | ||||
| 		return errors.InternalServerError("go.micro.client", "unmarshal response: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	return s.WithDetails(mappedErr).Err() | ||||
| } | ||||
		Reference in New Issue
	
	Block a user