Execute wrappers before router

This commit is contained in:
Asim Aslam 2019-01-11 15:49:54 +00:00
parent 9897c630ae
commit bfd341a269
4 changed files with 33 additions and 18 deletions

View File

@ -70,10 +70,8 @@ type router struct {
hdlrWrappers []HandlerWrapper hdlrWrappers []HandlerWrapper
} }
func newRpcRouter(opts Options) *router { func newRpcRouter() *router {
return &router{ return &router{
name: opts.Name,
hdlrWrappers: opts.HdlrWrappers,
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
} }
} }
@ -207,10 +205,6 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
return nil return nil
} }
for i := len(router.hdlrWrappers); i > 0; i-- {
fn = router.hdlrWrappers[i-1](fn)
}
errmsg := "" errmsg := ""
err := fn(ctx, r, replyv.Interface()) err := fn(ctx, r, replyv.Interface())
if err != nil { if err != nil {
@ -252,10 +246,6 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
} }
} }
for i := len(router.hdlrWrappers); i > 0; i-- {
fn = router.hdlrWrappers[i-1](fn)
}
// client.Stream request // client.Stream request
r.stream = true r.stream = true

View File

@ -38,7 +38,7 @@ func newRpcServer(opts ...Option) Server {
options := newOptions(opts...) options := newOptions(opts...)
return &rpcServer{ return &rpcServer{
opts: options, opts: options,
router: newRpcRouter(options), router: newRpcRouter(),
handlers: make(map[string]Handler), handlers: make(map[string]Handler),
subscribers: make(map[*subscriber][]broker.Subscriber), subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error), exit: make(chan chan error),
@ -133,16 +133,29 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// set router // set router
r := s.opts.Router r := s.opts.Router
// if nil use default router
if s.opts.Router == nil { if s.opts.Router == nil {
r = s.router r = s.router
} }
// TODO: needs better error handling // create a wrapped function
if err := r.ServeRequest(ctx, request, response); err != nil { handler := func(ctx context.Context, req Request, rsp interface{}) error {
return r.ServeRequest(ctx, req, rsp.(Response))
}
for i := len(s.opts.HdlrWrappers); i > 0; i-- {
handler = s.opts.HdlrWrappers[i-1](handler)
}
// TODO: handle error better
if err := handler(ctx, request, response); err != nil {
s.wg.Done() s.wg.Done()
log.Logf("Unexpected error serving request, closing socket: %v", err) log.Logf("Unexpected error serving request, closing socket: %v", err)
return return
} }
// done
s.wg.Done() s.wg.Done()
} }
} }
@ -171,7 +184,7 @@ func (s *rpcServer) Init(opts ...Option) error {
} }
// update router // update router
r := newRpcRouter(s.opts) r := newRpcRouter()
r.serviceMap = s.router.serviceMap r.serviceMap = s.router.serviceMap
s.router = r s.router = r

View File

@ -122,7 +122,7 @@ var (
DefaultVersion = "1.0.0" DefaultVersion = "1.0.0"
DefaultId = uuid.New().String() DefaultId = uuid.New().String()
DefaultServer Server = newRpcServer() DefaultServer Server = newRpcServer()
DefaultRouter = newRpcRouter(newOptions()) DefaultRouter = newRpcRouter()
) )
// DefaultOptions returns config options for the default service // DefaultOptions returns config options for the default service

View File

@ -164,17 +164,29 @@ func validateSubscriber(sub Subscriber) error {
func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Publication) error { return func(p broker.Publication) error {
msg := p.Message() msg := p.Message()
// get codec
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
// default content type
if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType
ct = DefaultContentType
}
// get codec
cf, err := s.newCodec(ct) cf, err := s.newCodec(ct)
if err != nil { if err != nil {
return err return err
} }
// copy headers
hdr := make(map[string]string) hdr := make(map[string]string)
for k, v := range msg.Header { for k, v := range msg.Header {
hdr[k] = v hdr[k] = v
} }
delete(hdr, "Content-Type")
// create context
ctx := metadata.NewContext(context.Background(), hdr) ctx := metadata.NewContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers)) results := make(chan error, len(sb.handlers))