Merge pull request #501 from micro/wrap

add the wrappers back into the core router
This commit is contained in:
Asim Aslam 2019-06-07 15:22:16 +01:00 committed by GitHub
commit 95b8147fa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 17 deletions

View File

@ -14,6 +14,7 @@ type rpcRequest struct {
codec codec.Codec codec codec.Codec
header map[string]string header map[string]string
body []byte body []byte
rawBody interface{}
stream bool stream bool
} }
@ -48,8 +49,7 @@ func (r *rpcRequest) Header() map[string]string {
} }
func (r *rpcRequest) Body() interface{} { func (r *rpcRequest) Body() interface{} {
// TODO: convert to interface value return r.rawBody
return r.body
} }
func (r *rpcRequest) Read() ([]byte, error) { func (r *rpcRequest) Read() ([]byte, error) {

View File

@ -188,6 +188,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
method: req.msg.Method, method: req.msg.Method,
endpoint: req.msg.Endpoint, endpoint: req.msg.Endpoint,
body: req.msg.Body, body: req.msg.Body,
rawBody: argv.Interface(),
} }
if !mtype.stream { if !mtype.stream {
@ -202,6 +203,11 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
return nil return nil
} }
// wrap the handler
for i := len(router.hdlrWrappers); i > 0; i-- {
fn = router.hdlrWrappers[i-1](fn)
}
// execute handler // execute handler
if err := fn(ctx, r, replyv.Interface()); err != nil { if err := fn(ctx, r, replyv.Interface()); err != nil {
return err return err
@ -235,6 +241,11 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
} }
} }
// wrap the handler
for i := len(router.hdlrWrappers); i > 0; i-- {
fn = router.hdlrWrappers[i-1](fn)
}
// client.Stream request // client.Stream request
r.stream = true r.stream = true

View File

@ -35,6 +35,9 @@ type rpcServer struct {
func newRpcServer(opts ...Option) Server { func newRpcServer(opts ...Option) Server {
options := newOptions(opts...) options := newOptions(opts...)
router := newRpcRouter()
router.hdlrWrappers = options.HdlrWrappers
return &rpcServer{ return &rpcServer{
opts: options, opts: options,
router: DefaultRouter, router: DefaultRouter,
@ -45,6 +48,14 @@ func newRpcServer(opts ...Option) Server {
} }
} }
type rpcRouter struct {
h func(context.Context, Request, interface{}) error
}
func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error {
return r.h(ctx, req, rsp)
}
// ServeConn serves a single connection // ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) { func (s *rpcServer) ServeConn(sock transport.Socket) {
defer func() { defer func() {
@ -143,24 +154,26 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
} }
// set router // set router
r := s.opts.Router r := Router(s.router)
// if nil use default router // if not nil use the router specified
if s.opts.Router == nil { if s.opts.Router != nil {
r = s.router // create a wrapped function
handler := func(ctx context.Context, req Request, rsp interface{}) error {
return s.opts.Router.ServeRequest(ctx, req, rsp.(Response))
}
// execute the wrapper for it
for i := len(s.opts.HdlrWrappers); i > 0; i-- {
handler = s.opts.HdlrWrappers[i-1](handler)
}
// set the router
r = rpcRouter{handler}
} }
// create a wrapped function // serve the actual request using the request router
handler := func(ctx context.Context, req Request, rsp interface{}) error { if err := r.ServeRequest(ctx, request, response); err != nil {
return r.ServeRequest(ctx, req, rsp.(Response))
}
for i := len(s.opts.HdlrWrappers); i > 0; i-- {
handler = s.opts.HdlrWrappers[i-1](handler)
}
// TODO: handle error better
if err := handler(ctx, request, response); err != nil {
// write an error response // write an error response
err = rcodec.Write(&codec.Message{ err = rcodec.Write(&codec.Message{
Header: msg.Header, Header: msg.Header,
@ -206,6 +219,15 @@ func (s *rpcServer) Init(opts ...Option) error {
for _, opt := range opts { for _, opt := range opts {
opt(&s.opts) opt(&s.opts)
} }
// update router if its the default
if s.opts.Router == nil {
r := newRpcRouter()
r.hdlrWrappers = s.opts.HdlrWrappers
r.serviceMap = s.router.serviceMap
s.router = r
}
s.Unlock() s.Unlock()
return nil return nil
} }