diff --git a/grpc.go b/grpc.go index a09dd59..942d1a8 100644 --- a/grpc.go +++ b/grpc.go @@ -89,6 +89,11 @@ func newGRPCServer(opts ...server.Option) server.Server { type grpcRouter struct { h func(context.Context, server.Request, interface{}) error + m func(context.Context, server.Message) error +} + +func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error { + return r.m(ctx, msg) } func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { @@ -258,7 +263,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { handler = g.opts.HdlrWrappers[i-1](handler) } - r := grpcRouter{handler} + r := grpcRouter{h: handler} // serve the actual request using the request router if err := r.ServeRequest(ctx, request, response); err != nil { @@ -564,7 +569,7 @@ func (g *grpcServer) Register() error { node.Metadata["registry"] = config.Registry.String() node.Metadata["server"] = g.String() node.Metadata["transport"] = g.String() - // node.Metadata["transport"] = config.Transport.String() + node.Metadata["protocol"] = "grpc" g.RLock() // Maps are ordered randomly, sort the keys for consistency diff --git a/request.go b/request.go index 617b9a7..adfed48 100644 --- a/request.go +++ b/request.go @@ -20,6 +20,9 @@ type rpcMessage struct { topic string contentType string payload interface{} + header map[string]string + body []byte + codec codec.Codec } func (r *rpcRequest) ContentType() string { @@ -73,3 +76,15 @@ func (r *rpcMessage) Topic() string { func (r *rpcMessage) Payload() interface{} { return r.payload } + +func (r *rpcMessage) Header() map[string]string { + return r.header +} + +func (r *rpcMessage) Body() []byte { + return r.body +} + +func (r *rpcMessage) Codec() codec.Reader { + return r.codec +} diff --git a/subscriber.go b/subscriber.go index 1f885ca..0e4f4f4 100644 --- a/subscriber.go +++ b/subscriber.go @@ -33,7 +33,6 @@ type subscriber struct { } func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { - options := server.SubscriberOptions{ AutoAck: true, } @@ -239,6 +238,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke topic: sb.topic, contentType: ct, payload: req.Interface(), + header: msg.Header, + body: msg.Body, }) }() }