diff --git a/client/rpc_codec.go b/client/rpc_codec.go index fd5df68b..74f6c4f2 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -63,6 +63,7 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.Ne } func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error { + c.buf.wbuf.Reset() m := &codec.Message{ Id: req.Seq, Target: req.Service, diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 0932539a..837b97d2 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -50,7 +50,6 @@ func (r *rpcStream) Send(msg interface{}) error { r.err = err return err } - return nil } diff --git a/examples/client/main.go b/examples/client/main.go index 5fea7647..c57de979 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -54,8 +54,9 @@ func call(i int) { fmt.Println("Call:", i, "rsp:", rsp.Msg) } -func stream() { +func stream(i int) { // Create new request to service go.micro.srv.example, method Example.Call + // Request can be empty as its actually ignored and merely used to call the handler req := client.NewRequest("go.micro.srv.example", "Example.Stream", &example.StreamingRequest{}) stream, err := client.Stream(context.Background(), req) @@ -63,7 +64,10 @@ func stream() { fmt.Println("err:", err) return } - + if err := stream.Send(&example.StreamingRequest{Count: int64(i)}); err != nil { + fmt.Println("err:", err) + return + } for stream.Error() == nil { rsp := &example.StreamingResponse{} err := stream.Recv(rsp) @@ -84,6 +88,41 @@ func stream() { } } +func pingPong(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + // Request can be empty as its actually ignored and merely used to call the handler + req := client.NewRequest("go.micro.srv.example", "Example.PingPong", &example.StreamingRequest{}) + + stream, err := client.Stream(context.Background(), req) + if err != nil { + fmt.Println("err:", err) + return + } + + for j := 0; j < i; j++ { + if err := stream.Send(&example.Ping{Stroke: int64(j)}); err != nil { + fmt.Println("err:", err) + return + } + rsp := &example.Pong{} + err := stream.Recv(rsp) + if err != nil { + fmt.Println("recv err", err) + break + } + fmt.Printf("Sent ping %v got pong %v\n", j, rsp.Stroke) + } + + if stream.Error() != nil { + fmt.Println("stream err:", err) + return + } + + if err := stream.Close(); err != nil { + fmt.Println("stream close err:", err) + } +} + func main() { cmd.Init() // fmt.Println("\n--- Call example ---\n") @@ -91,8 +130,11 @@ func main() { // call(i) // } - fmt.Println("\n--- Streamer example ---\n") - stream() + // fmt.Println("\n--- Streamer example ---\n") + // stream(10) + + fmt.Println("\n--- Ping Pong example ---\n") + pingPong(10) // fmt.Println("\n--- Publisher example ---\n") // pub() diff --git a/examples/server/handler/example.go b/examples/server/handler/example.go index 05ebbff5..d109ac60 100644 --- a/examples/server/handler/example.go +++ b/examples/server/handler/example.go @@ -42,3 +42,17 @@ func (e *Example) Stream(ctx context.Context, stream server.Streamer) error { return nil } + +func (e *Example) PingPong(ctx context.Context, stream server.Streamer) error { + for { + req := &example.Ping{} + if err := stream.Recv(req); err != nil { + return err + } + log.Infof("Got ping %v", req.Stroke) + if err := stream.Send(&example.Pong{Stroke: req.Stroke}); err != nil { + return err + } + } + return nil +} diff --git a/examples/server/proto/example/example.pb.go b/examples/server/proto/example/example.pb.go index 5c7faae9..eed8ea2c 100644 --- a/examples/server/proto/example/example.pb.go +++ b/examples/server/proto/example/example.pb.go @@ -14,6 +14,8 @@ It has these top-level messages: Response StreamingRequest StreamingResponse + Ping + Pong */ package go_micro_srv_example @@ -77,12 +79,32 @@ func (m *StreamingResponse) String() string { return proto.CompactTex func (*StreamingResponse) ProtoMessage() {} func (*StreamingResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } +type Ping struct { + Stroke int64 `protobuf:"varint,1,opt,name=stroke" json:"stroke,omitempty"` +} + +func (m *Ping) Reset() { *m = Ping{} } +func (m *Ping) String() string { return proto.CompactTextString(m) } +func (*Ping) ProtoMessage() {} +func (*Ping) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +type Pong struct { + Stroke int64 `protobuf:"varint,1,opt,name=stroke" json:"stroke,omitempty"` +} + +func (m *Pong) Reset() { *m = Pong{} } +func (m *Pong) String() string { return proto.CompactTextString(m) } +func (*Pong) ProtoMessage() {} +func (*Pong) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + func init() { proto.RegisterType((*Message)(nil), "go.micro.srv.example.Message") proto.RegisterType((*Request)(nil), "go.micro.srv.example.Request") proto.RegisterType((*Response)(nil), "go.micro.srv.example.Response") proto.RegisterType((*StreamingRequest)(nil), "go.micro.srv.example.StreamingRequest") proto.RegisterType((*StreamingResponse)(nil), "go.micro.srv.example.StreamingResponse") + proto.RegisterType((*Ping)(nil), "go.micro.srv.example.Ping") + proto.RegisterType((*Pong)(nil), "go.micro.srv.example.Pong") } // Reference imports to suppress errors if they are not otherwise used. @@ -95,6 +117,7 @@ var _ server.Option type ExampleClient interface { Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) Stream(ctx context.Context, in *StreamingRequest, opts ...client.CallOption) (Example_StreamClient, error) + PingPong(ctx context.Context, in *Ping, opts ...client.CallOption) (Example_PingPongClient, error) } type exampleClient struct { @@ -152,11 +175,44 @@ func (x *exampleStreamClient) RecvMsg() (*StreamingResponse, error) { return m, nil } +func (c *exampleClient) PingPong(ctx context.Context, in *Ping, opts ...client.CallOption) (Example_PingPongClient, error) { + req := c.c.NewRequest(c.serviceName, "Example.PingPong", in) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + return &examplePingPongClient{stream}, nil +} + +type Example_PingPongClient interface { + SendMsg(*Ping) error + RecvMsg() (*Pong, error) + client.Streamer +} + +type examplePingPongClient struct { + client.Streamer +} + +func (x *examplePingPongClient) SendMsg(m *Ping) error { + return x.Send(m) +} + +func (x *examplePingPongClient) RecvMsg() (*Pong, error) { + m := new(Pong) + err := x.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + // Server API for Example service type ExampleHandler interface { Call(context.Context, *Request, *Response) error Stream(context.Context, func(*StreamingResponse) error) error + PingPong(context.Context, func(*Pong) error) error } func RegisterExampleHandler(s server.Server, hdlr ExampleHandler) { @@ -164,20 +220,22 @@ func RegisterExampleHandler(s server.Server, hdlr ExampleHandler) { } var fileDescriptor0 = []byte{ - // 230 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x90, 0xcd, 0x4a, 0xc5, 0x30, - 0x10, 0x85, 0x0d, 0xf7, 0x7a, 0xab, 0xa3, 0x82, 0x06, 0x51, 0x29, 0x28, 0x9a, 0x85, 0xba, 0x31, - 0x15, 0xf5, 0x0d, 0x44, 0x5c, 0xb9, 0xa9, 0x6b, 0x17, 0xb1, 0x0c, 0xa1, 0xd0, 0x24, 0x35, 0x93, - 0x16, 0x7d, 0x2c, 0xdf, 0x50, 0x48, 0xd3, 0xa2, 0x52, 0x71, 0x15, 0x98, 0xf3, 0x9d, 0x1f, 0x02, - 0x77, 0xda, 0x5d, 0x99, 0xba, 0xf2, 0xae, 0xc0, 0x77, 0x65, 0xda, 0x06, 0xa9, 0x20, 0xf4, 0x3d, - 0xfa, 0xa2, 0xf5, 0x2e, 0x4c, 0xd7, 0xf1, 0x95, 0xf1, 0xca, 0xf7, 0xb5, 0x93, 0xd1, 0x25, 0xc9, - 0xf7, 0x32, 0x69, 0xe2, 0x00, 0xb2, 0x27, 0x24, 0x52, 0x1a, 0xf9, 0x16, 0x2c, 0x48, 0x7d, 0x1c, - 0xb1, 0x53, 0x76, 0xb9, 0x29, 0x0e, 0x21, 0x2b, 0xf1, 0xad, 0x43, 0x0a, 0x7c, 0x1b, 0x96, 0x56, - 0x19, 0x9c, 0x84, 0x8d, 0x12, 0xa9, 0x75, 0x96, 0xa2, 0xc3, 0x90, 0x4e, 0xc2, 0x19, 0xec, 0x3e, - 0x07, 0x8f, 0xca, 0xd4, 0x56, 0x8f, 0xd6, 0x1d, 0x58, 0xaf, 0x5c, 0x67, 0x43, 0x44, 0x16, 0x42, - 0xc0, 0xde, 0x37, 0x24, 0x85, 0xfc, 0x64, 0x6e, 0x3e, 0x19, 0x64, 0x0f, 0xc3, 0x38, 0xfe, 0x08, - 0xcb, 0x7b, 0xd5, 0x34, 0xfc, 0x58, 0xce, 0x6d, 0x97, 0xa9, 0x25, 0x3f, 0xf9, 0x4b, 0x1e, 0x1a, - 0xc4, 0x1a, 0x7f, 0x81, 0xd5, 0x50, 0xcc, 0xcf, 0xe7, 0xd9, 0xdf, 0xcb, 0xf3, 0x8b, 0x7f, 0xb9, - 0x31, 0xfc, 0x9a, 0xbd, 0xae, 0xe2, 0x0f, 0xdf, 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0x63, 0x02, - 0xbf, 0x5f, 0x99, 0x01, 0x00, 0x00, + // 270 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x91, 0x5f, 0x4b, 0xc3, 0x30, + 0x14, 0xc5, 0x17, 0x56, 0xdb, 0x79, 0xfd, 0x83, 0x06, 0x99, 0x52, 0x50, 0x34, 0x0f, 0xba, 0x17, + 0xd3, 0xa1, 0x7e, 0x03, 0x11, 0x7d, 0x11, 0x64, 0x3e, 0xfb, 0x10, 0xc7, 0x25, 0x0c, 0x9b, 0xa6, + 0xe6, 0x66, 0x43, 0x3f, 0xbb, 0x2f, 0x6e, 0x69, 0x3b, 0xc6, 0xec, 0xf0, 0x29, 0x70, 0x7e, 0xe7, + 0x5c, 0xce, 0x21, 0x70, 0xa7, 0xed, 0xb5, 0x99, 0x8c, 0x9d, 0xcd, 0xf0, 0x4b, 0x99, 0x32, 0x47, + 0xca, 0x08, 0xdd, 0x0c, 0x5d, 0x56, 0x3a, 0xeb, 0x97, 0x6a, 0xf3, 0xca, 0xa0, 0xf2, 0x23, 0x6d, + 0x65, 0x48, 0x49, 0x72, 0x33, 0x59, 0x33, 0xd1, 0x87, 0xe4, 0x19, 0x89, 0x94, 0x46, 0xbe, 0x03, + 0x5d, 0x52, 0xdf, 0x27, 0xec, 0x9c, 0x0d, 0xb6, 0xc5, 0x31, 0x24, 0x23, 0xfc, 0x9c, 0x22, 0x79, + 0xbe, 0x0b, 0x51, 0xa1, 0x0c, 0x2e, 0x41, 0x6f, 0x84, 0x54, 0xda, 0x82, 0x42, 0xc2, 0x90, 0xae, + 0xc1, 0x05, 0x1c, 0xbc, 0x7a, 0x87, 0xca, 0x4c, 0x0a, 0xdd, 0x44, 0xf7, 0x60, 0x6b, 0x6c, 0xa7, + 0x85, 0x0f, 0x96, 0xae, 0x10, 0x70, 0xb8, 0x62, 0xa9, 0x8f, 0xac, 0x79, 0xfa, 0x10, 0xbd, 0xcc, + 0x31, 0xdf, 0x87, 0x98, 0xbc, 0xb3, 0x1f, 0xb8, 0xa2, 0xdb, 0xbf, 0xfa, 0xcd, 0x0f, 0x83, 0xe4, + 0xa1, 0x1a, 0xc3, 0x1f, 0x21, 0xba, 0x57, 0x79, 0xce, 0x4f, 0x65, 0xdb, 0x56, 0x59, 0xb7, 0x4a, + 0xcf, 0x36, 0xe1, 0xaa, 0x91, 0xe8, 0xf0, 0x37, 0x88, 0xab, 0xa2, 0xfc, 0xb2, 0xdd, 0xbb, 0xbe, + 0x34, 0xbd, 0xfa, 0xd7, 0xd7, 0x1c, 0x1f, 0x32, 0xfe, 0x04, 0xbd, 0xc5, 0xc6, 0xb0, 0x27, 0x6d, + 0x0f, 0x2e, 0x78, 0xba, 0x89, 0xcd, 0x73, 0xa2, 0x33, 0x60, 0x43, 0xf6, 0x1e, 0x87, 0xbf, 0xbd, + 0xfd, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x53, 0xb5, 0xeb, 0x31, 0x13, 0x02, 0x00, 0x00, } diff --git a/examples/server/proto/example/example.proto b/examples/server/proto/example/example.proto index b7dd8748..48c687e4 100644 --- a/examples/server/proto/example/example.proto +++ b/examples/server/proto/example/example.proto @@ -5,6 +5,7 @@ package go.micro.srv.example; service Example { rpc Call(Request) returns (Response) {} rpc Stream(StreamingRequest) returns (stream StreamingResponse) {} + rpc PingPong(stream Ping) returns (stream Pong) {} } message Message { @@ -26,3 +27,11 @@ message StreamingRequest { message StreamingResponse { int64 count = 1; } + +message Ping { + int64 stroke = 1; +} + +message Pong { + int64 stroke = 1; +} diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 3aeb963f..69987924 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -60,9 +60,20 @@ func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.Ne return r } -func (c *rpcPlusCodec) ReadRequestHeader(r *request) error { - m := codec.Message{ - Headers: c.req.Header, +func (c *rpcPlusCodec) ReadRequestHeader(r *request, first bool) error { + m := codec.Message{Headers: c.req.Header} + + if !first { + var tm transport.Message + if err := c.socket.Recv(&tm); err != nil { + return err + } + c.buf.rbuf.Reset() + if _, err := c.buf.rbuf.Write(tm.Body); err != nil { + return err + } + + m.Headers = tm.Header } err := c.codec.ReadHeader(&m, codec.Request) diff --git a/server/rpc_stream.go b/server/rpc_stream.go index 060585a6..1819c011 100644 --- a/server/rpc_stream.go +++ b/server/rpc_stream.go @@ -51,7 +51,7 @@ func (r *rpcStream) Recv(msg interface{}) error { req := request{} - if err := r.codec.ReadRequestHeader(&req); err != nil { + if err := r.codec.ReadRequestHeader(&req, false); err != nil { // discard body r.codec.ReadRequestBody(nil) return err diff --git a/server/rpcplus_server.go b/server/rpcplus_server.go index ff808444..2a270a7a 100644 --- a/server/rpcplus_server.go +++ b/server/rpcplus_server.go @@ -389,7 +389,6 @@ func (server *server) readRequest(codec serverCodec) (service *service, mtype *m codec.ReadRequestBody(nil) return } - // is it a streaming request? then we don't read the body if mtype.stream { codec.ReadRequestBody(nil) @@ -421,7 +420,7 @@ func (server *server) readRequest(codec serverCodec) (service *service, mtype *m func (server *server) readRequestHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) { // Grab the request header. req = server.getRequest() - err = codec.ReadRequestHeader(req) + err = codec.ReadRequestHeader(req, true) if err != nil { req = nil if err == io.EOF || err == io.ErrUnexpectedEOF { @@ -456,7 +455,7 @@ func (server *server) readRequestHeader(codec serverCodec) (service *service, mt } type serverCodec interface { - ReadRequestHeader(*request) error + ReadRequestHeader(*request, bool) error ReadRequestBody(interface{}) error WriteResponse(*response, interface{}, bool) error diff --git a/transport/http_transport.go b/transport/http_transport.go index 91527e57..c2a2d770 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -23,16 +23,21 @@ type httpTransportClient struct { addr string conn net.Conn dialOpts dialOptions - r chan *http.Request once sync.Once sync.Mutex + r chan *http.Request + bl []*http.Request buff *bufio.Reader } type httpTransportSocket struct { - r *http.Request + r chan *http.Request conn net.Conn + once sync.Once + + sync.Mutex + buff *bufio.Reader } type httpTransportListener struct { @@ -68,7 +73,14 @@ func (h *httpTransportClient) Send(m *Message) error { Host: h.addr, } - h.r <- req + h.Lock() + h.bl = append(h.bl, req) + select { + case h.r <- h.bl[0]: + h.bl = h.bl[1:] + default: + } + h.Unlock() return req.Write(h.conn) } @@ -134,17 +146,23 @@ func (h *httpTransportSocket) Recv(m *Message) error { return errors.New("message passed in is nil") } - b, err := ioutil.ReadAll(h.r.Body) + r, err := http.ReadRequest(h.buff) if err != nil { return err } - h.r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return err + } + r.Body.Close() + mr := &Message{ Header: make(map[string]string), Body: b, } - for k, v := range h.r.Header { + for k, v := range r.Header { if len(v) > 0 { mr.Header[k] = v[0] } else { @@ -152,6 +170,11 @@ func (h *httpTransportSocket) Recv(m *Message) error { } } + select { + case h.r <- r: + default: + } + *m = *mr return nil } @@ -159,8 +182,11 @@ func (h *httpTransportSocket) Recv(m *Message) error { func (h *httpTransportSocket) Send(m *Message) error { b := bytes.NewBuffer(m.Body) defer b.Reset() + + r := <-h.r + rsp := &http.Response{ - Header: h.r.Header, + Header: r.Header, Body: &buffer{b}, Status: "200 OK", StatusCode: 200, @@ -174,6 +200,11 @@ func (h *httpTransportSocket) Send(m *Message) error { rsp.Header.Set(k, v) } + select { + case h.r <- r: + default: + } + return rsp.Write(h.conn) } @@ -199,7 +230,14 @@ func (h *httpTransportSocket) error(m *Message) error { } func (h *httpTransportSocket) Close() error { - return h.conn.Close() + err := h.conn.Close() + h.once.Do(func() { + h.Lock() + h.buff.Reset(nil) + h.buff = nil + h.Unlock() + }) + return err } func (h *httpTransportListener) Addr() string { @@ -211,18 +249,19 @@ func (h *httpTransportListener) Close() error { } func (h *httpTransportListener) Accept(fn func(Socket)) error { - srv := &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn, _, err := w.(http.Hijacker).Hijack() - if err != nil { - return - } + for { + c, err := h.listener.Accept() + if err != nil { + return err + } - sock := &httpTransportSocket{ - conn: conn, - r: r, - } + sock := &httpTransportSocket{ + conn: c, + buff: bufio.NewReader(c), + r: make(chan *http.Request, 1), + } + go func() { // TODO: think of a better error response strategy defer func() { if r := recover(); r != nil { @@ -231,10 +270,9 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error { }() fn(sock) - }), + }() } - - return srv.Serve(h.listener) + return nil } func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {