diff --git a/server/rpc_router.go b/server/rpc_router.go index 4d27fee6..dee976e8 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -70,11 +70,9 @@ type router struct { hdlrWrappers []HandlerWrapper } -func newRpcRouter(opts Options) *router { +func newRpcRouter() *router { return &router{ - name: opts.Name, - hdlrWrappers: opts.HdlrWrappers, - serviceMap: make(map[string]*service), + serviceMap: make(map[string]*service), } } @@ -207,10 +205,6 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, return nil } - for i := len(router.hdlrWrappers); i > 0; i-- { - fn = router.hdlrWrappers[i-1](fn) - } - errmsg := "" err := fn(ctx, r, replyv.Interface()) if err != nil { @@ -252,10 +246,6 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, } } - for i := len(router.hdlrWrappers); i > 0; i-- { - fn = router.hdlrWrappers[i-1](fn) - } - // client.Stream request r.stream = true diff --git a/server/rpc_server.go b/server/rpc_server.go index c1510905..cd7a5edb 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -38,7 +38,7 @@ func newRpcServer(opts ...Option) Server { options := newOptions(opts...) return &rpcServer{ opts: options, - router: newRpcRouter(options), + router: newRpcRouter(), handlers: make(map[string]Handler), subscribers: make(map[*subscriber][]broker.Subscriber), exit: make(chan chan error), @@ -133,16 +133,29 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // set router r := s.opts.Router + + // if nil use default router if s.opts.Router == nil { r = s.router } - // TODO: needs better error handling - if err := r.ServeRequest(ctx, request, response); err != nil { + // create a wrapped function + handler := func(ctx context.Context, req Request, rsp interface{}) error { + return r.ServeRequest(ctx, req, rsp.(Response)) + } + + for i := len(s.opts.HdlrWrappers); i > 0; i-- { + handler = s.opts.HdlrWrappers[i-1](handler) + } + + // TODO: handle error better + if err := handler(ctx, request, response); err != nil { s.wg.Done() log.Logf("Unexpected error serving request, closing socket: %v", err) return } + + // done s.wg.Done() } } @@ -171,7 +184,7 @@ func (s *rpcServer) Init(opts ...Option) error { } // update router - r := newRpcRouter(s.opts) + r := newRpcRouter() r.serviceMap = s.router.serviceMap s.router = r diff --git a/server/server.go b/server/server.go index b4ddcaa9..6f7c2712 100644 --- a/server/server.go +++ b/server/server.go @@ -122,7 +122,7 @@ var ( DefaultVersion = "1.0.0" DefaultId = uuid.New().String() DefaultServer Server = newRpcServer() - DefaultRouter = newRpcRouter(newOptions()) + DefaultRouter = newRpcRouter() ) // DefaultOptions returns config options for the default service diff --git a/server/subscriber.go b/server/subscriber.go index 9f2100c5..a02bc222 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -164,17 +164,29 @@ func validateSubscriber(sub Subscriber) error { func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { return func(p broker.Publication) error { msg := p.Message() + + // get codec ct := msg.Header["Content-Type"] + + // default content type + if len(ct) == 0 { + msg.Header["Content-Type"] = DefaultContentType + ct = DefaultContentType + } + + // get codec cf, err := s.newCodec(ct) if err != nil { return err } + // copy headers hdr := make(map[string]string) for k, v := range msg.Header { hdr[k] = v } - delete(hdr, "Content-Type") + + // create context ctx := metadata.NewContext(context.Background(), hdr) results := make(chan error, len(sb.handlers))