Experimental server side wrappers for handlers

This commit is contained in:
Asim 2015-12-02 00:47:52 +00:00
parent 0c9f8411bb
commit dae745f30f
4 changed files with 61 additions and 42 deletions

View File

@ -18,6 +18,7 @@ type options struct {
advertise string advertise string
id string id string
version string version string
wrappers []Wrapper
} }
func newOptions(opt ...Option) options { func newOptions(opt ...Option) options {
@ -153,3 +154,10 @@ func Metadata(md map[string]string) Option {
o.metadata = md o.metadata = md
} }
} }
// Adds a handler Wrapper to a list of options passed into the server
func Wrap(w Wrapper) Option {
return func(o *options) {
o.wrappers = append(o.wrappers, w)
}
}

View File

@ -28,9 +28,13 @@ type rpcServer struct {
} }
func newRpcServer(opts ...Option) Server { func newRpcServer(opts ...Option) Server {
options := newOptions(opts...)
return &rpcServer{ return &rpcServer{
opts: newOptions(opts...), opts: options,
rpc: newServer(), rpc: &server{
serviceMap: make(map[string]*service),
wrappers: options.wrappers,
},
handlers: make(map[string]Handler), handlers: make(map[string]Handler),
subscribers: make(map[*subscriber][]broker.Subscriber), subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error), exit: make(chan chan error),

View File

@ -18,11 +18,8 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const (
lastStreamResponseError = "EOS"
)
var ( var (
lastStreamResponseError = errors.New("EOS")
// A value sent as a placeholder for the server's response value when the server // A value sent as a placeholder for the server's response value when the server
// receives an invalid request. It is never decoded by the client since the Response // receives an invalid request. It is never decoded by the client since the Response
// contains an error when it is used. // contains an error when it is used.
@ -43,10 +40,6 @@ type methodType struct {
numCalls uint numCalls uint
} }
func (m *methodType) TakesContext() bool {
return m.ContextType != nil
}
func (m *methodType) NumCalls() (n uint) { func (m *methodType) NumCalls() (n uint) {
m.Lock() m.Lock()
n = m.numCalls n = m.numCalls
@ -82,10 +75,7 @@ type server struct {
freeReq *request freeReq *request
respLock sync.Mutex // protects freeResp respLock sync.Mutex // protects freeResp
freeResp *response freeResp *response
} wrappers []Wrapper
func newServer() *server {
return &server{serviceMap: make(map[string]*service)}
} }
// Is this an exported - upper case - name? // Is this an exported - upper case - name?
@ -122,11 +112,6 @@ func prepareMethod(method reflect.Method) *methodType {
} }
switch mtype.NumIn() { switch mtype.NumIn() {
case 3:
// normal method
argType = mtype.In(1)
replyType = mtype.In(2)
contextType = nil
case 4: case 4:
// method that takes a context // method that takes a context
argType = mtype.In(2) argType = mtype.In(2)
@ -259,20 +244,27 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
var returnValues []reflect.Value var returnValues []reflect.Value
if !mtype.stream { if !mtype.stream {
fn := func(ctx context.Context, req interface{}, rsp interface{}) error {
// Invoke the method, providing a new value for the reply.
if mtype.TakesContext() {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), argv, replyv}) returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), argv, replyv})
} else {
returnValues = function.Call([]reflect.Value{s.rcvr, argv, replyv}) // The return value for the method is an error.
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}
return nil
} }
// The return value for the method is an error. for i := len(server.wrappers); i > 0; i-- {
errInter := returnValues[0].Interface() fn = server.wrappers[i-1](fn)
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
} }
errmsg := ""
err := fn(ctx, argv.Interface(), replyv.Interface())
if err != nil {
errmsg = err.Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true) server.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true)
server.freeRequest(req) server.freeRequest(req)
return return
@ -314,22 +306,28 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
} }
// Invoke the method, providing a new value for the reply. // Invoke the method, providing a new value for the reply.
if mtype.TakesContext() { fn := func(ctx context.Context, req interface{}, rspFn interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), argv, reflect.ValueOf(sendReply)}) returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), argv, reflect.ValueOf(sendReply)})
} else { if err := returnValues[0].Interface(); err != nil {
returnValues = function.Call([]reflect.Value{s.rcvr, argv, reflect.ValueOf(sendReply)}) // the function returned an error, we use that
return err.(error)
} else if lastError != nil {
// we had an error inside sendReply, we use that
return lastError
} else {
// no error, we send the special EOS error
return lastStreamResponseError
}
return nil
} }
errInter := returnValues[0].Interface()
for i := len(server.wrappers); i > 0; i-- {
fn = server.wrappers[i-1](fn)
}
errmsg := "" errmsg := ""
if errInter != nil { if err := fn(ctx, argv.Interface(), reflect.ValueOf(sendReply).Interface()); err != nil {
// the function returned an error, we use that errmsg = err.Error()
errmsg = errInter.(error).Error()
} else if lastError != nil {
// we had an error inside sendReply, we use that
errmsg = lastError.Error()
} else {
// no error, we send the special EOS error
errmsg = lastStreamResponseError
} }
// this is the last packet, we don't do anything with // this is the last packet, we don't do anything with

9
server/server_wrapper.go Normal file
View File

@ -0,0 +1,9 @@
package server
import (
"golang.org/x/net/context"
)
type HandlerFunc func(ctx context.Context, req interface{}, rsp interface{}) error
type Wrapper func(HandlerFunc) HandlerFunc