rename service to router

This commit is contained in:
Asim Aslam 2019-01-07 14:44:40 +00:00
parent fcc730931c
commit 46ece968d4
3 changed files with 81 additions and 71 deletions

View File

@ -60,8 +60,8 @@ type response struct {
next *response // for free list in Server next *response // for free list in Server
} }
// server represents an RPC Server. // router represents an RPC router.
type server struct { type router struct {
name string name string
mu sync.Mutex // protects the serviceMap mu sync.Mutex // protects the serviceMap
serviceMap map[string]*service serviceMap map[string]*service
@ -72,6 +72,14 @@ type server struct {
hdlrWrappers []HandlerWrapper hdlrWrappers []HandlerWrapper
} }
func newRpcRouter(opts Options) *router {
return &router{
name: opts.Name,
hdlrWrappers: opts.HdlrWrappers,
serviceMap: make(map[string]*service),
}
}
// Is this an exported - upper case - name? // Is this an exported - upper case - name?
func isExported(name string) bool { func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name) rune, _ := utf8.DecodeRuneInString(name)
@ -158,11 +166,11 @@ func prepareMethod(method reflect.Method) *methodType {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
} }
func (server *server) register(rcvr interface{}) error { func (router *router) register(rcvr interface{}) error {
server.mu.Lock() router.mu.Lock()
defer server.mu.Unlock() defer router.mu.Unlock()
if server.serviceMap == nil { if router.serviceMap == nil {
server.serviceMap = make(map[string]*service) router.serviceMap = make(map[string]*service)
} }
s := new(service) s := new(service)
s.typ = reflect.TypeOf(rcvr) s.typ = reflect.TypeOf(rcvr)
@ -176,7 +184,7 @@ func (server *server) register(rcvr interface{}) error {
log.Log(s) log.Log(s)
return errors.New(s) return errors.New(s)
} }
if _, present := server.serviceMap[sname]; present { if _, present := router.serviceMap[sname]; present {
return errors.New("rpc: service already defined: " + sname) return errors.New("rpc: service already defined: " + sname)
} }
s.name = sname s.name = sname
@ -195,12 +203,12 @@ func (server *server) register(rcvr interface{}) error {
log.Log(s) log.Log(s)
return errors.New(s) return errors.New(s)
} }
server.serviceMap[s.name] = s router.serviceMap[s.name] = s
return nil return nil
} }
func (server *server) sendResponse(sending sync.Locker, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) { func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
resp := server.getResponse() resp := router.getResponse()
// Encode the response header // Encode the response header
resp.ServiceMethod = req.ServiceMethod resp.ServiceMethod = req.ServiceMethod
if errmsg != "" { if errmsg != "" {
@ -211,16 +219,16 @@ func (server *server) sendResponse(sending sync.Locker, req *request, reply inte
sending.Lock() sending.Lock()
err = codec.WriteResponse(resp, reply, last) err = codec.WriteResponse(resp, reply, last)
sending.Unlock() sending.Unlock()
server.freeResponse(resp) router.freeResponse(resp)
return err return err
} }
func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec serverCodec, ct string) { func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec serverCodec, ct string) {
function := mtype.method.Func function := mtype.method.Func
var returnValues []reflect.Value var returnValues []reflect.Value
r := &rpcRequest{ r := &rpcRequest{
service: server.name, service: router.name,
contentType: ct, contentType: ct,
method: req.ServiceMethod, method: req.ServiceMethod,
} }
@ -239,8 +247,8 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
return nil return nil
} }
for i := len(server.hdlrWrappers); i > 0; i-- { for i := len(router.hdlrWrappers); i > 0; i-- {
fn = server.hdlrWrappers[i-1](fn) fn = router.hdlrWrappers[i-1](fn)
} }
errmsg := "" errmsg := ""
@ -249,11 +257,11 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
errmsg = err.Error() errmsg = err.Error()
} }
err = server.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true) err = router.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true)
if err != nil { if err != nil {
log.Log("rpc call: unable to send response: ", err) log.Log("rpc call: unable to send response: ", err)
} }
server.freeRequest(req) router.freeRequest(req)
return return
} }
@ -284,8 +292,8 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
} }
} }
for i := len(server.hdlrWrappers); i > 0; i-- { for i := len(router.hdlrWrappers); i > 0; i-- {
fn = server.hdlrWrappers[i-1](fn) fn = router.hdlrWrappers[i-1](fn)
} }
// client.Stream request // client.Stream request
@ -299,8 +307,8 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
// this is the last packet, we don't do anything with // this is the last packet, we don't do anything with
// the error here (well sendStreamResponse will log it // the error here (well sendStreamResponse will log it
// already) // already)
server.sendResponse(sending, req, nil, codec, errmsg, true) router.sendResponse(sending, req, nil, codec, errmsg, true)
server.freeRequest(req) router.freeRequest(req)
} }
func (m *methodType) prepareContext(ctx context.Context) reflect.Value { func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
@ -310,66 +318,66 @@ func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
return reflect.Zero(m.ContextType) return reflect.Zero(m.ContextType)
} }
func (server *server) serveRequest(ctx context.Context, codec serverCodec, ct string) error { func (router *router) serveRequest(ctx context.Context, codec serverCodec, ct string) error {
sending := new(sync.Mutex) sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) service, mtype, req, argv, replyv, keepReading, err := router.readRequest(codec)
if err != nil { if err != nil {
if !keepReading { if !keepReading {
return err return err
} }
// send a response if we actually managed to read a header. // send a response if we actually managed to read a header.
if req != nil { if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error(), true) router.sendResponse(sending, req, invalidRequest, codec, err.Error(), true)
server.freeRequest(req) router.freeRequest(req)
} }
return err return err
} }
service.call(ctx, server, sending, mtype, req, argv, replyv, codec, ct) service.call(ctx, router, sending, mtype, req, argv, replyv, codec, ct)
return nil return nil
} }
func (server *server) getRequest() *request { func (router *router) getRequest() *request {
server.reqLock.Lock() router.reqLock.Lock()
req := server.freeReq req := router.freeReq
if req == nil { if req == nil {
req = new(request) req = new(request)
} else { } else {
server.freeReq = req.next router.freeReq = req.next
*req = request{} *req = request{}
} }
server.reqLock.Unlock() router.reqLock.Unlock()
return req return req
} }
func (server *server) freeRequest(req *request) { func (router *router) freeRequest(req *request) {
server.reqLock.Lock() router.reqLock.Lock()
req.next = server.freeReq req.next = router.freeReq
server.freeReq = req router.freeReq = req
server.reqLock.Unlock() router.reqLock.Unlock()
} }
func (server *server) getResponse() *response { func (router *router) getResponse() *response {
server.respLock.Lock() router.respLock.Lock()
resp := server.freeResp resp := router.freeResp
if resp == nil { if resp == nil {
resp = new(response) resp = new(response)
} else { } else {
server.freeResp = resp.next router.freeResp = resp.next
*resp = response{} *resp = response{}
} }
server.respLock.Unlock() router.respLock.Unlock()
return resp return resp
} }
func (server *server) freeResponse(resp *response) { func (router *router) freeResponse(resp *response) {
server.respLock.Lock() router.respLock.Lock()
resp.next = server.freeResp resp.next = router.freeResp
server.freeResp = resp router.freeResp = resp
server.respLock.Unlock() router.respLock.Unlock()
} }
func (server *server) readRequest(codec serverCodec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) { func (router *router) readRequest(codec serverCodec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec) service, mtype, req, keepReading, err = router.readRequestHeader(codec)
if err != nil { if err != nil {
if !keepReading { if !keepReading {
return return
@ -406,16 +414,16 @@ func (server *server) readRequest(codec serverCodec) (service *service, mtype *m
return return
} }
func (server *server) readRequestHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) { func (router *router) readRequestHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
// Grab the request header. // Grab the request header.
req = server.getRequest() req = router.getRequest()
err = codec.ReadRequestHeader(req, true) err = codec.ReadRequestHeader(req, true)
if err != nil { if err != nil {
req = nil req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
return return
} }
err = errors.New("rpc: server cannot decode request: " + err.Error()) err = errors.New("rpc: router cannot decode request: " + err.Error())
return return
} }
@ -429,9 +437,9 @@ func (server *server) readRequestHeader(codec serverCodec) (service *service, mt
return return
} }
// Look up the request. // Look up the request.
server.mu.Lock() router.mu.Lock()
service = server.serviceMap[serviceMethod[0]] service = router.serviceMap[serviceMethod[0]]
server.mu.Unlock() router.mu.Unlock()
if service == nil { if service == nil {
err = errors.New("rpc: can't find service " + req.ServiceMethod) err = errors.New("rpc: can't find service " + req.ServiceMethod)
return return

View File

@ -21,8 +21,8 @@ import (
) )
type rpcServer struct { type rpcServer struct {
rpc *server router *router
exit chan chan error exit chan chan error
sync.RWMutex sync.RWMutex
opts Options opts Options
@ -37,12 +37,8 @@ type rpcServer struct {
func newRpcServer(opts ...Option) Server { func newRpcServer(opts ...Option) Server {
options := newOptions(opts...) options := newOptions(opts...)
return &rpcServer{ return &rpcServer{
opts: options, opts: options,
rpc: &server{ router: newRpcRouter(options),
name: options.Name,
serviceMap: make(map[string]*service),
hdlrWrappers: options.HdlrWrappers,
},
handlers: make(map[string]Handler), handlers: make(map[string]Handler),
subscribers: make(map[*subscriber][]broker.Subscriber), subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error), exit: make(chan chan error),
@ -111,7 +107,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
} }
// TODO: needs better error handling // TODO: needs better error handling
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil { if err := s.router.serveRequest(ctx, codec, ct); err != nil {
s.wg.Done() s.wg.Done()
log.Logf("Unexpected error serving request, closing socket: %v", err) log.Logf("Unexpected error serving request, closing socket: %v", err)
return return
@ -142,12 +138,12 @@ func (s *rpcServer) Init(opts ...Option) error {
for _, opt := range opts { for _, opt := range opts {
opt(&s.opts) opt(&s.opts)
} }
// update internal server
s.rpc = &server{ // update router
name: s.opts.Name, r := newRpcRouter(s.opts)
serviceMap: s.rpc.serviceMap, r.serviceMap = s.router.serviceMap
hdlrWrappers: s.opts.HdlrWrappers, s.router = r
}
s.Unlock() s.Unlock()
return nil return nil
} }
@ -160,7 +156,7 @@ func (s *rpcServer) Handle(h Handler) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if err := s.rpc.register(h.Handler()); err != nil { if err := s.router.register(h.Handler()); err != nil {
return err return err
} }

View File

@ -27,6 +27,11 @@ type Server interface {
String() string String() string
} }
// Router handle serving messages
type Router interface {
ServeRequest(context.Context, Stream) error
}
// Message is an async message interface // Message is an async message interface
type Message interface { type Message interface {
Topic() string Topic() string
@ -97,6 +102,7 @@ var (
DefaultVersion = "1.0.0" DefaultVersion = "1.0.0"
DefaultId = uuid.New().String() DefaultId = uuid.New().String()
DefaultServer Server = newRpcServer() DefaultServer Server = newRpcServer()
DefaultRouter = newRpcRouter(newOptions())
) )
// DefaultOptions returns config options for the default service // DefaultOptions returns config options for the default service