Compare commits
13 Commits
Author | SHA1 | Date | |
---|---|---|---|
520dc29f89 | |||
59d6c26003 | |||
fade40754a | |||
f39d449ca2 | |||
7cab3c18a7 | |||
7098c252dc | |||
3cbc879769 | |||
05add422d1 | |||
|
b4970ee807 | ||
|
f4c91686f4 | ||
ee9b7493e3 | |||
|
0182d6ab56 | ||
d99e97090c |
13
.github/stale.sh
vendored
13
.github/stale.sh
vendored
@@ -1,13 +0,0 @@
|
||||
#!/bin/bash -ex
|
||||
|
||||
export PATH=$PATH:$(pwd)/bin
|
||||
export GO111MODULE=on
|
||||
export GOBIN=$(pwd)/bin
|
||||
|
||||
#go get github.com/rvflash/goup@v0.4.1
|
||||
|
||||
#goup -v ./...
|
||||
#go get github.com/psampaz/go-mod-outdated@v0.6.0
|
||||
go list -u -m -mod=mod -json all | go-mod-outdated -update -direct -ci || true
|
||||
|
||||
#go list -u -m -json all | go-mod-outdated -update
|
3
.github/workflows/build.yml
vendored
3
.github/workflows/build.yml
vendored
@@ -34,10 +34,9 @@ jobs:
|
||||
uses: actions/checkout@v2
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
version: v1.30
|
||||
version: v1.39
|
||||
# Optional: working directory, useful for monorepos
|
||||
# working-directory: somedir
|
||||
# Optional: golangci-lint command line arguments.
|
||||
|
3
.github/workflows/pr.yml
vendored
3
.github/workflows/pr.yml
vendored
@@ -34,10 +34,9 @@ jobs:
|
||||
uses: actions/checkout@v2
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
version: v1.30
|
||||
version: v1.39
|
||||
# Optional: working directory, useful for monorepos
|
||||
# working-directory: somedir
|
||||
# Optional: golangci-lint command line arguments.
|
||||
|
44
.golangci.yml
Normal file
44
.golangci.yml
Normal file
@@ -0,0 +1,44 @@
|
||||
run:
|
||||
concurrency: 4
|
||||
deadline: 5m
|
||||
issues-exit-code: 1
|
||||
tests: true
|
||||
|
||||
linters-settings:
|
||||
govet:
|
||||
check-shadowing: true
|
||||
enable:
|
||||
- fieldalignment
|
||||
|
||||
linters:
|
||||
enable:
|
||||
- govet
|
||||
- deadcode
|
||||
- errcheck
|
||||
- govet
|
||||
- ineffassign
|
||||
- staticcheck
|
||||
- structcheck
|
||||
- typecheck
|
||||
- unused
|
||||
- varcheck
|
||||
- bodyclose
|
||||
- gci
|
||||
- goconst
|
||||
- gocritic
|
||||
- gosimple
|
||||
- gofmt
|
||||
- gofumpt
|
||||
- goimports
|
||||
- golint
|
||||
- gosec
|
||||
- makezero
|
||||
- misspell
|
||||
- nakedret
|
||||
- nestif
|
||||
- nilerr
|
||||
- noctx
|
||||
- prealloc
|
||||
- unconvert
|
||||
- unparam
|
||||
disable-all: false
|
2
go.mod
2
go.mod
@@ -2,4 +2,4 @@ module github.com/unistack-org/micro-client-http/v3
|
||||
|
||||
go 1.16
|
||||
|
||||
require github.com/unistack-org/micro/v3 v3.3.13
|
||||
require github.com/unistack-org/micro/v3 v3.4.8
|
||||
|
10
go.sum
10
go.sum
@@ -5,13 +5,13 @@ github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
|
||||
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||
github.com/unistack-org/micro/v3 v3.3.13 h1:y4bDDkbwnjgOckrhFkC6D/o42tr75X33UbrB+Ko0M68=
|
||||
github.com/unistack-org/micro/v3 v3.3.13/go.mod h1:98hNcMXp/WyWJwLwCuwrhN1Jm7aCWaRNsMfRjK8Fq+Y=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
github.com/unistack-org/micro/v3 v3.4.8 h1:9+qGlNHgChC3aMuFrtTFUtG55PEAjneSvplg7phwoCI=
|
||||
github.com/unistack-org/micro/v3 v3.4.8/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk=
|
||||
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
117
http.go
117
http.go
@@ -20,24 +20,22 @@ import (
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/errors"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
"github.com/unistack-org/micro/v3/router"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultContentType = "application/json"
|
||||
)
|
||||
var DefaultContentType = "application/json"
|
||||
|
||||
/*
|
||||
func filterLabel(r []router.Route) []router.Route {
|
||||
// selector.FilterLabel("protocol", "http")
|
||||
return r
|
||||
}
|
||||
*/
|
||||
|
||||
type httpClient struct {
|
||||
opts client.Options
|
||||
dialer *net.Dialer
|
||||
httpcli *http.Client
|
||||
init bool
|
||||
opts client.Options
|
||||
sync.RWMutex
|
||||
init bool
|
||||
}
|
||||
|
||||
func newRequest(addr string, req client.Request, ct string, cf codec.Codec, msg interface{}, opts client.CallOptions) (*http.Request, error) {
|
||||
@@ -46,6 +44,7 @@ func newRequest(addr string, req client.Request, ct string, cf codec.Codec, msg
|
||||
|
||||
var tags []string
|
||||
var scheme string
|
||||
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
hreq.URL = &url.URL{
|
||||
@@ -55,7 +54,10 @@ func newRequest(addr string, req client.Request, ct string, cf codec.Codec, msg
|
||||
}
|
||||
hreq.Host = addr
|
||||
scheme = "http"
|
||||
} else {
|
||||
}
|
||||
|
||||
// nolint: nestif
|
||||
if scheme == "" {
|
||||
ep := req.Endpoint()
|
||||
if opts.Context != nil {
|
||||
if m, ok := opts.Context.Value(methodKey{}).(string); ok {
|
||||
@@ -70,14 +72,20 @@ func newRequest(addr string, req client.Request, ct string, cf codec.Codec, msg
|
||||
if t, ok := opts.Context.Value(structTagsKey{}).([]string); ok && len(t) > 0 {
|
||||
tags = t
|
||||
}
|
||||
|
||||
if md, ok := opts.Context.Value(metadataKey{}).(metadata.Metadata); ok {
|
||||
for k, v := range md {
|
||||
hreq.Header.Set(k, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
hreq.URL, err = u.Parse(ep)
|
||||
if err != nil {
|
||||
return nil, errors.BadRequest("go.micro.client", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if opts.AuthToken != "" {
|
||||
hreq.Header.Set("Authorization", opts.AuthToken)
|
||||
}
|
||||
if len(tags) == 0 {
|
||||
switch ct {
|
||||
default:
|
||||
@@ -115,23 +123,11 @@ func newRequest(addr string, req client.Request, ct string, cf codec.Codec, msg
|
||||
}
|
||||
|
||||
func (h *httpClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
header := make(http.Header, 2)
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
for k, v := range md {
|
||||
header.Set(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
ct := req.ContentType()
|
||||
if len(opts.ContentType) > 0 {
|
||||
ct = opts.ContentType
|
||||
}
|
||||
|
||||
// set timeout in nanoseconds
|
||||
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout))
|
||||
// set the content type for the request
|
||||
header.Set("Content-Type", ct)
|
||||
|
||||
cf, err := h.newCodec(ct)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
@@ -141,32 +137,35 @@ func (h *httpClient) call(ctx context.Context, addr string, req client.Request,
|
||||
return err
|
||||
}
|
||||
|
||||
hreq.Header = header
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
for k, v := range md {
|
||||
hreq.Header.Set(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// set timeout in nanoseconds
|
||||
hreq.Header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout))
|
||||
// set the content type for the request
|
||||
hreq.Header.Set("Content-Type", ct)
|
||||
|
||||
// make the request
|
||||
hrsp, err := h.httpcli.Do(hreq.WithContext(ctx))
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
case net.Error:
|
||||
if err.Timeout() {
|
||||
return errors.Timeout("go.micro.client", err.Error())
|
||||
}
|
||||
case *url.Error:
|
||||
if err, ok := err.Err.(net.Error); ok && err.Timeout() {
|
||||
return errors.Timeout("go.micro.client", err.Error())
|
||||
}
|
||||
case net.Error:
|
||||
if err.Timeout() {
|
||||
return errors.Timeout("go.micro.client", err.Error())
|
||||
}
|
||||
}
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
|
||||
defer hrsp.Body.Close()
|
||||
|
||||
if ct == "application/x-www-form-urlencoded" {
|
||||
cf, err = h.newCodec(DefaultContentType)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
}
|
||||
return h.parseRsp(ctx, hrsp, rsp, opts)
|
||||
}
|
||||
|
||||
@@ -186,9 +185,10 @@ func (h *httpClient) stream(ctx context.Context, addr string, req client.Request
|
||||
if len(opts.ContentType) > 0 {
|
||||
ct = opts.ContentType
|
||||
}
|
||||
|
||||
// set timeout in nanoseconds
|
||||
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout))
|
||||
if opts.StreamTimeout > time.Duration(0) {
|
||||
header.Set("Timeout", fmt.Sprintf("%d", opts.StreamTimeout))
|
||||
}
|
||||
// set the content type for the request
|
||||
header.Set("Content-Type", ct)
|
||||
|
||||
@@ -198,11 +198,6 @@ func (h *httpClient) stream(ctx context.Context, addr string, req client.Request
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
|
||||
dialAddr := addr
|
||||
u, err := url.Parse(dialAddr)
|
||||
if err == nil && u.Scheme != "" && u.Host != "" {
|
||||
dialAddr = u.Host
|
||||
}
|
||||
cc, err := (h.httpcli.Transport).(*http.Transport).DialContext(ctx, "tcp", addr)
|
||||
if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err))
|
||||
@@ -296,7 +291,7 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
|
||||
} else {
|
||||
// got a deadline so no need to setup context
|
||||
// but we need to set the timeout we pass along
|
||||
opt := client.WithRequestTimeout(d.Sub(time.Now()))
|
||||
opt := client.WithRequestTimeout(time.Until(d))
|
||||
opt(&callOpts)
|
||||
}
|
||||
|
||||
@@ -409,22 +404,22 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
|
||||
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
// make a copy of call opts
|
||||
callOpts := h.opts.CallOptions
|
||||
for _, opt := range opts {
|
||||
opt(&callOpts)
|
||||
for _, o := range opts {
|
||||
o(&callOpts)
|
||||
}
|
||||
|
||||
// check if we already have a deadline
|
||||
d, ok := ctx.Deadline()
|
||||
if !ok {
|
||||
if !ok && callOpts.StreamTimeout > time.Duration(0) {
|
||||
var cancel context.CancelFunc
|
||||
// no deadline so we create a new one
|
||||
ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout)
|
||||
ctx, cancel = context.WithTimeout(ctx, callOpts.StreamTimeout)
|
||||
defer cancel()
|
||||
} else {
|
||||
// got a deadline so no need to setup context
|
||||
// but we need to set the timeout we pass along
|
||||
opt := client.WithRequestTimeout(d.Sub(time.Now()))
|
||||
opt(&callOpts)
|
||||
o := client.WithStreamTimeout(time.Until(d))
|
||||
o(&callOpts)
|
||||
}
|
||||
|
||||
// should we noop right here?
|
||||
@@ -436,10 +431,7 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
||||
|
||||
/*
|
||||
// make copy of call method
|
||||
hstream, err := h.stream()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hstream := h.stream
|
||||
// wrap the call in reverse
|
||||
for i := len(callOpts.CallWrappers); i > 0; i-- {
|
||||
hstream = callOpts.CallWrappers[i-1](hstream)
|
||||
@@ -476,9 +468,9 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
||||
|
||||
call := func(i int) (client.Stream, error) {
|
||||
// call backoff first. Someone may want an initial start delay
|
||||
t, err := callOpts.Backoff(ctx, req, i)
|
||||
if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
t, cerr := callOpts.Backoff(ctx, req, i)
|
||||
if cerr != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
}
|
||||
|
||||
// only sleep if greater than 0
|
||||
@@ -488,19 +480,19 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
||||
|
||||
node := next()
|
||||
|
||||
stream, err := h.stream(ctx, node, req, callOpts)
|
||||
stream, cerr := h.stream(ctx, node, req, callOpts)
|
||||
|
||||
// record the result of the call to inform future routing decisions
|
||||
if verr := h.opts.Selector.Record(node, err); verr != nil {
|
||||
if verr := h.opts.Selector.Record(node, cerr); verr != nil {
|
||||
return nil, verr
|
||||
}
|
||||
|
||||
// try and transform the error to a go-micro error
|
||||
if verr, ok := err.(*errors.Error); ok {
|
||||
if verr, ok := cerr.(*errors.Error); ok {
|
||||
return nil, verr
|
||||
}
|
||||
|
||||
return stream, err
|
||||
return stream, cerr
|
||||
}
|
||||
|
||||
type response struct {
|
||||
@@ -513,8 +505,8 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
||||
|
||||
for i := 0; i <= callOpts.Retries; i++ {
|
||||
go func() {
|
||||
s, err := call(i)
|
||||
ch <- response{s, err}
|
||||
s, cerr := call(i)
|
||||
ch <- response{s, cerr}
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -585,7 +577,10 @@ func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...clie
|
||||
return h.opts.Broker.Publish(ctx, topic, &broker.Message{
|
||||
Header: md,
|
||||
Body: body,
|
||||
}, broker.PublishContext(ctx))
|
||||
},
|
||||
broker.PublishContext(ctx),
|
||||
broker.PublishBodyOnly(options.BodyOnly),
|
||||
)
|
||||
}
|
||||
|
||||
func (h *httpClient) String() string {
|
||||
|
@@ -5,9 +5,9 @@ import (
|
||||
)
|
||||
|
||||
type httpMessage struct {
|
||||
payload interface{}
|
||||
topic string
|
||||
contentType string
|
||||
payload interface{}
|
||||
}
|
||||
|
||||
func newHTTPMessage(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
|
||||
|
20
options.go
20
options.go
@@ -5,6 +5,7 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -26,27 +27,28 @@ var (
|
||||
)
|
||||
|
||||
type poolMaxStreams struct{}
|
||||
type poolMaxIdle struct{}
|
||||
type codecsKey struct{}
|
||||
type tlsAuth struct{}
|
||||
type maxRecvMsgSizeKey struct{}
|
||||
type maxSendMsgSizeKey struct{}
|
||||
|
||||
// PoolMaxStreams maximum streams on a connectioin
|
||||
func PoolMaxStreams(n int) client.Option {
|
||||
return client.SetOption(poolMaxStreams{}, n)
|
||||
}
|
||||
|
||||
type poolMaxIdle struct{}
|
||||
|
||||
// PoolMaxIdle maximum idle conns of a pool
|
||||
func PoolMaxIdle(d int) client.Option {
|
||||
return client.SetOption(poolMaxIdle{}, d)
|
||||
}
|
||||
|
||||
type maxRecvMsgSizeKey struct{}
|
||||
|
||||
// MaxRecvMsgSize set the maximum size of message that client can receive.
|
||||
func MaxRecvMsgSize(s int) client.Option {
|
||||
return client.SetOption(maxRecvMsgSizeKey{}, s)
|
||||
}
|
||||
|
||||
type maxSendMsgSizeKey struct{}
|
||||
|
||||
// MaxSendMsgSize set the maximum size of message that client can send.
|
||||
func MaxSendMsgSize(s int) client.Option {
|
||||
return client.SetOption(maxSendMsgSizeKey{}, s)
|
||||
@@ -54,12 +56,14 @@ func MaxSendMsgSize(s int) client.Option {
|
||||
|
||||
type httpClientKey struct{}
|
||||
|
||||
// nolint: golint
|
||||
func HTTPClient(c *http.Client) client.Option {
|
||||
return client.SetOption(httpClientKey{}, c)
|
||||
}
|
||||
|
||||
type httpDialerKey struct{}
|
||||
|
||||
// nolint: golint
|
||||
func HTTPDialer(d *net.Dialer) client.Option {
|
||||
return client.SetOption(httpDialerKey{}, d)
|
||||
}
|
||||
@@ -93,3 +97,9 @@ type structTagsKey struct{}
|
||||
func StructTags(tags []string) client.CallOption {
|
||||
return client.SetCallOption(structTagsKey{}, tags)
|
||||
}
|
||||
|
||||
type metadataKey struct{}
|
||||
|
||||
func Metadata(md metadata.Metadata) client.CallOption {
|
||||
return client.SetCallOption(metadataKey{}, md)
|
||||
}
|
||||
|
36
stream.go
36
stream.go
@@ -16,24 +16,21 @@ import (
|
||||
|
||||
// Implements the streamer interface
|
||||
type httpStream struct {
|
||||
sync.RWMutex
|
||||
address string
|
||||
opts client.CallOptions
|
||||
ct string
|
||||
cf codec.Codec
|
||||
context context.Context
|
||||
header http.Header
|
||||
seq uint64
|
||||
closed chan bool
|
||||
err error
|
||||
conn net.Conn
|
||||
reader *bufio.Reader
|
||||
cf codec.Codec
|
||||
context context.Context
|
||||
request client.Request
|
||||
header http.Header
|
||||
closed chan bool
|
||||
reader *bufio.Reader
|
||||
address string
|
||||
ct string
|
||||
opts client.CallOptions
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
var (
|
||||
errShutdown = fmt.Errorf("connection is shut down")
|
||||
)
|
||||
var errShutdown = fmt.Errorf("connection is shut down")
|
||||
|
||||
func (h *httpStream) isClosed() bool {
|
||||
select {
|
||||
@@ -112,6 +109,10 @@ func (h *httpStream) Close() error {
|
||||
func (h *httpStream) parseRsp(ctx context.Context, hrsp *http.Response, cf codec.Codec, rsp interface{}, opts client.CallOptions) error {
|
||||
var err error
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
default:
|
||||
// fast path return
|
||||
if hrsp.StatusCode == http.StatusNoContent {
|
||||
return nil
|
||||
@@ -130,10 +131,10 @@ func (h *httpStream) parseRsp(ctx context.Context, hrsp *http.Response, cf codec
|
||||
err, ok = errmap["default"].(error)
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
buf, err := io.ReadAll(hrsp.Body)
|
||||
if err != nil {
|
||||
errors.InternalServerError("go.micro.client", err.Error())
|
||||
if !ok || err == nil {
|
||||
buf, cerr := io.ReadAll(hrsp.Body)
|
||||
if cerr != nil {
|
||||
return errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
}
|
||||
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
|
||||
}
|
||||
@@ -141,6 +142,7 @@ func (h *httpStream) parseRsp(ctx context.Context, hrsp *http.Response, cf codec
|
||||
if cerr := cf.ReadBody(hrsp.Body, err); cerr != nil {
|
||||
err = errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
106
util.go
106
util.go
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -20,6 +21,23 @@ var (
|
||||
mu sync.RWMutex
|
||||
)
|
||||
|
||||
// Error struct holds error
|
||||
type Error struct {
|
||||
err interface{}
|
||||
}
|
||||
|
||||
// Error func for error interface
|
||||
func (err *Error) Error() string {
|
||||
return fmt.Sprintf("%v", err.err)
|
||||
}
|
||||
|
||||
func GetError(err error) interface{} {
|
||||
if rerr, ok := err.(*Error); ok {
|
||||
return rerr.err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func newPathRequest(path string, method string, body string, msg interface{}, tags []string) (string, interface{}, error) {
|
||||
// parse via https://github.com/googleapis/googleapis/blob/master/google/api/http.proto definition
|
||||
tpl, err := newTemplate(path)
|
||||
@@ -31,6 +49,7 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
|
||||
return "", nil, fmt.Errorf("nil message but path params requested: %v", path)
|
||||
}
|
||||
|
||||
fieldsmapskip := make(map[string]struct{})
|
||||
fieldsmap := make(map[string]string, len(tpl.Fields))
|
||||
for _, v := range tpl.Fields {
|
||||
fieldsmap[v] = ""
|
||||
@@ -52,7 +71,7 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
|
||||
tnmsg = tnmsg.Elem()
|
||||
}
|
||||
|
||||
values := make(map[string]string)
|
||||
values := url.Values{}
|
||||
// copy cycle
|
||||
for i := 0; i < tmsg.NumField(); i++ {
|
||||
val := tmsg.Field(i)
|
||||
@@ -72,9 +91,13 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
|
||||
// special
|
||||
switch tn {
|
||||
case "protobuf": // special
|
||||
t = &tag{key: tn, name: tp[3][5:], opts: append(tp[:3], tp[4:]...)}
|
||||
for _, p := range tp {
|
||||
if idx := strings.Index(p, "name="); idx > 0 {
|
||||
t = &tag{key: tn, name: p[idx:]}
|
||||
}
|
||||
}
|
||||
default:
|
||||
t = &tag{key: tn, name: tp[0], opts: tp[1:]}
|
||||
t = &tag{key: tn, name: tp[0]}
|
||||
}
|
||||
if t.name != "" {
|
||||
break
|
||||
@@ -90,18 +113,34 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
|
||||
continue
|
||||
}
|
||||
|
||||
// nolint: gocritic
|
||||
if _, ok := fieldsmap[t.name]; ok {
|
||||
switch val.Type().Kind() {
|
||||
case reflect.Slice:
|
||||
for idx := 0; idx < val.Len(); idx++ {
|
||||
values.Add(t.name, fmt.Sprintf("%v", val.Index(idx).Interface()))
|
||||
}
|
||||
fieldsmapskip[t.name] = struct{}{}
|
||||
default:
|
||||
fieldsmap[t.name] = fmt.Sprintf("%v", val.Interface())
|
||||
}
|
||||
} else if (body == "*" || body == t.name) && method != http.MethodGet {
|
||||
tnmsg.Field(i).Set(val)
|
||||
} else {
|
||||
values[t.name] = fmt.Sprintf("%v", val.Interface())
|
||||
if val.Type().Kind() == reflect.Slice {
|
||||
for idx := 0; idx < val.Len(); idx++ {
|
||||
values.Add(t.name, fmt.Sprintf("%v", val.Index(idx).Interface()))
|
||||
}
|
||||
} else {
|
||||
values.Add(t.name, fmt.Sprintf("%v", val.Interface()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check not filled stuff
|
||||
for k, v := range fieldsmap {
|
||||
if v == "" {
|
||||
_, ok := fieldsmapskip[k]
|
||||
if !ok && v == "" {
|
||||
return "", nil, fmt.Errorf("path param %s not filled", k)
|
||||
}
|
||||
}
|
||||
@@ -110,23 +149,17 @@ func newPathRequest(path string, method string, body string, msg interface{}, ta
|
||||
for _, fld := range tpl.Pool {
|
||||
_, _ = b.WriteRune('/')
|
||||
if v, ok := fieldsmap[fld]; ok {
|
||||
if v != "" {
|
||||
_, _ = b.WriteString(v)
|
||||
}
|
||||
} else {
|
||||
_, _ = b.WriteString(fld)
|
||||
}
|
||||
}
|
||||
|
||||
idx := 0
|
||||
for k, v := range values {
|
||||
if idx == 0 {
|
||||
if len(values) > 0 {
|
||||
_, _ = b.WriteRune('?')
|
||||
} else {
|
||||
_, _ = b.WriteRune('&')
|
||||
}
|
||||
_, _ = b.WriteString(k)
|
||||
_, _ = b.WriteRune('=')
|
||||
_, _ = b.WriteString(v)
|
||||
idx++
|
||||
_, _ = b.WriteString(values.Encode())
|
||||
}
|
||||
|
||||
if rutil.IsZero(nmsg) {
|
||||
@@ -159,45 +192,61 @@ func newTemplate(path string) (util.Template, error) {
|
||||
}
|
||||
|
||||
func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp interface{}, opts client.CallOptions) error {
|
||||
var err error
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
default:
|
||||
// fast path return
|
||||
if hrsp.StatusCode == http.StatusNoContent {
|
||||
return nil
|
||||
}
|
||||
|
||||
ct := DefaultContentType
|
||||
|
||||
if htype := hrsp.Header.Get("Content-Type"); htype != "" {
|
||||
ct = htype
|
||||
}
|
||||
|
||||
cf, err := h.newCodec(ct)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
cf, cerr := h.newCodec(ct)
|
||||
if cerr != nil {
|
||||
return errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
}
|
||||
|
||||
// succeseful response
|
||||
if hrsp.StatusCode < 400 {
|
||||
if err := cf.ReadBody(hrsp.Body, rsp); err != nil {
|
||||
if err = cf.ReadBody(hrsp.Body, rsp); err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// response with error
|
||||
var rerr interface{}
|
||||
errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{})
|
||||
if ok && errmap != nil {
|
||||
if err, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok {
|
||||
err, ok = errmap["default"].(error)
|
||||
rerr, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)]
|
||||
if !ok {
|
||||
rerr, ok = errmap["default"]
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
buf, err := io.ReadAll(hrsp.Body)
|
||||
if err != nil {
|
||||
errors.InternalServerError("go.micro.client", err.Error())
|
||||
|
||||
if !ok || rerr == nil {
|
||||
buf, rerr := io.ReadAll(hrsp.Body)
|
||||
if rerr != nil {
|
||||
return errors.InternalServerError("go.micro.client", rerr.Error())
|
||||
}
|
||||
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
|
||||
}
|
||||
|
||||
if cerr := cf.ReadBody(hrsp.Body, err); cerr != nil {
|
||||
err = errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
if cerr := cf.ReadBody(hrsp.Body, rerr); cerr != nil {
|
||||
return errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
}
|
||||
|
||||
if err, ok = rerr.(error); !ok {
|
||||
err = &Error{rerr}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -206,5 +255,4 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte
|
||||
type tag struct {
|
||||
key string
|
||||
name string
|
||||
opts []string
|
||||
}
|
||||
|
@@ -18,8 +18,8 @@ func TestNewPathRequest(t *testing.T) {
|
||||
type Message struct {
|
||||
Name string `json:"name"`
|
||||
Val1 string `protobuf:"bytes,1,opt,name=val1,proto3" json:"val1"`
|
||||
Val2 int64
|
||||
Val3 []string
|
||||
Val2 int64
|
||||
}
|
||||
|
||||
omsg := &Message{Name: "test_name", Val1: "test_val1", Val2: 100, Val3: []string{"slice"}}
|
||||
@@ -45,8 +45,8 @@ func TestNewPathVarRequest(t *testing.T) {
|
||||
type Message struct {
|
||||
Name string `json:"name"`
|
||||
Val1 string `protobuf:"bytes,1,opt,name=val1,proto3" json:"val1"`
|
||||
Val2 int64
|
||||
Val3 []string
|
||||
Val2 int64
|
||||
}
|
||||
|
||||
omsg := &Message{Name: "test_name", Val1: "test_val1", Val2: 100, Val3: []string{"slice"}}
|
||||
|
Reference in New Issue
Block a user