Further crufting

This commit is contained in:
Asim Aslam 2019-01-09 19:28:13 +00:00
parent 873fc6d663
commit 6e0e4a684c
6 changed files with 32 additions and 14 deletions

View File

@ -23,13 +23,21 @@ type NewCodec func(io.ReadWriteCloser) Codec
// connection. ReadBody may be called with a nil argument to force the // connection. ReadBody may be called with a nil argument to force the
// body to be read and discarded. // body to be read and discarded.
type Codec interface { type Codec interface {
ReadHeader(*Message, MessageType) error Reader
ReadBody(interface{}) error Writer
Write(*Message, interface{}) error
Close() error Close() error
String() string String() string
} }
type Reader interface {
ReadHeader(*Message, MessageType) error
ReadBody(interface{}) error
}
type Writer interface {
Write(*Message, interface{}) error
}
// Message represents detailed information about // Message represents detailed information about
// the communication, likely followed by the body. // the communication, likely followed by the body.
// In the case of an error, body may be nil. // In the case of an error, body may be nil.

View File

@ -22,7 +22,7 @@ type rpcMessage struct {
payload interface{} payload interface{}
} }
func (r *rpcRequest) Codec() codec.Codec { func (r *rpcRequest) Codec() codec.Reader {
return r.codec return r.codec
} }

View File

@ -3,12 +3,18 @@ package server
import ( import (
"net/http" "net/http"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
type rpcResponse struct { type rpcResponse struct {
header map[string]string header map[string]string
socket transport.Socket socket transport.Socket
codec codec.Codec
}
func (r *rpcResponse) Codec() codec.Writer {
return r.codec
} }
func (r *rpcResponse) WriteHeader(hdr map[string]string) { func (r *rpcResponse) WriteHeader(hdr map[string]string) {

View File

@ -164,7 +164,7 @@ func prepareMethod(method reflect.Method) *methodType {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
} }
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Codec, errmsg string, last bool) (err error) { func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) {
msg := new(codec.Message) msg := new(codec.Message)
msg.Type = codec.Response msg.Type = codec.Response
resp := router.getResponse() resp := router.getResponse()
@ -184,7 +184,7 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte
return err return err
} }
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Codec) { func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) {
function := mtype.method.Func function := mtype.method.Func
var returnValues []reflect.Value var returnValues []reflect.Value
@ -232,7 +232,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
stream := &rpcStream{ stream := &rpcStream{
context: ctx, context: ctx,
codec: cc, codec: cc.(codec.Codec),
request: r, request: r,
id: req.msg.Id, id: req.msg.Id,
} }
@ -358,7 +358,7 @@ func (router *router) readRequest(r Request) (service *service, mtype *methodTyp
return return
} }
func (router *router) readHeader(cc codec.Codec) (service *service, mtype *methodType, req *request, keepReading bool, err error) { func (router *router) readHeader(cc codec.Reader) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
// Grab the request header. // Grab the request header.
msg := new(codec.Message) msg := new(codec.Message)
msg.Type = codec.Request msg.Type = codec.Request
@ -449,7 +449,6 @@ func (router *router) Handle(h Handler) error {
} }
func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) error { func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) error {
cc := r.Codec()
sending := new(sync.Mutex) sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := router.readRequest(r) service, mtype, req, argv, replyv, keepReading, err := router.readRequest(r)
if err != nil { if err != nil {
@ -458,11 +457,11 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response)
} }
// send a response if we actually managed to read a header. // send a response if we actually managed to read a header.
if req != nil { if req != nil {
router.sendResponse(sending, req, invalidRequest, cc, err.Error(), true) router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true)
router.freeRequest(req) router.freeRequest(req)
} }
return err return err
} }
service.call(ctx, router, sending, mtype, req, argv, replyv, cc) service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
return nil return nil
} }

View File

@ -110,12 +110,14 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
return return
} }
codec := newRpcCodec(&msg, sock, cf)
// internal request // internal request
request := &rpcRequest{ request := &rpcRequest{
service: msg.Header["X-Micro-Service"], service: msg.Header["X-Micro-Service"],
method: msg.Header["X-Micro-Method"], method: msg.Header["X-Micro-Method"],
contentType: ct, contentType: ct,
codec: newRpcCodec(&msg, sock, cf), codec: codec,
header: msg.Header, header: msg.Header,
body: msg.Body, body: msg.Body,
socket: sock, socket: sock,
@ -126,6 +128,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
response := &rpcResponse{ response := &rpcResponse{
header: make(map[string]string), header: make(map[string]string),
socket: sock, socket: sock,
codec: codec,
} }
// set router // set router

View File

@ -54,13 +54,15 @@ type Request interface {
// Read the undecoded request body // Read the undecoded request body
Read() ([]byte, error) Read() ([]byte, error)
// The encoded message stream // The encoded message stream
Codec() codec.Codec Codec() codec.Reader
// Indicates whether its a stream // Indicates whether its a stream
Stream() bool Stream() bool
} }
// Response is the response writer for unencoded messages // Response is the response writer for unencoded messages
type Response interface { type Response interface {
// Encoded writer
Codec() codec.Writer
// Write the header // Write the header
WriteHeader(map[string]string) WriteHeader(map[string]string)
// write a response directly to the client // write a response directly to the client