From c1d02373709705b4251475b200424c4fafc9789b Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 14 Jan 2019 21:30:43 +0000 Subject: [PATCH] Add client response --- client/client.go | 8 ++++++ client/rpc_client.go | 33 ++++++++++++++++------- client/rpc_codec.go | 57 ++++++++++++++++++++++++---------------- client/rpc_response.go | 2 +- client/rpc_stream.go | 17 +++++++----- codec/bytes/bytes.go | 14 +++++++--- codec/bytes/marshaler.go | 3 +++ server/rpc_codec.go | 15 +++++++++-- 8 files changed, 105 insertions(+), 44 deletions(-) diff --git a/client/client.go b/client/client.go index 4cfe0bc8..5703fd31 100644 --- a/client/client.go +++ b/client/client.go @@ -62,11 +62,19 @@ type Response interface { // Stream is the inteface for a bidirectional synchronous stream type Stream interface { + // Context for the stream Context() context.Context + // The request made Request() Request + // The response read + Response() Response + // Send will encode and send a request Send(interface{}) error + // Recv will decode and read a response Recv(interface{}) error + // Error returns the stream error Error() error + // Close closes the stream Close() error } diff --git a/client/rpc_client.go b/client/rpc_client.go index 34fd8671..16db46cf 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -92,13 +92,20 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp seq := atomic.LoadUint64(&r.seq) atomic.AddUint64(&r.seq, 1) + codec := newRpcCodec(msg, c, cf) + + rsp := &rpcResponse{ + socket: c, + codec: codec, + } stream := &rpcStream{ - context: ctx, - request: req, - closed: make(chan bool), - codec: newRpcCodec(msg, c, cf), - id: fmt.Sprintf("%v", seq), + context: ctx, + request: req, + response: rsp, + codec: codec, + closed: make(chan bool), + id: fmt.Sprintf("%v", seq), } defer stream.Close() @@ -174,11 +181,19 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err) } + codec := newRpcCodec(msg, c, cf) + + rsp := &rpcResponse{ + socket: c, + codec: codec, + } + stream := &rpcStream{ - context: ctx, - request: req, - closed: make(chan bool), - codec: newRpcCodec(msg, c, cf), + context: ctx, + request: req, + response: rsp, + closed: make(chan bool), + codec: newRpcCodec(msg, c, cf), } ch := make(chan error, 1) diff --git a/client/rpc_codec.go b/client/rpc_codec.go index a7e9dada..84be5b77 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -88,41 +88,54 @@ func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCod return r } -func (c *rpcCodec) Write(wm *codec.Message, body interface{}) error { +func (c *rpcCodec) Write(m *codec.Message, body interface{}) error { c.buf.wbuf.Reset() - m := &codec.Message{ - Id: wm.Id, - Target: wm.Target, - Endpoint: wm.Endpoint, - Type: codec.Request, - Header: map[string]string{ - "X-Micro-Id": wm.Id, - "X-Micro-Service": wm.Target, - "X-Micro-Endpoint": wm.Endpoint, - }, + // create header + if m.Header == nil { + m.Header = map[string]string{} } - if err := c.codec.Write(m, body); err != nil { - return errors.InternalServerError("go.micro.client.codec", err.Error()) + // copy original header + for k, v := range c.req.Header { + m.Header[k] = v } - // set body - if len(wm.Body) > 0 { - c.req.Body = wm.Body - } else { - c.req.Body = c.buf.wbuf.Bytes() + // set the mucp headers + m.Header["X-Micro-Id"] = m.Id + m.Header["X-Micro-Service"] = m.Target + m.Header["X-Micro-Endpoint"] = m.Endpoint + + // if body is bytes don't encode + if body != nil { + b, ok := body.([]byte) + if ok { + // set body + m.Body = b + body = nil + } } - // set header - for k, v := range m.Header { - c.req.Header[k] = v + if len(m.Body) == 0 { + // write to codec + if err := c.codec.Write(m, body); err != nil { + return errors.InternalServerError("go.micro.client.codec", err.Error()) + } + // set body + m.Body = c.buf.wbuf.Bytes() + } + + // create new transport message + msg := transport.Message{ + Header: m.Header, + Body: m.Body, } // send the request - if err := c.client.Send(c.req); err != nil { + if err := c.client.Send(&msg); err != nil { return errors.InternalServerError("go.micro.client.transport", err.Error()) } + return nil } diff --git a/client/rpc_response.go b/client/rpc_response.go index d5c4e894..08aaa84d 100644 --- a/client/rpc_response.go +++ b/client/rpc_response.go @@ -12,7 +12,7 @@ type rpcResponse struct { codec codec.Codec } -func (r *rpcResponse) Codec() codec.Writer { +func (r *rpcResponse) Codec() codec.Reader { return r.codec } diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 627d2099..39e82fe4 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -11,12 +11,13 @@ import ( // Implements the streamer interface type rpcStream struct { sync.RWMutex - id string - closed chan bool - err error - request Request - codec codec.Codec - context context.Context + id string + closed chan bool + err error + request Request + response Response + codec codec.Codec + context context.Context } func (r *rpcStream) isClosed() bool { @@ -36,6 +37,10 @@ func (r *rpcStream) Request() Request { return r.request } +func (r *rpcStream) Response() Response { + return r.response +} + func (r *rpcStream) Send(msg interface{}) error { r.Lock() defer r.Unlock() diff --git a/codec/bytes/bytes.go b/codec/bytes/bytes.go index b2604df2..f912fe34 100644 --- a/codec/bytes/bytes.go +++ b/codec/bytes/bytes.go @@ -35,11 +35,17 @@ func (c *Codec) ReadBody(b interface{}) error { } func (c *Codec) Write(m *codec.Message, b interface{}) error { - v, ok := b.(*[]byte) - if !ok { - return fmt.Errorf("failed to write: %v is not type of *[]byte", b) + var v []byte + switch b.(type) { + case *[]byte: + ve := b.(*[]byte) + v = *ve + case []byte: + v = b.([]byte) + default: + return fmt.Errorf("failed to write: %v is not type of *[]byte or []byte", b) } - _, err := c.Conn.Write(*v) + _, err := c.Conn.Write(v) return err } diff --git a/codec/bytes/marshaler.go b/codec/bytes/marshaler.go index 8f8d188f..76599b65 100644 --- a/codec/bytes/marshaler.go +++ b/codec/bytes/marshaler.go @@ -13,6 +13,9 @@ type Message struct { func (n Marshaler) Marshal(v interface{}) ([]byte, error) { switch v.(type) { + case *[]byte: + ve := v.(*[]byte) + return *ve, nil case []byte: return v.([]byte), nil case *Message: diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 82a8a45f..da2e35ce 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -99,6 +99,9 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { m.Header = tm.Header // set the message body m.Body = tm.Body + + // set req + c.req = &tm } // no longer first read @@ -120,6 +123,10 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { } func (c *rpcCodec) ReadBody(b interface{}) error { + // don't read empty body + if len(c.req.Body) == 0 { + return nil + } return c.codec.ReadBody(b) } @@ -132,7 +139,11 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { Id: r.Id, Error: r.Error, Type: r.Type, - Header: map[string]string{}, + Header: r.Header, + } + + if m.Header == nil { + m.Header = map[string]string{} } // set request id @@ -160,7 +171,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { // if we have encoded data just send it if len(r.Body) > 0 { body = r.Body - // write to the body + // write the body to codec } else if err := c.codec.Write(m, b); err != nil { c.buf.wbuf.Reset()