The mega cruft proxy PR (#974)

* the mega cruft proxy PR

* Rename broker id

* add protocol=grpc

* fix compilation breaks

* Add the tunnel broker to the network

* fix broker id

* continue to be backwards compatible in the protocol
This commit is contained in:
Asim Aslam 2019-11-25 16:31:43 +00:00 committed by Vasiliy Tolstov
parent d0222ee239
commit 15b405b1a9
3 changed files with 24 additions and 3 deletions

View File

@ -89,6 +89,11 @@ func newGRPCServer(opts ...server.Option) server.Server {
type grpcRouter struct { type grpcRouter struct {
h func(context.Context, server.Request, interface{}) error h func(context.Context, server.Request, interface{}) error
m func(context.Context, server.Message) error
}
func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error {
return r.m(ctx, msg)
} }
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
@ -258,7 +263,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
handler = g.opts.HdlrWrappers[i-1](handler) handler = g.opts.HdlrWrappers[i-1](handler)
} }
r := grpcRouter{handler} r := grpcRouter{h: handler}
// serve the actual request using the request router // serve the actual request using the request router
if err := r.ServeRequest(ctx, request, response); err != nil { if err := r.ServeRequest(ctx, request, response); err != nil {
@ -564,7 +569,7 @@ func (g *grpcServer) Register() error {
node.Metadata["registry"] = config.Registry.String() node.Metadata["registry"] = config.Registry.String()
node.Metadata["server"] = g.String() node.Metadata["server"] = g.String()
node.Metadata["transport"] = g.String() node.Metadata["transport"] = g.String()
// node.Metadata["transport"] = config.Transport.String() node.Metadata["protocol"] = "grpc"
g.RLock() g.RLock()
// Maps are ordered randomly, sort the keys for consistency // Maps are ordered randomly, sort the keys for consistency

View File

@ -20,6 +20,9 @@ type rpcMessage struct {
topic string topic string
contentType string contentType string
payload interface{} payload interface{}
header map[string]string
body []byte
codec codec.Codec
} }
func (r *rpcRequest) ContentType() string { func (r *rpcRequest) ContentType() string {
@ -73,3 +76,15 @@ func (r *rpcMessage) Topic() string {
func (r *rpcMessage) Payload() interface{} { func (r *rpcMessage) Payload() interface{} {
return r.payload return r.payload
} }
func (r *rpcMessage) Header() map[string]string {
return r.header
}
func (r *rpcMessage) Body() []byte {
return r.body
}
func (r *rpcMessage) Codec() codec.Reader {
return r.codec
}

View File

@ -33,7 +33,6 @@ type subscriber struct {
} }
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.SubscriberOptions{ options := server.SubscriberOptions{
AutoAck: true, AutoAck: true,
} }
@ -239,6 +238,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
topic: sb.topic, topic: sb.topic,
contentType: ct, contentType: ct,
payload: req.Interface(), payload: req.Interface(),
header: msg.Header,
body: msg.Body,
}) })
}() }()
} }