add changes for mercury

This commit is contained in:
Asim 2015-12-16 01:18:05 +00:00
parent ed75f84584
commit 271b827aa9
3 changed files with 19 additions and 9 deletions

View File

@ -105,7 +105,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r
defer c.Close() defer c.Close()
client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) client := newClientWithCodec(newRpcPlusCodec(msg, c, cf))
err = client.Call(ctx, request.Method(), request.Request(), response) err = client.Call(ctx, request.Service(), request.Method(), request.Request(), response)
if err != nil { if err != nil {
return err return err
} }
@ -137,7 +137,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request,
} }
client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) client := newClientWithCodec(newRpcPlusCodec(msg, c, cf))
call := client.StreamGo(request.Method(), request.Request(), responseChan) call := client.StreamGo(request.Service(), request.Method(), request.Request(), responseChan)
return &rpcStream{ return &rpcStream{
request: request, request: request,

View File

@ -65,13 +65,18 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.Ne
func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error { func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error {
m := &codec.Message{ m := &codec.Message{
Id: req.Seq, Id: req.Seq,
Target: req.Service,
Method: req.ServiceMethod, Method: req.ServiceMethod,
Type: codec.Request, Type: codec.Request,
Headers: map[string]string{},
} }
if err := c.codec.Write(m, body); err != nil { if err := c.codec.Write(m, body); err != nil {
return err return err
} }
c.req.Body = c.buf.wbuf.Bytes() c.req.Body = c.buf.wbuf.Bytes()
for k, v := range m.Headers {
c.req.Header[k] = v
}
return c.client.Send(c.req) return c.client.Send(c.req)
} }

View File

@ -32,6 +32,7 @@ var errShutdown = errors.New("connection is shut down")
// call represents an active RPC. // call represents an active RPC.
type call struct { type call struct {
Service string
ServiceMethod string // The name of the service and method to call. ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct). Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming). Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming).
@ -65,6 +66,7 @@ type clientCodec interface {
} }
type request struct { type request struct {
Service string
ServiceMethod string // format: "Service.Method" ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client Seq uint64 // sequence number chosen by client
next *request // for free list in Server next *request // for free list in Server
@ -95,6 +97,7 @@ func (client *client) send(call *call) {
client.mutex.Unlock() client.mutex.Unlock()
// Encode and send the request. // Encode and send the request.
client.request.Service = call.Service
client.request.Seq = seq client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args) err := client.codec.WriteRequest(&client.request, call.Args)
@ -241,12 +244,13 @@ func (client *client) Close() error {
// the invocation. The done channel will signal when the call is complete by returning // the invocation. The done channel will signal when the call is complete by returning
// the same call object. If done is nil, Go will allocate a new channel. // the same call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash. // If non-nil, done must be buffered or Go will deliberately crash.
func (client *client) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *call) *call { func (client *client) Go(ctx context.Context, service, serviceMethod string, args interface{}, reply interface{}, done chan *call) *call {
span := trace.NewSpanFromContext(ctx) span := trace.NewSpanFromContext(ctx)
span.StartClient(serviceMethod) span.StartClient(serviceMethod)
defer span.Finish() defer span.Finish()
cal := new(call) cal := new(call)
cal.Service = service
cal.ServiceMethod = serviceMethod cal.ServiceMethod = serviceMethod
cal.Args = args cal.Args = args
cal.Reply = reply cal.Reply = reply
@ -268,7 +272,7 @@ func (client *client) Go(ctx context.Context, serviceMethod string, args interfa
// StreamGo invokes the streaming function asynchronously. It returns the call structure representing // StreamGo invokes the streaming function asynchronously. It returns the call structure representing
// the invocation. // the invocation.
func (client *client) StreamGo(serviceMethod string, args interface{}, replyStream interface{}) *call { func (client *client) StreamGo(service string, serviceMethod string, args interface{}, replyStream interface{}) *call {
// first check the replyStream object is a stream of pointers to a data structure // first check the replyStream object is a stream of pointers to a data structure
typ := reflect.TypeOf(replyStream) typ := reflect.TypeOf(replyStream)
// FIXME: check the direction of the channel, maybe? // FIXME: check the direction of the channel, maybe?
@ -278,6 +282,7 @@ func (client *client) StreamGo(serviceMethod string, args interface{}, replyStre
} }
call := new(call) call := new(call)
call.Service = service
call.ServiceMethod = serviceMethod call.ServiceMethod = serviceMethod
call.Args = args call.Args = args
call.Reply = replyStream call.Reply = replyStream
@ -288,7 +293,7 @@ func (client *client) StreamGo(serviceMethod string, args interface{}, replyStre
} }
// call invokes the named function, waits for it to complete, and returns its error status. // call invokes the named function, waits for it to complete, and returns its error status.
func (client *client) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error { func (client *client) Call(ctx context.Context, service string, serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(ctx, serviceMethod, args, reply, make(chan *call, 1)).Done call := <-client.Go(ctx, service, serviceMethod, args, reply, make(chan *call, 1)).Done
return call.Error return call.Error
} }