Merge pull request #375 from micro/codec

further codec changes
This commit is contained in:
Asim Aslam 2019-01-08 21:04:22 +00:00 committed by GitHub
commit d5df31eeb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 165 additions and 126 deletions

View File

@ -97,7 +97,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
request: req, request: req,
closed: make(chan bool), closed: make(chan bool),
codec: newRpcCodec(msg, c, cf), codec: newRpcCodec(msg, c, cf),
seq: seq, seq: fmt.Sprintf("%v", seq),
} }
defer stream.Close() defer stream.Close()

View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
errs "errors" errs "errors"
"fmt" "fmt"
"strconv"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes" raw "github.com/micro/go-micro/codec/bytes"
@ -55,13 +54,13 @@ type clientCodec interface {
type request struct { type request struct {
Service string Service string
ServiceMethod string // format: "Service.Method" ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client Seq string // sequence number chosen by client
next *request // for free list in Server next *request // for free list in Server
} }
type response struct { type response struct {
ServiceMethod string // echoes that of the Request ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request Seq string // echoes that of the request
Error string // error, if any. Error string // error, if any.
next *response // for free list in Server next *response // for free list in Server
} }
@ -115,7 +114,7 @@ func (c *rpcCodec) Write(req *request, body interface{}) error {
Method: req.ServiceMethod, Method: req.ServiceMethod,
Type: codec.Request, Type: codec.Request,
Header: map[string]string{ Header: map[string]string{
"X-Micro-Id": fmt.Sprintf("%d", req.Seq), "X-Micro-Id": fmt.Sprintf("%v", req.Seq),
"X-Micro-Service": req.Service, "X-Micro-Service": req.Service,
"X-Micro-Method": req.ServiceMethod, "X-Micro-Method": req.ServiceMethod,
}, },
@ -161,9 +160,8 @@ func (c *rpcCodec) Read(r *response, b interface{}) error {
r.ServiceMethod = me.Header["X-Micro-Method"] r.ServiceMethod = me.Header["X-Micro-Method"]
} }
if me.Id == 0 && len(me.Header["X-Micro-Id"]) > 0 { if len(me.Id) == 0 {
id, _ := strconv.ParseInt(me.Header["X-Micro-Id"], 10, 64) r.Seq = me.Header["X-Micro-Id"]
r.Seq = uint64(id)
} }
if err != nil { if err != nil {

View File

@ -9,7 +9,7 @@ import (
// Implements the streamer interface // Implements the streamer interface
type rpcStream struct { type rpcStream struct {
sync.RWMutex sync.RWMutex
seq uint64 seq string
closed chan bool closed chan bool
err error err error
request Request request Request

View File

@ -34,7 +34,7 @@ type Codec interface {
// the communication, likely followed by the body. // the communication, likely followed by the body.
// In the case of an error, body may be nil. // In the case of an error, body may be nil.
type Message struct { type Message struct {
Id uint64 Id string
Type MessageType Type MessageType
Target string Target string
Method string Method string

View File

@ -19,17 +19,17 @@ type clientCodec struct {
resp clientResponse resp clientResponse
sync.Mutex sync.Mutex
pending map[uint64]string pending map[interface{}]string
} }
type clientRequest struct { type clientRequest struct {
Method string `json:"method"` Method string `json:"method"`
Params [1]interface{} `json:"params"` Params [1]interface{} `json:"params"`
ID uint64 `json:"id"` ID interface{} `json:"id"`
} }
type clientResponse struct { type clientResponse struct {
ID uint64 `json:"id"` ID interface{} `json:"id"`
Result *json.RawMessage `json:"result"` Result *json.RawMessage `json:"result"`
Error interface{} `json:"error"` Error interface{} `json:"error"`
} }
@ -39,7 +39,7 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec {
dec: json.NewDecoder(conn), dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn), enc: json.NewEncoder(conn),
c: conn, c: conn,
pending: make(map[uint64]string), pending: make(map[interface{}]string),
} }
} }
@ -71,7 +71,7 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error {
c.Unlock() c.Unlock()
m.Error = "" m.Error = ""
m.Id = c.resp.ID m.Id = fmt.Sprintf("%v", c.resp.ID)
if c.resp.Error != nil { if c.resp.Error != nil {
x, ok := c.resp.Error.(string) x, ok := c.resp.Error.(string)
if !ok { if !ok {

View File

@ -2,9 +2,8 @@ package jsonrpc
import ( import (
"encoding/json" "encoding/json"
"errors" "fmt"
"io" "io"
"sync"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
) )
@ -17,20 +16,16 @@ type serverCodec struct {
// temporary work space // temporary work space
req serverRequest req serverRequest
resp serverResponse resp serverResponse
sync.Mutex
seq uint64
pending map[uint64]*json.RawMessage
} }
type serverRequest struct { type serverRequest struct {
Method string `json:"method"` Method string `json:"method"`
Params *json.RawMessage `json:"params"` Params *json.RawMessage `json:"params"`
ID *json.RawMessage `json:"id"` ID interface{} `json:"id"`
} }
type serverResponse struct { type serverResponse struct {
ID *json.RawMessage `json:"id"` ID interface{} `json:"id"`
Result interface{} `json:"result"` Result interface{} `json:"result"`
Error interface{} `json:"error"` Error interface{} `json:"error"`
} }
@ -40,7 +35,6 @@ func newServerCodec(conn io.ReadWriteCloser) *serverCodec {
dec: json.NewDecoder(conn), dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn), enc: json.NewEncoder(conn),
c: conn, c: conn,
pending: make(map[uint64]*json.RawMessage),
} }
} }
@ -50,7 +44,7 @@ func (r *serverRequest) reset() {
*r.Params = (*r.Params)[0:0] *r.Params = (*r.Params)[0:0]
} }
if r.ID != nil { if r.ID != nil {
*r.ID = (*r.ID)[0:0] r.ID = nil
} }
} }
@ -60,14 +54,8 @@ func (c *serverCodec) ReadHeader(m *codec.Message) error {
return err return err
} }
m.Method = c.req.Method m.Method = c.req.Method
m.Id = fmt.Sprintf("%v", c.req.ID)
c.Lock()
c.seq++
c.pending[c.seq] = c.req.ID
c.req.ID = nil c.req.ID = nil
m.Id = c.seq
c.Unlock()
return nil return nil
} }
@ -84,19 +72,7 @@ var null = json.RawMessage([]byte("null"))
func (c *serverCodec) Write(m *codec.Message, x interface{}) error { func (c *serverCodec) Write(m *codec.Message, x interface{}) error {
var resp serverResponse var resp serverResponse
c.Lock() resp.ID = m.Id
b, ok := c.pending[m.Id]
if !ok {
c.Unlock()
return errors.New("invalid sequence number in response")
}
c.Unlock()
if b == nil {
// Invalid request so no id. Use JSON null.
b = &null
}
resp.ID = b
resp.Result = x resp.Result = x
if m.Error == "" { if m.Error == "" {
resp.Error = nil resp.Error = nil

View File

@ -22,11 +22,18 @@ func (c *Codec) ReadBody(b interface{}) error {
if err != nil { if err != nil {
return err return err
} }
if b == nil {
return nil
}
return proto.Unmarshal(buf, b.(proto.Message)) return proto.Unmarshal(buf, b.(proto.Message))
} }
func (c *Codec) Write(m *codec.Message, b interface{}) error { func (c *Codec) Write(m *codec.Message, b interface{}) error {
buf, err := proto.Marshal(b.(proto.Message)) p, ok := b.(proto.Message)
if !ok {
return nil
}
buf, err := proto.Marshal(p)
if err != nil { if err != nil {
return err return err
} }

View File

@ -5,6 +5,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"strconv"
"sync" "sync"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -31,13 +32,22 @@ func (c *protoCodec) String() string {
return "proto-rpc" return "proto-rpc"
} }
func id(id string) *uint64 {
p, err := strconv.ParseInt(id, 10, 64)
if err != nil {
p = 0
}
i := uint64(p)
return &i
}
func (c *protoCodec) Write(m *codec.Message, b interface{}) error { func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
switch m.Type { switch m.Type {
case codec.Request: case codec.Request:
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
// This is protobuf, of course we copy it. // This is protobuf, of course we copy it.
pbr := &Request{ServiceMethod: &m.Method, Seq: &m.Id} pbr := &Request{ServiceMethod: &m.Method, Seq: id(m.Id)}
data, err := proto.Marshal(pbr) data, err := proto.Marshal(pbr)
if err != nil { if err != nil {
return err return err
@ -63,7 +73,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
case codec.Response: case codec.Response:
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
rtmp := &Response{ServiceMethod: &m.Method, Seq: &m.Id, Error: &m.Error} rtmp := &Response{ServiceMethod: &m.Method, Seq: id(m.Id), Error: &m.Error}
data, err := proto.Marshal(rtmp) data, err := proto.Marshal(rtmp)
if err != nil { if err != nil {
return err return err
@ -117,7 +127,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
return err return err
} }
m.Method = rtmp.GetServiceMethod() m.Method = rtmp.GetServiceMethod()
m.Id = rtmp.GetSeq() m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
case codec.Response: case codec.Response:
data, err := ReadNetString(c.rwc) data, err := ReadNetString(c.rwc)
if err != nil { if err != nil {
@ -129,7 +139,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
return err return err
} }
m.Method = rtmp.GetServiceMethod() m.Method = rtmp.GetServiceMethod()
m.Id = rtmp.GetSeq() m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
m.Error = rtmp.GetError() m.Error = rtmp.GetError()
case codec.Publication: case codec.Publication:
_, err := io.Copy(c.buf, c.rwc) _, err := io.Copy(c.buf, c.rwc)

View File

@ -25,8 +25,12 @@ type Options struct {
HdlrWrappers []HandlerWrapper HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper SubWrappers []SubscriberWrapper
// The register expiry time
RegisterTTL time.Duration RegisterTTL time.Duration
// The router for requests
Router Router
// Debug Handler which can be set by a user // Debug Handler which can be set by a user
DebugHandler debug.DebugHandler DebugHandler debug.DebugHandler
@ -164,6 +168,13 @@ func RegisterTTL(t time.Duration) Option {
} }
} }
// WithRouter sets the request router
func WithRouter(r Router) Option {
return func(o *Options) {
o.Router = r
}
}
// Wait tells the server to wait for requests to finish before exiting // Wait tells the server to wait for requests to finish before exiting
func Wait(b bool) Option { func Wait(b bool) Option {
return func(o *Options) { return func(o *Options) {

View File

@ -2,8 +2,6 @@ package server
import ( import (
"bytes" "bytes"
"fmt"
"strconv"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes" raw "github.com/micro/go-micro/codec/bytes"
@ -19,6 +17,7 @@ import (
type rpcCodec struct { type rpcCodec struct {
socket transport.Socket socket transport.Socket
codec codec.Codec codec codec.Codec
first bool
req *transport.Message req *transport.Message
buf *readWriteCloser buf *readWriteCloser
@ -65,12 +64,13 @@ func (rwc *readWriteCloser) Close() error {
return nil return nil
} }
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) serverCodec { func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{ rwc := &readWriteCloser{
rbuf: bytes.NewBuffer(req.Body), rbuf: bytes.NewBuffer(req.Body),
wbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil),
} }
r := &rpcCodec{ r := &rpcCodec{
first: true,
buf: rwc, buf: rwc,
codec: c(rwc), codec: c(rwc),
req: req, req: req,
@ -79,36 +79,43 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
return r return r
} }
func (c *rpcCodec) ReadHeader(r *request, first bool) error { func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
m := codec.Message{Header: c.req.Header} m := codec.Message{Header: c.req.Header}
if !first { // if its a follow on request read it
if !c.first {
var tm transport.Message var tm transport.Message
// read off the socket
if err := c.socket.Recv(&tm); err != nil { if err := c.socket.Recv(&tm); err != nil {
return err return err
} }
// reset the read buffer
c.buf.rbuf.Reset() c.buf.rbuf.Reset()
// write the body to the buffer
if _, err := c.buf.rbuf.Write(tm.Body); err != nil { if _, err := c.buf.rbuf.Write(tm.Body); err != nil {
return err return err
} }
// set the message header
m.Header = tm.Header m.Header = tm.Header
} }
// no longer first read
c.first = false
// set some internal things // set some internal things
m.Target = m.Header["X-Micro-Service"] m.Target = m.Header["X-Micro-Service"]
m.Method = m.Header["X-Micro-Method"] m.Method = m.Header["X-Micro-Method"]
m.Id = m.Header["X-Micro-Id"]
// set id
if len(m.Header["X-Micro-Id"]) > 0 {
id, _ := strconv.ParseInt(m.Header["X-Micro-Id"], 10, 64)
m.Id = uint64(id)
}
// read header via codec // read header via codec
err := c.codec.ReadHeader(&m, codec.Request) err := c.codec.ReadHeader(&m, codec.Request)
r.ServiceMethod = m.Method
r.Seq = m.Id // set the method/id
r.Method = m.Method
r.Id = m.Id
return err return err
} }
@ -117,21 +124,28 @@ func (c *rpcCodec) ReadBody(b interface{}) error {
return c.codec.ReadBody(b) return c.codec.ReadBody(b)
} }
func (c *rpcCodec) Write(r *response, body interface{}, last bool) error { func (c *rpcCodec) Write(r *codec.Message, body interface{}) error {
c.buf.wbuf.Reset() c.buf.wbuf.Reset()
// create a new message
m := &codec.Message{ m := &codec.Message{
Method: r.ServiceMethod, Method: r.Method,
Id: r.Seq, Id: r.Id,
Error: r.Error, Error: r.Error,
Type: codec.Response, Type: r.Type,
Header: map[string]string{ Header: map[string]string{
"X-Micro-Id": fmt.Sprintf("%d", r.Seq), "X-Micro-Id": r.Id,
"X-Micro-Method": r.ServiceMethod, "X-Micro-Method": r.Method,
"X-Micro-Error": r.Error, "X-Micro-Error": r.Error,
"Content-Type": c.req.Header["Content-Type"],
}, },
} }
// write to the body
if err := c.codec.Write(m, body); err != nil { if err := c.codec.Write(m, body); err != nil {
c.buf.wbuf.Reset() c.buf.wbuf.Reset()
// write an error if it failed
m.Error = errors.Wrapf(err, "Unable to encode body").Error() m.Error = errors.Wrapf(err, "Unable to encode body").Error()
m.Header["X-Micro-Error"] = m.Error m.Header["X-Micro-Error"] = m.Error
if err := c.codec.Write(m, nil); err != nil { if err := c.codec.Write(m, nil); err != nil {
@ -139,7 +153,7 @@ func (c *rpcCodec) Write(r *response, body interface{}, last bool) error {
} }
} }
m.Header["Content-Type"] = c.req.Header["Content-Type"] // send on the socket
return c.socket.Send(&transport.Message{ return c.socket.Send(&transport.Message{
Header: m.Header, Header: m.Header,
Body: c.buf.wbuf.Bytes(), Body: c.buf.wbuf.Bytes(),
@ -151,3 +165,7 @@ func (c *rpcCodec) Close() error {
c.codec.Close() c.codec.Close()
return c.socket.Close() return c.socket.Close()
} }
func (c *rpcCodec) String() string {
return "rpc"
}

View File

@ -47,12 +47,11 @@ func TestCodecWriteError(t *testing.T) {
socket: socket, socket: socket,
} }
err := c.Write(&response{ err := c.Write(&codec.Message{
ServiceMethod: "Service.Method", Method: "Service.Method",
Seq: 0, Id: "0",
Error: "", Error: "",
next: nil, }, "body")
}, "body", false)
if err != nil { if err != nil {
t.Fatalf(`Expected Write to fail; got "%+v" instead`, err) t.Fatalf(`Expected Write to fail; got "%+v" instead`, err)

View File

@ -17,6 +17,7 @@ import (
"unicode/utf8" "unicode/utf8"
"github.com/micro/go-log" "github.com/micro/go-log"
"github.com/micro/go-micro/codec"
) )
var ( var (
@ -48,15 +49,12 @@ type service struct {
} }
type request struct { type request struct {
ServiceMethod string // format: "Service.Method" msg *codec.Message
Seq uint64 // sequence number chosen by client
next *request // for free list in Server next *request // for free list in Server
} }
type response struct { type response struct {
ServiceMethod string // echoes that of the Request msg *codec.Message
Seq uint64 // echoes that of the request
Error string // error, if any.
next *response // for free list in Server next *response // for free list in Server
} }
@ -215,30 +213,34 @@ func (router *router) Handle(h Handler) error {
return nil return nil
} }
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) { func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Codec, errmsg string, last bool) (err error) {
msg := new(codec.Message)
msg.Type = codec.Response
resp := router.getResponse() resp := router.getResponse()
resp.msg = msg
// Encode the response header // Encode the response header
resp.ServiceMethod = req.ServiceMethod resp.msg.Method = req.msg.Method
if errmsg != "" { if errmsg != "" {
resp.Error = errmsg resp.msg.Error = errmsg
reply = invalidRequest reply = invalidRequest
} }
resp.Seq = req.Seq resp.msg.Id = req.msg.Id
sending.Lock() sending.Lock()
err = codec.Write(resp, reply, last) err = cc.Write(resp.msg, reply)
sending.Unlock() sending.Unlock()
router.freeResponse(resp) router.freeResponse(resp)
return err return err
} }
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec serverCodec, ct string) { func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec codec.Codec) {
function := mtype.method.Func function := mtype.method.Func
var returnValues []reflect.Value var returnValues []reflect.Value
r := &rpcRequest{ r := &rpcRequest{
service: router.name, service: router.name,
contentType: ct, contentType: req.msg.Header["Content-Type"],
method: req.ServiceMethod, method: req.msg.Method,
} }
if !mtype.stream { if !mtype.stream {
@ -282,7 +284,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
context: ctx, context: ctx,
codec: codec, codec: codec,
request: r, request: r,
seq: req.Seq, id: req.msg.Id,
} }
// Invoke the method, providing a new value for the reply. // Invoke the method, providing a new value for the reply.
@ -326,21 +328,21 @@ func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
return reflect.Zero(m.ContextType) return reflect.Zero(m.ContextType)
} }
func (router *router) ServeRequest(ctx context.Context, codec serverCodec, ct string) error { func (router *router) ServeRequest(ctx context.Context, cc codec.Codec) error {
sending := new(sync.Mutex) sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := router.readRequest(codec) service, mtype, req, argv, replyv, keepReading, err := router.readRequest(cc)
if err != nil { if err != nil {
if !keepReading { if !keepReading {
return err return err
} }
// send a response if we actually managed to read a header. // send a response if we actually managed to read a header.
if req != nil { if req != nil {
router.sendResponse(sending, req, invalidRequest, codec, err.Error(), true) router.sendResponse(sending, req, invalidRequest, cc, err.Error(), true)
router.freeRequest(req) router.freeRequest(req)
} }
return err return err
} }
service.call(ctx, router, sending, mtype, req, argv, replyv, codec, ct) service.call(ctx, router, sending, mtype, req, argv, replyv, cc)
return nil return nil
} }
@ -384,19 +386,19 @@ func (router *router) freeResponse(resp *response) {
router.respLock.Unlock() router.respLock.Unlock()
} }
func (router *router) readRequest(codec serverCodec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) { func (router *router) readRequest(cc codec.Codec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = router.readHeader(codec) service, mtype, req, keepReading, err = router.readHeader(cc)
if err != nil { if err != nil {
if !keepReading { if !keepReading {
return return
} }
// discard body // discard body
codec.ReadBody(nil) cc.ReadBody(nil)
return return
} }
// is it a streaming request? then we don't read the body // is it a streaming request? then we don't read the body
if mtype.stream { if mtype.stream {
codec.ReadBody(nil) cc.ReadBody(nil)
return return
} }
@ -409,7 +411,7 @@ func (router *router) readRequest(codec serverCodec) (service *service, mtype *m
argIsValue = true argIsValue = true
} }
// argv guaranteed to be a pointer now. // argv guaranteed to be a pointer now.
if err = codec.ReadBody(argv.Interface()); err != nil { if err = cc.ReadBody(argv.Interface()); err != nil {
return return
} }
if argIsValue { if argIsValue {
@ -422,10 +424,14 @@ func (router *router) readRequest(codec serverCodec) (service *service, mtype *m
return return
} }
func (router *router) readHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) { func (router *router) readHeader(cc codec.Codec) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
// Grab the request header. // Grab the request header.
msg := new(codec.Message)
msg.Type = codec.Request
req = router.getRequest() req = router.getRequest()
err = codec.ReadHeader(req, true) req.msg = msg
err = cc.ReadHeader(msg, msg.Type)
if err != nil { if err != nil {
req = nil req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
@ -439,9 +445,9 @@ func (router *router) readHeader(codec serverCodec) (service *service, mtype *me
// we can still recover and move on to the next request. // we can still recover and move on to the next request.
keepReading = true keepReading = true
serviceMethod := strings.Split(req.ServiceMethod, ".") serviceMethod := strings.Split(req.msg.Method, ".")
if len(serviceMethod) != 2 { if len(serviceMethod) != 2 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod) err = errors.New("rpc: service/method request ill-formed: " + req.msg.Method)
return return
} }
// Look up the request. // Look up the request.
@ -449,12 +455,12 @@ func (router *router) readHeader(codec serverCodec) (service *service, mtype *me
service = router.serviceMap[serviceMethod[0]] service = router.serviceMap[serviceMethod[0]]
router.mu.Unlock() router.mu.Unlock()
if service == nil { if service == nil {
err = errors.New("rpc: can't find service " + req.ServiceMethod) err = errors.New("rpc: can't find service " + req.msg.Method)
return return
} }
mtype = service.method[serviceMethod[1]] mtype = service.method[serviceMethod[1]]
if mtype == nil { if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod) err = errors.New("rpc: can't find method " + req.msg.Method)
} }
return return
} }

View File

@ -45,7 +45,8 @@ func newRpcServer(opts ...Option) Server {
} }
} }
func (s *rpcServer) accept(sock transport.Socket) { // ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) {
defer func() { defer func() {
// close socket // close socket
sock.Close() sock.Close()
@ -92,6 +93,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
// no content type // no content type
if len(ct) == 0 { if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType
ct = DefaultContentType ct = DefaultContentType
} }
@ -111,8 +113,16 @@ func (s *rpcServer) accept(sock transport.Socket) {
// create the internal server codec // create the internal server codec
codec := newRpcCodec(&msg, sock, cf) codec := newRpcCodec(&msg, sock, cf)
// set router
var r Router
r = s.router
if s.opts.Router != nil {
r = s.opts.Router
}
// TODO: needs better error handling // TODO: needs better error handling
if err := s.router.ServeRequest(ctx, codec, ct); err != nil { if err := r.ServeRequest(ctx, codec); err != nil {
s.wg.Done() s.wg.Done()
log.Logf("Unexpected error serving request, closing socket: %v", err) log.Logf("Unexpected error serving request, closing socket: %v", err)
return return
@ -402,7 +412,7 @@ func (s *rpcServer) Start() error {
go func() { go func() {
for { for {
err := ts.Accept(s.accept) err := ts.Accept(s.ServeConn)
// check if we're supposed to exit // check if we're supposed to exit
select { select {

View File

@ -3,16 +3,18 @@ package server
import ( import (
"context" "context"
"sync" "sync"
"github.com/micro/go-micro/codec"
) )
// Implements the Streamer interface // Implements the Streamer interface
type rpcStream struct { type rpcStream struct {
sync.RWMutex sync.RWMutex
seq uint64 id string
closed bool closed bool
err error err error
request Request request Request
codec serverCodec codec codec.Codec
context context.Context context context.Context
} }
@ -28,28 +30,30 @@ func (r *rpcStream) Send(msg interface{}) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
resp := response{ resp := codec.Message{
ServiceMethod: r.request.Method(), Method: r.request.Method(),
Seq: r.seq, Id: r.id,
Type: codec.Response,
} }
return r.codec.Write(&resp, msg, false) return r.codec.Write(&resp, msg)
} }
func (r *rpcStream) Recv(msg interface{}) error { func (r *rpcStream) Recv(msg interface{}) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
req := request{} req := new(codec.Message)
req.Type = codec.Request
if err := r.codec.ReadHeader(&req, false); err != nil { if err := r.codec.ReadHeader(req, req.Type); err != nil {
// discard body // discard body
r.codec.ReadBody(nil) r.codec.ReadBody(nil)
return err return err
} }
// we need to stay up to date with sequence numbers // we need to stay up to date with sequence numbers
r.seq = req.Seq r.id = req.Id
return r.codec.ReadBody(msg) return r.codec.ReadBody(msg)
} }

View File

@ -30,7 +30,7 @@ type Server interface {
// Router handle serving messages // Router handle serving messages
type Router interface { type Router interface {
ServeCodec(context.Context, codec.Codec) error ServeRequest(context.Context, codec.Codec) error
} }
// Message is an async message interface // Message is an async message interface