add the various updates for proxy in http client/server

This commit is contained in:
Asim Aslam 2019-11-22 22:09:18 +00:00 committed by Vasiliy Tolstov
parent da340f01bc
commit ce4c0458bc
2 changed files with 62 additions and 6 deletions

62
http.go
View File

@ -10,6 +10,7 @@ import (
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"os"
"sync" "sync"
"time" "time"
@ -17,6 +18,7 @@ import (
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/config/cmd" "github.com/micro/go-micro/config/cmd"
errors "github.com/micro/go-micro/errors" errors "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
@ -34,17 +36,37 @@ func init() {
} }
func (h *httpClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { func (h *httpClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
service := request.Service()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
service = prx
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = []string{prx}
}
// return remote address // return remote address
if len(opts.Address) > 0 { if len(opts.Address) > 0 {
return func() (*registry.Node, error) { return func() (*registry.Node, error) {
return &registry.Node{ return &registry.Node{
Address: opts.Address[0], Address: opts.Address[0],
Metadata: map[string]string{
"protocol": "http",
},
}, nil }, nil
}, nil }, nil
} }
// only get the things that are of mucp protocol
selectOptions := append(opts.SelectOptions, selector.WithFilter(
selector.FilterLabel("protocol", "http"),
))
// get next nodes from the selector // get next nodes from the selector
next, err := h.opts.Selector.Select(request.Service(), opts.SelectOptions...) next, err := h.opts.Selector.Select(service, selectOptions...)
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error()) return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil { } else if err != nil {
@ -388,29 +410,57 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
} }
func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
options := client.PublishOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx) md, ok := metadata.FromContext(ctx)
if !ok { if !ok {
md = make(map[string]string) md = make(map[string]string)
} }
md["Content-Type"] = p.ContentType() md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic()
cf, err := h.newCodec(p.ContentType()) cf, err := h.newCodec(p.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
b := &buffer{bytes.NewBuffer(nil)} var body []byte
if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) // passed in raw data
if d, ok := p.Payload().(*raw.Frame); ok {
body = d.Data
} else {
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b.Bytes()
} }
h.once.Do(func() { h.once.Do(func() {
h.opts.Broker.Connect() h.opts.Broker.Connect()
}) })
return h.opts.Broker.Publish(p.Topic(), &broker.Message{ topic := p.Topic()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
return h.opts.Broker.Publish(topic, &broker.Message{
Header: md, Header: md,
Body: b.Bytes(), Body: body,
}) })
} }

View File

@ -73,6 +73,9 @@ func TestHTTPClient(t *testing.T) {
{ {
Id: "test.service.1", Id: "test.service.1",
Address: l.Addr().String(), Address: l.Addr().String(),
Metadata: map[string]string{
"protocol": "http",
},
}, },
}, },
}); err != nil { }); err != nil {
@ -229,6 +232,9 @@ func TestHTTPClientStream(t *testing.T) {
{ {
Id: "test.service.1", Id: "test.service.1",
Address: l.Addr().String(), Address: l.Addr().String(),
Metadata: map[string]string{
"protocol": "http",
},
}, },
}, },
}); err != nil { }); err != nil {