add needed files
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
e265a6f83f
commit
d797edb9a7
15
LICENSE
15
LICENSE
@ -1,3 +1,4 @@
|
|||||||
|
|
||||||
Apache License
|
Apache License
|
||||||
Version 2.0, January 2004
|
Version 2.0, January 2004
|
||||||
http://www.apache.org/licenses/
|
http://www.apache.org/licenses/
|
||||||
@ -175,18 +176,8 @@
|
|||||||
|
|
||||||
END OF TERMS AND CONDITIONS
|
END OF TERMS AND CONDITIONS
|
||||||
|
|
||||||
APPENDIX: How to apply the Apache License to your work.
|
Copyright 2015-2020 Asim Aslam.
|
||||||
|
Copyright 2019-2020 Unistack LLC.
|
||||||
To apply the Apache License to your work, attach the following
|
|
||||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
|
||||||
replaced with your own identifying information. (Don't include
|
|
||||||
the brackets!) The text should be enclosed in the appropriate
|
|
||||||
comment syntax for the file format. We also recommend that a
|
|
||||||
file or class name and description of purpose be included on the
|
|
||||||
same "printed page" as the copyright notice for easier
|
|
||||||
identification within third-party archives.
|
|
||||||
|
|
||||||
Copyright [yyyy] [name of copyright owner]
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
|
54
codec.go
Normal file
54
codec.go
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/codec"
|
||||||
|
"storj.io/drpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type wrapMicroCodec struct{ codec.Codec }
|
||||||
|
|
||||||
|
func (w *wrapMicroCodec) Marshal(v drpc.Message) ([]byte, error) {
|
||||||
|
if m, ok := v.(*codec.Frame); ok {
|
||||||
|
return m.Data, nil
|
||||||
|
}
|
||||||
|
return w.Codec.Marshal(v.(interface{}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapMicroCodec) Unmarshal(d []byte, v drpc.Message) error {
|
||||||
|
if d == nil || v == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if m, ok := v.(*codec.Frame); ok {
|
||||||
|
m.Data = d
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return w.Codec.Unmarshal(d, v.(interface{}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapMicroCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapMicroCodec) ReadBody(conn io.Reader, v interface{}) error {
|
||||||
|
if m, ok := v.(*codec.Frame); ok {
|
||||||
|
_, err := conn.Read(m.Data)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return codec.ErrInvalidMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapMicroCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
|
||||||
|
// if we don't have a body
|
||||||
|
if v != nil {
|
||||||
|
b, err := w.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.Body = b
|
||||||
|
}
|
||||||
|
// write the body using the framing codec
|
||||||
|
_, err := conn.Write(m.Body)
|
||||||
|
return err
|
||||||
|
}
|
961
drpc.go
Normal file
961
drpc.go
Normal file
@ -0,0 +1,961 @@
|
|||||||
|
// Package drpc provides a drpc server
|
||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"reflect"
|
||||||
|
"runtime/debug"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
// nolint: staticcheck
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/broker"
|
||||||
|
"github.com/unistack-org/micro/v3/codec"
|
||||||
|
"github.com/unistack-org/micro/v3/errors"
|
||||||
|
"github.com/unistack-org/micro/v3/logger"
|
||||||
|
metadata "github.com/unistack-org/micro/v3/metadata"
|
||||||
|
"github.com/unistack-org/micro/v3/register"
|
||||||
|
"github.com/unistack-org/micro/v3/server"
|
||||||
|
"golang.org/x/net/netutil"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/peer"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
"storj.io/drpc/drpcerr"
|
||||||
|
gmetadata "storj.io/drpc/drpcmetadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultContentType = "application/grpc+proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
type grpcServerReflection struct {
|
||||||
|
srv *grpc.Server
|
||||||
|
s *serverReflectionServer
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
type grpcServer struct {
|
||||||
|
handlers map[string]server.Handler
|
||||||
|
srv *grpc.Server
|
||||||
|
exit chan chan error
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
rsvc *register.Service
|
||||||
|
subscribers map[*subscriber][]broker.Subscriber
|
||||||
|
rpc *rServer
|
||||||
|
opts server.Options
|
||||||
|
sync.RWMutex
|
||||||
|
init bool
|
||||||
|
started bool
|
||||||
|
registered bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCServer(opts ...server.Option) server.Server {
|
||||||
|
// create a grpc server
|
||||||
|
g := &grpcServer{
|
||||||
|
opts: server.NewOptions(opts...),
|
||||||
|
rpc: &rServer{
|
||||||
|
serviceMap: make(map[string]*service),
|
||||||
|
},
|
||||||
|
handlers: make(map[string]server.Handler),
|
||||||
|
subscribers: make(map[*subscriber][]broker.Subscriber),
|
||||||
|
exit: make(chan chan error),
|
||||||
|
}
|
||||||
|
|
||||||
|
return g
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
type grpcRouter struct {
|
||||||
|
h func(context.Context, server.Request, interface{}) error
|
||||||
|
m func(context.Context, server.Message) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error {
|
||||||
|
return r.m(ctx, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
|
||||||
|
return r.h(ctx, req, rsp)
|
||||||
|
}
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
func (g *grpcServer) configure(opts ...server.Option) error {
|
||||||
|
g.Lock()
|
||||||
|
defer g.Unlock()
|
||||||
|
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&g.opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := g.opts.Register.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Broker.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Tracer.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Auth.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Logger.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Meter.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := g.opts.Transport.Init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
g.wg = g.opts.Wait
|
||||||
|
|
||||||
|
maxMsgSize := g.getMaxMsgSize()
|
||||||
|
|
||||||
|
gopts := []grpc.ServerOption{
|
||||||
|
grpc.MaxRecvMsgSize(maxMsgSize),
|
||||||
|
grpc.MaxSendMsgSize(maxMsgSize),
|
||||||
|
grpc.UnknownServiceHandler(g.handler),
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
if creds := g.getCredentials(); creds != nil {
|
||||||
|
gopts = append(gopts, grpc.Creds(creds))
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts := g.getGrpcOptions(); opts != nil {
|
||||||
|
gopts = append(gopts, opts...)
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
g.rsvc = nil
|
||||||
|
restart := false
|
||||||
|
if g.started {
|
||||||
|
restart = true
|
||||||
|
if err := g.Stop(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
g.srv = grpc.NewServer(gopts...)
|
||||||
|
|
||||||
|
if restart {
|
||||||
|
return g.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
g.init = true
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) getMaxMsgSize() int {
|
||||||
|
if g.opts.Context == nil {
|
||||||
|
return codec.DefaultMaxMsgSize
|
||||||
|
}
|
||||||
|
s, ok := g.opts.Context.Value(maxMsgSizeKey{}).(int)
|
||||||
|
if !ok {
|
||||||
|
return codec.DefaultMaxMsgSize
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
func (g *grpcServer) getCredentials() credentials.TransportCredentials {
|
||||||
|
if g.opts.TLSConfig != nil {
|
||||||
|
return credentials.NewTLS(g.opts.TLSConfig)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
/*
|
||||||
|
func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
||||||
|
if g.opts.Context == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
opts, ok := g.opts.Context.Value(grpcOptions{}).([]grpc.ServerOption)
|
||||||
|
if !ok || opts == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return opts
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
g.RLock()
|
||||||
|
config := g.opts
|
||||||
|
g.RUnlock()
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Error(config.Context, "panic recovered: ", r)
|
||||||
|
config.Logger.Error(config.Context, string(debug.Stack()))
|
||||||
|
}
|
||||||
|
err = errors.InternalServerError(g.opts.Name, "panic recovered: %v", r)
|
||||||
|
} else if err != nil {
|
||||||
|
g.RLock()
|
||||||
|
config := g.opts
|
||||||
|
g.RUnlock()
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "grpc handler got error: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if g.wg != nil {
|
||||||
|
g.wg.Add(1)
|
||||||
|
defer g.wg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
||||||
|
if !ok {
|
||||||
|
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceName, methodName, err := serviceMethod(fullMethod)
|
||||||
|
if err != nil {
|
||||||
|
return status.New(StatusInvalidArgument, err.Error()).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// get drpc metadata
|
||||||
|
dmd, _ := gmetadata.Get(stream.Context())
|
||||||
|
|
||||||
|
md := metadata.New(len(dmd))
|
||||||
|
for k, v := range dmd {
|
||||||
|
md.Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// timeout for server deadline
|
||||||
|
to, ok := md.Get("timeout")
|
||||||
|
if ok {
|
||||||
|
md.Del("timeout")
|
||||||
|
}
|
||||||
|
|
||||||
|
// get content type
|
||||||
|
ct := defaultContentType
|
||||||
|
|
||||||
|
if ctype, ok := md.Get("content-type"); ok {
|
||||||
|
ct = ctype
|
||||||
|
} else if ctype, ok := md.Get("x-content-type"); ok {
|
||||||
|
ct = ctype
|
||||||
|
md.Del("x-content-type")
|
||||||
|
}
|
||||||
|
|
||||||
|
// create new context
|
||||||
|
ctx := metadata.NewIncomingContext(stream.Context(), md)
|
||||||
|
|
||||||
|
// get peer from context
|
||||||
|
if p, ok := peer.FromContext(stream.Context()); ok {
|
||||||
|
md["Remote"] = p.Addr.String()
|
||||||
|
ctx = peer.NewContext(ctx, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the timeout if we have it
|
||||||
|
if len(to) > 0 {
|
||||||
|
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, time.Duration(n))
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
g.rpc.mu.RLock()
|
||||||
|
svc := g.rpc.serviceMap[serviceName]
|
||||||
|
g.rpc.mu.RUnlock()
|
||||||
|
|
||||||
|
/*
|
||||||
|
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
|
||||||
|
rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
|
||||||
|
svc = &service{}
|
||||||
|
svc.typ = reflect.TypeOf(rfl)
|
||||||
|
svc.rcvr = reflect.ValueOf(rfl)
|
||||||
|
svc.name = reflect.Indirect(svc.rcvr).Type().Name()
|
||||||
|
svc.method = make(map[string]*methodType)
|
||||||
|
typ := reflect.TypeOf(rfl)
|
||||||
|
if me, ok := typ.MethodByName("ServerReflectionInfo"); ok {
|
||||||
|
g.rpc.mu.Lock()
|
||||||
|
ep, err := prepareEndpoint(me)
|
||||||
|
if ep != nil && err != nil {
|
||||||
|
svc.method["ServerReflectionInfo"] = ep
|
||||||
|
} else if err != nil {
|
||||||
|
return status.New(codes.Unimplemented, err.Error()).Err()
|
||||||
|
}
|
||||||
|
g.rpc.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
if svc == nil {
|
||||||
|
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
mtype := svc.method[methodName]
|
||||||
|
if mtype == nil {
|
||||||
|
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s.%s", serviceName, methodName)).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// process unary
|
||||||
|
if !mtype.stream {
|
||||||
|
return g.processRequest(ctx, stream, svc, mtype, ct)
|
||||||
|
}
|
||||||
|
|
||||||
|
// process stream
|
||||||
|
return g.processStream(ctx, stream, svc, mtype, ct)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||||
|
// for {
|
||||||
|
var argv, replyv reflect.Value
|
||||||
|
|
||||||
|
// Decode the argument value.
|
||||||
|
argIsValue := false // if true, need to indirect before calling.
|
||||||
|
if mtype.ArgType.Kind() == reflect.Ptr {
|
||||||
|
argv = reflect.New(mtype.ArgType.Elem())
|
||||||
|
} else {
|
||||||
|
argv = reflect.New(mtype.ArgType)
|
||||||
|
argIsValue = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal request
|
||||||
|
if err := stream.RecvMsg(argv.Interface()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if argIsValue {
|
||||||
|
argv = argv.Elem()
|
||||||
|
}
|
||||||
|
|
||||||
|
// reply value
|
||||||
|
replyv = reflect.New(mtype.ReplyType.Elem())
|
||||||
|
|
||||||
|
function := mtype.method.Func
|
||||||
|
var returnValues []reflect.Value
|
||||||
|
|
||||||
|
cf, err := g.newCodec(ct)
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError(g.opts.Name, err.Error())
|
||||||
|
}
|
||||||
|
b, err := cf.Marshal(argv.Interface())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a client.Request
|
||||||
|
r := &rpcRequest{
|
||||||
|
service: g.opts.Name,
|
||||||
|
contentType: ct,
|
||||||
|
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||||
|
body: b,
|
||||||
|
payload: argv.Interface(),
|
||||||
|
}
|
||||||
|
// define the handler func
|
||||||
|
fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
|
||||||
|
returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
|
||||||
|
|
||||||
|
// The return value for the method is an error.
|
||||||
|
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||||
|
err = rerr.(error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// wrap the handler func
|
||||||
|
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
|
||||||
|
fn = g.opts.HdlrWrappers[i-1](fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
statusCode := StatusOK
|
||||||
|
statusDesc := ""
|
||||||
|
|
||||||
|
// execute the handler
|
||||||
|
if appErr := fn(ctx, r, replyv.Interface()); appErr != nil {
|
||||||
|
switch verr := appErr.(type) {
|
||||||
|
case *errors.Error:
|
||||||
|
statusCode = microError(verr)
|
||||||
|
statusDesc = verr.Error()
|
||||||
|
case proto.Message:
|
||||||
|
// user defined error that proto based we can attach it to grpc status
|
||||||
|
statusCode = convertCode(appErr)
|
||||||
|
statusDesc = appErr.Error()
|
||||||
|
default:
|
||||||
|
g.RLock()
|
||||||
|
config := g.opts
|
||||||
|
g.RUnlock()
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Warn(config.Context, "handler error will not be transferred properly, must return *errors.Error or drpcerr.Error")
|
||||||
|
}
|
||||||
|
// default case user pass own error type that not proto based
|
||||||
|
statusCode = convertCode(verr)
|
||||||
|
statusDesc = verr.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
return drpcerr.WithCode(fmt.Errorf(statusDesc), uint64(statusCode))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stream.SendMsg(replyv.Interface()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
type reflectStream struct {
|
||||||
|
stream server.Stream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error {
|
||||||
|
return s.stream.Send(rsp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) {
|
||||||
|
req := &grpcreflect.ServerReflectionRequest{}
|
||||||
|
err := s.stream.Recv(req)
|
||||||
|
return req, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *reflectStream) SetHeader(gmetadata.MD) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *reflectStream) SendHeader(gmetadata.MD) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *reflectStream) SetTrailer(gmetadata.MD) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *reflectStream) Context() context.Context {
|
||||||
|
return s.stream.Context()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *reflectStream) SendMsg(m interface{}) error {
|
||||||
|
return s.stream.Send(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *reflectStream) RecvMsg(m interface{}) error {
|
||||||
|
return s.stream.Recv(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
||||||
|
return g.s.ServerReflectionInfo(&reflectStream{stream})
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||||
|
opts := g.opts
|
||||||
|
|
||||||
|
r := &rpcRequest{
|
||||||
|
service: opts.Name,
|
||||||
|
contentType: ct,
|
||||||
|
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||||
|
stream: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
ss := &rpcStream{
|
||||||
|
ServerStream: stream,
|
||||||
|
request: r,
|
||||||
|
}
|
||||||
|
|
||||||
|
function := mtype.method.Func
|
||||||
|
var returnValues []reflect.Value
|
||||||
|
|
||||||
|
// Invoke the method, providing a new value for the reply.
|
||||||
|
fn := func(ctx context.Context, req server.Request, stream interface{}) error {
|
||||||
|
returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(stream)})
|
||||||
|
if err := returnValues[0].Interface(); err != nil {
|
||||||
|
return err.(error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := len(opts.HdlrWrappers); i > 0; i-- {
|
||||||
|
fn = opts.HdlrWrappers[i-1](fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
statusCode := StatusOK
|
||||||
|
statusDesc := ""
|
||||||
|
|
||||||
|
if appErr := fn(ctx, r, ss); appErr != nil {
|
||||||
|
switch verr := appErr.(type) {
|
||||||
|
case *errors.Error:
|
||||||
|
statusCode = microError(verr)
|
||||||
|
statusDesc = verr.Error()
|
||||||
|
case proto.Message:
|
||||||
|
// user defined error that proto based we can attach it to grpc status
|
||||||
|
statusCode = convertCode(appErr)
|
||||||
|
statusDesc = appErr.Error()
|
||||||
|
default:
|
||||||
|
if g.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
g.opts.Logger.Error(g.opts.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
|
||||||
|
}
|
||||||
|
// default case user pass own error type that not proto based
|
||||||
|
statusCode = convertCode(verr)
|
||||||
|
statusDesc = verr.Error()
|
||||||
|
}
|
||||||
|
return drpcerr.WithCode(fmt.Errorf(statusDesc), uint64(statusCode))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
|
||||||
|
g.RLock()
|
||||||
|
defer g.RUnlock()
|
||||||
|
|
||||||
|
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
||||||
|
ct = ct[:idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
if c, ok := g.opts.Codecs[ct]; ok {
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, codec.ErrUnknownContentType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) Options() server.Options {
|
||||||
|
g.RLock()
|
||||||
|
opts := g.opts
|
||||||
|
g.RUnlock()
|
||||||
|
|
||||||
|
return opts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) Init(opts ...server.Option) error {
|
||||||
|
if len(opts) == 0 && g.init {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return g.configure(opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
|
||||||
|
return newRPCHandler(h, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) Handle(h server.Handler) error {
|
||||||
|
if err := g.rpc.register(h.Handler()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
g.handlers[h.Name()] = h
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||||
|
return newSubscriber(topic, sb, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
||||||
|
sub, ok := sb.(*subscriber)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid subscriber: expected *subscriber")
|
||||||
|
}
|
||||||
|
if len(sub.handlers) == 0 {
|
||||||
|
return fmt.Errorf("invalid subscriber: no handler functions")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := server.ValidateSubscriber(sb); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
g.Lock()
|
||||||
|
if _, ok = g.subscribers[sub]; ok {
|
||||||
|
g.Unlock()
|
||||||
|
return fmt.Errorf("subscriber %v already exists", sub)
|
||||||
|
}
|
||||||
|
|
||||||
|
g.subscribers[sub] = nil
|
||||||
|
g.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) Register() error {
|
||||||
|
g.RLock()
|
||||||
|
rsvc := g.rsvc
|
||||||
|
config := g.opts
|
||||||
|
g.RUnlock()
|
||||||
|
|
||||||
|
// if service already filled, reuse it and return early
|
||||||
|
if rsvc != nil {
|
||||||
|
if err := server.DefaultRegisterFunc(rsvc, config); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
service, err := server.NewRegisterService(g)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
g.RLock()
|
||||||
|
// Maps are ordered randomly, sort the keys for consistency
|
||||||
|
handlerList := make([]string, 0, len(g.handlers))
|
||||||
|
for n := range g.handlers {
|
||||||
|
// Only advertise non internal handlers
|
||||||
|
handlerList = append(handlerList, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(handlerList)
|
||||||
|
|
||||||
|
subscriberList := make([]*subscriber, 0, len(g.subscribers))
|
||||||
|
for e := range g.subscribers {
|
||||||
|
// Only advertise non internal subscribers
|
||||||
|
subscriberList = append(subscriberList, e)
|
||||||
|
}
|
||||||
|
sort.Slice(subscriberList, func(i, j int) bool {
|
||||||
|
return subscriberList[i].topic > subscriberList[j].topic
|
||||||
|
})
|
||||||
|
|
||||||
|
endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList))
|
||||||
|
for _, n := range handlerList {
|
||||||
|
endpoints = append(endpoints, g.handlers[n].Endpoints()...)
|
||||||
|
}
|
||||||
|
for _, e := range subscriberList {
|
||||||
|
endpoints = append(endpoints, e.Endpoints()...)
|
||||||
|
}
|
||||||
|
g.RUnlock()
|
||||||
|
|
||||||
|
service.Nodes[0].Metadata["protocol"] = "grpc"
|
||||||
|
service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"]
|
||||||
|
service.Endpoints = endpoints
|
||||||
|
|
||||||
|
g.RLock()
|
||||||
|
registered := g.registered
|
||||||
|
g.RUnlock()
|
||||||
|
|
||||||
|
if !registered {
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// register the service
|
||||||
|
if err := server.DefaultRegisterFunc(service, config); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// already registered? don't need to register subscribers
|
||||||
|
if registered {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
g.Lock()
|
||||||
|
defer g.Unlock()
|
||||||
|
|
||||||
|
for sb := range g.subscribers {
|
||||||
|
handler := g.createSubHandler(sb, config)
|
||||||
|
var opts []broker.SubscribeOption
|
||||||
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
subCtx := config.Context
|
||||||
|
if cx := sb.Options().Context; cx != nil {
|
||||||
|
subCtx = cx
|
||||||
|
}
|
||||||
|
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||||
|
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
||||||
|
}
|
||||||
|
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
g.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
|
}
|
||||||
|
|
||||||
|
g.registered = true
|
||||||
|
g.rsvc = service
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) Deregister() error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
g.RLock()
|
||||||
|
config := g.opts
|
||||||
|
g.RUnlock()
|
||||||
|
|
||||||
|
service, err := server.NewRegisterService(g)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := server.DefaultDeregisterFunc(service, config); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
g.Lock()
|
||||||
|
g.rsvc = nil
|
||||||
|
|
||||||
|
if !g.registered {
|
||||||
|
g.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
g.registered = false
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for sb, subs := range g.subscribers {
|
||||||
|
for _, sub := range subs {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(s broker.Subscriber) {
|
||||||
|
defer wg.Done()
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", s.Topic())
|
||||||
|
}
|
||||||
|
if err := s.Unsubscribe(g.opts.Context); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Unsubscribing from topic: %s err: %v", s.Topic(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(sub)
|
||||||
|
}
|
||||||
|
g.subscribers[sb] = nil
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
g.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) Start() error {
|
||||||
|
g.RLock()
|
||||||
|
if g.started {
|
||||||
|
g.RUnlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
g.RUnlock()
|
||||||
|
|
||||||
|
config := g.Options()
|
||||||
|
|
||||||
|
// micro: config.Transport.Listen(config.Address)
|
||||||
|
var ts net.Listener
|
||||||
|
|
||||||
|
if l := config.Listener; l != nil {
|
||||||
|
ts = l
|
||||||
|
} else {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// check the tls config for secure connect
|
||||||
|
if tc := config.TLSConfig; tc != nil {
|
||||||
|
ts, err = tls.Listen("tcp", config.Address, tc)
|
||||||
|
// otherwise just plain tcp listener
|
||||||
|
} else {
|
||||||
|
ts, err = net.Listen("tcp", config.Address)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.MaxConn > 0 {
|
||||||
|
ts = netutil.LimitListener(ts, config.MaxConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Server [grpc] Listening on %s", ts.Addr().String())
|
||||||
|
}
|
||||||
|
g.Lock()
|
||||||
|
g.opts.Address = ts.Addr().String()
|
||||||
|
if len(g.opts.Advertise) == 0 {
|
||||||
|
g.opts.Advertise = ts.Addr().String()
|
||||||
|
}
|
||||||
|
g.Unlock()
|
||||||
|
|
||||||
|
// only connect if we're subscribed
|
||||||
|
if len(g.subscribers) > 0 {
|
||||||
|
// connect to the broker
|
||||||
|
if err := config.Broker.Connect(config.Context); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Broker [%s] connect error: %v", config.Broker.String(), err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// use RegisterCheck func before register
|
||||||
|
// nolint: nestif
|
||||||
|
if err := g.opts.RegisterCheck(config.Context); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// announce self to the world
|
||||||
|
if err := g.Register(); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Server register error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// micro: go ts.Accept(s.accept)
|
||||||
|
go func() {
|
||||||
|
if err := g.srv.Serve(ts); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "gRPC Server start error: %v", err)
|
||||||
|
}
|
||||||
|
if err := g.Stop(); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "gRPC Server stop error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
t := new(time.Ticker)
|
||||||
|
|
||||||
|
// only process if it exists
|
||||||
|
if g.opts.RegisterInterval > time.Duration(0) {
|
||||||
|
// new ticker
|
||||||
|
t = time.NewTicker(g.opts.RegisterInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
// return error chan
|
||||||
|
var ch chan error
|
||||||
|
|
||||||
|
Loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// register self on interval
|
||||||
|
case <-t.C:
|
||||||
|
g.RLock()
|
||||||
|
registered := g.registered
|
||||||
|
g.RUnlock()
|
||||||
|
rerr := g.opts.RegisterCheck(g.opts.Context)
|
||||||
|
// nolint: nestif
|
||||||
|
if rerr != nil && registered {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
|
||||||
|
}
|
||||||
|
// deregister self in case of error
|
||||||
|
if err := g.Deregister(); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if rerr != nil && !registered {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := g.Register(); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// wait for exit
|
||||||
|
case ch = <-g.exit:
|
||||||
|
break Loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// deregister self
|
||||||
|
if err := g.Deregister(); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Server deregister error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for waitgroup
|
||||||
|
if g.wg != nil {
|
||||||
|
g.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop the grpc server
|
||||||
|
exit := make(chan bool)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
g.srv.GracefulStop()
|
||||||
|
close(exit)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-exit:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
g.srv.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// close transport
|
||||||
|
ch <- nil
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
|
||||||
|
}
|
||||||
|
// disconnect broker
|
||||||
|
if err := config.Broker.Disconnect(config.Context); err != nil {
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Errorf(config.Context, "Broker [%s] disconnect error: %v", config.Broker.String(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// mark the server as started
|
||||||
|
g.Lock()
|
||||||
|
g.started = true
|
||||||
|
g.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) Stop() error {
|
||||||
|
g.RLock()
|
||||||
|
if !g.started {
|
||||||
|
g.RUnlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
g.RUnlock()
|
||||||
|
|
||||||
|
ch := make(chan error)
|
||||||
|
g.exit <- ch
|
||||||
|
|
||||||
|
err := <-ch
|
||||||
|
g.Lock()
|
||||||
|
g.rsvc = nil
|
||||||
|
g.started = false
|
||||||
|
g.Unlock()
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) String() string {
|
||||||
|
return "grpc"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) Name() string {
|
||||||
|
return g.opts.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServer(opts ...server.Option) server.Server {
|
||||||
|
return newGRPCServer(opts...)
|
||||||
|
}
|
236
error.go
Normal file
236
error.go
Normal file
@ -0,0 +1,236 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StatusCode uint64
|
||||||
|
|
||||||
|
const (
|
||||||
|
// OK is returned on success.
|
||||||
|
StatusOK StatusCode = 0
|
||||||
|
|
||||||
|
// Canceled indicates the operation was canceled (typically by the caller).
|
||||||
|
//
|
||||||
|
// The gRPC framework will generate this error code when cancellation
|
||||||
|
// is requested.
|
||||||
|
StatusCanceled StatusCode = 1
|
||||||
|
|
||||||
|
// Unknown error. An example of where this error may be returned is
|
||||||
|
// if a Status value received from another address space belongs to
|
||||||
|
// an error-space that is not known in this address space. Also
|
||||||
|
// errors raised by APIs that do not return enough error information
|
||||||
|
// may be converted to this error.
|
||||||
|
//
|
||||||
|
// The gRPC framework will generate this error code in the above two
|
||||||
|
// mentioned cases.
|
||||||
|
StatusUnknown StatusCode = 2
|
||||||
|
|
||||||
|
// InvalidArgument indicates client specified an invalid argument.
|
||||||
|
// Note that this differs from FailedPrecondition. It indicates arguments
|
||||||
|
// that are problematic regardless of the state of the system
|
||||||
|
// (e.g., a malformed file name).
|
||||||
|
//
|
||||||
|
// This error code will not be generated by the gRPC framework.
|
||||||
|
StatusInvalidArgument StatusCode = 3
|
||||||
|
|
||||||
|
// DeadlineExceeded means operation expired before completion.
|
||||||
|
// For operations that change the state of the system, this error may be
|
||||||
|
// returned even if the operation has completed successfully. For
|
||||||
|
// example, a successful response from a server could have been delayed
|
||||||
|
// long enough for the deadline to expire.
|
||||||
|
//
|
||||||
|
// The gRPC framework will generate this error code when the deadline is
|
||||||
|
// exceeded.
|
||||||
|
StatusDeadlineExceeded StatusCode = 4
|
||||||
|
|
||||||
|
// NotFound means some requested entity (e.g., file or directory) was
|
||||||
|
// not found.
|
||||||
|
//
|
||||||
|
// This error code will not be generated by the gRPC framework.
|
||||||
|
StatusNotFound StatusCode = 5
|
||||||
|
|
||||||
|
// AlreadyExists means an attempt to create an entity failed because one
|
||||||
|
// already exists.
|
||||||
|
//
|
||||||
|
// This error code will not be generated by the gRPC framework.
|
||||||
|
StatusAlreadyExists StatusCode = 6
|
||||||
|
|
||||||
|
// PermissionDenied indicates the caller does not have permission to
|
||||||
|
// execute the specified operation. It must not be used for rejections
|
||||||
|
// caused by exhausting some resource (use ResourceExhausted
|
||||||
|
// instead for those errors). It must not be
|
||||||
|
// used if the caller cannot be identified (use Unauthenticated
|
||||||
|
// instead for those errors).
|
||||||
|
//
|
||||||
|
// This error code will not be generated by the gRPC core framework,
|
||||||
|
// but expect authentication middleware to use it.
|
||||||
|
StatusPermissionDenied StatusCode = 7
|
||||||
|
|
||||||
|
// ResourceExhausted indicates some resource has been exhausted, perhaps
|
||||||
|
// a per-user quota, or perhaps the entire file system is out of space.
|
||||||
|
//
|
||||||
|
// This error code will be generated by the gRPC framework in
|
||||||
|
// out-of-memory and server overload situations, or when a message is
|
||||||
|
// larger than the configured maximum size.
|
||||||
|
StatusResourceExhausted StatusCode = 8
|
||||||
|
|
||||||
|
// FailedPrecondition indicates operation was rejected because the
|
||||||
|
// system is not in a state required for the operation's execution.
|
||||||
|
// For example, directory to be deleted may be non-empty, an rmdir
|
||||||
|
// operation is applied to a non-directory, etc.
|
||||||
|
//
|
||||||
|
// A litmus test that may help a service implementor in deciding
|
||||||
|
// between FailedPrecondition, Aborted, and Unavailable:
|
||||||
|
// (a) Use Unavailable if the client can retry just the failing call.
|
||||||
|
// (b) Use Aborted if the client should retry at a higher-level
|
||||||
|
// (e.g., restarting a read-modify-write sequence).
|
||||||
|
// (c) Use FailedPrecondition if the client should not retry until
|
||||||
|
// the system state has been explicitly fixed. E.g., if an "rmdir"
|
||||||
|
// fails because the directory is non-empty, FailedPrecondition
|
||||||
|
// should be returned since the client should not retry unless
|
||||||
|
// they have first fixed up the directory by deleting files from it.
|
||||||
|
// (d) Use FailedPrecondition if the client performs conditional
|
||||||
|
// REST Get/Update/Delete on a resource and the resource on the
|
||||||
|
// server does not match the condition. E.g., conflicting
|
||||||
|
// read-modify-write on the same resource.
|
||||||
|
//
|
||||||
|
// This error code will not be generated by the gRPC framework.
|
||||||
|
StatusFailedPrecondition StatusCode = 9
|
||||||
|
|
||||||
|
// Aborted indicates the operation was aborted, typically due to a
|
||||||
|
// concurrency issue like sequencer check failures, transaction aborts,
|
||||||
|
// etc.
|
||||||
|
//
|
||||||
|
// See litmus test above for deciding between FailedPrecondition,
|
||||||
|
// Aborted, and Unavailable.
|
||||||
|
//
|
||||||
|
// This error code will not be generated by the gRPC framework.
|
||||||
|
StatusAborted StatusCode = 10
|
||||||
|
|
||||||
|
// OutOfRange means operation was attempted past the valid range.
|
||||||
|
// E.g., seeking or reading past end of file.
|
||||||
|
//
|
||||||
|
// Unlike InvalidArgument, this error indicates a problem that may
|
||||||
|
// be fixed if the system state changes. For example, a 32-bit file
|
||||||
|
// system will generate InvalidArgument if asked to read at an
|
||||||
|
// offset that is not in the range [0,2^32-1], but it will generate
|
||||||
|
// OutOfRange if asked to read from an offset past the current
|
||||||
|
// file size.
|
||||||
|
//
|
||||||
|
// There is a fair bit of overlap between FailedPrecondition and
|
||||||
|
// OutOfRange. We recommend using OutOfRange (the more specific
|
||||||
|
// error) when it applies so that callers who are iterating through
|
||||||
|
// a space can easily look for an OutOfRange error to detect when
|
||||||
|
// they are done.
|
||||||
|
//
|
||||||
|
// This error code will not be generated by the gRPC framework.
|
||||||
|
StatusOutOfRange StatusCode = 11
|
||||||
|
|
||||||
|
// Unimplemented indicates operation is not implemented or not
|
||||||
|
// supported/enabled in this service.
|
||||||
|
//
|
||||||
|
// This error code will be generated by the gRPC framework. Most
|
||||||
|
// commonly, you will see this error code when a method implementation
|
||||||
|
// is missing on the server. It can also be generated for unknown
|
||||||
|
// compression algorithms or a disagreement as to whether an RPC should
|
||||||
|
// be streaming.
|
||||||
|
StatusUnimplemented StatusCode = 12
|
||||||
|
|
||||||
|
// Internal errors. Means some invariants expected by underlying
|
||||||
|
// system has been broken. If you see one of these errors,
|
||||||
|
// something is very broken.
|
||||||
|
//
|
||||||
|
// This error code will be generated by the gRPC framework in several
|
||||||
|
// internal error conditions.
|
||||||
|
StatusInternal StatusCode = 13
|
||||||
|
|
||||||
|
// Unavailable indicates the service is currently unavailable.
|
||||||
|
// This is a most likely a transient condition and may be corrected
|
||||||
|
// by retrying with a backoff. Note that it is not always safe to retry
|
||||||
|
// non-idempotent operations.
|
||||||
|
//
|
||||||
|
// See litmus test above for deciding between FailedPrecondition,
|
||||||
|
// Aborted, and Unavailable.
|
||||||
|
//
|
||||||
|
// This error code will be generated by the gRPC framework during
|
||||||
|
// abrupt shutdown of a server process or network connection.
|
||||||
|
StatusUnavailable StatusCode = 14
|
||||||
|
|
||||||
|
// DataLoss indicates unrecoverable data loss or corruption.
|
||||||
|
//
|
||||||
|
// This error code will not be generated by the gRPC framework.
|
||||||
|
StatusDataLoss StatusCode = 15
|
||||||
|
|
||||||
|
// Unauthenticated indicates the request does not have valid
|
||||||
|
// authentication credentials for the operation.
|
||||||
|
//
|
||||||
|
// The gRPC framework will generate this error code when the
|
||||||
|
// authentication metadata is invalid or a Credentials callback fails,
|
||||||
|
// but also expect authentication middleware to generate it.
|
||||||
|
StatusUnauthenticated StatusCode = 16
|
||||||
|
)
|
||||||
|
|
||||||
|
var errMapping = map[int32]StatusCode{
|
||||||
|
http.StatusOK: StatusOK,
|
||||||
|
http.StatusBadRequest: StatusInvalidArgument,
|
||||||
|
http.StatusRequestTimeout: StatusDeadlineExceeded,
|
||||||
|
http.StatusNotFound: StatusNotFound,
|
||||||
|
http.StatusConflict: StatusAlreadyExists,
|
||||||
|
http.StatusForbidden: StatusPermissionDenied,
|
||||||
|
http.StatusUnauthorized: StatusUnauthenticated,
|
||||||
|
http.StatusPreconditionFailed: StatusFailedPrecondition,
|
||||||
|
http.StatusNotImplemented: StatusUnimplemented,
|
||||||
|
http.StatusInternalServerError: StatusInternal,
|
||||||
|
http.StatusServiceUnavailable: StatusUnavailable,
|
||||||
|
}
|
||||||
|
|
||||||
|
// convertCode converts a standard Go error into its canonical code. Note that
|
||||||
|
// this is only used to translate the error returned by the server applications.
|
||||||
|
func convertCode(err error) StatusCode {
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
return StatusOK
|
||||||
|
case io.EOF:
|
||||||
|
return StatusOutOfRange
|
||||||
|
case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
|
||||||
|
return StatusFailedPrecondition
|
||||||
|
case os.ErrInvalid:
|
||||||
|
return StatusInvalidArgument
|
||||||
|
case context.Canceled:
|
||||||
|
return StatusCanceled
|
||||||
|
case context.DeadlineExceeded:
|
||||||
|
return StatusDeadlineExceeded
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case os.IsExist(err):
|
||||||
|
return StatusAlreadyExists
|
||||||
|
case os.IsNotExist(err):
|
||||||
|
return StatusNotFound
|
||||||
|
case os.IsPermission(err):
|
||||||
|
return StatusPermissionDenied
|
||||||
|
}
|
||||||
|
return StatusUnknown
|
||||||
|
}
|
||||||
|
|
||||||
|
func microError(err error) StatusCode {
|
||||||
|
if err == nil {
|
||||||
|
return StatusOK
|
||||||
|
}
|
||||||
|
|
||||||
|
var ec int32
|
||||||
|
if verr, ok := err.(*errors.Error); ok {
|
||||||
|
ec = verr.Code
|
||||||
|
}
|
||||||
|
|
||||||
|
if code, ok := errMapping[ec]; ok {
|
||||||
|
return code
|
||||||
|
}
|
||||||
|
|
||||||
|
return StatusUnknown
|
||||||
|
}
|
12
go.mod
12
go.mod
@ -1,3 +1,13 @@
|
|||||||
module github.com/unistack-org/micro-server-drpc/v3
|
module github.com/unistack-org/micro-server-grpc/v3
|
||||||
|
|
||||||
go 1.16
|
go 1.16
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/golang/protobuf v1.5.2 // indirect
|
||||||
|
github.com/unistack-org/micro/v3 v3.3.17
|
||||||
|
golang.org/x/net v0.0.0-20210510120150-4163338589ed
|
||||||
|
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||||
|
google.golang.org/grpc v1.37.1
|
||||||
|
google.golang.org/protobuf v1.26.0
|
||||||
|
storj.io/drpc v0.0.23
|
||||||
|
)
|
||||||
|
115
go.sum
Normal file
115
go.sum
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||||
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
|
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||||
|
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||||
|
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||||
|
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
||||||
|
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
|
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
|
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
|
||||||
|
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||||
|
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||||
|
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||||
|
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||||
|
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||||
|
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||||
|
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||||
|
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||||
|
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||||
|
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||||
|
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
|
||||||
|
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||||
|
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||||
|
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||||
|
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||||
|
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||||
|
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
|
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
|
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||||
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
|
||||||
|
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||||
|
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||||
|
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||||
|
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||||
|
github.com/unistack-org/micro/v3 v3.3.17 h1:WcyS7InP0DlS/JpRQGLh5sG6VstkdHJbgpMp+gmHmwg=
|
||||||
|
github.com/unistack-org/micro/v3 v3.3.17/go.mod h1:022EOEZZ789hZY3yB5ZSMXU6jLiadBgcNB/cpediV3c=
|
||||||
|
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
|
||||||
|
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
|
||||||
|
github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g=
|
||||||
|
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
|
||||||
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
|
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||||
|
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||||
|
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||||
|
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||||
|
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||||
|
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||||
|
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||||
|
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||||
|
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
|
||||||
|
golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I=
|
||||||
|
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||||
|
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||||
|
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
|
||||||
|
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
|
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||||
|
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||||
|
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||||
|
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
|
||||||
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||||
|
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||||
|
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||||
|
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||||
|
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||||
|
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
|
||||||
|
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
|
||||||
|
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||||
|
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
|
||||||
|
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
|
||||||
|
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||||
|
google.golang.org/grpc v1.37.1 h1:ARnQJNWxGyYJpdf/JXscNlQr/uv607ZPU9Z7ogHi+iI=
|
||||||
|
google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
|
||||||
|
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||||
|
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||||
|
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||||
|
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||||
|
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||||
|
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||||
|
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||||
|
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||||
|
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
|
||||||
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
|
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
|
||||||
|
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
|
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
|
storj.io/drpc v0.0.23 h1:MHVOMVoBbQkPuL17GerlGTA9PLwE1HF6zAOYFvWwYG4=
|
||||||
|
storj.io/drpc v0.0.23/go.mod h1:OSJH7wvH3yKlhnMHwblKJioaaeyI6X8xbXT1SG9woe8=
|
60
handler.go
Normal file
60
handler.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/register"
|
||||||
|
"github.com/unistack-org/micro/v3/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rpcHandler struct {
|
||||||
|
opts server.HandlerOptions
|
||||||
|
handler interface{}
|
||||||
|
name string
|
||||||
|
endpoints []*register.Endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRPCHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
|
||||||
|
options := server.NewHandlerOptions(opts...)
|
||||||
|
|
||||||
|
typ := reflect.TypeOf(handler)
|
||||||
|
hdlr := reflect.ValueOf(handler)
|
||||||
|
name := reflect.Indirect(hdlr).Type().Name()
|
||||||
|
|
||||||
|
var endpoints []*register.Endpoint
|
||||||
|
|
||||||
|
for m := 0; m < typ.NumMethod(); m++ {
|
||||||
|
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
|
||||||
|
e.Name = name + "." + e.Name
|
||||||
|
|
||||||
|
for k, v := range options.Metadata[e.Name] {
|
||||||
|
e.Metadata[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoints = append(endpoints, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &rpcHandler{
|
||||||
|
name: name,
|
||||||
|
handler: handler,
|
||||||
|
endpoints: endpoints,
|
||||||
|
opts: options,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcHandler) Name() string {
|
||||||
|
return r.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcHandler) Handler() interface{} {
|
||||||
|
return r.handler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcHandler) Endpoints() []*register.Endpoint {
|
||||||
|
return r.endpoints
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcHandler) Options() server.HandlerOptions {
|
||||||
|
return r.opts
|
||||||
|
}
|
17
options.go
Normal file
17
options.go
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/unistack-org/micro/v3/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
maxMsgSizeKey struct{}
|
||||||
|
)
|
||||||
|
|
||||||
|
//
|
||||||
|
// MaxMsgSize set the maximum message in bytes the server can receive and
|
||||||
|
// send. Default maximum message size is 4 MB.
|
||||||
|
//
|
||||||
|
func MaxMsgSize(s int) server.Option {
|
||||||
|
return server.SetOption(maxMsgSizeKey{}, s)
|
||||||
|
}
|
100
request.go
Normal file
100
request.go
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/codec"
|
||||||
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
|
"github.com/unistack-org/micro/v3/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ server.Request = &rpcRequest{}
|
||||||
|
_ server.Message = &rpcMessage{}
|
||||||
|
)
|
||||||
|
|
||||||
|
type rpcRequest struct {
|
||||||
|
rw io.ReadWriter
|
||||||
|
payload interface{}
|
||||||
|
codec codec.Codec
|
||||||
|
header metadata.Metadata
|
||||||
|
method string
|
||||||
|
endpoint string
|
||||||
|
contentType string
|
||||||
|
service string
|
||||||
|
body []byte
|
||||||
|
stream bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type rpcMessage struct {
|
||||||
|
payload interface{}
|
||||||
|
codec codec.Codec
|
||||||
|
header metadata.Metadata
|
||||||
|
topic string
|
||||||
|
contentType string
|
||||||
|
body []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) ContentType() string {
|
||||||
|
return r.contentType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Service() string {
|
||||||
|
return r.service
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Method() string {
|
||||||
|
return r.method
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Endpoint() string {
|
||||||
|
return r.endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Codec() codec.Codec {
|
||||||
|
return r.codec
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Header() metadata.Metadata {
|
||||||
|
return r.header
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Read() ([]byte, error) {
|
||||||
|
f := &codec.Frame{}
|
||||||
|
if err := r.codec.ReadBody(r.rw, f); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return f.Data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Stream() bool {
|
||||||
|
return r.stream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Body() interface{} {
|
||||||
|
return r.payload
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) ContentType() string {
|
||||||
|
return r.contentType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) Topic() string {
|
||||||
|
return r.topic
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) Payload() interface{} {
|
||||||
|
return r.payload
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) Header() metadata.Metadata {
|
||||||
|
return r.header
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) Body() []byte {
|
||||||
|
return r.body
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) Codec() codec.Codec {
|
||||||
|
return r.codec
|
||||||
|
}
|
34
response.go
Normal file
34
response.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/codec"
|
||||||
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
|
"github.com/unistack-org/micro/v3/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ server.Response = &rpcResponse{}
|
||||||
|
|
||||||
|
type rpcResponse struct {
|
||||||
|
rw io.ReadWriter
|
||||||
|
header metadata.Metadata
|
||||||
|
codec codec.Codec
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcResponse) Codec() codec.Codec {
|
||||||
|
return r.codec
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcResponse) WriteHeader(hdr metadata.Metadata) {
|
||||||
|
for k, v := range hdr {
|
||||||
|
r.header[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcResponse) Write(b []byte) error {
|
||||||
|
return r.codec.Write(r.rw, &codec.Message{
|
||||||
|
Header: r.header,
|
||||||
|
Body: b,
|
||||||
|
}, nil)
|
||||||
|
}
|
171
server.go
Normal file
171
server.go
Normal file
@ -0,0 +1,171 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
// Copyright 2009 The Go Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
//
|
||||||
|
// Meh, we need to get rid of this shit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"unicode"
|
||||||
|
"unicode/utf8"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Precompute the reflect type for error. Can't use error directly
|
||||||
|
// because Typeof takes an empty interface value. This is annoying.
|
||||||
|
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
||||||
|
|
||||||
|
type methodType struct {
|
||||||
|
ArgType reflect.Type
|
||||||
|
ReplyType reflect.Type
|
||||||
|
ContextType reflect.Type
|
||||||
|
method reflect.Method
|
||||||
|
stream bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// type reflectionType func(context.Context, server.Stream) error
|
||||||
|
|
||||||
|
type service struct {
|
||||||
|
typ reflect.Type
|
||||||
|
method map[string]*methodType
|
||||||
|
rcvr reflect.Value
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
// server represents an RPC Server.
|
||||||
|
type rServer struct {
|
||||||
|
serviceMap map[string]*service
|
||||||
|
mu sync.RWMutex
|
||||||
|
// reflection bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is this an exported - upper case - name?
|
||||||
|
func isExported(name string) bool {
|
||||||
|
rune, _ := utf8.DecodeRuneInString(name)
|
||||||
|
return unicode.IsUpper(rune)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is this type exported or a builtin?
|
||||||
|
func isExportedOrBuiltinType(t reflect.Type) bool {
|
||||||
|
for t.Kind() == reflect.Ptr {
|
||||||
|
t = t.Elem()
|
||||||
|
}
|
||||||
|
// PkgPath will be non-empty even for an exported type,
|
||||||
|
// so we need to check the type name as well.
|
||||||
|
return isExported(t.Name()) || t.PkgPath() == ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepareEndpoint() returns a methodType for the provided method or nil
|
||||||
|
// in case if the method was unsuitable.
|
||||||
|
func prepareEndpoint(method reflect.Method) (*methodType, error) {
|
||||||
|
mtype := method.Type
|
||||||
|
mname := method.Name
|
||||||
|
var replyType, argType, contextType reflect.Type
|
||||||
|
var stream bool
|
||||||
|
|
||||||
|
// Endpoint() must be exported.
|
||||||
|
if method.PkgPath != "" {
|
||||||
|
return nil, fmt.Errorf("Endpoint must be exported")
|
||||||
|
}
|
||||||
|
|
||||||
|
switch mtype.NumIn() {
|
||||||
|
case 3:
|
||||||
|
// assuming streaming
|
||||||
|
argType = mtype.In(2)
|
||||||
|
contextType = mtype.In(1)
|
||||||
|
stream = true
|
||||||
|
case 4:
|
||||||
|
// method that takes a context
|
||||||
|
argType = mtype.In(2)
|
||||||
|
replyType = mtype.In(3)
|
||||||
|
contextType = mtype.In(1)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
|
||||||
|
}
|
||||||
|
|
||||||
|
switch stream {
|
||||||
|
case true:
|
||||||
|
// check stream type
|
||||||
|
streamType := reflect.TypeOf((*server.Stream)(nil)).Elem()
|
||||||
|
if !argType.Implements(streamType) {
|
||||||
|
return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// First arg need not be a pointer.
|
||||||
|
if !isExportedOrBuiltinType(argType) {
|
||||||
|
return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType)
|
||||||
|
}
|
||||||
|
|
||||||
|
if replyType.Kind() != reflect.Ptr {
|
||||||
|
return nil, fmt.Errorf("method %v reply type not a pointer: %v", mname, replyType)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reply type must be exported.
|
||||||
|
if !isExportedOrBuiltinType(replyType) {
|
||||||
|
return nil, fmt.Errorf("method %v reply type not exported: %v", mname, replyType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Endpoint() needs one out.
|
||||||
|
if mtype.NumOut() != 1 {
|
||||||
|
return nil, fmt.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
|
||||||
|
}
|
||||||
|
// The return type of the method must be error.
|
||||||
|
if returnType := mtype.Out(0); returnType != typeOfError {
|
||||||
|
return nil, fmt.Errorf("method %v returns %v not error", mname, returnType.String())
|
||||||
|
}
|
||||||
|
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (server *rServer) register(rcvr interface{}) error {
|
||||||
|
server.mu.Lock()
|
||||||
|
defer server.mu.Unlock()
|
||||||
|
if server.serviceMap == nil {
|
||||||
|
server.serviceMap = make(map[string]*service)
|
||||||
|
}
|
||||||
|
s := &service{}
|
||||||
|
s.typ = reflect.TypeOf(rcvr)
|
||||||
|
s.rcvr = reflect.ValueOf(rcvr)
|
||||||
|
sname := reflect.Indirect(s.rcvr).Type().Name()
|
||||||
|
if sname == "" {
|
||||||
|
return fmt.Errorf("rpc: no service name for type %v", s.typ.String())
|
||||||
|
}
|
||||||
|
if !isExported(sname) {
|
||||||
|
return fmt.Errorf("rpc Register: type %s is not exported", sname)
|
||||||
|
}
|
||||||
|
if _, present := server.serviceMap[sname]; present {
|
||||||
|
return fmt.Errorf("rpc: service already defined: " + sname)
|
||||||
|
}
|
||||||
|
s.name = sname
|
||||||
|
s.method = make(map[string]*methodType)
|
||||||
|
|
||||||
|
// Install the methods
|
||||||
|
for m := 0; m < s.typ.NumMethod(); m++ {
|
||||||
|
method := s.typ.Method(m)
|
||||||
|
mt, err := prepareEndpoint(method)
|
||||||
|
if mt != nil && err == nil {
|
||||||
|
s.method[method.Name] = mt
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s.method) == 0 {
|
||||||
|
return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname)
|
||||||
|
}
|
||||||
|
server.serviceMap[s.name] = s
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
|
||||||
|
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
|
||||||
|
return contextv
|
||||||
|
}
|
||||||
|
return reflect.Zero(m.ContextType)
|
||||||
|
}
|
40
stream.go
Normal file
40
stream.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/server"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// rpcStream implements a server side Stream.
|
||||||
|
type rpcStream struct {
|
||||||
|
// embed the grpc stream so we can access it
|
||||||
|
grpc.ServerStream
|
||||||
|
|
||||||
|
request server.Request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcStream) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcStream) Error() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcStream) Request() server.Request {
|
||||||
|
return r.request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcStream) Context() context.Context {
|
||||||
|
return r.ServerStream.Context()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcStream) Send(m interface{}) error {
|
||||||
|
return r.ServerStream.SendMsg(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcStream) Recv(m interface{}) error {
|
||||||
|
return r.ServerStream.RecvMsg(m)
|
||||||
|
}
|
232
subscriber.go
Normal file
232
subscriber.go
Normal file
@ -0,0 +1,232 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"runtime/debug"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/broker"
|
||||||
|
"github.com/unistack-org/micro/v3/errors"
|
||||||
|
"github.com/unistack-org/micro/v3/logger"
|
||||||
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
|
"github.com/unistack-org/micro/v3/register"
|
||||||
|
"github.com/unistack-org/micro/v3/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
type handler struct {
|
||||||
|
reqType reflect.Type
|
||||||
|
ctxType reflect.Type
|
||||||
|
method reflect.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
type subscriber struct {
|
||||||
|
topic string
|
||||||
|
rcvr reflect.Value
|
||||||
|
typ reflect.Type
|
||||||
|
subscriber interface{}
|
||||||
|
handlers []*handler
|
||||||
|
endpoints []*register.Endpoint
|
||||||
|
opts server.SubscriberOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||||
|
options := server.NewSubscriberOptions(opts...)
|
||||||
|
|
||||||
|
var endpoints []*register.Endpoint
|
||||||
|
var handlers []*handler
|
||||||
|
|
||||||
|
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
|
||||||
|
h := &handler{
|
||||||
|
method: reflect.ValueOf(sub),
|
||||||
|
}
|
||||||
|
|
||||||
|
switch typ.NumIn() {
|
||||||
|
case 1:
|
||||||
|
h.reqType = typ.In(0)
|
||||||
|
case 2:
|
||||||
|
h.ctxType = typ.In(0)
|
||||||
|
h.reqType = typ.In(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
handlers = append(handlers, h)
|
||||||
|
|
||||||
|
endpoints = append(endpoints, ®ister.Endpoint{
|
||||||
|
Name: "Func",
|
||||||
|
Request: register.ExtractSubValue(typ),
|
||||||
|
Metadata: map[string]string{
|
||||||
|
"topic": topic,
|
||||||
|
"subscriber": "true",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
hdlr := reflect.ValueOf(sub)
|
||||||
|
name := reflect.Indirect(hdlr).Type().Name()
|
||||||
|
|
||||||
|
for m := 0; m < typ.NumMethod(); m++ {
|
||||||
|
method := typ.Method(m)
|
||||||
|
h := &handler{
|
||||||
|
method: method.Func,
|
||||||
|
}
|
||||||
|
|
||||||
|
switch method.Type.NumIn() {
|
||||||
|
case 2:
|
||||||
|
h.reqType = method.Type.In(1)
|
||||||
|
case 3:
|
||||||
|
h.ctxType = method.Type.In(1)
|
||||||
|
h.reqType = method.Type.In(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
handlers = append(handlers, h)
|
||||||
|
|
||||||
|
endpoints = append(endpoints, ®ister.Endpoint{
|
||||||
|
Name: name + "." + method.Name,
|
||||||
|
Request: register.ExtractSubValue(method.Type),
|
||||||
|
Metadata: map[string]string{
|
||||||
|
"topic": topic,
|
||||||
|
"subscriber": "true",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &subscriber{
|
||||||
|
rcvr: reflect.ValueOf(sub),
|
||||||
|
typ: reflect.TypeOf(sub),
|
||||||
|
topic: topic,
|
||||||
|
subscriber: sub,
|
||||||
|
handlers: handlers,
|
||||||
|
endpoints: endpoints,
|
||||||
|
opts: options,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||||
|
return func(p broker.Event) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
if g.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
g.opts.Logger.Error(g.opts.Context, "panic recovered: ", r)
|
||||||
|
g.opts.Logger.Error(g.opts.Context, string(debug.Stack()))
|
||||||
|
}
|
||||||
|
err = errors.InternalServerError(g.opts.Name+".subscriber", "panic recovered: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
msg := p.Message()
|
||||||
|
// if we don't have headers, create empty map
|
||||||
|
if msg.Header == nil {
|
||||||
|
msg.Header = make(map[string]string)
|
||||||
|
}
|
||||||
|
|
||||||
|
ct := msg.Header["Content-Type"]
|
||||||
|
if len(ct) == 0 {
|
||||||
|
msg.Header["Content-Type"] = defaultContentType
|
||||||
|
ct = defaultContentType
|
||||||
|
}
|
||||||
|
cf, err := g.newCodec(ct)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
hdr := make(map[string]string, len(msg.Header))
|
||||||
|
for k, v := range msg.Header {
|
||||||
|
if k == "Content-Type" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
hdr[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
|
||||||
|
|
||||||
|
results := make(chan error, len(sb.handlers))
|
||||||
|
|
||||||
|
for i := 0; i < len(sb.handlers); i++ {
|
||||||
|
handler := sb.handlers[i]
|
||||||
|
|
||||||
|
var isVal bool
|
||||||
|
var req reflect.Value
|
||||||
|
|
||||||
|
if handler.reqType.Kind() == reflect.Ptr {
|
||||||
|
req = reflect.New(handler.reqType.Elem())
|
||||||
|
} else {
|
||||||
|
req = reflect.New(handler.reqType)
|
||||||
|
isVal = true
|
||||||
|
}
|
||||||
|
if isVal {
|
||||||
|
req = req.Elem()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := func(ctx context.Context, msg server.Message) error {
|
||||||
|
var vals []reflect.Value
|
||||||
|
if sb.typ.Kind() != reflect.Func {
|
||||||
|
vals = append(vals, sb.rcvr)
|
||||||
|
}
|
||||||
|
if handler.ctxType != nil {
|
||||||
|
vals = append(vals, reflect.ValueOf(ctx))
|
||||||
|
}
|
||||||
|
|
||||||
|
vals = append(vals, reflect.ValueOf(msg.Payload()))
|
||||||
|
|
||||||
|
returnValues := handler.method.Call(vals)
|
||||||
|
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||||
|
return rerr.(error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := len(opts.SubWrappers); i > 0; i-- {
|
||||||
|
fn = opts.SubWrappers[i-1](fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
if g.wg != nil {
|
||||||
|
g.wg.Add(1)
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
if g.wg != nil {
|
||||||
|
defer g.wg.Done()
|
||||||
|
}
|
||||||
|
cerr := fn(ctx, &rpcMessage{
|
||||||
|
topic: sb.topic,
|
||||||
|
contentType: ct,
|
||||||
|
payload: req.Interface(),
|
||||||
|
header: msg.Header,
|
||||||
|
body: msg.Body,
|
||||||
|
})
|
||||||
|
results <- cerr
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
var errors []string
|
||||||
|
for i := 0; i < len(sb.handlers); i++ {
|
||||||
|
if rerr := <-results; rerr != nil {
|
||||||
|
errors = append(errors, rerr.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(errors) > 0 {
|
||||||
|
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Topic() string {
|
||||||
|
return s.topic
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Subscriber() interface{} {
|
||||||
|
return s.subscriber
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Endpoints() []*register.Endpoint {
|
||||||
|
return s.endpoints
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Options() server.SubscriberOptions {
|
||||||
|
return s.opts
|
||||||
|
}
|
59
util.go
Normal file
59
util.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ServiceMethod converts a gRPC method to a Go method
|
||||||
|
// Input:
|
||||||
|
// Foo.Bar, /Foo/Bar, /package.Foo/Bar, /a.package.Foo/Bar
|
||||||
|
// Output:
|
||||||
|
// [Foo, Bar]
|
||||||
|
func serviceMethod(m string) (string, string, error) {
|
||||||
|
if len(m) == 0 {
|
||||||
|
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// grpc method
|
||||||
|
if m[0] == '/' {
|
||||||
|
// [ , Foo, Bar]
|
||||||
|
// [ , package.Foo, Bar]
|
||||||
|
// [ , a.package.Foo, Bar]
|
||||||
|
parts := strings.Split(m, "/")
|
||||||
|
if len(parts) != 3 || len(parts[1]) == 0 || len(parts[2]) == 0 {
|
||||||
|
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||||||
|
}
|
||||||
|
service := strings.Split(parts[1], ".")
|
||||||
|
return service[len(service)-1], parts[2], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// non grpc method
|
||||||
|
parts := strings.Split(m, ".")
|
||||||
|
|
||||||
|
// expect [Foo, Bar]
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||||||
|
}
|
||||||
|
|
||||||
|
return parts[0], parts[1], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
// ServiceFromMethod returns the service
|
||||||
|
// /service.Foo/Bar => service
|
||||||
|
func serviceFromMethod(m string) string {
|
||||||
|
if len(m) == 0 {
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
if m[0] != '/' {
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
parts := strings.Split(m, "/")
|
||||||
|
if len(parts) < 3 {
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
parts = strings.Split(parts[1], ".")
|
||||||
|
return strings.Join(parts[:len(parts)-1], ".")
|
||||||
|
}
|
||||||
|
*/
|
46
util_test.go
Normal file
46
util_test.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
package drpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServiceMethod(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
input string
|
||||||
|
service string
|
||||||
|
method string
|
||||||
|
err bool
|
||||||
|
}
|
||||||
|
|
||||||
|
methods := []testCase{
|
||||||
|
{"Foo.Bar", "Foo", "Bar", false},
|
||||||
|
{"/Foo/Bar", "Foo", "Bar", false},
|
||||||
|
{"/package.Foo/Bar", "Foo", "Bar", false},
|
||||||
|
{"/a.package.Foo/Bar", "Foo", "Bar", false},
|
||||||
|
{"a.package.Foo/Bar", "", "", true},
|
||||||
|
{"/Foo/Bar/Baz", "", "", true},
|
||||||
|
{"Foo.Bar.Baz", "", "", true},
|
||||||
|
}
|
||||||
|
for _, test := range methods {
|
||||||
|
service, method, err := serviceMethod(test.input)
|
||||||
|
if err != nil && test.err == true {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// unexpected error
|
||||||
|
if err != nil && test.err == false {
|
||||||
|
t.Fatalf("unexpected err %v for %+v", err, test)
|
||||||
|
}
|
||||||
|
// expecter error
|
||||||
|
if test.err == true && err == nil {
|
||||||
|
t.Fatalf("expected error for %+v: got service: %s method: %s", test, service, method)
|
||||||
|
}
|
||||||
|
|
||||||
|
if service != test.service {
|
||||||
|
t.Fatalf("wrong service for %+v: got service: %s method: %s", test, service, method)
|
||||||
|
}
|
||||||
|
|
||||||
|
if method != test.method {
|
||||||
|
t.Fatalf("wrong method for %+v: got service: %s method: %s", test, service, method)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user