Compare commits
	
		
			4 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| ef36082f2e | |||
| 21c897be47 | |||
| 0b21dd6660 | |||
| 18eb0d9e5c | 
@@ -5,14 +5,14 @@ This plugin is a http client for micro.
 | 
			
		||||
## Overview
 | 
			
		||||
 | 
			
		||||
The http client wraps `net/http` to provide a robust micro client with service discovery, load balancing and streaming. 
 | 
			
		||||
It complies with the [micro.Client](https://godoc.org/go.unistack.org/micro-client-http/v3#Client) interface.
 | 
			
		||||
It complies with the [micro.Client](https://godoc.org/go.unistack.org/micro-client-http/v4#Client) interface.
 | 
			
		||||
 | 
			
		||||
## Usage
 | 
			
		||||
 | 
			
		||||
### Use directly
 | 
			
		||||
 | 
			
		||||
```go
 | 
			
		||||
import "go.unistack.org/micro-client-http/v3"
 | 
			
		||||
import "go.unistack.org/micro-client-http/v4"
 | 
			
		||||
 | 
			
		||||
service := micro.NewService(
 | 
			
		||||
	micro.Name("my.service"),
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										6
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								go.mod
									
									
									
									
									
								
							@@ -1,5 +1,5 @@
 | 
			
		||||
module go.unistack.org/micro-client-http/v3
 | 
			
		||||
module go.unistack.org/micro-client-http/v4
 | 
			
		||||
 | 
			
		||||
go 1.18
 | 
			
		||||
go 1.19
 | 
			
		||||
 | 
			
		||||
require go.unistack.org/micro/v3 v3.10.16
 | 
			
		||||
require go.unistack.org/micro/v4 v4.0.1
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										10
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								go.sum
									
									
									
									
									
								
							@@ -1,8 +1,2 @@
 | 
			
		||||
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
 | 
			
		||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
 | 
			
		||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
 | 
			
		||||
go.unistack.org/micro/v3 v3.10.16 h1:2er/SKKYbV60M+UuJM4eYCF0MZYAIq/yNUrAbTfgq8Q=
 | 
			
		||||
go.unistack.org/micro/v3 v3.10.16/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q=
 | 
			
		||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 | 
			
		||||
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
			
		||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
			
		||||
go.unistack.org/micro/v4 v4.0.1 h1:xo1IxbVfgh8i0eY0VeYa3cbb13u5n/Mxnp3FOgWD4Jo=
 | 
			
		||||
go.unistack.org/micro/v4 v4.0.1/go.mod h1:p/J5UcSJjfHsWGT31uKoghQ5rUQZzQJBAFy+Z4+ZVMs=
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										95
									
								
								http.go
									
									
									
									
									
								
							
							
						
						
									
										95
									
								
								http.go
									
									
									
									
									
								
							@@ -1,5 +1,5 @@
 | 
			
		||||
// Package http provides a http client
 | 
			
		||||
package http // import "go.unistack.org/micro-client-http/v3"
 | 
			
		||||
package http // import "go.unistack.org/micro-client-http/v4"
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
@@ -10,19 +10,17 @@ import (
 | 
			
		||||
	"net"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/url"
 | 
			
		||||
	"os"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/broker"
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/codec"
 | 
			
		||||
	"go.unistack.org/micro/v3/errors"
 | 
			
		||||
	"go.unistack.org/micro/v3/logger"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
	"go.unistack.org/micro/v3/selector"
 | 
			
		||||
	rutil "go.unistack.org/micro/v3/util/reflect"
 | 
			
		||||
	"go.unistack.org/micro/v4/client"
 | 
			
		||||
	"go.unistack.org/micro/v4/codec"
 | 
			
		||||
	"go.unistack.org/micro/v4/errors"
 | 
			
		||||
	"go.unistack.org/micro/v4/logger"
 | 
			
		||||
	"go.unistack.org/micro/v4/metadata"
 | 
			
		||||
	"go.unistack.org/micro/v4/selector"
 | 
			
		||||
	rutil "go.unistack.org/micro/v4/util/reflect"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var DefaultContentType = "application/json"
 | 
			
		||||
@@ -147,6 +145,11 @@ func newRequest(ctx context.Context, log logger.Logger, addr string, req client.
 | 
			
		||||
	if opts.AuthToken != "" {
 | 
			
		||||
		header.Set(metadata.HeaderAuthorization, opts.AuthToken)
 | 
			
		||||
	}
 | 
			
		||||
	if opts.RequestMetadata != nil {
 | 
			
		||||
		for k, v := range opts.RequestMetadata {
 | 
			
		||||
			header.Set(k, v)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if md, ok := metadata.FromOutgoingContext(ctx); ok {
 | 
			
		||||
		for k, v := range md {
 | 
			
		||||
@@ -308,9 +311,6 @@ func (h *httpClient) Init(opts ...client.Option) error {
 | 
			
		||||
		o(&h.opts)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := h.opts.Broker.Init(); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if err := h.opts.Tracer.Init(); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
@@ -334,10 +334,6 @@ func (h *httpClient) Options() client.Options {
 | 
			
		||||
	return h.opts
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
 | 
			
		||||
	return newHTTPMessage(topic, msg, h.opts.ContentType, opts...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpClient) NewRequest(service, method string, req interface{}, opts ...client.RequestOption) client.Request {
 | 
			
		||||
	return newHTTPRequest(service, method, req, h.opts.ContentType, opts...)
 | 
			
		||||
}
 | 
			
		||||
@@ -614,71 +610,6 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
 | 
			
		||||
	return nil, grr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpClient) BatchPublish(ctx context.Context, p []client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
	return h.publish(ctx, p, opts...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
	return h.publish(ctx, []client.Message{p}, opts...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
	var body []byte
 | 
			
		||||
 | 
			
		||||
	options := client.NewPublishOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	// get proxy
 | 
			
		||||
	exchange := ""
 | 
			
		||||
	if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
 | 
			
		||||
		exchange = v
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	omd, ok := metadata.FromOutgoingContext(ctx)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		omd = metadata.New(2)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	msgs := make([]*broker.Message, 0, len(ps))
 | 
			
		||||
 | 
			
		||||
	for _, p := range ps {
 | 
			
		||||
		md := metadata.Copy(omd)
 | 
			
		||||
		md[metadata.HeaderContentType] = p.ContentType()
 | 
			
		||||
 | 
			
		||||
		// passed in raw data
 | 
			
		||||
		if d, ok := p.Payload().(*codec.Frame); ok {
 | 
			
		||||
			body = d.Data
 | 
			
		||||
		} else {
 | 
			
		||||
			// use codec for payload
 | 
			
		||||
			cf, err := h.newCodec(p.ContentType())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return errors.InternalServerError("go.micro.client", err.Error())
 | 
			
		||||
			}
 | 
			
		||||
			// set the body
 | 
			
		||||
			b, err := cf.Marshal(p.Payload())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return errors.InternalServerError("go.micro.client", err.Error())
 | 
			
		||||
			}
 | 
			
		||||
			body = b
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		topic := p.Topic()
 | 
			
		||||
		if len(exchange) > 0 {
 | 
			
		||||
			topic = exchange
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for k, v := range p.Metadata() {
 | 
			
		||||
			md.Set(k, v)
 | 
			
		||||
		}
 | 
			
		||||
		md.Set(metadata.HeaderTopic, topic)
 | 
			
		||||
		msgs = append(msgs, &broker.Message{Header: md, Body: body})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return h.opts.Broker.BatchPublish(ctx, msgs,
 | 
			
		||||
		broker.PublishContext(ctx),
 | 
			
		||||
		broker.PublishBodyOnly(options.BodyOnly),
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpClient) String() string {
 | 
			
		||||
	return "http"
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										44
									
								
								message.go
									
									
									
									
									
								
							
							
						
						
									
										44
									
								
								message.go
									
									
									
									
									
								
							@@ -1,44 +0,0 @@
 | 
			
		||||
package http
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type httpMessage struct {
 | 
			
		||||
	payload     interface{}
 | 
			
		||||
	topic       string
 | 
			
		||||
	contentType string
 | 
			
		||||
	opts        client.MessageOptions
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newHTTPMessage(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
 | 
			
		||||
	options := client.NewMessageOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	if len(options.ContentType) > 0 {
 | 
			
		||||
		contentType = options.ContentType
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &httpMessage{
 | 
			
		||||
		payload:     payload,
 | 
			
		||||
		topic:       topic,
 | 
			
		||||
		contentType: contentType,
 | 
			
		||||
		opts:        options,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpMessage) ContentType() string {
 | 
			
		||||
	return h.contentType
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpMessage) Topic() string {
 | 
			
		||||
	return h.topic
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpMessage) Payload() interface{} {
 | 
			
		||||
	return h.payload
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *httpMessage) Metadata() metadata.Metadata {
 | 
			
		||||
	return h.opts.Metadata
 | 
			
		||||
}
 | 
			
		||||
@@ -4,8 +4,8 @@ import (
 | 
			
		||||
	"net"
 | 
			
		||||
	"net/http"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
	"go.unistack.org/micro/v4/client"
 | 
			
		||||
	"go.unistack.org/micro/v4/metadata"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,8 @@
 | 
			
		||||
package http
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/codec"
 | 
			
		||||
	"go.unistack.org/micro/v4/client"
 | 
			
		||||
	"go.unistack.org/micro/v4/codec"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type httpRequest struct {
 | 
			
		||||
 
 | 
			
		||||
@@ -9,10 +9,10 @@ import (
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/codec"
 | 
			
		||||
	"go.unistack.org/micro/v3/errors"
 | 
			
		||||
	"go.unistack.org/micro/v3/logger"
 | 
			
		||||
	"go.unistack.org/micro/v4/client"
 | 
			
		||||
	"go.unistack.org/micro/v4/codec"
 | 
			
		||||
	"go.unistack.org/micro/v4/errors"
 | 
			
		||||
	"go.unistack.org/micro/v4/logger"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Implements the streamer interface
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										18
									
								
								util.go
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								util.go
									
									
									
									
									
								
							@@ -10,10 +10,11 @@ import (
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/errors"
 | 
			
		||||
	"go.unistack.org/micro/v3/logger"
 | 
			
		||||
	rutil "go.unistack.org/micro/v3/util/reflect"
 | 
			
		||||
	"go.unistack.org/micro/v4/client"
 | 
			
		||||
	"go.unistack.org/micro/v4/errors"
 | 
			
		||||
	"go.unistack.org/micro/v4/logger"
 | 
			
		||||
	"go.unistack.org/micro/v4/metadata"
 | 
			
		||||
	rutil "go.unistack.org/micro/v4/util/reflect"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
@@ -252,6 +253,13 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if opts.ResponseMetadata != nil {
 | 
			
		||||
		*opts.ResponseMetadata = metadata.New(len(hrsp.Header))
 | 
			
		||||
		for k, v := range hrsp.Header {
 | 
			
		||||
			opts.ResponseMetadata.Set(k, strings.Join(v, ","))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case <-ctx.Done():
 | 
			
		||||
		err = ctx.Err()
 | 
			
		||||
@@ -275,7 +283,7 @@ func (h *httpClient) parseRsp(ctx context.Context, hrsp *http.Response, rsp inte
 | 
			
		||||
		cf, cerr := h.newCodec(ct)
 | 
			
		||||
		if cerr != nil {
 | 
			
		||||
			if h.opts.Logger.V(logger.DebugLevel) {
 | 
			
		||||
				h.opts.Logger.Debugf(ctx, "response with %v unknown content-type %s", hrsp.Header, ct, buf)
 | 
			
		||||
				h.opts.Logger.Debugf(ctx, "response with %v unknown content-type %s %s", hrsp.Header, ct, buf)
 | 
			
		||||
			}
 | 
			
		||||
			return errors.InternalServerError("go.micro.client", cerr.Error())
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user