checkpoint

This commit is contained in:
Asim 2015-12-17 20:37:35 +00:00
parent c73a88e801
commit 6ae48c9f29
11 changed files with 284 additions and 100 deletions

View File

@ -27,13 +27,13 @@ import (
type Client interface { type Client interface {
NewPublication(topic string, msg interface{}) Publication NewPublication(topic string, msg interface{}) Publication
NewRequest(service, method string, req interface{}) Request NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewProtoRequest(service, method string, req interface{}) Request NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewJsonRequest(service, method string, req interface{}) Request NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, rspChan interface{}, opts ...CallOption) (Streamer, error) Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error)
StreamRemote(ctx context.Context, addr string, req Request, rspChan interface{}, opts ...CallOption) (Streamer, error) StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error)
Publish(ctx context.Context, p Publication, opts ...PublishOption) error Publish(ctx context.Context, p Publication, opts ...PublishOption) error
} }
@ -48,10 +48,15 @@ type Request interface {
Method() string Method() string
ContentType() string ContentType() string
Request() interface{} Request() interface{}
// indicates whether the request will be a streaming one rather than unary
Stream() bool
} }
type Streamer interface { type Streamer interface {
Context() context.Context
Request() Request Request() Request
Send(interface{}) error
Recv(interface{}) error
Error() error Error() error
Close() error Close() error
} }
@ -59,6 +64,7 @@ type Streamer interface {
type Option func(*options) type Option func(*options)
type CallOption func(*callOptions) type CallOption func(*callOptions)
type PublishOption func(*publishOptions) type PublishOption func(*publishOptions)
type RequestOption func(*requestOptions)
var ( var (
DefaultClient Client = newRpcClient() DefaultClient Client = newRpcClient()
@ -76,13 +82,13 @@ func CallRemote(ctx context.Context, address string, request Request, response i
// Creates a streaming connection with a service and returns responses on the // Creates a streaming connection with a service and returns responses on the
// channel passed in. It's upto the user to close the streamer. // channel passed in. It's upto the user to close the streamer.
func Stream(ctx context.Context, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.Stream(ctx, request, responseChan, opts...) return DefaultClient.Stream(ctx, request, opts...)
} }
// Creates a streaming connection to the address specified. // Creates a streaming connection to the address specified.
func StreamRemote(ctx context.Context, address string, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { func StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.StreamRemote(ctx, address, request, responseChan, opts...) return DefaultClient.StreamRemote(ctx, address, request, opts...)
} }
// Publishes a publication using the default client. Using the underlying broker // Publishes a publication using the default client. Using the underlying broker
@ -103,16 +109,16 @@ func NewPublication(topic string, message interface{}) Publication {
// Creates a new request using the default client. Content Type will // Creates a new request using the default client. Content Type will
// be set to the default within options and use the appropriate codec // be set to the default within options and use the appropriate codec
func NewRequest(service, method string, request interface{}) Request { func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewRequest(service, method, request) return DefaultClient.NewRequest(service, method, request, reqOpts...)
} }
// Creates a new protobuf request using the default client // Creates a new protobuf request using the default client
func NewProtoRequest(service, method string, request interface{}) Request { func NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewProtoRequest(service, method, request) return DefaultClient.NewProtoRequest(service, method, request, reqOpts...)
} }
// Creates a new json request using the default client // Creates a new json request using the default client
func NewJsonRequest(service, method string, request interface{}) Request { func NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewJsonRequest(service, method, request) return DefaultClient.NewJsonRequest(service, method, request, reqOpts...)
} }

View File

@ -24,6 +24,10 @@ type callOptions struct {
type publishOptions struct{} type publishOptions struct{}
type requestOptions struct {
stream bool
}
// Broker to be used for pub/sub // Broker to be used for pub/sub
func Broker(b broker.Broker) Option { func Broker(b broker.Broker) Option {
return func(o *options) { return func(o *options) {
@ -80,3 +84,11 @@ func WithSelectOption(so selector.SelectOption) CallOption {
o.selectOptions = append(o.selectOptions, so) o.selectOptions = append(o.selectOptions, so)
} }
} }
// Request Options
func StreamingRequest() RequestOption {
return func(o *requestOptions) {
o.stream = true
}
}

View File

@ -112,7 +112,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r
return client.Close() return client.Close()
} }
func (r *rpcClient) stream(ctx context.Context, address string, request Request, responseChan interface{}) (Streamer, error) { func (r *rpcClient) stream(ctx context.Context, address string, req Request) (Streamer, error) {
msg := &transport.Message{ msg := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
} }
@ -124,9 +124,9 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request,
} }
} }
msg.Header["Content-Type"] = request.ContentType() msg.Header["Content-Type"] = req.ContentType()
cf, err := r.newCodec(request.ContentType()) cf, err := r.newCodec(req.ContentType())
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error()) return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
@ -136,13 +136,22 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request,
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) codec := newRpcPlusCodec(msg, c, cf)
call := client.StreamGo(request.Service(), request.Method(), request.Request(), responseChan)
err = codec.WriteRequest(&request{
Service: req.Service(),
ServiceMethod: req.Method(),
Seq: 0,
}, req.Request())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
return &rpcStream{ return &rpcStream{
request: request, context: ctx,
call: call, request: req,
client: client, codec: codec,
}, nil }, nil
} }
@ -180,11 +189,11 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return err return err
} }
func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
return r.stream(ctx, address, request, responseChan) return r.stream(ctx, address, request)
} }
func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
var copts callOptions var copts callOptions
for _, opt := range opts { for _, opt := range opts {
opt(&copts) opt(&copts)
@ -209,7 +218,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in
address = fmt.Sprintf("%s:%d", address, node.Port) address = fmt.Sprintf("%s:%d", address, node.Port)
} }
stream, err := r.stream(ctx, address, request, responseChan) stream, err := r.stream(ctx, address, request)
r.opts.selector.Mark(request.Service(), node, err) r.opts.selector.Mark(request.Service(), node, err)
return stream, err return stream, err
} }
@ -247,14 +256,14 @@ func (r *rpcClient) NewPublication(topic string, message interface{}) Publicatio
func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication { func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, "application/octet-stream") return newRpcPublication(topic, message, "application/octet-stream")
} }
func (r *rpcClient) NewRequest(service, method string, request interface{}) Request { func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, r.opts.contentType) return newRpcRequest(service, method, request, r.opts.contentType, reqOpts...)
} }
func (r *rpcClient) NewProtoRequest(service, method string, request interface{}) Request { func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/octet-stream") return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...)
} }
func (r *rpcClient) NewJsonRequest(service, method string, request interface{}) Request { func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/json") return newRpcRequest(service, method, request, "application/json", reqOpts...)
} }

