From 4495ca383968021631506d4e8fc9570089936346 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 16 Aug 2019 17:24:17 +0100 Subject: [PATCH] Use client.Call for non streaming requests --- client/options.go | 10 ---------- client/rpc_client.go | 4 ++-- client/rpc_codec.go | 6 ++++++ proxy/mucp/mucp.go | 25 +++++++++++++++---------- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/client/options.go b/client/options.go index 0ba57ad7..9f363d74 100644 --- a/client/options.go +++ b/client/options.go @@ -59,9 +59,6 @@ type CallOptions struct { // Middleware for low level call func CallWrappers []CallWrapper - // SendEOS specifies whether to send EOS - SendEOS bool - // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -308,13 +305,6 @@ func WithDialTimeout(d time.Duration) CallOption { } } -// SendEOS specifies whether to send the end of stream message -func SendEOS(b bool) CallOption { - return func(o *CallOptions) { - o.SendEOS = b - } -} - // Request Options func WithContentType(ct string) RequestOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index 5449dcd9..754e4329 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -118,7 +118,7 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, codec: codec, closed: make(chan bool), release: func(err error) { r.pool.Release(c, err) }, - sendEOS: opts.SendEOS, + sendEOS: false, } // close the stream on exiting this function defer stream.Close() @@ -244,7 +244,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request // used to close the stream closed: make(chan bool), // signal the end of stream, - sendEOS: opts.SendEOS, + sendEOS: true, // release func release: func(err error) { r.pool.Release(c, err) }, } diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 37987496..c20537ea 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -249,6 +249,12 @@ func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error { func (c *rpcCodec) ReadBody(b interface{}) error { // read body + // read raw data + if v, ok := b.(*raw.Frame); ok { + v.Data = c.buf.rbuf.Bytes() + return nil + } + if err := c.codec.ReadBody(b); err != nil { return errors.InternalServerError("go.micro.client.codec", err.Error()) } diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index e4303d16..095c10d8 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -220,9 +220,21 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server // create new request with raw bytes body creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType())) + // not a stream so make a client.Call request if !req.Stream() { - // specify not to send eos - opts = append(opts, client.SendEOS(false)) + crsp := new(bytes.Frame) + + // make a call to the backend + if err := p.Client.Call(ctx, creq, crsp, opts...); err != nil { + return err + } + + // write the response + if err := rsp.Write(crsp.Data); err != nil { + return err + } + + return nil } // create new stream @@ -233,9 +245,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server defer stream.Close() // create client request read loop if streaming - if req.Stream() { - go readLoop(req, stream) - } + go readLoop(req, stream) // get raw response resp := stream.Response() @@ -273,11 +283,6 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server } else if err != nil { return err } - - // we don't continue unless its a stream - if !req.Stream() { - return nil - } } }