Working bidirectional streaming example

This commit is contained in:
Asim 2015-12-18 20:28:50 +00:00
parent a5be9ca585
commit 3b295b16e7
10 changed files with 220 additions and 49 deletions

View File

@ -63,6 +63,7 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.Ne
}
func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error {
c.buf.wbuf.Reset()
m := &codec.Message{
Id: req.Seq,
Target: req.Service,

View File

@ -50,7 +50,6 @@ func (r *rpcStream) Send(msg interface{}) error {
r.err = err
return err
}
return nil
}

View File

@ -54,8 +54,9 @@ func call(i int) {
fmt.Println("Call:", i, "rsp:", rsp.Msg)
}
func stream() {
func stream(i int) {
// Create new request to service go.micro.srv.example, method Example.Call
// Request can be empty as its actually ignored and merely used to call the handler
req := client.NewRequest("go.micro.srv.example", "Example.Stream", &example.StreamingRequest{})
stream, err := client.Stream(context.Background(), req)
@ -63,7 +64,10 @@ func stream() {
fmt.Println("err:", err)
return
}
if err := stream.Send(&example.StreamingRequest{Count: int64(i)}); err != nil {
fmt.Println("err:", err)
return
}
for stream.Error() == nil {
rsp := &example.StreamingResponse{}
err := stream.Recv(rsp)
@ -84,6 +88,41 @@ func stream() {
}
}
func pingPong(i int) {
// Create new request to service go.micro.srv.example, method Example.Call
// Request can be empty as its actually ignored and merely used to call the handler
req := client.NewRequest("go.micro.srv.example", "Example.PingPong", &example.StreamingRequest{})
stream, err := client.Stream(context.Background(), req)
if err != nil {
fmt.Println("err:", err)
return
}
for j := 0; j < i; j++ {
if err := stream.Send(&example.Ping{Stroke: int64(j)}); err != nil {
fmt.Println("err:", err)
return
}
rsp := &example.Pong{}
err := stream.Recv(rsp)
if err != nil {
fmt.Println("recv err", err)
break
}
fmt.Printf("Sent ping %v got pong %v\n", j, rsp.Stroke)
}
if stream.Error() != nil {
fmt.Println("stream err:", err)
return
}
if err := stream.Close(); err != nil {
fmt.Println("stream close err:", err)
}
}
func main() {
cmd.Init()
// fmt.Println("\n--- Call example ---\n")
@ -91,8 +130,11 @@ func main() {
// call(i)
// }
fmt.Println("\n--- Streamer example ---\n")
stream()
// fmt.Println("\n--- Streamer example ---\n")
// stream(10)
fmt.Println("\n--- Ping Pong example ---\n")
pingPong(10)
// fmt.Println("\n--- Publisher example ---\n")
// pub()

View File

@ -42,3 +42,17 @@ func (e *Example) Stream(ctx context.Context, stream server.Streamer) error {
return nil
}
func (e *Example) PingPong(ctx context.Context, stream server.Streamer) error {
for {
req := &example.Ping{}
if err := stream.Recv(req); err != nil {
return err
}
log.Infof("Got ping %v", req.Stroke)
if err := stream.Send(&example.Pong{Stroke: req.Stroke}); err != nil {
return err
}
}
return nil
}

View File

@ -14,6 +14,8 @@ It has these top-level messages:
Response
StreamingRequest
StreamingResponse
Ping
Pong
*/
package go_micro_srv_example
@ -77,12 +79,32 @@ func (m *StreamingResponse) String() string { return proto.CompactTex
func (*StreamingResponse) ProtoMessage() {}
func (*StreamingResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
type Ping struct {
Stroke int64 `protobuf:"varint,1,opt,name=stroke" json:"stroke,omitempty"`
}
func (m *Ping) Reset() { *m = Ping{} }
func (m *Ping) String() string { return proto.CompactTextString(m) }
func (*Ping) ProtoMessage() {}
func (*Ping) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
type Pong struct {
Stroke int64 `protobuf:"varint,1,opt,name=stroke" json:"stroke,omitempty"`
}
func (m *Pong) Reset() { *m = Pong{} }
func (m *Pong) String() string { return proto.CompactTextString(m) }
func (*Pong) ProtoMessage() {}
func (*Pong) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func init() {
proto.RegisterType((*Message)(nil), "go.micro.srv.example.Message")
proto.RegisterType((*Request)(nil), "go.micro.srv.example.Request")
proto.RegisterType((*Response)(nil), "go.micro.srv.example.Response")
proto.RegisterType((*StreamingRequest)(nil), "go.micro.srv.example.StreamingRequest")
proto.RegisterType((*StreamingResponse)(nil), "go.micro.srv.example.StreamingResponse")
proto.RegisterType((*Ping)(nil), "go.micro.srv.example.Ping")
proto.RegisterType((*Pong)(nil), "go.micro.srv.example.Pong")
}
// Reference imports to suppress errors if they are not otherwise used.
@ -95,6 +117,7 @@ var _ server.Option
type ExampleClient interface {
Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error)
Stream(ctx context.Context, in *StreamingRequest, opts ...client.CallOption) (Example_StreamClient, error)
PingPong(ctx context.Context, in *Ping, opts ...client.CallOption) (Example_PingPongClient, error)
}
type exampleClient struct {
@ -152,11 +175,44 @@ func (x *exampleStreamClient) RecvMsg() (*StreamingResponse, error) {
return m, nil
}
func (c *exampleClient) PingPong(ctx context.Context, in *Ping, opts ...client.CallOption) (Example_PingPongClient, error) {
req := c.c.NewRequest(c.serviceName, "Example.PingPong", in)
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
return &examplePingPongClient{stream}, nil
}
type Example_PingPongClient interface {
SendMsg(*Ping) error
RecvMsg() (*Pong, error)
client.Streamer
}
type examplePingPongClient struct {
client.Streamer
}
func (x *examplePingPongClient) SendMsg(m *Ping) error {
return x.Send(m)
}
func (x *examplePingPongClient) RecvMsg() (*Pong, error) {
m := new(Pong)
err := x.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
// Server API for Example service
type ExampleHandler interface {
Call(context.Context, *Request, *Response) error
Stream(context.Context, func(*StreamingResponse) error) error
PingPong(context.Context, func(*Pong) error) error
}
func RegisterExampleHandler(s server.Server, hdlr ExampleHandler) {
@ -164,20 +220,22 @@ func RegisterExampleHandler(s server.Server, hdlr ExampleHandler) {
}
var fileDescriptor0 = []byte{
// 230 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x90, 0xcd, 0x4a, 0xc5, 0x30,
0x10, 0x85, 0x0d, 0xf7, 0x7a, 0xab, 0xa3, 0x82, 0x06, 0x51, 0x29, 0x28, 0x9a, 0x85, 0xba, 0x31,
0x15, 0xf5, 0x0d, 0x44, 0x5c, 0xb9, 0xa9, 0x6b, 0x17, 0xb1, 0x0c, 0xa1, 0xd0, 0x24, 0x35, 0x93,
0x16, 0x7d, 0x2c, 0xdf, 0x50, 0x48, 0xd3, 0xa2, 0x52, 0x71, 0x15, 0x98, 0xf3, 0x9d, 0x1f, 0x02,
0x77, 0xda, 0x5d, 0x99, 0xba, 0xf2, 0xae, 0xc0, 0x77, 0x65, 0xda, 0x06, 0xa9, 0x20, 0xf4, 0x3d,
0xfa, 0xa2, 0xf5, 0x2e, 0x4c, 0xd7, 0xf1, 0x95, 0xf1, 0xca, 0xf7, 0xb5, 0x93, 0xd1, 0x25, 0xc9,
0xf7, 0x32, 0x69, 0xe2, 0x00, 0xb2, 0x27, 0x24, 0x52, 0x1a, 0xf9, 0x16, 0x2c, 0x48, 0x7d, 0x1c,
0xb1, 0x53, 0x76, 0xb9, 0x29, 0x0e, 0x21, 0x2b, 0xf1, 0xad, 0x43, 0x0a, 0x7c, 0x1b, 0x96, 0x56,
0x19, 0x9c, 0x84, 0x8d, 0x12, 0xa9, 0x75, 0x96, 0xa2, 0xc3, 0x90, 0x4e, 0xc2, 0x19, 0xec, 0x3e,
0x07, 0x8f, 0xca, 0xd4, 0x56, 0x8f, 0xd6, 0x1d, 0x58, 0xaf, 0x5c, 0x67, 0x43, 0x44, 0x16, 0x42,
0xc0, 0xde, 0x37, 0x24, 0x85, 0xfc, 0x64, 0x6e, 0x3e, 0x19, 0x64, 0x0f, 0xc3, 0x38, 0xfe, 0x08,
0xcb, 0x7b, 0xd5, 0x34, 0xfc, 0x58, 0xce, 0x6d, 0x97, 0xa9, 0x25, 0x3f, 0xf9, 0x4b, 0x1e, 0x1a,
0xc4, 0x1a, 0x7f, 0x81, 0xd5, 0x50, 0xcc, 0xcf, 0xe7, 0xd9, 0xdf, 0xcb, 0xf3, 0x8b, 0x7f, 0xb9,
0x31, 0xfc, 0x9a, 0xbd, 0xae, 0xe2, 0x0f, 0xdf, 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0x63, 0x02,
0xbf, 0x5f, 0x99, 0x01, 0x00, 0x00,
// 270 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x91, 0x5f, 0x4b, 0xc3, 0x30,
0x14, 0xc5, 0x17, 0x56, 0xdb, 0x79, 0xfd, 0x83, 0x06, 0x99, 0x52, 0x50, 0x34, 0x0f, 0xba, 0x17,
0xd3, 0xa1, 0x7e, 0x03, 0x11, 0x7d, 0x11, 0x64, 0x3e, 0xfb, 0x10, 0xc7, 0x25, 0x0c, 0x9b, 0xa6,
0xe6, 0x66, 0x43, 0x3f, 0xbb, 0x2f, 0x6e, 0x69, 0x3b, 0xc6, 0xec, 0xf0, 0x29, 0x70, 0x7e, 0xe7,
0x5c, 0xce, 0x21, 0x70, 0xa7, 0xed, 0xb5, 0x99, 0x8c, 0x9d, 0xcd, 0xf0, 0x4b, 0x99, 0x32, 0x47,
0xca, 0x08, 0xdd, 0x0c, 0x5d, 0x56, 0x3a, 0xeb, 0x97, 0x6a, 0xf3, 0xca, 0xa0, 0xf2, 0x23, 0x6d,
0x65, 0x48, 0x49, 0x72, 0x33, 0x59, 0x33, 0xd1, 0x87, 0xe4, 0x19, 0x89, 0x94, 0x46, 0xbe, 0x03,
0x5d, 0x52, 0xdf, 0x27, 0xec, 0x9c, 0x0d, 0xb6, 0xc5, 0x31, 0x24, 0x23, 0xfc, 0x9c, 0x22, 0x79,
0xbe, 0x0b, 0x51, 0xa1, 0x0c, 0x2e, 0x41, 0x6f, 0x84, 0x54, 0xda, 0x82, 0x42, 0xc2, 0x90, 0xae,
0xc1, 0x05, 0x1c, 0xbc, 0x7a, 0x87, 0xca, 0x4c, 0x0a, 0xdd, 0x44, 0xf7, 0x60, 0x6b, 0x6c, 0xa7,
0x85, 0x0f, 0x96, 0xae, 0x10, 0x70, 0xb8, 0x62, 0xa9, 0x8f, 0xac, 0x79, 0xfa, 0x10, 0xbd, 0xcc,
0x31, 0xdf, 0x87, 0x98, 0xbc, 0xb3, 0x1f, 0xb8, 0xa2, 0xdb, 0xbf, 0xfa, 0xcd, 0x0f, 0x83, 0xe4,
0xa1, 0x1a, 0xc3, 0x1f, 0x21, 0xba, 0x57, 0x79, 0xce, 0x4f, 0x65, 0xdb, 0x56, 0x59, 0xb7, 0x4a,
0xcf, 0x36, 0xe1, 0xaa, 0x91, 0xe8, 0xf0, 0x37, 0x88, 0xab, 0xa2, 0xfc, 0xb2, 0xdd, 0xbb, 0xbe,
0x34, 0xbd, 0xfa, 0xd7, 0xd7, 0x1c, 0x1f, 0x32, 0xfe, 0x04, 0xbd, 0xc5, 0xc6, 0xb0, 0x27, 0x6d,
0x0f, 0x2e, 0x78, 0xba, 0x89, 0xcd, 0x73, 0xa2, 0x33, 0x60, 0x43, 0xf6, 0x1e, 0x87, 0xbf, 0xbd,
0xfd, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x53, 0xb5, 0xeb, 0x31, 0x13, 0x02, 0x00, 0x00,
}

View File

@ -5,6 +5,7 @@ package go.micro.srv.example;
service Example {
rpc Call(Request) returns (Response) {}
rpc Stream(StreamingRequest) returns (stream StreamingResponse) {}
rpc PingPong(stream Ping) returns (stream Pong) {}
}
message Message {
@ -26,3 +27,11 @@ message StreamingRequest {
message StreamingResponse {
int64 count = 1;
}
message Ping {
int64 stroke = 1;
}
message Pong {
int64 stroke = 1;
}

View File

@ -60,9 +60,20 @@ func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.Ne
return r
}
func (c *rpcPlusCodec) ReadRequestHeader(r *request) error {
m := codec.Message{
Headers: c.req.Header,
func (c *rpcPlusCodec) ReadRequestHeader(r *request, first bool) error {
m := codec.Message{Headers: c.req.Header}
if !first {
var tm transport.Message
if err := c.socket.Recv(&tm); err != nil {
return err
}
c.buf.rbuf.Reset()
if _, err := c.buf.rbuf.Write(tm.Body); err != nil {
return err
}
m.Headers = tm.Header
}
err := c.codec.ReadHeader(&m, codec.Request)

View File

@ -51,7 +51,7 @@ func (r *rpcStream) Recv(msg interface{}) error {
req := request{}
if err := r.codec.ReadRequestHeader(&req); err != nil {
if err := r.codec.ReadRequestHeader(&req, false); err != nil {
// discard body
r.codec.ReadRequestBody(nil)
return err

View File

@ -389,7 +389,6 @@ func (server *server) readRequest(codec serverCodec) (service *service, mtype *m
codec.ReadRequestBody(nil)
return
}
// is it a streaming request? then we don't read the body
if mtype.stream {
codec.ReadRequestBody(nil)
@ -421,7 +420,7 @@ func (server *server) readRequest(codec serverCodec) (service *service, mtype *m
func (server *server) readRequestHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
err = codec.ReadRequestHeader(req)
err = codec.ReadRequestHeader(req, true)
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
@ -456,7 +455,7 @@ func (server *server) readRequestHeader(codec serverCodec) (service *service, mt
}
type serverCodec interface {
ReadRequestHeader(*request) error
ReadRequestHeader(*request, bool) error
ReadRequestBody(interface{}) error
WriteResponse(*response, interface{}, bool) error

View File

@ -23,16 +23,21 @@ type httpTransportClient struct {
addr string
conn net.Conn
dialOpts dialOptions
r chan *http.Request
once sync.Once
sync.Mutex
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
}
type httpTransportSocket struct {
r *http.Request
r chan *http.Request
conn net.Conn
once sync.Once
sync.Mutex
buff *bufio.Reader
}
type httpTransportListener struct {
@ -68,7 +73,14 @@ func (h *httpTransportClient) Send(m *Message) error {
Host: h.addr,
}
h.r <- req
h.Lock()
h.bl = append(h.bl, req)
select {
case h.r <- h.bl[0]:
h.bl = h.bl[1:]
default:
}
h.Unlock()
return req.Write(h.conn)
}
@ -134,17 +146,23 @@ func (h *httpTransportSocket) Recv(m *Message) error {
return errors.New("message passed in is nil")
}
b, err := ioutil.ReadAll(h.r.Body)
r, err := http.ReadRequest(h.buff)
if err != nil {
return err
}
h.r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
r.Body.Close()
mr := &Message{
Header: make(map[string]string),
Body: b,
}
for k, v := range h.r.Header {
for k, v := range r.Header {
if len(v) > 0 {
mr.Header[k] = v[0]
} else {
@ -152,6 +170,11 @@ func (h *httpTransportSocket) Recv(m *Message) error {
}
}
select {
case h.r <- r:
default:
}
*m = *mr
return nil
}
@ -159,8 +182,11 @@ func (h *httpTransportSocket) Recv(m *Message) error {
func (h *httpTransportSocket) Send(m *Message) error {
b := bytes.NewBuffer(m.Body)
defer b.Reset()
r := <-h.r
rsp := &http.Response{
Header: h.r.Header,
Header: r.Header,
Body: &buffer{b},
Status: "200 OK",
StatusCode: 200,
@ -174,6 +200,11 @@ func (h *httpTransportSocket) Send(m *Message) error {
rsp.Header.Set(k, v)
}
select {
case h.r <- r:
default:
}
return rsp.Write(h.conn)
}
@ -199,7 +230,14 @@ func (h *httpTransportSocket) error(m *Message) error {
}
func (h *httpTransportSocket) Close() error {
return h.conn.Close()
err := h.conn.Close()
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.buff = nil
h.Unlock()
})
return err
}
func (h *httpTransportListener) Addr() string {
@ -211,18 +249,19 @@ func (h *httpTransportListener) Close() error {
}
func (h *httpTransportListener) Accept(fn func(Socket)) error {
srv := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
return
}
for {
c, err := h.listener.Accept()
if err != nil {
return err
}
sock := &httpTransportSocket{
conn: conn,
r: r,
}
sock := &httpTransportSocket{
conn: c,
buff: bufio.NewReader(c),
r: make(chan *http.Request, 1),
}
go func() {
// TODO: think of a better error response strategy
defer func() {
if r := recover(); r != nil {
@ -231,10 +270,9 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error {
}()
fn(sock)
}),
}()
}
return srv.Serve(h.listener)
return nil
}
func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {