Add client response

This commit is contained in:
Asim Aslam 2019-01-14 21:30:43 +00:00
parent f2ac73eae5
commit c1d0237370
8 changed files with 105 additions and 44 deletions

View File

@ -62,11 +62,19 @@ type Response interface {
// Stream is the inteface for a bidirectional synchronous stream // Stream is the inteface for a bidirectional synchronous stream
type Stream interface { type Stream interface {
// Context for the stream
Context() context.Context Context() context.Context
// The request made
Request() Request Request() Request
// The response read
Response() Response
// Send will encode and send a request
Send(interface{}) error Send(interface{}) error
// Recv will decode and read a response
Recv(interface{}) error Recv(interface{}) error
// Error returns the stream error
Error() error Error() error
// Close closes the stream
Close() error Close() error
} }

View File

@ -92,13 +92,20 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
seq := atomic.LoadUint64(&r.seq) seq := atomic.LoadUint64(&r.seq)
atomic.AddUint64(&r.seq, 1) atomic.AddUint64(&r.seq, 1)
codec := newRpcCodec(msg, c, cf)
rsp := &rpcResponse{
socket: c,
codec: codec,
}
stream := &rpcStream{ stream := &rpcStream{
context: ctx, context: ctx,
request: req, request: req,
closed: make(chan bool), response: rsp,
codec: newRpcCodec(msg, c, cf), codec: codec,
id: fmt.Sprintf("%v", seq), closed: make(chan bool),
id: fmt.Sprintf("%v", seq),
} }
defer stream.Close() defer stream.Close()
@ -174,11 +181,19 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err) return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
} }
codec := newRpcCodec(msg, c, cf)
rsp := &rpcResponse{
socket: c,
codec: codec,
}
stream := &rpcStream{ stream := &rpcStream{
context: ctx, context: ctx,
request: req, request: req,
closed: make(chan bool), response: rsp,
codec: newRpcCodec(msg, c, cf), closed: make(chan bool),
codec: newRpcCodec(msg, c, cf),
} }
ch := make(chan error, 1) ch := make(chan error, 1)

View File

@ -88,41 +88,54 @@ func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCod
return r return r
} }
func (c *rpcCodec) Write(wm *codec.Message, body interface{}) error { func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
c.buf.wbuf.Reset() c.buf.wbuf.Reset()
m := &codec.Message{ // create header
Id: wm.Id, if m.Header == nil {
Target: wm.Target, m.Header = map[string]string{}
Endpoint: wm.Endpoint,
Type: codec.Request,
Header: map[string]string{
"X-Micro-Id": wm.Id,
"X-Micro-Service": wm.Target,
"X-Micro-Endpoint": wm.Endpoint,
},
} }
if err := c.codec.Write(m, body); err != nil { // copy original header
return errors.InternalServerError("go.micro.client.codec", err.Error()) for k, v := range c.req.Header {
m.Header[k] = v
} }
// set body // set the mucp headers
if len(wm.Body) > 0 { m.Header["X-Micro-Id"] = m.Id
c.req.Body = wm.Body m.Header["X-Micro-Service"] = m.Target
} else { m.Header["X-Micro-Endpoint"] = m.Endpoint
c.req.Body = c.buf.wbuf.Bytes()
// if body is bytes don't encode
if body != nil {
b, ok := body.([]byte)
if ok {
// set body
m.Body = b
body = nil
}
} }
// set header if len(m.Body) == 0 {
for k, v := range m.Header { // write to codec
c.req.Header[k] = v if err := c.codec.Write(m, body); err != nil {
return errors.InternalServerError("go.micro.client.codec", err.Error())
}
// set body
m.Body = c.buf.wbuf.Bytes()
}
// create new transport message
msg := transport.Message{
Header: m.Header,
Body: m.Body,
} }
// send the request // send the request
if err := c.client.Send(c.req); err != nil { if err := c.client.Send(&msg); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error()) return errors.InternalServerError("go.micro.client.transport", err.Error())
} }
return nil return nil
} }

View File

@ -12,7 +12,7 @@ type rpcResponse struct {
codec codec.Codec codec codec.Codec
} }
func (r *rpcResponse) Codec() codec.Writer { func (r *rpcResponse) Codec() codec.Reader {
return r.codec return r.codec
} }

View File

@ -11,12 +11,13 @@ import (
// Implements the streamer interface // Implements the streamer interface
type rpcStream struct { type rpcStream struct {
sync.RWMutex sync.RWMutex
id string id string
closed chan bool closed chan bool
err error err error
request Request request Request
codec codec.Codec response Response
context context.Context codec codec.Codec
context context.Context
} }
func (r *rpcStream) isClosed() bool { func (r *rpcStream) isClosed() bool {
@ -36,6 +37,10 @@ func (r *rpcStream) Request() Request {
return r.request return r.request
} }
func (r *rpcStream) Response() Response {
return r.response
}
func (r *rpcStream) Send(msg interface{}) error { func (r *rpcStream) Send(msg interface{}) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()

View File

@ -35,11 +35,17 @@ func (c *Codec) ReadBody(b interface{}) error {
} }
func (c *Codec) Write(m *codec.Message, b interface{}) error { func (c *Codec) Write(m *codec.Message, b interface{}) error {
v, ok := b.(*[]byte) var v []byte
if !ok { switch b.(type) {
return fmt.Errorf("failed to write: %v is not type of *[]byte", b) case *[]byte:
ve := b.(*[]byte)
v = *ve
case []byte:
v = b.([]byte)
default:
return fmt.Errorf("failed to write: %v is not type of *[]byte or []byte", b)
} }
_, err := c.Conn.Write(*v) _, err := c.Conn.Write(v)
return err return err
} }

View File

@ -13,6 +13,9 @@ type Message struct {
func (n Marshaler) Marshal(v interface{}) ([]byte, error) { func (n Marshaler) Marshal(v interface{}) ([]byte, error) {
switch v.(type) { switch v.(type) {
case *[]byte:
ve := v.(*[]byte)
return *ve, nil
case []byte: case []byte:
return v.([]byte), nil return v.([]byte), nil
case *Message: case *Message:

View File

@ -99,6 +99,9 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
m.Header = tm.Header m.Header = tm.Header
// set the message body // set the message body
m.Body = tm.Body m.Body = tm.Body
// set req
c.req = &tm
} }
// no longer first read // no longer first read
@ -120,6 +123,10 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
} }
func (c *rpcCodec) ReadBody(b interface{}) error { func (c *rpcCodec) ReadBody(b interface{}) error {
// don't read empty body
if len(c.req.Body) == 0 {
return nil
}
return c.codec.ReadBody(b) return c.codec.ReadBody(b)
} }
@ -132,7 +139,11 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
Id: r.Id, Id: r.Id,
Error: r.Error, Error: r.Error,
Type: r.Type, Type: r.Type,
Header: map[string]string{}, Header: r.Header,
}
if m.Header == nil {
m.Header = map[string]string{}
} }
// set request id // set request id
@ -160,7 +171,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
// if we have encoded data just send it // if we have encoded data just send it
if len(r.Body) > 0 { if len(r.Body) > 0 {
body = r.Body body = r.Body
// write to the body // write the body to codec
} else if err := c.codec.Write(m, b); err != nil { } else if err := c.codec.Write(m, b); err != nil {
c.buf.wbuf.Reset() c.buf.wbuf.Reset()