Compare commits

...

5 Commits

Author SHA1 Message Date
fade40754a update deps
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-05 16:23:00 +03:00
f39d449ca2 lint
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-05 16:10:38 +03:00
7cab3c18a7 add ability to wrap any struct to error interface
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-05 16:02:35 +03:00
7098c252dc allow to publish only body
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-06-11 15:21:36 +03:00
3cbc879769 fix stream timeout
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-04-29 22:38:38 +03:00
4 changed files with 53 additions and 26 deletions

2
go.mod
View File

@@ -2,4 +2,4 @@ module github.com/unistack-org/micro-client-http/v3
go 1.16 go 1.16
require github.com/unistack-org/micro/v3 v3.3.16 require github.com/unistack-org/micro/v3 v3.4.5

8
go.sum
View File

@@ -5,11 +5,11 @@ github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/unistack-org/micro/v3 v3.3.16 h1:v0h/oC0TO2n1djQJeOjD2jNEqKkiykwI6cpflEVTlQE= github.com/unistack-org/micro/v3 v3.4.5 h1:4Rn6EkJz/web43XGZAPt4NtqDLV2SrYXZxdAfBKF9Ic=
github.com/unistack-org/micro/v3 v3.3.16/go.mod h1:ETGcQQUcjxGaD44LUMX+0fgo8Loh7ExldfIPLvfUmDo= github.com/unistack-org/micro/v3 v3.4.5/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk=
golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

31
http.go
View File

@@ -32,10 +32,10 @@ func filterLabel(r []router.Route) []router.Route {
*/ */
type httpClient struct { type httpClient struct {
opts client.Options
httpcli *http.Client httpcli *http.Client
init bool opts client.Options
sync.RWMutex sync.RWMutex
init bool
} }
func newRequest(addr string, req client.Request, ct string, cf codec.Codec, msg interface{}, opts client.CallOptions) (*http.Request, error) { func newRequest(addr string, req client.Request, ct string, cf codec.Codec, msg interface{}, opts client.CallOptions) (*http.Request, error) {
@@ -181,9 +181,10 @@ func (h *httpClient) stream(ctx context.Context, addr string, req client.Request
if len(opts.ContentType) > 0 { if len(opts.ContentType) > 0 {
ct = opts.ContentType ct = opts.ContentType
} }
// set timeout in nanoseconds // set timeout in nanoseconds
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout)) if opts.StreamTimeout > time.Duration(0) {
header.Set("Timeout", fmt.Sprintf("%d", opts.StreamTimeout))
}
// set the content type for the request // set the content type for the request
header.Set("Content-Type", ct) header.Set("Content-Type", ct)
@@ -399,22 +400,22 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
// make a copy of call opts // make a copy of call opts
callOpts := h.opts.CallOptions callOpts := h.opts.CallOptions
for _, opt := range opts { for _, o := range opts {
opt(&callOpts) o(&callOpts)
} }
// check if we already have a deadline // check if we already have a deadline
d, ok := ctx.Deadline() d, ok := ctx.Deadline()
if !ok { if !ok && callOpts.StreamTimeout > time.Duration(0) {
var cancel context.CancelFunc var cancel context.CancelFunc
// no deadline so we create a new one // no deadline so we create a new one
ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) ctx, cancel = context.WithTimeout(ctx, callOpts.StreamTimeout)
defer cancel() defer cancel()
} else { } else {
// got a deadline so no need to setup context // got a deadline so no need to setup context
// but we need to set the timeout we pass along // but we need to set the timeout we pass along
opt := client.WithRequestTimeout(time.Until(d)) o := client.WithStreamTimeout(time.Until(d))
opt(&callOpts) o(&callOpts)
} }
// should we noop right here? // should we noop right here?
@@ -426,10 +427,7 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
/* /*
// make copy of call method // make copy of call method
hstream, err := h.stream() hstream := h.stream
if err != nil {
return nil, err
}
// wrap the call in reverse // wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- { for i := len(callOpts.CallWrappers); i > 0; i-- {
hstream = callOpts.CallWrappers[i-1](hstream) hstream = callOpts.CallWrappers[i-1](hstream)
@@ -575,7 +573,10 @@ func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...clie
return h.opts.Broker.Publish(ctx, topic, &broker.Message{ return h.opts.Broker.Publish(ctx, topic, &broker.Message{
Header: md, Header: md,
Body: body, Body: body,
}, broker.PublishContext(ctx)) },
broker.PublishContext(ctx),
broker.PublishBodyOnly(options.BodyOnly),
)
} }
func (h *httpClient) String() string { func (h *httpClient) String() string {

38
util.go
View File

@@ -21,6 +21,23 @@ var (
mu sync.RWMutex mu sync.RWMutex
) )
// Error struct holds error
type Error struct {
err interface{}
}
// Error func for error interface
func (err *Error) Error() string {
return fmt.Sprintf("%v", err.err)
}
func GetError(err error) interface{} {
if rerr, ok := err.(*Error); ok {
return rerr.err
}
return err
}
func newPathRequest(path string, method string, body string, msg interface{}, tags []string) (string, interface{}, error) { func newPathRequest(path string, method string, body string, msg interface{}, tags []string) (string, interface{}, error) {
// parse via https://github.com/googleapis/googleapis/blob/master/google/api/http.proto definition // parse via https://github.com/googleapis/googleapis/blob/master/google/api/http.proto definition
tpl, err := newTemplate(path) tpl, err := newTemplate(path)
@@ -185,7 +202,6 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte
if hrsp.StatusCode == http.StatusNoContent { if hrsp.StatusCode == http.StatusNoContent {
return nil return nil
} }
ct := DefaultContentType ct := DefaultContentType
if htype := hrsp.Header.Get("Content-Type"); htype != "" { if htype := hrsp.Header.Get("Content-Type"); htype != "" {
@@ -197,6 +213,7 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte
return errors.InternalServerError("go.micro.client", cerr.Error()) return errors.InternalServerError("go.micro.client", cerr.Error())
} }
// succeseful response
if hrsp.StatusCode < 400 { if hrsp.StatusCode < 400 {
if err = cf.ReadBody(hrsp.Body, rsp); err != nil { if err = cf.ReadBody(hrsp.Body, rsp); err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
@@ -204,13 +221,17 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte
return nil return nil
} }
// response with error
var rerr interface{}
errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{}) errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{})
if ok && errmap != nil { if ok && errmap != nil {
if err, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok { rerr, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)]
err, ok = errmap["default"].(error) if !ok {
rerr, ok = errmap["default"]
} }
} }
if !ok || err == nil {
if !ok || rerr == nil {
buf, rerr := io.ReadAll(hrsp.Body) buf, rerr := io.ReadAll(hrsp.Body)
if rerr != nil { if rerr != nil {
return errors.InternalServerError("go.micro.client", rerr.Error()) return errors.InternalServerError("go.micro.client", rerr.Error())
@@ -218,9 +239,14 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode)) return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
} }
if cerr := cf.ReadBody(hrsp.Body, err); cerr != nil { if cerr := cf.ReadBody(hrsp.Body, rerr); cerr != nil {
err = errors.InternalServerError("go.micro.client", cerr.Error()) return errors.InternalServerError("go.micro.client", cerr.Error())
} }
if err, ok = rerr.(error); !ok {
err = &Error{rerr}
}
} }
return err return err