Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
ef36082f2e | |||
21c897be47 | |||
0b21dd6660 | |||
18eb0d9e5c | |||
3f44a41d30 | |||
090100e522 |
@@ -5,14 +5,14 @@ This plugin is a http client for micro.
|
||||
## Overview
|
||||
|
||||
The http client wraps `net/http` to provide a robust micro client with service discovery, load balancing and streaming.
|
||||
It complies with the [micro.Client](https://godoc.org/go.unistack.org/micro-client-http/v3#Client) interface.
|
||||
It complies with the [micro.Client](https://godoc.org/go.unistack.org/micro-client-http/v4#Client) interface.
|
||||
|
||||
## Usage
|
||||
|
||||
### Use directly
|
||||
|
||||
```go
|
||||
import "go.unistack.org/micro-client-http/v3"
|
||||
import "go.unistack.org/micro-client-http/v4"
|
||||
|
||||
service := micro.NewService(
|
||||
micro.Name("my.service"),
|
||||
|
6
go.mod
6
go.mod
@@ -1,5 +1,5 @@
|
||||
module go.unistack.org/micro-client-http/v3
|
||||
module go.unistack.org/micro-client-http/v4
|
||||
|
||||
go 1.18
|
||||
go 1.19
|
||||
|
||||
require go.unistack.org/micro/v3 v3.10.16
|
||||
require go.unistack.org/micro/v4 v4.0.1
|
||||
|
10
go.sum
10
go.sum
@@ -1,8 +1,2 @@
|
||||
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||
go.unistack.org/micro/v3 v3.10.16 h1:2er/SKKYbV60M+UuJM4eYCF0MZYAIq/yNUrAbTfgq8Q=
|
||||
go.unistack.org/micro/v3 v3.10.16/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
go.unistack.org/micro/v4 v4.0.1 h1:xo1IxbVfgh8i0eY0VeYa3cbb13u5n/Mxnp3FOgWD4Jo=
|
||||
go.unistack.org/micro/v4 v4.0.1/go.mod h1:p/J5UcSJjfHsWGT31uKoghQ5rUQZzQJBAFy+Z4+ZVMs=
|
||||
|
95
http.go
95
http.go
@@ -1,5 +1,5 @@
|
||||
// Package http provides a http client
|
||||
package http // import "go.unistack.org/micro-client-http/v3"
|
||||
package http // import "go.unistack.org/micro-client-http/v4"
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
@@ -10,19 +10,17 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/errors"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/selector"
|
||||
rutil "go.unistack.org/micro/v3/util/reflect"
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/codec"
|
||||
"go.unistack.org/micro/v4/errors"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
"go.unistack.org/micro/v4/selector"
|
||||
rutil "go.unistack.org/micro/v4/util/reflect"
|
||||
)
|
||||
|
||||
var DefaultContentType = "application/json"
|
||||
@@ -147,6 +145,11 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client.
|
||||
if opts.AuthToken != "" {
|
||||
header.Set(metadata.HeaderAuthorization, opts.AuthToken)
|
||||
}
|
||||
if opts.RequestMetadata != nil {
|
||||
for k, v := range opts.RequestMetadata {
|
||||
header.Set(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
for k, v := range md {
|
||||
@@ -308,9 +311,6 @@ func (h *httpClient) Init(opts ...client.Option) error {
|
||||
o(&h.opts)
|
||||
}
|
||||
|
||||
if err := h.opts.Broker.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.opts.Tracer.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -334,10 +334,6 @@ func (h *httpClient) Options() client.Options {
|
||||
return h.opts
|
||||
}
|
||||
|
||||
func (h *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
||||
return newHTTPMessage(topic, msg, h.opts.ContentType, opts...)
|
||||
}
|
||||
|
||||
func (h *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request {
|
||||
return newHTTPRequest(service, method, req, h.opts.ContentType, opts...)
|
||||
}
|
||||
@@ -614,71 +610,6 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
||||
return nil, grr
|
||||
}
|
||||
|
||||
func (h *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error {
|
||||
return h.publish(ctx, p, opts...)
|
||||
}
|
||||
|
||||
func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||
return h.publish(ctx, []client.Message{p}, opts...)
|
||||
}
|
||||
|
||||
func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
||||
var body []byte
|
||||
|
||||
options := client.NewPublishOptions(opts...)
|
||||
|
||||
// get proxy
|
||||
exchange := ""
|
||||
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
|
||||
exchange = v
|
||||
}
|
||||
|
||||
omd, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
omd = metadata.New(2)
|
||||
}
|
||||
|
||||
msgs := make([]*broker.Message, 0, len(ps))
|
||||
|
||||
for _, p := range ps {
|
||||
md := metadata.Copy(omd)
|
||||
md[metadata.HeaderContentType] = p.ContentType()
|
||||
|
||||
// passed in raw data
|
||||
if d, ok := p.Payload().(*codec.Frame); ok {
|
||||
body = d.Data
|
||||
} else {
|
||||
// use codec for payload
|
||||
cf, err := h.newCodec(p.ContentType())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
// set the body
|
||||
b, err := cf.Marshal(p.Payload())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
body = b
|
||||
}
|
||||
|
||||
topic := p.Topic()
|
||||
if len(exchange) > 0 {
|
||||
topic = exchange
|
||||
}
|
||||
|
||||
for k, v := range p.Metadata() {
|
||||
md.Set(k, v)
|
||||
}
|
||||
md.Set(metadata.HeaderTopic, topic)
|
||||
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
||||
}
|
||||
|
||||
return h.opts.Broker.BatchPublish(ctx, msgs,
|
||||
broker.PublishContext(ctx),
|
||||
broker.PublishBodyOnly(options.BodyOnly),
|
||||
)
|
||||
}
|
||||
|
||||
func (h *httpClient) String() string {
|
||||
return "http"
|
||||
}
|
||||
|
44
message.go
44
message.go
@@ -1,44 +0,0 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
type httpMessage struct {
|
||||
payload interface{}
|
||||
topic string
|
||||
contentType string
|
||||
opts client.MessageOptions
|
||||
}
|
||||
|
||||
func newHTTPMessage(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
|
||||
options := client.NewMessageOptions(opts...)
|
||||
|
||||
if len(options.ContentType) > 0 {
|
||||
contentType = options.ContentType
|
||||
}
|
||||
|
||||
return &httpMessage{
|
||||
payload: payload,
|
||||
topic: topic,
|
||||
contentType: contentType,
|
||||
opts: options,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpMessage) ContentType() string {
|
||||
return h.contentType
|
||||
}
|
||||
|
||||
func (h *httpMessage) Topic() string {
|
||||
return h.topic
|
||||
}
|
||||
|
||||
func (h *httpMessage) Payload() interface{} {
|
||||
return h.payload
|
||||
}
|
||||
|
||||
func (h *httpMessage) Metadata() metadata.Metadata {
|
||||
return h.opts.Metadata
|
||||
}
|
@@ -4,8 +4,8 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@@ -1,8 +1,8 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/codec"
|
||||
)
|
||||
|
||||
type httpRequest struct {
|
||||
|
66
stream.go
66
stream.go
@@ -2,19 +2,17 @@ package http
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/errors"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/codec"
|
||||
"go.unistack.org/micro/v4/errors"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
)
|
||||
|
||||
// Implements the streamer interface
|
||||
@@ -121,59 +119,55 @@ func (h *httpStream) Close() error {
|
||||
|
||||
func (h *httpStream) parseRsp(ctx context.Context, log logger.Logger, hrsp *http.Response, cf codec.Codec, rsp interface{}, opts client.CallOptions) error {
|
||||
var err error
|
||||
var buf []byte
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
default:
|
||||
// fast path return
|
||||
if hrsp.StatusCode == http.StatusNoContent {
|
||||
return nil
|
||||
}
|
||||
|
||||
if hrsp.StatusCode < 400 {
|
||||
if log.V(logger.DebugLevel) {
|
||||
buf, rerr := io.ReadAll(hrsp.Body)
|
||||
log.Debugf(ctx, "response %s with %v", buf, hrsp.Header)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
default:
|
||||
if hrsp.Body != nil {
|
||||
buf, err = io.ReadAll(hrsp.Body)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", rerr.Error())
|
||||
if log.V(logger.ErrorLevel) {
|
||||
log.Errorf(ctx, "failed to read body: %v", err)
|
||||
}
|
||||
hrsp.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
|
||||
return errors.InternalServerError("go.micro.client", string(buf))
|
||||
}
|
||||
if err = cf.ReadBody(hrsp.Body, rsp); err != nil {
|
||||
}
|
||||
|
||||
if log.V(logger.DebugLevel) {
|
||||
log.Debugf(ctx, "response %s with %v", buf, hrsp.Header)
|
||||
}
|
||||
|
||||
if hrsp.StatusCode < 400 {
|
||||
if err = cf.Unmarshal(buf, rsp); err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var rerr interface{}
|
||||
errmap, ok := opts.Context.Value(errorMapKey{}).(map[string]interface{})
|
||||
if ok && errmap != nil {
|
||||
if err, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok {
|
||||
err, ok = errmap["default"].(error)
|
||||
if rerr, ok = errmap[fmt.Sprintf("%d", hrsp.StatusCode)].(error); !ok {
|
||||
rerr, ok = errmap["default"].(error)
|
||||
}
|
||||
}
|
||||
if !ok || err == nil {
|
||||
buf, cerr := io.ReadAll(hrsp.Body)
|
||||
if log.V(logger.DebugLevel) {
|
||||
log.Debugf(ctx, "response %s with %v", buf, hrsp.Header)
|
||||
}
|
||||
if cerr != nil {
|
||||
return errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
}
|
||||
if !ok || rerr == nil {
|
||||
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
|
||||
}
|
||||
|
||||
if log.V(logger.DebugLevel) {
|
||||
buf, rerr := io.ReadAll(hrsp.Body)
|
||||
log.Debugf(ctx, "response %s with %v", buf, hrsp.Header)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", rerr.Error())
|
||||
}
|
||||
hrsp.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
|
||||
if cerr := cf.Unmarshal(buf, rerr); cerr != nil {
|
||||
return errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
}
|
||||
|
||||
if cerr := cf.ReadBody(hrsp.Body, err); cerr != nil {
|
||||
err = errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
if err, ok = rerr.(error); !ok {
|
||||
err = &Error{rerr}
|
||||
}
|
||||
}
|
||||
|
||||
|
69
util.go
69
util.go
@@ -1,21 +1,20 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/errors"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
rutil "go.unistack.org/micro/v3/util/reflect"
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/errors"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
rutil "go.unistack.org/micro/v4/util/reflect"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -247,45 +246,55 @@ func newTemplate(path string) ([]string, error) {
|
||||
|
||||
func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp interface{}, opts client.CallOptions) error {
|
||||
var err error
|
||||
var buf []byte
|
||||
|
||||
// fast path return
|
||||
if hrsp.StatusCode == http.StatusNoContent {
|
||||
return nil
|
||||
}
|
||||
|
||||
if opts.ResponseMetadata != nil {
|
||||
*opts.ResponseMetadata = metadata.New(len(hrsp.Header))
|
||||
for k, v := range hrsp.Header {
|
||||
opts.ResponseMetadata.Set(k, strings.Join(v, ","))
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
default:
|
||||
// fast path return
|
||||
if hrsp.StatusCode == http.StatusNoContent {
|
||||
return nil
|
||||
}
|
||||
ct := DefaultContentType
|
||||
|
||||
if htype := hrsp.Header.Get("Content-Type"); htype != "" {
|
||||
ct = htype
|
||||
}
|
||||
|
||||
cf, cerr := h.newCodec(ct)
|
||||
if hrsp.StatusCode >= 400 && cerr != nil {
|
||||
var buf []byte
|
||||
if hrsp.Body != nil {
|
||||
buf, err = io.ReadAll(hrsp.Body)
|
||||
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
|
||||
if err != nil {
|
||||
if h.opts.Logger.V(logger.ErrorLevel) {
|
||||
h.opts.Logger.Errorf(ctx, "failed to read body: %v", err)
|
||||
}
|
||||
return errors.InternalServerError("go.micro.client", string(buf))
|
||||
}
|
||||
if h.opts.Logger.V(logger.DebugLevel) {
|
||||
h.opts.Logger.Debugf(ctx, "response %s with %v", buf, hrsp.Header)
|
||||
}
|
||||
// response like text/plain or something else, return original error
|
||||
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
|
||||
} else if cerr != nil {
|
||||
|
||||
cf, cerr := h.newCodec(ct)
|
||||
if cerr != nil {
|
||||
if h.opts.Logger.V(logger.DebugLevel) {
|
||||
h.opts.Logger.Debugf(ctx, "response with %v unknown content-type", hrsp.Header, ct)
|
||||
h.opts.Logger.Debugf(ctx, "response with %v unknown content-type %s %s", hrsp.Header, ct, buf)
|
||||
}
|
||||
return errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
}
|
||||
|
||||
if h.opts.Logger.V(logger.DebugLevel) {
|
||||
h.opts.Logger.Debugf(ctx, "response %s with %v", buf, hrsp.Header)
|
||||
}
|
||||
|
||||
// succeseful response
|
||||
if hrsp.StatusCode < 400 {
|
||||
if err = cf.ReadBody(hrsp.Body, rsp); err != nil {
|
||||
if err = cf.Unmarshal(buf, rsp); err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
return nil
|
||||
@@ -302,26 +311,10 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte
|
||||
}
|
||||
|
||||
if !ok || rerr == nil {
|
||||
buf, rerr := io.ReadAll(hrsp.Body)
|
||||
if h.opts.Logger.V(logger.DebugLevel) {
|
||||
h.opts.Logger.Debugf(ctx, "response %s with %v", buf, hrsp.Header)
|
||||
}
|
||||
if rerr != nil {
|
||||
return errors.InternalServerError("go.micro.client", rerr.Error())
|
||||
}
|
||||
return errors.New("go.micro.client", string(buf), int32(hrsp.StatusCode))
|
||||
}
|
||||
|
||||
if h.opts.Logger.V(logger.DebugLevel) {
|
||||
buf, rerr := io.ReadAll(hrsp.Body)
|
||||
h.opts.Logger.Debugf(ctx, "response %s with %v", buf, hrsp.Header)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", rerr.Error())
|
||||
}
|
||||
hrsp.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
|
||||
}
|
||||
|
||||
if cerr := cf.ReadBody(hrsp.Body, rerr); cerr != nil {
|
||||
if cerr := cf.Unmarshal(buf, rerr); cerr != nil {
|
||||
return errors.InternalServerError("go.micro.client", cerr.Error())
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user