Add router modifications

This commit is contained in:
Asim Aslam 2019-01-09 16:20:57 +00:00
parent ee380c6b7a
commit d004c9624b
6 changed files with 72 additions and 25 deletions

View File

@ -39,5 +39,8 @@ type Message struct {
Target string Target string
Method string Method string
Error string Error string
// The values read from the socket
Header map[string]string Header map[string]string
Body []byte
} }

View File

@ -73,7 +73,11 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
} }
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
m := codec.Message{Header: c.req.Header} // the initieal message
m := codec.Message{
Header: c.req.Header,
Body: c.req.Body,
}
// if its a follow on request read it // if its a follow on request read it
if !c.first { if !c.first {
@ -93,6 +97,8 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
// set the message header // set the message header
m.Header = tm.Header m.Header = tm.Header
// set the message body
m.Body = tm.Body
} }
// no longer first read // no longer first read
@ -117,7 +123,7 @@ func (c *rpcCodec) ReadBody(b interface{}) error {
return c.codec.ReadBody(b) return c.codec.ReadBody(b)
} }
func (c *rpcCodec) Write(r *codec.Message, body interface{}) error { func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
c.buf.wbuf.Reset() c.buf.wbuf.Reset()
// create a new message // create a new message
@ -134,22 +140,33 @@ func (c *rpcCodec) Write(r *codec.Message, body interface{}) error {
}, },
} }
// write to the body // the body being sent
if err := c.codec.Write(m, body); err != nil { var body []byte
// if we have encoded data just send it
if len(r.Body) > 0 {
body = r.Body
// write to the body
} else if err := c.codec.Write(m, b); err != nil {
c.buf.wbuf.Reset() c.buf.wbuf.Reset()
// write an error if it failed // write an error if it failed
m.Error = errors.Wrapf(err, "Unable to encode body").Error() m.Error = errors.Wrapf(err, "Unable to encode body").Error()
m.Header["X-Micro-Error"] = m.Error m.Header["X-Micro-Error"] = m.Error
// no body to write
if err := c.codec.Write(m, nil); err != nil { if err := c.codec.Write(m, nil); err != nil {
return err return err
} }
// write the body
} else {
// set the body
body = c.buf.wbuf.Bytes()
} }
// send on the socket // send on the socket
return c.socket.Send(&transport.Message{ return c.socket.Send(&transport.Message{
Header: m.Header, Header: m.Header,
Body: c.buf.wbuf.Bytes(), Body: body,
}) })
} }

View File

