diff --git a/client/rpc_client.go b/client/rpc_client.go index e8e0bbeb..bf2abacf 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -105,7 +105,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r defer c.Close() client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) - err = client.Call(ctx, request.Method(), request.Request(), response) + err = client.Call(ctx, request.Service(), request.Method(), request.Request(), response) if err != nil { return err } @@ -137,7 +137,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request, } client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) - call := client.StreamGo(request.Method(), request.Request(), responseChan) + call := client.StreamGo(request.Service(), request.Method(), request.Request(), responseChan) return &rpcStream{ request: request, diff --git a/client/rpc_codec.go b/client/rpc_codec.go index f2fd28e3..fd5df68b 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -64,14 +64,19 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.Ne func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error { m := &codec.Message{ - Id: req.Seq, - Method: req.ServiceMethod, - Type: codec.Request, + Id: req.Seq, + Target: req.Service, + Method: req.ServiceMethod, + Type: codec.Request, + Headers: map[string]string{}, } if err := c.codec.Write(m, body); err != nil { return err } c.req.Body = c.buf.wbuf.Bytes() + for k, v := range m.Headers { + c.req.Header[k] = v + } return c.client.Send(c.req) } diff --git a/client/rpcplus_client.go b/client/rpcplus_client.go index 4fb2d400..f119cc3e 100644 --- a/client/rpcplus_client.go +++ b/client/rpcplus_client.go @@ -32,6 +32,7 @@ var errShutdown = errors.New("connection is shut down") // call represents an active RPC. type call struct { + Service string ServiceMethod string // The name of the service and method to call. Args interface{} // The argument to the function (*struct). Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming). @@ -65,6 +66,7 @@ type clientCodec interface { } type request struct { + Service string ServiceMethod string // format: "Service.Method" Seq uint64 // sequence number chosen by client next *request // for free list in Server @@ -95,6 +97,7 @@ func (client *client) send(call *call) { client.mutex.Unlock() // Encode and send the request. + client.request.Service = call.Service client.request.Seq = seq client.request.ServiceMethod = call.ServiceMethod err := client.codec.WriteRequest(&client.request, call.Args) @@ -241,12 +244,13 @@ func (client *client) Close() error { // the invocation. The done channel will signal when the call is complete by returning // the same call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. -func (client *client) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *call) *call { +func (client *client) Go(ctx context.Context, service, serviceMethod string, args interface{}, reply interface{}, done chan *call) *call { span := trace.NewSpanFromContext(ctx) span.StartClient(serviceMethod) defer span.Finish() cal := new(call) + cal.Service = service cal.ServiceMethod = serviceMethod cal.Args = args cal.Reply = reply @@ -268,7 +272,7 @@ func (client *client) Go(ctx context.Context, serviceMethod string, args interfa // StreamGo invokes the streaming function asynchronously. It returns the call structure representing // the invocation. -func (client *client) StreamGo(serviceMethod string, args interface{}, replyStream interface{}) *call { +func (client *client) StreamGo(service string, serviceMethod string, args interface{}, replyStream interface{}) *call { // first check the replyStream object is a stream of pointers to a data structure typ := reflect.TypeOf(replyStream) // FIXME: check the direction of the channel, maybe? @@ -278,6 +282,7 @@ func (client *client) StreamGo(serviceMethod string, args interface{}, replyStre } call := new(call) + call.Service = service call.ServiceMethod = serviceMethod call.Args = args call.Reply = replyStream @@ -288,7 +293,7 @@ func (client *client) StreamGo(serviceMethod string, args interface{}, replyStre } // call invokes the named function, waits for it to complete, and returns its error status. -func (client *client) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error { - call := <-client.Go(ctx, serviceMethod, args, reply, make(chan *call, 1)).Done +func (client *client) Call(ctx context.Context, service string, serviceMethod string, args interface{}, reply interface{}) error { + call := <-client.Go(ctx, service, serviceMethod, args, reply, make(chan *call, 1)).Done return call.Error }