Send Request and Publication types

This commit is contained in:
Asim 2015-12-02 20:56:50 +00:00
parent 4b18b779aa
commit a695e10d21
7 changed files with 103 additions and 21 deletions

View File

@ -10,8 +10,8 @@ import (
) )
func logWrapper(fn server.HandlerFunc) server.HandlerFunc { func logWrapper(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req interface{}, rsp interface{}) error { return func(ctx context.Context, req server.Request, rsp interface{}) error {
log.Infof("[Log Wrapper] Before serving request") log.Infof("[Log Wrapper] Before serving request method: %v", req.Method())
err := fn(ctx, req, rsp) err := fn(ctx, req, rsp)
log.Infof("[Log Wrapper] After serving request") log.Infof("[Log Wrapper] After serving request")
return err return err
@ -19,8 +19,8 @@ func logWrapper(fn server.HandlerFunc) server.HandlerFunc {
} }
func logSubWrapper(fn server.SubscriberFunc) server.SubscriberFunc { func logSubWrapper(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, req interface{}) error { return func(ctx context.Context, req server.Publication) error {
log.Infof("[Log Sub Wrapper] Before serving publication") log.Infof("[Log Sub Wrapper] Before serving publication topic: %v", req.Topic())
err := fn(ctx, req) err := fn(ctx, req)
log.Infof("[Log Sub Wrapper] After serving publication") log.Infof("[Log Sub Wrapper] After serving publication")
return err return err

47
server/rpc_request.go Normal file
View File

@ -0,0 +1,47 @@
package server
type rpcRequest struct {
service string
method string
contentType string
request interface{}
stream bool
}
type rpcPublication struct {
topic string
contentType string
message interface{}
}
func (r *rpcRequest) ContentType() string {
return r.contentType
}
func (r *rpcRequest) Service() string {
return r.service
}
func (r *rpcRequest) Method() string {
return r.method
}
func (r *rpcRequest) Request() interface{} {
return r.request
}
func (r *rpcRequest) Stream() bool {
return r.stream
}
func (r *rpcPublication) ContentType() string {
return r.contentType
}
func (r *rpcPublication) Topic() string {
return r.topic
}
func (r *rpcPublication) Message() interface{} {
return r.message
}

View File

@ -32,6 +32,7 @@ func newRpcServer(opts ...Option) Server {
return &rpcServer{ return &rpcServer{
opts: options, opts: options,
rpc: &server{ rpc: &server{
name: options.name,
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
wrappers: options.wrappers, wrappers: options.wrappers,
}, },
@ -47,7 +48,8 @@ func (s *rpcServer) accept(sock transport.Socket) {
return return
} }
cf, err := s.newCodec(msg.Header["Content-Type"]) ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
// TODO: needs better error handling // TODO: needs better error handling
if err != nil { if err != nil {
sock.Send(&transport.Message{ sock.Send(&transport.Message{
@ -70,8 +72,9 @@ func (s *rpcServer) accept(sock transport.Socket) {
delete(hdr, "Content-Type") delete(hdr, "Content-Type")
ctx := c.WithMetadata(context.Background(), hdr) ctx := c.WithMetadata(context.Background(), hdr)
// TODO: needs better error handling // TODO: needs better error handling
if err := s.rpc.serveRequest(ctx, codec); err != nil { if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
log.Errorf("Unexpected error serving request, closing socket: %v", err) log.Errorf("Unexpected error serving request, closing socket: %v", err)
sock.Close() sock.Close()
} }

View File

@ -69,6 +69,7 @@ type response struct {
// server represents an RPC Server. // server represents an RPC Server.
type server struct { type server struct {
name string
mu sync.Mutex // protects the serviceMap mu sync.Mutex // protects the serviceMap
serviceMap map[string]*service serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq reqLock sync.Mutex // protects freeReq
@ -229,16 +230,23 @@ func (server *server) sendResponse(sending *sync.Mutex, req *request, reply inte
return err return err
} }
func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec serverCodec) { func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec serverCodec, ct string) {
mtype.Lock() mtype.Lock()
mtype.numCalls++ mtype.numCalls++
mtype.Unlock() mtype.Unlock()
function := mtype.method.Func function := mtype.method.Func
var returnValues []reflect.Value var returnValues []reflect.Value
r := &rpcRequest{
service: s.name,
contentType: ct,
method: req.ServiceMethod,
request: argv.Interface(),
}
if !mtype.stream { if !mtype.stream {
fn := func(ctx context.Context, req interface{}, rsp interface{}) error { fn := func(ctx context.Context, req Request, rsp interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req), reflect.ValueOf(rsp)}) returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req.Request()), reflect.ValueOf(rsp)})
// The return value for the method is an error. // The return value for the method is an error.
if err := returnValues[0].Interface(); err != nil { if err := returnValues[0].Interface(); err != nil {
@ -253,11 +261,12 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
} }
errmsg := "" errmsg := ""
err := fn(ctx, argv.Interface(), replyv.Interface()) err := fn(ctx, r, replyv.Interface())
if err != nil { if err != nil {
errmsg = err.Error() errmsg = err.Error()
} }
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true) server.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true)
server.freeRequest(req) server.freeRequest(req)
return return
@ -299,8 +308,8 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
} }
// Invoke the method, providing a new value for the reply. // Invoke the method, providing a new value for the reply.
fn := func(ctx context.Context, req interface{}, rspFn interface{}) error { fn := func(ctx context.Context, req Request, rspFn interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req), reflect.ValueOf(rspFn)}) returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req.Request()), reflect.ValueOf(rspFn)})
if err := returnValues[0].Interface(); err != nil { if err := returnValues[0].Interface(); err != nil {
// the function returned an error, we use that // the function returned an error, we use that
return err.(error) return err.(error)
@ -318,8 +327,11 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
fn = server.wrappers[i-1](fn) fn = server.wrappers[i-1](fn)
} }
// client.Stream request
r.stream = true
errmsg := "" errmsg := ""
if err := fn(ctx, argv.Interface(), reflect.ValueOf(sendReply).Interface()); err != nil { if err := fn(ctx, r, reflect.ValueOf(sendReply).Interface()); err != nil {
errmsg = err.Error() errmsg = err.Error()
} }
@ -337,7 +349,7 @@ func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
return reflect.Zero(m.ContextType) return reflect.Zero(m.ContextType)
} }
func (server *server) serveRequest(ctx context.Context, codec serverCodec) error { func (server *server) serveRequest(ctx context.Context, codec serverCodec, ct string) error {
sending := new(sync.Mutex) sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil { if err != nil {
@ -351,7 +363,7 @@ func (server *server) serveRequest(ctx context.Context, codec serverCodec) error
} }
return err return err
} }
service.call(ctx, server, sending, mtype, req, argv, replyv, codec) service.call(ctx, server, sending, mtype, req, argv, replyv, codec, ct)
return nil return nil
} }

View File

@ -31,6 +31,21 @@ type Server interface {
Stop() error Stop() error
} }
type Publication interface {
Topic() string
Message() interface{}
ContentType() string
}
type Request interface {
Service() string
Method() string
ContentType() string
Request() interface{}
Stream() bool
}
type Option func(*options) type Option func(*options)
var ( var (

View File

@ -4,9 +4,9 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
type HandlerFunc func(ctx context.Context, req interface{}, rsp interface{}) error type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
type SubscriberFunc func(ctx context.Context, msg interface{}) error type SubscriberFunc func(ctx context.Context, msg Publication) error
type HandlerWrapper func(HandlerFunc) HandlerFunc type HandlerWrapper func(HandlerFunc) HandlerFunc

View File

@ -156,7 +156,8 @@ func validateSubscriber(sub Subscriber) error {
func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler { func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler {
return func(msg *broker.Message) { return func(msg *broker.Message) {
cf, err := s.newCodec(msg.Header["Content-Type"]) ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
if err != nil { if err != nil {
return return
} }
@ -196,7 +197,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle
continue continue
} }
fn := func(ctx context.Context, msg interface{}) error { fn := func(ctx context.Context, msg Publication) error {
var vals []reflect.Value var vals []reflect.Value
if sb.typ.Kind() != reflect.Func { if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr) vals = append(vals, sb.rcvr)
@ -205,7 +206,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle
vals = append(vals, reflect.ValueOf(ctx)) vals = append(vals, reflect.ValueOf(ctx))
} }
vals = append(vals, reflect.ValueOf(msg)) vals = append(vals, reflect.ValueOf(msg.Message()))
returnValues := handler.method.Call(vals) returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil { if err := returnValues[0].Interface(); err != nil {
@ -218,7 +219,11 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle
fn = opts.subWrappers[i-1](fn) fn = opts.subWrappers[i-1](fn)
} }
go fn(ctx, req.Interface()) go fn(ctx, &rpcPublication{
topic: sb.topic,
contentType: ct,
message: req.Interface(),
})
} }
} }
} }