@ -1,10 +1,15 @@
package server package server
import (
"github.com/micro/go-micro/codec"
)
type rpcRequest struct { type rpcRequest struct {
service string service string
method string method string
contentType string contentType string
request interface{} codec codec.Codec
body []byte
stream bool stream bool
} }
@ -14,6 +19,10 @@ type rpcMessage struct {
payload interface{} payload interface{}
} }
func (r *rpcRequest) Codec() codec.Codec {
return r.codec
}
func (r *rpcRequest) ContentType() string { func (r *rpcRequest) ContentType() string {
return r.contentType return r.contentType
} }
@ -26,8 +35,8 @@ func (r *rpcRequest) Method() string {
return r.method return r.method
} }
func (r *rpcRequest) Request() interface{} { func (r *rpcRequest) Body() []byte {
return r.request return r.body
} }
func (r *rpcRequest) Stream() bool { func (r *rpcRequest) Stream() bool {

View File

@ -18,6 +18,7 @@ import (
"github.com/micro/go-log" "github.com/micro/go-log"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
) )
var ( var (
@ -184,21 +185,20 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte
return err return err
} }
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec codec.Codec) { func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Codec) {
function := mtype.method.Func function := mtype.method.Func
var returnValues []reflect.Value var returnValues []reflect.Value
r := &rpcRequest{ r := &rpcRequest{
service: router.name, service: req.msg.Target,
contentType: req.msg.Header["Content-Type"], contentType: req.msg.Header["Content-Type"],
method: req.msg.Method, method: req.msg.Method,
body: req.msg.Body,
} }
if !mtype.stream { if !mtype.stream {
r.request = argv.Interface()
fn := func(ctx context.Context, req Request, rsp interface{}) error { fn := func(ctx context.Context, req Request, rsp interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req.Request()), reflect.ValueOf(rsp)}) returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
// The return value for the method is an error. // The return value for the method is an error.
if err := returnValues[0].Interface(); err != nil { if err := returnValues[0].Interface(); err != nil {
@ -218,7 +218,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
errmsg = err.Error() errmsg = err.Error()
} }
err = router.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true) err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true)
if err != nil { if err != nil {
log.Log("rpc call: unable to send response: ", err) log.Log("rpc call: unable to send response: ", err)
} }
@ -233,7 +233,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
stream := &rpcStream{ stream := &rpcStream{
context: ctx, context: ctx,
codec: codec, codec: cc,
request: r, request: r,
id: req.msg.Id, id: req.msg.Id,
} }
@ -268,7 +268,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
// this is the last packet, we don't do anything with // this is the last packet, we don't do anything with
// the error here (well sendStreamResponse will log it // the error here (well sendStreamResponse will log it
// already) // already)
router.sendResponse(sending, req, nil, codec, errmsg, true) router.sendResponse(sending, req, nil, cc, errmsg, true)
router.freeRequest(req) router.freeRequest(req)
} }
@ -319,7 +319,9 @@ func (router *router) freeResponse(resp *response) {
router.respLock.Unlock() router.respLock.Unlock()
} }
func (router *router) readRequest(cc codec.Codec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) { func (router *router) readRequest(r Request) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) {
cc := r.Codec()
service, mtype, req, keepReading, err = router.readHeader(cc) service, mtype, req, keepReading, err = router.readHeader(cc)
if err != nil { if err != nil {
if !keepReading { if !keepReading {
@ -447,9 +449,10 @@ func (router *router) Handle(h Handler) error {
return nil return nil
} }
func (router *router) ServeRequest(ctx context.Context, cc codec.Codec) error { func (router *router) ServeRequest(ctx context.Context, r Request, s transport.Socket) error {
cc := r.Codec()
sending := new(sync.Mutex) sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := router.readRequest(cc) service, mtype, req, argv, replyv, keepReading, err := router.readRequest(r)
if err != nil { if err != nil {
if !keepReading { if !keepReading {
return err return err

View File

@ -110,8 +110,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
return return
} }
// create the internal server codec // internal request
codec := newRpcCodec(&msg, sock, cf) request := &rpcRequest{
service: msg.Header["X-Micro-Service"],
method: msg.Header["X-Micro-Method"],
contentType: ct,
codec: newRpcCodec(&msg, sock, cf),
body: msg.Body,
stream: true,
}
// set router // set router
var r Router var r Router
@ -122,7 +129,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
} }
// TODO: needs better error handling // TODO: needs better error handling
if err := r.ServeRequest(ctx, codec); err != nil { if err := r.ServeRequest(ctx, request, sock); err != nil {
s.wg.Done() s.wg.Done()
log.Logf("Unexpected error serving request, closing socket: %v", err) log.Logf("Unexpected error serving request, closing socket: %v", err)
return return

View File

@ -11,6 +11,7 @@ import (
"github.com/micro/go-log" "github.com/micro/go-log"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport"
) )
// Server is a simple micro server abstraction // Server is a simple micro server abstraction
@ -30,7 +31,8 @@ type Server interface {
// Router handle serving messages // Router handle serving messages
type Router interface { type Router interface {
ServeRequest(context.Context, codec.Codec) error // ServeRequest processes a request to completion
ServeRequest(context.Context, Request, transport.Socket) error
} }
// Message is an async message interface // Message is an async message interface
@ -42,11 +44,17 @@ type Message interface {
// Request is a synchronous request interface // Request is a synchronous request interface
type Request interface { type Request interface {
// Service name requested
Service() string Service() string
// Method name requested
Method() string Method() string
// The initial request body
Body() []byte
// Content type provided
ContentType() string ContentType() string
Request() interface{} // The codec for encoding/decoding messages
// indicates whether the request will be streamed Codec() codec.Codec
// Indicates whether its a stream
Stream() bool Stream() bool
} }