Merge pull request #35 from micro/codec

Reworking codec interface
This commit is contained in:
Asim 2016-01-06 00:50:56 +00:00
commit dee9cbb763
3 changed files with 68 additions and 266 deletions

View File

@ -51,7 +51,7 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
} }
func (r *rpcClient) call(ctx context.Context, address string, request Request, response interface{}) error { func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}) error {
msg := &transport.Message{ msg := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
} }
@ -63,9 +63,9 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r
} }
} }
msg.Header["Content-Type"] = request.ContentType() msg.Header["Content-Type"] = req.ContentType()
cf, err := r.newCodec(request.ContentType()) cf, err := r.newCodec(req.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
@ -74,18 +74,36 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
defer c.Close()
client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) var once sync.Once
defer client.Close() stream := &rpcStream{
context: ctx,
request: req,
once: once,
closed: make(chan bool),
codec: newRpcPlusCodec(msg, c, cf),
}
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { go func() {
select { // defer stream close
case ch <- client.Call(ctx, request.Service(), request.Method(), request.Request(), response): defer stream.Close()
default:
// send request
if err := stream.Send(req.Request()); err != nil {
ch <- err
return
} }
// recv request
if err := stream.Recv(resp); err != nil {
ch <- err
return
}
// success
ch <- nil
}() }()
select { select {
@ -133,10 +151,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { go func() {
select { ch <- stream.Send(req.Request())
case ch <- stream.Send(req.Request()):
default:
}
}() }()
select { select {

View File

@ -2,6 +2,7 @@ package client
import ( import (
"bytes" "bytes"
"errors"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/jsonrpc" "github.com/micro/go-micro/codec/jsonrpc"
@ -9,6 +10,23 @@ import (
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
const (
lastStreamResponseError = "EOS"
)
// serverError represents an error that has been returned from
// the remote side of the RPC connection.
type serverError string
func (e serverError) Error() string {
return string(e)
}
// errShutdown holds the specific error for closing/closed connections
var (
errShutdown = errors.New("connection is shut down")
)
type rpcPlusCodec struct { type rpcPlusCodec struct {
client transport.Client client transport.Client
codec codec.Codec codec codec.Codec
@ -22,6 +40,28 @@ type readWriteCloser struct {
rbuf *bytes.Buffer rbuf *bytes.Buffer
} }
type clientCodec interface {
WriteRequest(*request, interface{}) error
ReadResponseHeader(*response) error
ReadResponseBody(interface{}) error
Close() error
}
type request struct {
Service string
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *request // for free list in Server
}
type response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *response // for free list in Server
}
var ( var (
defaultContentType = "application/octet-stream" defaultContentType = "application/octet-stream"

View File

@ -1,253 +0,0 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package client
import (
"errors"
"io"
"log"
"sync"
"github.com/youtube/vitess/go/trace"
"golang.org/x/net/context"
)
const (
lastStreamResponseError = "EOS"
)
// serverError represents an error that has been returned from
// the remote side of the RPC connection.
type serverError string
func (e serverError) Error() string {
return string(e)
}
// errShutdown holds the specific error for closing/closed connections
var errShutdown = errors.New("connection is shut down")
// call represents an active RPC.
type call struct {
Service string
ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming).
Error error // After completion, the error status.
Done chan *call // Strobes when call is complete (nil for streaming RPCs)
Subseq uint64 // The next expected subseq in the packets
}
// client represents an RPC client.
// There may be multiple outstanding calls associated
// with a single client, and a client may be used by
// multiple goroutines simultaneously.
type client struct {
mutex sync.Mutex // protects pending, seq, request
sending sync.Mutex
request request
seq uint64
codec clientCodec
pending map[uint64]*call
closing bool
shutdown bool
}
type clientCodec interface {
WriteRequest(*request, interface{}) error
ReadResponseHeader(*response) error
ReadResponseBody(interface{}) error
Close() error
}
type request struct {
Service string
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *request // for free list in Server
}
type response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *response // for free list in Server
}
func (client *client) send(call *call) {
client.sending.Lock()
defer client.sending.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown {
call.Error = errShutdown
client.mutex.Unlock()
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Service = call.Service
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
func (client *client) input() {
var err error
var resp response
for err == nil {
resp = response{}
err = client.codec.ReadResponseHeader(&resp)
if err != nil {
if err == io.EOF && !client.closing {
err = io.ErrUnexpectedEOF
}
break
}
seq := resp.Seq
client.mutex.Lock()
call := client.pending[seq]
client.mutex.Unlock()
switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case resp.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = serverError(resp.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error payload: " + err.Error())
}
client.done(seq)
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
client.done(seq)
}
}
// Terminate pending calls.
client.sending.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.sending.Unlock()
if err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
func (client *client) done(seq uint64) {
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.done()
}
}
func (call *call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
log.Println("rpc: discarding call reply due to insufficient Done chan capacity")
}
}
func newClientWithCodec(codec clientCodec) *client {
client := &client{
codec: codec,
pending: make(map[uint64]*call),
}
go client.input()
return client
}
// Close closes the client connection
func (client *client) Close() error {
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
return errShutdown
}
client.closing = true
client.mutex.Unlock()
return client.codec.Close()
}
// Go invokes the function asynchronously. It returns the call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *client) Go(ctx context.Context, service, serviceMethod string, args interface{}, reply interface{}, done chan *call) *call {
span := trace.NewSpanFromContext(ctx)
span.StartClient(serviceMethod)
defer span.Finish()
cal := new(call)
cal.Service = service
cal.ServiceMethod = serviceMethod
cal.Args = args
cal.Reply = reply
if done == nil {
done = make(chan *call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
cal.Done = done
client.send(cal)
return cal
}
// call invokes the named function, waits for it to complete, and returns its error status.
func (client *client) Call(ctx context.Context, service string, serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(ctx, service, serviceMethod, args, reply, make(chan *call, 1)).Done
return call.Error
}