View File

@ -5,14 +5,22 @@ type rpcRequest struct {
method string method string
contentType string contentType string
request interface{} request interface{}
opts requestOptions
}
func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts requestOptions
for _, o := range reqOpts {
o(&opts)
} }
func newRpcRequest(service, method string, request interface{}, contentType string) Request {
return &rpcRequest{ return &rpcRequest{
service: service, service: service,
method: method, method: method,
request: request, request: request,
contentType: contentType, contentType: contentType,
opts: opts,
} }
} }
@ -31,3 +39,7 @@ func (r *rpcRequest) Method() string {
func (r *rpcRequest) Request() interface{} { func (r *rpcRequest) Request() interface{} {
return r.request return r.request
} }
func (r *rpcRequest) Stream() bool {
return r.opts.stream
}

View File

@ -1,19 +1,112 @@
package client package client
import (
"errors"
"io"
"log"
"sync"
"golang.org/x/net/context"
)
type rpcStream struct { type rpcStream struct {
sync.RWMutex
seq uint64
closed bool
err error
request Request request Request
call *call codec clientCodec
client *client context context.Context
}
func (r *rpcStream) Context() context.Context {
return r.context
} }
func (r *rpcStream) Request() Request { func (r *rpcStream) Request() Request {
return r.request return r.request
} }
func (r *rpcStream) Send(msg interface{}) error {
r.Lock()
defer r.Unlock()
if r.closed {
r.err = errShutdown
return errShutdown
}
seq := r.seq
r.seq++
req := request{
Service: r.request.Service(),
Seq: seq,
ServiceMethod: r.request.Method(),
}
if err := r.codec.WriteRequest(&req, msg); err != nil {
r.err = err
return err
}
return nil
}
func (r *rpcStream) Recv(msg interface{}) error {
r.Lock()
defer r.Unlock()
if r.closed {
r.err = errShutdown
return errShutdown
}
var resp response
if err := r.codec.ReadResponseHeader(&resp); err != nil {
if err == io.EOF && !r.closed {
r.err = io.ErrUnexpectedEOF
return io.ErrUnexpectedEOF
}
r.err = err
return err
}
switch {
case len(resp.Error) > 0:
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
if resp.Error != lastStreamResponseError {
r.err = serverError(resp.Error)
} else {
r.err = io.EOF
}
if err := r.codec.ReadResponseBody(nil); err != nil {
r.err = errors.New("reading error payload: " + err.Error())
}
default:
if err := r.codec.ReadResponseBody(msg); err != nil {
r.err = errors.New("reading body " + err.Error())
}
}
if r.err != nil && r.err != io.EOF && !r.closed {
log.Println("rpc: client protocol error:", r.err)
}
return r.err
}
func (r *rpcStream) Error() error { func (r *rpcStream) Error() error {
return r.call.Error r.RLock()
defer r.RUnlock()
return r.err
} }
func (r *rpcStream) Close() error { func (r *rpcStream) Close() error {
return r.client.Close() r.Lock()
defer r.Unlock()
r.closed = true
return r.codec.Close()
} }

View File

@ -8,7 +8,6 @@ import (
"errors" "errors"
"io" "io"
"log" "log"
"reflect"
"sync" "sync"
"github.com/youtube/vitess/go/trace" "github.com/youtube/vitess/go/trace"
@ -38,7 +37,6 @@ type call struct {
Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming). Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming).
Error error // After completion, the error status. Error error // After completion, the error status.
Done chan *call // Strobes when call is complete (nil for streaming RPCs) Done chan *call // Strobes when call is complete (nil for streaming RPCs)
Stream bool // True for a streaming RPC call, false otherwise
Subseq uint64 // The next expected subseq in the packets Subseq uint64 // The next expected subseq in the packets
} }
@ -145,28 +143,12 @@ func (client *client) input() {
// We've got an error response. Give this to the request; // We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody // any subsequent requests will get the ReadResponseBody
// error if there is one. // error if there is one.
if !(call.Stream && resp.Error == lastStreamResponseError) {
call.Error = serverError(resp.Error) call.Error = serverError(resp.Error)
}
err = client.codec.ReadResponseBody(nil) err = client.codec.ReadResponseBody(nil)
if err != nil { if err != nil {
err = errors.New("reading error payload: " + err.Error()) err = errors.New("reading error payload: " + err.Error())
} }
client.done(seq) client.done(seq)
case call.Stream:
// call.Reply is a chan *T2
// we need to create a T2 and get a *T2 back
value := reflect.New(reflect.TypeOf(call.Reply).Elem().Elem()).Interface()
err = client.codec.ReadResponseBody(value)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
} else {
// writing on the channel could block forever. For
// instance, if a client calls 'close', this might block
// forever. the current suggestion is for the
// client to drain the receiving channel in that case
reflect.ValueOf(call.Reply).Send(reflect.ValueOf(value))
}
default: default:
err = client.codec.ReadResponseBody(call.Reply) err = client.codec.ReadResponseBody(call.Reply)
if err != nil { if err != nil {
@ -203,12 +185,6 @@ func (client *client) done(seq uint64) {
} }
func (call *call) done() { func (call *call) done() {
if call.Stream {
// need to close the channel. client won't be able to read any more.
reflect.ValueOf(call.Reply).Close()
return
}
select { select {
case call.Done <- call: case call.Done <- call:
// ok // ok
@ -270,28 +246,6 @@ func (client *client) Go(ctx context.Context, service, serviceMethod string, arg
return cal return cal
} }
// StreamGo invokes the streaming function asynchronously. It returns the call structure representing
// the invocation.
func (client *client) StreamGo(service string, serviceMethod string, args interface{}, replyStream interface{}) *call {
// first check the replyStream object is a stream of pointers to a data structure
typ := reflect.TypeOf(replyStream)
// FIXME: check the direction of the channel, maybe?
if typ.Kind() != reflect.Chan || typ.Elem().Kind() != reflect.Ptr {
log.Panic("rpc: replyStream is not a channel of pointers")
return nil
}
call := new(call)
call.Service = service
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = replyStream
call.Stream = true
call.Subseq = 0
client.send(call)
return call
}
// call invokes the named function, waits for it to complete, and returns its error status. // call invokes the named function, waits for it to complete, and returns its error status.
func (client *client) Call(ctx context.Context, service string, serviceMethod string, args interface{}, reply interface{}) error { func (client *client) Call(ctx context.Context, service string, serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(ctx, service, serviceMethod, args, reply, make(chan *call, 1)).Done call := <-client.Go(ctx, service, serviceMethod, args, reply, make(chan *call, 1)).Done

View File

@ -60,15 +60,19 @@ func stream() {
Count: int64(10), Count: int64(10),
}) })
rspChan := make(chan *example.StreamingResponse, 10) stream, err := client.Stream(context.Background(), req)
stream, err := client.Stream(context.Background(), req, rspChan)
if err != nil { if err != nil {
fmt.Println("err:", err) fmt.Println("err:", err)
return return
} }
for rsp := range rspChan { for stream.Error() == nil {
rsp := &example.StreamingResponse{}
err := stream.Recv(rsp)
if err != nil {
fmt.Println(err)
break
}
fmt.Println("Stream: rsp:", rsp.Count) fmt.Println("Stream: rsp:", rsp.Count)
} }

View File

@ -127,30 +127,29 @@ func (c *exampleClient) Call(ctx context.Context, in *Request, opts ...client.Ca
func (c *exampleClient) Stream(ctx context.Context, in *StreamingRequest, opts ...client.CallOption) (Example_StreamClient, error) { func (c *exampleClient) Stream(ctx context.Context, in *StreamingRequest, opts ...client.CallOption) (Example_StreamClient, error) {
req := c.c.NewRequest(c.serviceName, "Example.Stream", in) req := c.c.NewRequest(c.serviceName, "Example.Stream", in)
outCh := make(chan *StreamingResponse) stream, err := c.c.Stream(ctx, req, opts...)
stream, err := c.c.Stream(ctx, req, outCh, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &exampleStreamClient{stream, outCh}, nil return &exampleStreamClient{stream}, nil
} }
type Example_StreamClient interface { type Example_StreamClient interface {
Next() (*StreamingResponse, error) RecvMsg() (*StreamingResponse, error)
client.Streamer client.Streamer
} }
type exampleStreamClient struct { type exampleStreamClient struct {
client.Streamer client.Streamer
next chan *StreamingResponse
} }
func (x *exampleStreamClient) Next() (*StreamingResponse, error) { func (x *exampleStreamClient) RecvMsg() (*StreamingResponse, error) {
out, ok := <-x.next m := new(StreamingResponse)
if !ok { err := x.Recv(m)
return nil, fmt.Errorf(`chan closed`) if err != nil {
return nil, err
} }
return out, nil return m, nil
} }
// Server API for Example service // Server API for Example service

79
server/rpc_stream.go Normal file
View File

@ -0,0 +1,79 @@
package server
import (
"errors"
"io"
"log"
"sync"
"golang.org/x/net/context"
)
type rpcStream struct {
sync.RWMutex
seq uint64
closed bool
err error
request Request
codec serverCodec
context context.Context
}
func (r *rpcStream) Context() context.Context {
return r.context
}
func (r *rpcStream) Request() Request {
return r.request
}
func (r *rpcStream) Send(msg interface{}) error {
r.Lock()
defer r.Unlock()
seq := r.seq
r.seq++
resp := response{
ServiceMethod: r.request.Method(),
Seq: seq,
}
err := codec.WriteResponse(&resp, msg, false)
if err != nil {
log.Println("rpc: writing response:", err)
}
return err
}
func (r *rpcStream) Recv(msg interface{}) error {
r.Lock()
defer r.Unlock()
req := request{}
if err := codec.ReadRequestHeader(&req); err != nil {
// discard body
codec.ReadRequestBody(nil)
return err
}
if err = codec.ReadRequestBody(msg); err != nil {
return err
}
return nil
}
func (r *rpcStream) Error() error {
r.RLock()
defer r.RUnlock()
return r.err
}
func (r *rpcStream) Close() error {
r.Lock()
defer r.Unlock()
r.closed = true
return r.codec.Close()
}

View File

@ -35,6 +35,7 @@ import (
log "github.com/golang/glog" log "github.com/golang/glog"
"github.com/pborman/uuid" "github.com/pborman/uuid"
"golang.org/x/net/context"
) )
type Server interface { type Server interface {
@ -61,10 +62,23 @@ type Request interface {
Method() string Method() string
ContentType() string ContentType() string
Request() interface{} Request() interface{}
// indicates whether the response should be streaming // indicates whether the request will be streamed
Stream() bool Stream() bool
} }
// Streamer represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error().
// EOF indicated end of the stream.
type Streamer interface {
Context() context.Context
Request() Request
Send(interface{}) error
Recv(interface{}) error
Error() error
Close() error
}
type Option func(*options) type Option func(*options)
var ( var (

View File

@ -19,3 +19,5 @@ type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent // SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
type StreamWrapper func(Streamer) Streamer