further codec changes

This commit is contained in:
Asim Aslam
2019-01-08 15:38:25 +00:00
parent 216dbb771a
commit 4cb41721f1
13 changed files with 139 additions and 121 deletions

View File

@@ -2,8 +2,6 @@ package server
import (
"bytes"
"fmt"
"strconv"
"github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
@@ -19,6 +17,7 @@ import (
type rpcCodec struct {
socket transport.Socket
codec codec.Codec
first bool
req *transport.Message
buf *readWriteCloser
@@ -65,12 +64,13 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) serverCodec {
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{
rbuf: bytes.NewBuffer(req.Body),
wbuf: bytes.NewBuffer(nil),
}
r := &rpcCodec{
first: true,
buf: rwc,
codec: c(rwc),
req: req,
@@ -79,36 +79,43 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
return r
}
func (c *rpcCodec) ReadHeader(r *request, first bool) error {
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
m := codec.Message{Header: c.req.Header}
if !first {
// if its a follow on request read it
if !c.first {
var tm transport.Message
// read off the socket
if err := c.socket.Recv(&tm); err != nil {
return err
}
// reset the read buffer
c.buf.rbuf.Reset()
// write the body to the buffer
if _, err := c.buf.rbuf.Write(tm.Body); err != nil {
return err
}
// set the message header
m.Header = tm.Header
}
// no longer first read
c.first = false
// set some internal things
m.Target = m.Header["X-Micro-Service"]
m.Method = m.Header["X-Micro-Method"]
// set id
if len(m.Header["X-Micro-Id"]) > 0 {
id, _ := strconv.ParseInt(m.Header["X-Micro-Id"], 10, 64)
m.Id = uint64(id)
}
m.Id = m.Header["X-Micro-Id"]
// read header via codec
err := c.codec.ReadHeader(&m, codec.Request)
r.ServiceMethod = m.Method
r.Seq = m.Id
// set the method/id
r.Method = m.Method
r.Id = m.Id
return err
}
@@ -117,21 +124,28 @@ func (c *rpcCodec) ReadBody(b interface{}) error {
return c.codec.ReadBody(b)
}
func (c *rpcCodec) Write(r *response, body interface{}, last bool) error {
func (c *rpcCodec) Write(r *codec.Message, body interface{}) error {
c.buf.wbuf.Reset()
// create a new message
m := &codec.Message{
Method: r.ServiceMethod,
Id: r.Seq,
Method: r.Method,
Id: r.Id,
Error: r.Error,
Type: codec.Response,
Type: r.Type,
Header: map[string]string{
"X-Micro-Id": fmt.Sprintf("%d", r.Seq),
"X-Micro-Method": r.ServiceMethod,
"X-Micro-Id": r.Id,
"X-Micro-Method": r.Method,
"X-Micro-Error": r.Error,
"Content-Type": c.req.Header["Content-Type"],
},
}
// write to the body
if err := c.codec.Write(m, body); err != nil {
c.buf.wbuf.Reset()
// write an error if it failed
m.Error = errors.Wrapf(err, "Unable to encode body").Error()
m.Header["X-Micro-Error"] = m.Error
if err := c.codec.Write(m, nil); err != nil {
@@ -139,7 +153,7 @@ func (c *rpcCodec) Write(r *response, body interface{}, last bool) error {
}
}
m.Header["Content-Type"] = c.req.Header["Content-Type"]
// send on the socket
return c.socket.Send(&transport.Message{
Header: m.Header,
Body: c.buf.wbuf.Bytes(),
@@ -151,3 +165,7 @@ func (c *rpcCodec) Close() error {
c.codec.Close()
return c.socket.Close()
}
func (c *rpcCodec) String() string {
return "rpc"
}

View File

@@ -47,12 +47,11 @@ func TestCodecWriteError(t *testing.T) {
socket: socket,
}
err := c.Write(&response{
ServiceMethod: "Service.Method",
Seq: 0,
Error: "",
next: nil,
}, "body", false)
err := c.Write(&codec.Message{
Method: "Service.Method",
Id: "0",
Error: "",
}, "body")
if err != nil {
t.Fatalf(`Expected Write to fail; got "%+v" instead`, err)

View File

@@ -17,6 +17,7 @@ import (
"unicode/utf8"
"github.com/micro/go-log"
"github.com/micro/go-micro/codec"
)
var (
@@ -48,16 +49,13 @@ type service struct {
}
type request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *request // for free list in Server
msg *codec.Message
next *request // for free list in Server
}
type response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *response // for free list in Server
msg *codec.Message
next *response // for free list in Server
}
// router represents an RPC router.
@@ -215,30 +213,34 @@ func (router *router) Handle(h Handler) error {
return nil
}
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Codec, errmsg string, last bool) (err error) {
msg := new(codec.Message)
msg.Type = codec.Response
resp := router.getResponse()
resp.msg = msg
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
resp.msg.Method = req.msg.Method
if errmsg != "" {
resp.Error = errmsg
resp.msg.Error = errmsg
reply = invalidRequest
}
resp.Seq = req.Seq
resp.msg.Id = req.msg.Id
sending.Lock()
err = codec.Write(resp, reply, last)
err = cc.Write(resp.msg, reply)
sending.Unlock()
router.freeResponse(resp)
return err
}
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec serverCodec, ct string) {
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec codec.Codec, ct string) {
function := mtype.method.Func
var returnValues []reflect.Value
r := &rpcRequest{
service: router.name,
contentType: ct,
method: req.ServiceMethod,
method: req.msg.Method,
}
if !mtype.stream {
@@ -282,7 +284,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
context: ctx,
codec: codec,
request: r,
seq: req.Seq,
id: req.msg.Id,
}
// Invoke the method, providing a new value for the reply.
@@ -326,21 +328,21 @@ func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
return reflect.Zero(m.ContextType)
}
func (router *router) ServeRequest(ctx context.Context, codec serverCodec, ct string) error {
func (router *router) ServeRequest(ctx context.Context, cc codec.Codec, ct string) error {
sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := router.readRequest(codec)
service, mtype, req, argv, replyv, keepReading, err := router.readRequest(cc)
if err != nil {
if !keepReading {
return err
}
// send a response if we actually managed to read a header.
if req != nil {
router.sendResponse(sending, req, invalidRequest, codec, err.Error(), true)
router.sendResponse(sending, req, invalidRequest, cc, err.Error(), true)
router.freeRequest(req)
}
return err
}
service.call(ctx, router, sending, mtype, req, argv, replyv, codec, ct)
service.call(ctx, router, sending, mtype, req, argv, replyv, cc, ct)
return nil
}
@@ -384,19 +386,19 @@ func (router *router) freeResponse(resp *response) {
router.respLock.Unlock()
}
func (router *router) readRequest(codec serverCodec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = router.readHeader(codec)
func (router *router) readRequest(cc codec.Codec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = router.readHeader(cc)
if err != nil {
if !keepReading {
return
}
// discard body
codec.ReadBody(nil)
cc.ReadBody(nil)
return
}
// is it a streaming request? then we don't read the body
if mtype.stream {
codec.ReadBody(nil)
cc.ReadBody(nil)
return
}
@@ -409,7 +411,7 @@ func (router *router) readRequest(codec serverCodec) (service *service, mtype *m
argIsValue = true
}
// argv guaranteed to be a pointer now.
if err = codec.ReadBody(argv.Interface()); err != nil {
if err = cc.ReadBody(argv.Interface()); err != nil {
return
}
if argIsValue {
@@ -422,10 +424,14 @@ func (router *router) readRequest(codec serverCodec) (service *service, mtype *m
return
}
func (router *router) readHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
func (router *router) readHeader(cc codec.Codec) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
// Grab the request header.
msg := new(codec.Message)
msg.Type = codec.Request
req = router.getRequest()
err = codec.ReadHeader(req, true)
req.msg = msg
err = cc.ReadHeader(msg, msg.Type)
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
@@ -439,9 +445,9 @@ func (router *router) readHeader(codec serverCodec) (service *service, mtype *me
// we can still recover and move on to the next request.
keepReading = true
serviceMethod := strings.Split(req.ServiceMethod, ".")
serviceMethod := strings.Split(req.msg.Method, ".")
if len(serviceMethod) != 2 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
err = errors.New("rpc: service/method request ill-formed: " + req.msg.Method)
return
}
// Look up the request.
@@ -449,12 +455,12 @@ func (router *router) readHeader(codec serverCodec) (service *service, mtype *me
service = router.serviceMap[serviceMethod[0]]
router.mu.Unlock()
if service == nil {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
err = errors.New("rpc: can't find service " + req.msg.Method)
return
}
mtype = service.method[serviceMethod[1]]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
err = errors.New("rpc: can't find method " + req.msg.Method)
}
return
}

View File

@@ -3,16 +3,18 @@ package server
import (
"context"
"sync"
"github.com/micro/go-micro/codec"
)
// Implements the Streamer interface
type rpcStream struct {
sync.RWMutex
seq uint64
id string
closed bool
err error
request Request
codec serverCodec
codec codec.Codec
context context.Context
}
@@ -28,28 +30,30 @@ func (r *rpcStream) Send(msg interface{}) error {
r.Lock()
defer r.Unlock()
resp := response{
ServiceMethod: r.request.Method(),
Seq: r.seq,
resp := codec.Message{
Method: r.request.Method(),
Id: r.id,
Type: codec.Response,
}
return r.codec.Write(&resp, msg, false)
return r.codec.Write(&resp, msg)
}
func (r *rpcStream) Recv(msg interface{}) error {
r.Lock()
defer r.Unlock()
req := request{}
req := new(codec.Message)
req.Type = codec.Request
if err := r.codec.ReadHeader(&req, false); err != nil {
if err := r.codec.ReadHeader(req, req.Type); err != nil {
// discard body
r.codec.ReadBody(nil)
return err
}
// we need to stay up to date with sequence numbers
r.seq = req.Seq
r.id = req.Id
return r.codec.ReadBody(msg)
}