The mega cruft proxy PR (#974)

* the mega cruft proxy PR

* Rename broker id

* add protocol=grpc

* fix compilation breaks

* Add the tunnel broker to the network

* fix broker id

* continue to be backwards compatible in the protocol
This commit is contained in:
Asim Aslam 2019-11-25 16:31:43 +00:00 committed by Vasiliy Tolstov
parent b18c0c8bb4
commit 9d31078176
2 changed files with 42 additions and 6 deletions

45
grpc.go
View File

@ -13,6 +13,7 @@ import (
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
@ -70,8 +71,13 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele
}, nil }, nil
} }
// only get the things that are of grpc protocol
selectOptions := append(opts.SelectOptions, selector.WithFilter(
selector.FilterLabel("protocol", "grpc"),
))
// get next nodes from the selector // get next nodes from the selector
next, err := g.opts.Selector.Select(service, opts.SelectOptions...) next, err := g.opts.Selector.Select(service, selectOptions...)
if err != nil { if err != nil {
if err == selector.ErrNotFound { if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
@ -510,29 +516,56 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
} }
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
var options client.PublishOptions
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx) md, ok := metadata.FromContext(ctx)
if !ok { if !ok {
md = make(map[string]string) md = make(map[string]string)
} }
md["Content-Type"] = p.ContentType() md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic()
cf, err := g.newGRPCCodec(p.ContentType()) cf, err := g.newGRPCCodec(p.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
b, err := cf.Marshal(p.Payload()) var body []byte
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) // passed in raw data
if d, ok := p.Payload().(*raw.Frame); ok {
body = d.Data
} else {
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
} }
g.once.Do(func() { g.once.Do(func() {
g.opts.Broker.Connect() g.opts.Broker.Connect()
}) })
return g.opts.Broker.Publish(p.Topic(), &broker.Message{ topic := p.Topic()
// get proxy topic
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
return g.opts.Broker.Publish(topic, &broker.Message{
Header: md, Header: md,
Body: b, Body: body,
}) })
} }

View File

@ -45,6 +45,9 @@ func TestGRPCClient(t *testing.T) {
{ {
Id: "test-1", Id: "test-1",
Address: l.Addr().String(), Address: l.Addr().String(),
Metadata: map[string]string{
"protocol": "grpc",
},
}, },
}, },
}) })