This commit is contained in:
Asim Aslam 2019-01-07 18:20:47 +00:00
parent e8b431c5ff
commit c9963cb870
6 changed files with 44 additions and 44 deletions

View File

@ -29,6 +29,13 @@ type readWriteCloser struct {
rbuf *bytes.Buffer rbuf *bytes.Buffer
} }
type serverCodec interface {
ReadHeader(*request, bool) error
ReadBody(interface{}) error
Write(*response, interface{}, bool) error
Close() error
}
var ( var (
DefaultContentType = "application/protobuf" DefaultContentType = "application/protobuf"
@ -72,7 +79,7 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
return r return r
} }
func (c *rpcCodec) ReadRequestHeader(r *request, first bool) error { func (c *rpcCodec) ReadHeader(r *request, first bool) error {
m := codec.Message{Header: c.req.Header} m := codec.Message{Header: c.req.Header}
if !first { if !first {
@ -106,11 +113,11 @@ func (c *rpcCodec) ReadRequestHeader(r *request, first bool) error {
return err return err
} }
func (c *rpcCodec) ReadRequestBody(b interface{}) error { func (c *rpcCodec) ReadBody(b interface{}) error {
return c.codec.ReadBody(b) return c.codec.ReadBody(b)
} }
func (c *rpcCodec) WriteResponse(r *response, body interface{}, last bool) error { func (c *rpcCodec) Write(r *response, body interface{}, last bool) error {
c.buf.wbuf.Reset() c.buf.wbuf.Reset()
m := &codec.Message{ m := &codec.Message{
Method: r.ServiceMethod, Method: r.ServiceMethod,

View File

@ -47,7 +47,7 @@ func TestCodecWriteError(t *testing.T) {
socket: socket, socket: socket,
} }
err := c.WriteResponse(&response{ err := c.Write(&response{
ServiceMethod: "Service.Method", ServiceMethod: "Service.Method",
Seq: 0, Seq: 0,
Error: "", Error: "",
@ -55,7 +55,7 @@ func TestCodecWriteError(t *testing.T) {
}, "body", false) }, "body", false)
if err != nil { if err != nil {
t.Fatalf(`Expected WriteResponse to fail; got "%+v" instead`, err) t.Fatalf(`Expected Write to fail; got "%+v" instead`, err)
} }
const expectedError = "Unable to encode body: simulating a codec write failure" const expectedError = "Unable to encode body: simulating a codec write failure"

View File

@ -225,7 +225,7 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte
} }
resp.Seq = req.Seq resp.Seq = req.Seq
sending.Lock() sending.Lock()
err = codec.WriteResponse(resp, reply, last) err = codec.Write(resp, reply, last)
sending.Unlock() sending.Unlock()
router.freeResponse(resp) router.freeResponse(resp)
return err return err
@ -391,12 +391,12 @@ func (router *router) readRequest(codec serverCodec) (service *service, mtype *m
return return
} }
// discard body // discard body
codec.ReadRequestBody(nil) codec.ReadBody(nil)
return return
} }
// is it a streaming request? then we don't read the body // is it a streaming request? then we don't read the body
if mtype.stream { if mtype.stream {
codec.ReadRequestBody(nil) codec.ReadBody(nil)
return return
} }
@ -409,7 +409,7 @@ func (router *router) readRequest(codec serverCodec) (service *service, mtype *m
argIsValue = true argIsValue = true
} }
// argv guaranteed to be a pointer now. // argv guaranteed to be a pointer now.
if err = codec.ReadRequestBody(argv.Interface()); err != nil { if err = codec.ReadBody(argv.Interface()); err != nil {
return return
} }
if argIsValue { if argIsValue {
@ -425,7 +425,7 @@ func (router *router) readRequest(codec serverCodec) (service *service, mtype *m
func (router *router) readRequestHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) { func (router *router) readRequestHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
// Grab the request header. // Grab the request header.
req = router.getRequest() req = router.getRequest()
err = codec.ReadRequestHeader(req, true) err = codec.ReadHeader(req, true)
if err != nil { if err != nil {
req = nil req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
@ -458,11 +458,3 @@ func (router *router) readRequestHeader(codec serverCodec) (service *service, mt
} }
return return
} }
type serverCodec interface {
ReadRequestHeader(*request, bool) error
ReadRequestBody(interface{}) error
WriteResponse(*response, interface{}, bool) error
Close() error
}

View File

@ -70,6 +70,26 @@ func (s *rpcServer) accept(sock transport.Socket) {
// we use this Content-Type header to identify the codec needed // we use this Content-Type header to identify the codec needed
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
// strip our headers
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
// set local/remote ips
hdr["Local"] = sock.Local()
hdr["Remote"] = sock.Remote()
// create new context
ctx := metadata.NewContext(context.Background(), hdr)
// set the timeout if we have it
if len(to) > 0 {
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
ctx, _ = context.WithTimeout(ctx, time.Duration(n))
}
}
// no content type // no content type
if len(ct) == 0 { if len(ct) == 0 {
ct = DefaultContentType ct = DefaultContentType
@ -88,29 +108,9 @@ func (s *rpcServer) accept(sock transport.Socket) {
return return
} }
// create the internal server codec
codec := newRpcCodec(&msg, sock, cf) codec := newRpcCodec(&msg, sock, cf)
// strip our headers
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
delete(hdr, "Content-Type")
delete(hdr, "Timeout")
// set local/remote ips
hdr["Local"] = sock.Local()
hdr["Remote"] = sock.Remote()
ctx := metadata.NewContext(context.Background(), hdr)
// set the timeout if we have it
if len(to) > 0 {
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
ctx, _ = context.WithTimeout(ctx, time.Duration(n))
}
}
// TODO: needs better error handling // TODO: needs better error handling
if err := s.router.ServeRequest(ctx, codec, ct); err != nil { if err := s.router.ServeRequest(ctx, codec, ct); err != nil {
s.wg.Done() s.wg.Done()

View File

@ -33,7 +33,7 @@ func (r *rpcStream) Send(msg interface{}) error {
Seq: r.seq, Seq: r.seq,
} }
return r.codec.WriteResponse(&resp, msg, false) return r.codec.Write(&resp, msg, false)
} }
func (r *rpcStream) Recv(msg interface{}) error { func (r *rpcStream) Recv(msg interface{}) error {
@ -42,15 +42,15 @@ func (r *rpcStream) Recv(msg interface{}) error {
req := request{} req := request{}
if err := r.codec.ReadRequestHeader(&req, false); err != nil { if err := r.codec.ReadHeader(&req, false); err != nil {
// discard body // discard body
r.codec.ReadRequestBody(nil) r.codec.ReadBody(nil)
return err return err
} }
// we need to stay up to date with sequence numbers // we need to stay up to date with sequence numbers
r.seq = req.Seq r.seq = req.Seq
return r.codec.ReadRequestBody(msg) return r.codec.ReadBody(msg)
} }
func (r *rpcStream) Error() error { func (r *rpcStream) Error() error {

View File

@ -9,6 +9,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-log" "github.com/micro/go-log"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
) )
@ -29,7 +30,7 @@ type Server interface {
// Router handle serving messages // Router handle serving messages
type Router interface { type Router interface {
ServeRequest(context.Context, Stream) error ServeCodec(context.Context, codec.Codec) error
} }
// Message is an async message interface // Message is an async message interface