diff --git a/http.go b/http.go index 64330ac..cb37def 100644 --- a/http.go +++ b/http.go @@ -10,6 +10,7 @@ import ( "net" "net/http" "net/url" + "os" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/codec" + raw "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/config/cmd" errors "github.com/micro/go-micro/errors" "github.com/micro/go-micro/metadata" @@ -34,17 +36,37 @@ func init() { } func (h *httpClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { + service := request.Service() + + // get proxy + if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { + service = prx + } + + // get proxy address + if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 { + opts.Address = []string{prx} + } + // return remote address if len(opts.Address) > 0 { return func() (*registry.Node, error) { return ®istry.Node{ Address: opts.Address[0], + Metadata: map[string]string{ + "protocol": "http", + }, }, nil }, nil } + // only get the things that are of mucp protocol + selectOptions := append(opts.SelectOptions, selector.WithFilter( + selector.FilterLabel("protocol", "http"), + )) + // get next nodes from the selector - next, err := h.opts.Selector.Select(request.Service(), opts.SelectOptions...) + next, err := h.opts.Selector.Select(service, selectOptions...) if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) } else if err != nil { @@ -388,29 +410,57 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli } func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + options := client.PublishOptions{ + Context: context.Background(), + } + for _, o := range opts { + o(&options) + } + md, ok := metadata.FromContext(ctx) if !ok { md = make(map[string]string) } md["Content-Type"] = p.ContentType() + md["Micro-Topic"] = p.Topic() cf, err := h.newCodec(p.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } - b := &buffer{bytes.NewBuffer(nil)} - if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + var body []byte + + // passed in raw data + if d, ok := p.Payload().(*raw.Frame); ok { + body = d.Data + } else { + b := &buffer{bytes.NewBuffer(nil)} + if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + body = b.Bytes() } h.once.Do(func() { h.opts.Broker.Connect() }) - return h.opts.Broker.Publish(p.Topic(), &broker.Message{ + topic := p.Topic() + + // get proxy + if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { + options.Exchange = prx + } + + // get the exchange + if len(options.Exchange) > 0 { + topic = options.Exchange + } + + return h.opts.Broker.Publish(topic, &broker.Message{ Header: md, - Body: b.Bytes(), + Body: body, }) } diff --git a/http_test.go b/http_test.go index 51eb31a..9abe628 100644 --- a/http_test.go +++ b/http_test.go @@ -73,6 +73,9 @@ func TestHTTPClient(t *testing.T) { { Id: "test.service.1", Address: l.Addr().String(), + Metadata: map[string]string{ + "protocol": "http", + }, }, }, }); err != nil { @@ -229,6 +232,9 @@ func TestHTTPClientStream(t *testing.T) { { Id: "test.service.1", Address: l.Addr().String(), + Metadata: map[string]string{ + "protocol": "http", + }, }, }, }); err != nil {