micro/server/rpcplus_server.go

465 lines
12 KiB
Go

package server
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// Meh, we need to get rid of this shit
import (
"errors"
"io"
"log"
"reflect"
"strings"
"sync"
"unicode"
"unicode/utf8"
"golang.org/x/net/context"
)
var (
lastStreamResponseError = errors.New("EOS")
// A value sent as a placeholder for the server's response value when the server
// receives an invalid request. It is never decoded by the client since the Response
// contains an error when it is used.
invalidRequest = struct{}{}
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
)
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
ContextType reflect.Type
stream bool
numCalls uint
}
func (m *methodType) NumCalls() (n uint) {
m.Lock()
n = m.numCalls
m.Unlock()
return n
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
type request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *request // for free list in Server
}
type response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *response // for free list in Server
}
// server represents an RPC Server.
type server struct {
name string
mu sync.Mutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *request
respLock sync.Mutex // protects freeResp
freeResp *response
hdlrWrappers []HandlerWrapper
}
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// prepareMethod returns a methodType for the provided method or nil
// in case if the method was unsuitable.
func prepareMethod(method reflect.Method) *methodType {
mtype := method.Type
mname := method.Name
var replyType, argType, contextType reflect.Type
var stream bool
// Method must be exported.
if method.PkgPath != "" {
return nil
}
switch mtype.NumIn() {
case 3:
// assuming streaming
argType = mtype.In(2)
contextType = mtype.In(1)
stream = true
case 4:
// method that takes a context
argType = mtype.In(2)
replyType = mtype.In(3)
contextType = mtype.In(1)
default:
log.Println("method", mname, "of", mtype, "has wrong number of ins:", mtype.NumIn())
return nil
}
if stream {
// check stream type
streamType := reflect.TypeOf((*Streamer)(nil)).Elem()
if !argType.Implements(streamType) {
log.Println(mname, "argument does not implement Streamer interface:", argType)
return nil
}
} else {
// if not stream check the replyType
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
log.Println(mname, "argument type not exported:", argType)
return nil
}
if replyType.Kind() != reflect.Ptr {
log.Println("method", mname, "reply type not a pointer:", replyType)
return nil
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
log.Println("method", mname, "reply type not exported:", replyType)
return nil
}
}
// Method needs one out.
if mtype.NumOut() != 1 {
log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
return nil
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
log.Println("method", mname, "returns", returnType.String(), "not error")
return nil
}
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
}
func (server *server) register(rcvr interface{}) error {
server.mu.Lock()
defer server.mu.Unlock()
if server.serviceMap == nil {
server.serviceMap = make(map[string]*service)
}
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if sname == "" {
log.Fatal("rpc: no service name for type", s.typ.String())
}
if !isExported(sname) {
s := "rpc Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
if _, present := server.serviceMap[sname]; present {
return errors.New("rpc: service already defined: " + sname)
}
s.name = sname
s.method = make(map[string]*methodType)
// Install the methods
for m := 0; m < s.typ.NumMethod(); m++ {
method := s.typ.Method(m)
if mt := prepareMethod(method); mt != nil {
s.method[method.Name] = mt
}
}
if len(s.method) == 0 {
s := "rpc Register: type " + sname + " has no exported methods of suitable type"
log.Print(s)
return errors.New(s)
}
server.serviceMap[s.name] = s
return nil
}
func (server *server) sendResponse(sending *sync.Mutex, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
reply = invalidRequest
}
resp.Seq = req.Seq
sending.Lock()
err = codec.WriteResponse(resp, reply, last)
if err != nil {
log.Println("rpc: writing response:", err)
}
sending.Unlock()
server.freeResponse(resp)
return err
}
func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, codec serverCodec, ct string) {
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
var returnValues []reflect.Value
r := &rpcRequest{
service: s.name,
contentType: ct,
method: req.ServiceMethod,
}
if !mtype.stream {
r.request = argv.Interface()
fn := func(ctx context.Context, req Request, rsp interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req.Request()), reflect.ValueOf(rsp)})
// The return value for the method is an error.
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}
return nil
}
for i := len(server.hdlrWrappers); i > 0; i-- {
fn = server.hdlrWrappers[i-1](fn)
}
errmsg := ""
err := fn(ctx, r, replyv.Interface())
if err != nil {
errmsg = err.Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true)
server.freeRequest(req)
return
}
// declare a local error to see if we errored out already
// keep track of the type, to make sure we return
// the same one consistently
var lastError error
stream := &rpcStream{
context: ctx,
codec: codec,
request: r,
seq: req.Seq,
}
// Invoke the method, providing a new value for the reply.
fn := func(ctx context.Context, req Request, stream interface{}) error {
returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(stream)})
if err := returnValues[0].Interface(); err != nil {
// the function returned an error, we use that
return err.(error)
} else if lastError != nil {
// we had an error inside sendReply, we use that
return lastError
} else {
// no error, we send the special EOS error
return lastStreamResponseError
}
return nil
}
for i := len(server.hdlrWrappers); i > 0; i-- {
fn = server.hdlrWrappers[i-1](fn)
}
// client.Stream request
r.stream = true
errmsg := ""
if err := fn(ctx, r, stream); err != nil {
errmsg = err.Error()
}
// this is the last packet, we don't do anything with
// the error here (well sendStreamResponse will log it
// already)
server.sendResponse(sending, req, nil, codec, errmsg, true)
server.freeRequest(req)
}
func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
return contextv
}
return reflect.Zero(m.ContextType)
}
func (server *server) serveRequest(ctx context.Context, codec serverCodec, ct string) error {
sending := new(sync.Mutex)
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if !keepReading {
return err
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error(), true)
server.freeRequest(req)
}
return err
}
service.call(ctx, server, sending, mtype, req, argv, replyv, codec, ct)
return nil
}
func (server *server) getRequest() *request {
server.reqLock.Lock()
req := server.freeReq
if req == nil {
req = new(request)
} else {
server.freeReq = req.next
*req = request{}
}
server.reqLock.Unlock()
return req
}
func (server *server) freeRequest(req *request) {
server.reqLock.Lock()
req.next = server.freeReq
server.freeReq = req
server.reqLock.Unlock()
}
func (server *server) getResponse() *response {
server.respLock.Lock()
resp := server.freeResp
if resp == nil {
resp = new(response)
} else {
server.freeResp = resp.next
*resp = response{}
}
server.respLock.Unlock()
return resp
}
func (server *server) freeResponse(resp *response) {
server.respLock.Lock()
resp.next = server.freeResp
server.freeResp = resp
server.respLock.Unlock()
}
func (server *server) readRequest(codec serverCodec) (service *service, mtype *methodType, req *request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
if err != nil {
if !keepReading {
return
}
// discard body
codec.ReadRequestBody(nil)
return
}
// is it a streaming request? then we don't read the body
if mtype.stream {
codec.ReadRequestBody(nil)
return
}
// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
argIsValue = true
}
// argv guaranteed to be a pointer now.
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
return
}
if argIsValue {
argv = argv.Elem()
}
if !mtype.stream {
replyv = reflect.New(mtype.ReplyType.Elem())
}
return
}
func (server *server) readRequestHeader(codec serverCodec) (service *service, mtype *methodType, req *request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
err = codec.ReadRequestHeader(req, true)
if err != nil {
req = nil
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
err = errors.New("rpc: server cannot decode request: " + err.Error())
return
}
// We read the header successfully. If we see an error now,
// we can still recover and move on to the next request.
keepReading = true
serviceMethod := strings.Split(req.ServiceMethod, ".")
if len(serviceMethod) != 2 {
err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
return
}
// Look up the request.
server.mu.Lock()
service = server.serviceMap[serviceMethod[0]]
server.mu.Unlock()
if service == nil {
err = errors.New("rpc: can't find service " + req.ServiceMethod)
return
}
mtype = service.method[serviceMethod[1]]
if mtype == nil {
err = errors.New("rpc: can't find method " + req.ServiceMethod)
}
return
}
type serverCodec interface {
ReadRequestHeader(*request, bool) error
ReadRequestBody(interface{}) error
WriteResponse(*response, interface{}, bool) error
Close() error
}