From d797edb9a765cbde91f3fdf48c8bf03414f2752d Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 12 Sep 2021 23:24:32 +0300 Subject: [PATCH] add needed files Signed-off-by: Vasiliy Tolstov --- LICENSE | 15 +- codec.go | 54 +++ drpc.go | 961 ++++++++++++++++++++++++++++++++++++++++++++++++++ error.go | 236 +++++++++++++ go.mod | 12 +- go.sum | 115 ++++++ handler.go | 60 ++++ options.go | 17 + request.go | 100 ++++++ response.go | 34 ++ server.go | 171 +++++++++ stream.go | 40 +++ subscriber.go | 232 ++++++++++++ util.go | 59 ++++ util_test.go | 46 +++ 15 files changed, 2139 insertions(+), 13 deletions(-) create mode 100644 codec.go create mode 100644 drpc.go create mode 100644 error.go create mode 100644 go.sum create mode 100644 handler.go create mode 100644 options.go create mode 100644 request.go create mode 100644 response.go create mode 100644 server.go create mode 100644 stream.go create mode 100644 subscriber.go create mode 100644 util.go create mode 100644 util_test.go diff --git a/LICENSE b/LICENSE index 261eeb9..6e701f1 100644 --- a/LICENSE +++ b/LICENSE @@ -1,3 +1,4 @@ + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ @@ -175,18 +176,8 @@ END OF TERMS AND CONDITIONS - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] + Copyright 2015-2020 Asim Aslam. + Copyright 2019-2020 Unistack LLC. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/codec.go b/codec.go new file mode 100644 index 0000000..204609f --- /dev/null +++ b/codec.go @@ -0,0 +1,54 @@ +package drpc + +import ( + "io" + + "github.com/unistack-org/micro/v3/codec" + "storj.io/drpc" +) + +type wrapMicroCodec struct{ codec.Codec } + +func (w *wrapMicroCodec) Marshal(v drpc.Message) ([]byte, error) { + if m, ok := v.(*codec.Frame); ok { + return m.Data, nil + } + return w.Codec.Marshal(v.(interface{})) +} + +func (w *wrapMicroCodec) Unmarshal(d []byte, v drpc.Message) error { + if d == nil || v == nil { + return nil + } + if m, ok := v.(*codec.Frame); ok { + m.Data = d + return nil + } + return w.Codec.Unmarshal(d, v.(interface{})) +} + +func (w *wrapMicroCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error { + return nil +} + +func (w *wrapMicroCodec) ReadBody(conn io.Reader, v interface{}) error { + if m, ok := v.(*codec.Frame); ok { + _, err := conn.Read(m.Data) + return err + } + return codec.ErrInvalidMessage +} + +func (w *wrapMicroCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error { + // if we don't have a body + if v != nil { + b, err := w.Marshal(v) + if err != nil { + return err + } + m.Body = b + } + // write the body using the framing codec + _, err := conn.Write(m.Body) + return err +} diff --git a/drpc.go b/drpc.go new file mode 100644 index 0000000..d66bedf --- /dev/null +++ b/drpc.go @@ -0,0 +1,961 @@ +// Package drpc provides a drpc server +package drpc + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "reflect" + "runtime/debug" + "sort" + "strconv" + "strings" + "sync" + "time" + + // nolint: staticcheck + + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/errors" + "github.com/unistack-org/micro/v3/logger" + metadata "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/register" + "github.com/unistack-org/micro/v3/server" + "golang.org/x/net/netutil" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "storj.io/drpc/drpcerr" + gmetadata "storj.io/drpc/drpcmetadata" +) + +const ( + defaultContentType = "application/grpc+proto" +) + +/* +type grpcServerReflection struct { + srv *grpc.Server + s *serverReflectionServer +} +*/ + +type grpcServer struct { + handlers map[string]server.Handler + srv *grpc.Server + exit chan chan error + wg *sync.WaitGroup + rsvc *register.Service + subscribers map[*subscriber][]broker.Subscriber + rpc *rServer + opts server.Options + sync.RWMutex + init bool + started bool + registered bool +} + +func newGRPCServer(opts ...server.Option) server.Server { + // create a grpc server + g := &grpcServer{ + opts: server.NewOptions(opts...), + rpc: &rServer{ + serviceMap: make(map[string]*service), + }, + handlers: make(map[string]server.Handler), + subscribers: make(map[*subscriber][]broker.Subscriber), + exit: make(chan chan error), + } + + return g +} + +/* +type grpcRouter struct { + h func(context.Context, server.Request, interface{}) error + m func(context.Context, server.Message) error +} + +func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error { + return r.m(ctx, msg) +} + +func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { + return r.h(ctx, req, rsp) +} + +*/ + +func (g *grpcServer) configure(opts ...server.Option) error { + g.Lock() + defer g.Unlock() + + for _, o := range opts { + o(&g.opts) + } + + if err := g.opts.Register.Init(); err != nil { + return err + } + if err := g.opts.Broker.Init(); err != nil { + return err + } + if err := g.opts.Tracer.Init(); err != nil { + return err + } + if err := g.opts.Auth.Init(); err != nil { + return err + } + if err := g.opts.Logger.Init(); err != nil { + return err + } + if err := g.opts.Meter.Init(); err != nil { + return err + } + if err := g.opts.Transport.Init(); err != nil { + return err + } + + g.wg = g.opts.Wait + + maxMsgSize := g.getMaxMsgSize() + + gopts := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(maxMsgSize), + grpc.MaxSendMsgSize(maxMsgSize), + grpc.UnknownServiceHandler(g.handler), + } + + /* + if creds := g.getCredentials(); creds != nil { + gopts = append(gopts, grpc.Creds(creds)) + } + + if opts := g.getGrpcOptions(); opts != nil { + gopts = append(gopts, opts...) + } + */ + + g.rsvc = nil + restart := false + if g.started { + restart = true + if err := g.Stop(); err != nil { + return err + } + } + g.srv = grpc.NewServer(gopts...) + + if restart { + return g.Start() + } + + g.init = true + + return nil +} + +func (g *grpcServer) getMaxMsgSize() int { + if g.opts.Context == nil { + return codec.DefaultMaxMsgSize + } + s, ok := g.opts.Context.Value(maxMsgSizeKey{}).(int) + if !ok { + return codec.DefaultMaxMsgSize + } + return s +} + +/* +func (g *grpcServer) getCredentials() credentials.TransportCredentials { + if g.opts.TLSConfig != nil { + return credentials.NewTLS(g.opts.TLSConfig) + } + return nil +} +*/ +/* +func (g *grpcServer) getGrpcOptions() []grpc.ServerOption { + if g.opts.Context == nil { + return nil + } + + opts, ok := g.opts.Context.Value(grpcOptions{}).([]grpc.ServerOption) + if !ok || opts == nil { + return nil + } + + return opts +} +*/ + +func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) { + defer func() { + if r := recover(); r != nil { + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error(config.Context, "panic recovered: ", r) + config.Logger.Error(config.Context, string(debug.Stack())) + } + err = errors.InternalServerError(g.opts.Name, "panic recovered: %v", r) + } else if err != nil { + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "grpc handler got error: %s", err) + } + } + }() + + if g.wg != nil { + g.wg.Add(1) + defer g.wg.Done() + } + + fullMethod, ok := grpc.MethodFromServerStream(stream) + if !ok { + return status.Errorf(codes.Internal, "method does not exist in context") + } + + serviceName, methodName, err := serviceMethod(fullMethod) + if err != nil { + return status.New(StatusInvalidArgument, err.Error()).Err() + } + + // get drpc metadata + dmd, _ := gmetadata.Get(stream.Context()) + + md := metadata.New(len(dmd)) + for k, v := range dmd { + md.Set(k, v) + } + + // timeout for server deadline + to, ok := md.Get("timeout") + if ok { + md.Del("timeout") + } + + // get content type + ct := defaultContentType + + if ctype, ok := md.Get("content-type"); ok { + ct = ctype + } else if ctype, ok := md.Get("x-content-type"); ok { + ct = ctype + md.Del("x-content-type") + } + + // create new context + ctx := metadata.NewIncomingContext(stream.Context(), md) + + // get peer from context + if p, ok := peer.FromContext(stream.Context()); ok { + md["Remote"] = p.Addr.String() + ctx = peer.NewContext(ctx, p) + } + + // set the timeout if we have it + if len(to) > 0 { + if n, err := strconv.ParseUint(to, 10, 64); err == nil { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(n)) + defer cancel() + } + } + + g.rpc.mu.RLock() + svc := g.rpc.serviceMap[serviceName] + g.rpc.mu.RUnlock() + + /* + if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { + rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} + svc = &service{} + svc.typ = reflect.TypeOf(rfl) + svc.rcvr = reflect.ValueOf(rfl) + svc.name = reflect.Indirect(svc.rcvr).Type().Name() + svc.method = make(map[string]*methodType) + typ := reflect.TypeOf(rfl) + if me, ok := typ.MethodByName("ServerReflectionInfo"); ok { + g.rpc.mu.Lock() + ep, err := prepareEndpoint(me) + if ep != nil && err != nil { + svc.method["ServerReflectionInfo"] = ep + } else if err != nil { + return status.New(codes.Unimplemented, err.Error()).Err() + } + g.rpc.mu.Unlock() + } + } + */ + + if svc == nil { + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() + } + + mtype := svc.method[methodName] + if mtype == nil { + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s.%s", serviceName, methodName)).Err() + } + + // process unary + if !mtype.stream { + return g.processRequest(ctx, stream, svc, mtype, ct) + } + + // process stream + return g.processStream(ctx, stream, svc, mtype, ct) +} + +func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { + // for { + var argv, replyv reflect.Value + + // Decode the argument value. + argIsValue := false // if true, need to indirect before calling. + if mtype.ArgType.Kind() == reflect.Ptr { + argv = reflect.New(mtype.ArgType.Elem()) + } else { + argv = reflect.New(mtype.ArgType) + argIsValue = true + } + + // Unmarshal request + if err := stream.RecvMsg(argv.Interface()); err != nil { + return err + } + + if argIsValue { + argv = argv.Elem() + } + + // reply value + replyv = reflect.New(mtype.ReplyType.Elem()) + + function := mtype.method.Func + var returnValues []reflect.Value + + cf, err := g.newCodec(ct) + if err != nil { + return errors.InternalServerError(g.opts.Name, err.Error()) + } + b, err := cf.Marshal(argv.Interface()) + if err != nil { + return err + } + + // create a client.Request + r := &rpcRequest{ + service: g.opts.Name, + contentType: ct, + method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name), + body: b, + payload: argv.Interface(), + } + // define the handler func + fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) { + returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) + + // The return value for the method is an error. + if rerr := returnValues[0].Interface(); rerr != nil { + err = rerr.(error) + } + + return err + } + + // wrap the handler func + for i := len(g.opts.HdlrWrappers); i > 0; i-- { + fn = g.opts.HdlrWrappers[i-1](fn) + } + + statusCode := StatusOK + statusDesc := "" + + // execute the handler + if appErr := fn(ctx, r, replyv.Interface()); appErr != nil { + switch verr := appErr.(type) { + case *errors.Error: + statusCode = microError(verr) + statusDesc = verr.Error() + case proto.Message: + // user defined error that proto based we can attach it to grpc status + statusCode = convertCode(appErr) + statusDesc = appErr.Error() + default: + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Warn(config.Context, "handler error will not be transferred properly, must return *errors.Error or drpcerr.Error") + } + // default case user pass own error type that not proto based + statusCode = convertCode(verr) + statusDesc = verr.Error() + } + + return drpcerr.WithCode(fmt.Errorf(statusDesc), uint64(statusCode)) + } + + if err := stream.SendMsg(replyv.Interface()); err != nil { + return err + } + + return nil +} + +/* +type reflectStream struct { + stream server.Stream +} + +func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error { + return s.stream.Send(rsp) +} + +func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) { + req := &grpcreflect.ServerReflectionRequest{} + err := s.stream.Recv(req) + return req, err +} + +func (s *reflectStream) SetHeader(gmetadata.MD) error { + return nil +} + +func (s *reflectStream) SendHeader(gmetadata.MD) error { + return nil +} + +func (s *reflectStream) SetTrailer(gmetadata.MD) { + +} + +func (s *reflectStream) Context() context.Context { + return s.stream.Context() +} + +func (s *reflectStream) SendMsg(m interface{}) error { + return s.stream.Send(m) +} + +func (s *reflectStream) RecvMsg(m interface{}) error { + return s.stream.Recv(m) +} + +func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { + return g.s.ServerReflectionInfo(&reflectStream{stream}) +} +*/ + +func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { + opts := g.opts + + r := &rpcRequest{ + service: opts.Name, + contentType: ct, + method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name), + stream: true, + } + + ss := &rpcStream{ + ServerStream: stream, + request: r, + } + + function := mtype.method.Func + var returnValues []reflect.Value + + // Invoke the method, providing a new value for the reply. + fn := func(ctx context.Context, req server.Request, stream interface{}) error { + returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(stream)}) + if err := returnValues[0].Interface(); err != nil { + return err.(error) + } + + return nil + } + + for i := len(opts.HdlrWrappers); i > 0; i-- { + fn = opts.HdlrWrappers[i-1](fn) + } + + statusCode := StatusOK + statusDesc := "" + + if appErr := fn(ctx, r, ss); appErr != nil { + switch verr := appErr.(type) { + case *errors.Error: + statusCode = microError(verr) + statusDesc = verr.Error() + case proto.Message: + // user defined error that proto based we can attach it to grpc status + statusCode = convertCode(appErr) + statusDesc = appErr.Error() + default: + if g.opts.Logger.V(logger.ErrorLevel) { + g.opts.Logger.Error(g.opts.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message") + } + // default case user pass own error type that not proto based + statusCode = convertCode(verr) + statusDesc = verr.Error() + } + return drpcerr.WithCode(fmt.Errorf(statusDesc), uint64(statusCode)) + } + + return nil +} + +func (g *grpcServer) newCodec(ct string) (codec.Codec, error) { + g.RLock() + defer g.RUnlock() + + if idx := strings.IndexRune(ct, ';'); idx >= 0 { + ct = ct[:idx] + } + + if c, ok := g.opts.Codecs[ct]; ok { + return c, nil + } + + return nil, codec.ErrUnknownContentType +} + +func (g *grpcServer) Options() server.Options { + g.RLock() + opts := g.opts + g.RUnlock() + + return opts +} + +func (g *grpcServer) Init(opts ...server.Option) error { + if len(opts) == 0 && g.init { + return nil + } + return g.configure(opts...) +} + +func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { + return newRPCHandler(h, opts...) +} + +func (g *grpcServer) Handle(h server.Handler) error { + if err := g.rpc.register(h.Handler()); err != nil { + return err + } + + g.handlers[h.Name()] = h + return nil +} + +func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber { + return newSubscriber(topic, sb, opts...) +} + +func (g *grpcServer) Subscribe(sb server.Subscriber) error { + sub, ok := sb.(*subscriber) + if !ok { + return fmt.Errorf("invalid subscriber: expected *subscriber") + } + if len(sub.handlers) == 0 { + return fmt.Errorf("invalid subscriber: no handler functions") + } + + if err := server.ValidateSubscriber(sb); err != nil { + return err + } + + g.Lock() + if _, ok = g.subscribers[sub]; ok { + g.Unlock() + return fmt.Errorf("subscriber %v already exists", sub) + } + + g.subscribers[sub] = nil + g.Unlock() + return nil +} + +func (g *grpcServer) Register() error { + g.RLock() + rsvc := g.rsvc + config := g.opts + g.RUnlock() + + // if service already filled, reuse it and return early + if rsvc != nil { + if err := server.DefaultRegisterFunc(rsvc, config); err != nil { + return err + } + return nil + } + + service, err := server.NewRegisterService(g) + if err != nil { + return err + } + + g.RLock() + // Maps are ordered randomly, sort the keys for consistency + handlerList := make([]string, 0, len(g.handlers)) + for n := range g.handlers { + // Only advertise non internal handlers + handlerList = append(handlerList, n) + } + + sort.Strings(handlerList) + + subscriberList := make([]*subscriber, 0, len(g.subscribers)) + for e := range g.subscribers { + // Only advertise non internal subscribers + subscriberList = append(subscriberList, e) + } + sort.Slice(subscriberList, func(i, j int) bool { + return subscriberList[i].topic > subscriberList[j].topic + }) + + endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList)) + for _, n := range handlerList { + endpoints = append(endpoints, g.handlers[n].Endpoints()...) + } + for _, e := range subscriberList { + endpoints = append(endpoints, e.Endpoints()...) + } + g.RUnlock() + + service.Nodes[0].Metadata["protocol"] = "grpc" + service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"] + service.Endpoints = endpoints + + g.RLock() + registered := g.registered + g.RUnlock() + + if !registered { + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID) + } + } + + // register the service + if err := server.DefaultRegisterFunc(service, config); err != nil { + return err + } + + // already registered? don't need to register subscribers + if registered { + return nil + } + + g.Lock() + defer g.Unlock() + + for sb := range g.subscribers { + handler := g.createSubHandler(sb, config) + var opts []broker.SubscribeOption + if queue := sb.Options().Queue; len(queue) > 0 { + opts = append(opts, broker.SubscribeGroup(queue)) + } + + subCtx := config.Context + if cx := sb.Options().Context; cx != nil { + subCtx = cx + } + opts = append(opts, broker.SubscribeContext(subCtx)) + opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic()) + } + sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) + if err != nil { + return err + } + g.subscribers[sb] = []broker.Subscriber{sub} + } + + g.registered = true + g.rsvc = service + + return nil +} + +func (g *grpcServer) Deregister() error { + var err error + + g.RLock() + config := g.opts + g.RUnlock() + + service, err := server.NewRegisterService(g) + if err != nil { + return err + } + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID) + } + + if err := server.DefaultDeregisterFunc(service, config); err != nil { + return err + } + + g.Lock() + g.rsvc = nil + + if !g.registered { + g.Unlock() + return nil + } + + g.registered = false + + wg := sync.WaitGroup{} + for sb, subs := range g.subscribers { + for _, sub := range subs { + wg.Add(1) + go func(s broker.Subscriber) { + defer wg.Done() + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", s.Topic()) + } + if err := s.Unsubscribe(g.opts.Context); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Unsubscribing from topic: %s err: %v", s.Topic(), err) + } + } + }(sub) + } + g.subscribers[sb] = nil + } + wg.Wait() + + g.Unlock() + return nil +} + +func (g *grpcServer) Start() error { + g.RLock() + if g.started { + g.RUnlock() + return nil + } + g.RUnlock() + + config := g.Options() + + // micro: config.Transport.Listen(config.Address) + var ts net.Listener + + if l := config.Listener; l != nil { + ts = l + } else { + var err error + + // check the tls config for secure connect + if tc := config.TLSConfig; tc != nil { + ts, err = tls.Listen("tcp", config.Address, tc) + // otherwise just plain tcp listener + } else { + ts, err = net.Listen("tcp", config.Address) + } + if err != nil { + return err + } + } + + if config.MaxConn > 0 { + ts = netutil.LimitListener(ts, config.MaxConn) + } + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Server [grpc] Listening on %s", ts.Addr().String()) + } + g.Lock() + g.opts.Address = ts.Addr().String() + if len(g.opts.Advertise) == 0 { + g.opts.Advertise = ts.Addr().String() + } + g.Unlock() + + // only connect if we're subscribed + if len(g.subscribers) > 0 { + // connect to the broker + if err := config.Broker.Connect(config.Context); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Broker [%s] connect error: %v", config.Broker.String(), err) + } + return err + } + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + } + } + + // use RegisterCheck func before register + // nolint: nestif + if err := g.opts.RegisterCheck(config.Context); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err) + } + } else { + // announce self to the world + if err := g.Register(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server register error: %v", err) + } + } + } + + // micro: go ts.Accept(s.accept) + go func() { + if err := g.srv.Serve(ts); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "gRPC Server start error: %v", err) + } + if err := g.Stop(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "gRPC Server stop error: %v", err) + } + } + } + }() + + go func() { + t := new(time.Ticker) + + // only process if it exists + if g.opts.RegisterInterval > time.Duration(0) { + // new ticker + t = time.NewTicker(g.opts.RegisterInterval) + } + + // return error chan + var ch chan error + + Loop: + for { + select { + // register self on interval + case <-t.C: + g.RLock() + registered := g.registered + g.RUnlock() + rerr := g.opts.RegisterCheck(g.opts.Context) + // nolint: nestif + if rerr != nil && registered { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr) + } + // deregister self in case of error + if err := g.Deregister(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err) + } + } + } else if rerr != nil && !registered { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr) + } + continue + } + if err := g.Register(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err) + } + } + // wait for exit + case ch = <-g.exit: + break Loop + } + } + + // deregister self + if err := g.Deregister(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server deregister error: %v", err) + } + } + + // wait for waitgroup + if g.wg != nil { + g.wg.Wait() + } + + // stop the grpc server + exit := make(chan bool) + + go func() { + g.srv.GracefulStop() + close(exit) + }() + + select { + case <-exit: + case <-time.After(time.Second): + g.srv.Stop() + } + + // close transport + ch <- nil + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + } + // disconnect broker + if err := config.Broker.Disconnect(config.Context); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Broker [%s] disconnect error: %v", config.Broker.String(), err) + } + } + }() + + // mark the server as started + g.Lock() + g.started = true + g.Unlock() + + return nil +} + +func (g *grpcServer) Stop() error { + g.RLock() + if !g.started { + g.RUnlock() + return nil + } + g.RUnlock() + + ch := make(chan error) + g.exit <- ch + + err := <-ch + g.Lock() + g.rsvc = nil + g.started = false + g.Unlock() + + return err +} + +func (g *grpcServer) String() string { + return "grpc" +} + +func (g *grpcServer) Name() string { + return g.opts.Name +} + +func NewServer(opts ...server.Option) server.Server { + return newGRPCServer(opts...) +} diff --git a/error.go b/error.go new file mode 100644 index 0000000..92a6e5a --- /dev/null +++ b/error.go @@ -0,0 +1,236 @@ +package drpc + +import ( + "context" + "io" + "net/http" + "os" + + "github.com/unistack-org/micro/v3/errors" +) + +type StatusCode uint64 + +const ( + // OK is returned on success. + StatusOK StatusCode = 0 + + // Canceled indicates the operation was canceled (typically by the caller). + // + // The gRPC framework will generate this error code when cancellation + // is requested. + StatusCanceled StatusCode = 1 + + // Unknown error. An example of where this error may be returned is + // if a Status value received from another address space belongs to + // an error-space that is not known in this address space. Also + // errors raised by APIs that do not return enough error information + // may be converted to this error. + // + // The gRPC framework will generate this error code in the above two + // mentioned cases. + StatusUnknown StatusCode = 2 + + // InvalidArgument indicates client specified an invalid argument. + // Note that this differs from FailedPrecondition. It indicates arguments + // that are problematic regardless of the state of the system + // (e.g., a malformed file name). + // + // This error code will not be generated by the gRPC framework. + StatusInvalidArgument StatusCode = 3 + + // DeadlineExceeded means operation expired before completion. + // For operations that change the state of the system, this error may be + // returned even if the operation has completed successfully. For + // example, a successful response from a server could have been delayed + // long enough for the deadline to expire. + // + // The gRPC framework will generate this error code when the deadline is + // exceeded. + StatusDeadlineExceeded StatusCode = 4 + + // NotFound means some requested entity (e.g., file or directory) was + // not found. + // + // This error code will not be generated by the gRPC framework. + StatusNotFound StatusCode = 5 + + // AlreadyExists means an attempt to create an entity failed because one + // already exists. + // + // This error code will not be generated by the gRPC framework. + StatusAlreadyExists StatusCode = 6 + + // PermissionDenied indicates the caller does not have permission to + // execute the specified operation. It must not be used for rejections + // caused by exhausting some resource (use ResourceExhausted + // instead for those errors). It must not be + // used if the caller cannot be identified (use Unauthenticated + // instead for those errors). + // + // This error code will not be generated by the gRPC core framework, + // but expect authentication middleware to use it. + StatusPermissionDenied StatusCode = 7 + + // ResourceExhausted indicates some resource has been exhausted, perhaps + // a per-user quota, or perhaps the entire file system is out of space. + // + // This error code will be generated by the gRPC framework in + // out-of-memory and server overload situations, or when a message is + // larger than the configured maximum size. + StatusResourceExhausted StatusCode = 8 + + // FailedPrecondition indicates operation was rejected because the + // system is not in a state required for the operation's execution. + // For example, directory to be deleted may be non-empty, an rmdir + // operation is applied to a non-directory, etc. + // + // A litmus test that may help a service implementor in deciding + // between FailedPrecondition, Aborted, and Unavailable: + // (a) Use Unavailable if the client can retry just the failing call. + // (b) Use Aborted if the client should retry at a higher-level + // (e.g., restarting a read-modify-write sequence). + // (c) Use FailedPrecondition if the client should not retry until + // the system state has been explicitly fixed. E.g., if an "rmdir" + // fails because the directory is non-empty, FailedPrecondition + // should be returned since the client should not retry unless + // they have first fixed up the directory by deleting files from it. + // (d) Use FailedPrecondition if the client performs conditional + // REST Get/Update/Delete on a resource and the resource on the + // server does not match the condition. E.g., conflicting + // read-modify-write on the same resource. + // + // This error code will not be generated by the gRPC framework. + StatusFailedPrecondition StatusCode = 9 + + // Aborted indicates the operation was aborted, typically due to a + // concurrency issue like sequencer check failures, transaction aborts, + // etc. + // + // See litmus test above for deciding between FailedPrecondition, + // Aborted, and Unavailable. + // + // This error code will not be generated by the gRPC framework. + StatusAborted StatusCode = 10 + + // OutOfRange means operation was attempted past the valid range. + // E.g., seeking or reading past end of file. + // + // Unlike InvalidArgument, this error indicates a problem that may + // be fixed if the system state changes. For example, a 32-bit file + // system will generate InvalidArgument if asked to read at an + // offset that is not in the range [0,2^32-1], but it will generate + // OutOfRange if asked to read from an offset past the current + // file size. + // + // There is a fair bit of overlap between FailedPrecondition and + // OutOfRange. We recommend using OutOfRange (the more specific + // error) when it applies so that callers who are iterating through + // a space can easily look for an OutOfRange error to detect when + // they are done. + // + // This error code will not be generated by the gRPC framework. + StatusOutOfRange StatusCode = 11 + + // Unimplemented indicates operation is not implemented or not + // supported/enabled in this service. + // + // This error code will be generated by the gRPC framework. Most + // commonly, you will see this error code when a method implementation + // is missing on the server. It can also be generated for unknown + // compression algorithms or a disagreement as to whether an RPC should + // be streaming. + StatusUnimplemented StatusCode = 12 + + // Internal errors. Means some invariants expected by underlying + // system has been broken. If you see one of these errors, + // something is very broken. + // + // This error code will be generated by the gRPC framework in several + // internal error conditions. + StatusInternal StatusCode = 13 + + // Unavailable indicates the service is currently unavailable. + // This is a most likely a transient condition and may be corrected + // by retrying with a backoff. Note that it is not always safe to retry + // non-idempotent operations. + // + // See litmus test above for deciding between FailedPrecondition, + // Aborted, and Unavailable. + // + // This error code will be generated by the gRPC framework during + // abrupt shutdown of a server process or network connection. + StatusUnavailable StatusCode = 14 + + // DataLoss indicates unrecoverable data loss or corruption. + // + // This error code will not be generated by the gRPC framework. + StatusDataLoss StatusCode = 15 + + // Unauthenticated indicates the request does not have valid + // authentication credentials for the operation. + // + // The gRPC framework will generate this error code when the + // authentication metadata is invalid or a Credentials callback fails, + // but also expect authentication middleware to generate it. + StatusUnauthenticated StatusCode = 16 +) + +var errMapping = map[int32]StatusCode{ + http.StatusOK: StatusOK, + http.StatusBadRequest: StatusInvalidArgument, + http.StatusRequestTimeout: StatusDeadlineExceeded, + http.StatusNotFound: StatusNotFound, + http.StatusConflict: StatusAlreadyExists, + http.StatusForbidden: StatusPermissionDenied, + http.StatusUnauthorized: StatusUnauthenticated, + http.StatusPreconditionFailed: StatusFailedPrecondition, + http.StatusNotImplemented: StatusUnimplemented, + http.StatusInternalServerError: StatusInternal, + http.StatusServiceUnavailable: StatusUnavailable, +} + +// convertCode converts a standard Go error into its canonical code. Note that +// this is only used to translate the error returned by the server applications. +func convertCode(err error) StatusCode { + switch err { + case nil: + return StatusOK + case io.EOF: + return StatusOutOfRange + case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF: + return StatusFailedPrecondition + case os.ErrInvalid: + return StatusInvalidArgument + case context.Canceled: + return StatusCanceled + case context.DeadlineExceeded: + return StatusDeadlineExceeded + } + switch { + case os.IsExist(err): + return StatusAlreadyExists + case os.IsNotExist(err): + return StatusNotFound + case os.IsPermission(err): + return StatusPermissionDenied + } + return StatusUnknown +} + +func microError(err error) StatusCode { + if err == nil { + return StatusOK + } + + var ec int32 + if verr, ok := err.(*errors.Error); ok { + ec = verr.Code + } + + if code, ok := errMapping[ec]; ok { + return code + } + + return StatusUnknown +} diff --git a/go.mod b/go.mod index ddee484..a3c0826 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,13 @@ -module github.com/unistack-org/micro-server-drpc/v3 +module github.com/unistack-org/micro-server-grpc/v3 go 1.16 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/unistack-org/micro/v3 v3.3.17 + golang.org/x/net v0.0.0-20210510120150-4163338589ed + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + google.golang.org/grpc v1.37.1 + google.golang.org/protobuf v1.26.0 + storj.io/drpc v0.0.23 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..284db5c --- /dev/null +++ b/go.sum @@ -0,0 +1,115 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/unistack-org/micro/v3 v3.3.17 h1:WcyS7InP0DlS/JpRQGLh5sG6VstkdHJbgpMp+gmHmwg= +github.com/unistack-org/micro/v3 v3.3.17/go.mod h1:022EOEZZ789hZY3yB5ZSMXU6jLiadBgcNB/cpediV3c= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g= +github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I= +golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.37.1 h1:ARnQJNWxGyYJpdf/JXscNlQr/uv607ZPU9Z7ogHi+iI= +google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +storj.io/drpc v0.0.23 h1:MHVOMVoBbQkPuL17GerlGTA9PLwE1HF6zAOYFvWwYG4= +storj.io/drpc v0.0.23/go.mod h1:OSJH7wvH3yKlhnMHwblKJioaaeyI6X8xbXT1SG9woe8= diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..5505cec --- /dev/null +++ b/handler.go @@ -0,0 +1,60 @@ +package drpc + +import ( + "reflect" + + "github.com/unistack-org/micro/v3/register" + "github.com/unistack-org/micro/v3/server" +) + +type rpcHandler struct { + opts server.HandlerOptions + handler interface{} + name string + endpoints []*register.Endpoint +} + +func newRPCHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { + options := server.NewHandlerOptions(opts...) + + typ := reflect.TypeOf(handler) + hdlr := reflect.ValueOf(handler) + name := reflect.Indirect(hdlr).Type().Name() + + var endpoints []*register.Endpoint + + for m := 0; m < typ.NumMethod(); m++ { + if e := register.ExtractEndpoint(typ.Method(m)); e != nil { + e.Name = name + "." + e.Name + + for k, v := range options.Metadata[e.Name] { + e.Metadata[k] = v + } + + endpoints = append(endpoints, e) + } + } + + return &rpcHandler{ + name: name, + handler: handler, + endpoints: endpoints, + opts: options, + } +} + +func (r *rpcHandler) Name() string { + return r.name +} + +func (r *rpcHandler) Handler() interface{} { + return r.handler +} + +func (r *rpcHandler) Endpoints() []*register.Endpoint { + return r.endpoints +} + +func (r *rpcHandler) Options() server.HandlerOptions { + return r.opts +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..13cfbb8 --- /dev/null +++ b/options.go @@ -0,0 +1,17 @@ +package drpc + +import ( + "github.com/unistack-org/micro/v3/server" +) + +type ( + maxMsgSizeKey struct{} +) + +// +// MaxMsgSize set the maximum message in bytes the server can receive and +// send. Default maximum message size is 4 MB. +// +func MaxMsgSize(s int) server.Option { + return server.SetOption(maxMsgSizeKey{}, s) +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..9cff051 --- /dev/null +++ b/request.go @@ -0,0 +1,100 @@ +package drpc + +import ( + "io" + + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/server" +) + +var ( + _ server.Request = &rpcRequest{} + _ server.Message = &rpcMessage{} +) + +type rpcRequest struct { + rw io.ReadWriter + payload interface{} + codec codec.Codec + header metadata.Metadata + method string + endpoint string + contentType string + service string + body []byte + stream bool +} + +type rpcMessage struct { + payload interface{} + codec codec.Codec + header metadata.Metadata + topic string + contentType string + body []byte +} + +func (r *rpcRequest) ContentType() string { + return r.contentType +} + +func (r *rpcRequest) Service() string { + return r.service +} + +func (r *rpcRequest) Method() string { + return r.method +} + +func (r *rpcRequest) Endpoint() string { + return r.endpoint +} + +func (r *rpcRequest) Codec() codec.Codec { + return r.codec +} + +func (r *rpcRequest) Header() metadata.Metadata { + return r.header +} + +func (r *rpcRequest) Read() ([]byte, error) { + f := &codec.Frame{} + if err := r.codec.ReadBody(r.rw, f); err != nil { + return nil, err + } + return f.Data, nil +} + +func (r *rpcRequest) Stream() bool { + return r.stream +} + +func (r *rpcRequest) Body() interface{} { + return r.payload +} + +func (r *rpcMessage) ContentType() string { + return r.contentType +} + +func (r *rpcMessage) Topic() string { + return r.topic +} + +func (r *rpcMessage) Payload() interface{} { + return r.payload +} + +func (r *rpcMessage) Header() metadata.Metadata { + return r.header +} + +func (r *rpcMessage) Body() []byte { + return r.body +} + +func (r *rpcMessage) Codec() codec.Codec { + return r.codec +} diff --git a/response.go b/response.go new file mode 100644 index 0000000..a94bf46 --- /dev/null +++ b/response.go @@ -0,0 +1,34 @@ +package drpc + +import ( + "io" + + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/server" +) + +var _ server.Response = &rpcResponse{} + +type rpcResponse struct { + rw io.ReadWriter + header metadata.Metadata + codec codec.Codec +} + +func (r *rpcResponse) Codec() codec.Codec { + return r.codec +} + +func (r *rpcResponse) WriteHeader(hdr metadata.Metadata) { + for k, v := range hdr { + r.header[k] = v + } +} + +func (r *rpcResponse) Write(b []byte) error { + return r.codec.Write(r.rw, &codec.Message{ + Header: r.header, + Body: b, + }, nil) +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..7ae5b54 --- /dev/null +++ b/server.go @@ -0,0 +1,171 @@ +package drpc + +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// Meh, we need to get rid of this shit + +import ( + "context" + "fmt" + "reflect" + "sync" + "unicode" + "unicode/utf8" + + "github.com/unistack-org/micro/v3/server" +) + +// Precompute the reflect type for error. Can't use error directly +// because Typeof takes an empty interface value. This is annoying. +var typeOfError = reflect.TypeOf((*error)(nil)).Elem() + +type methodType struct { + ArgType reflect.Type + ReplyType reflect.Type + ContextType reflect.Type + method reflect.Method + stream bool +} + +// type reflectionType func(context.Context, server.Stream) error + +type service struct { + typ reflect.Type + method map[string]*methodType + rcvr reflect.Value + name string +} + +// server represents an RPC Server. +type rServer struct { + serviceMap map[string]*service + mu sync.RWMutex + // reflection bool +} + +// Is this an exported - upper case - name? +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +// Is this type exported or a builtin? +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +// prepareEndpoint() returns a methodType for the provided method or nil +// in case if the method was unsuitable. +func prepareEndpoint(method reflect.Method) (*methodType, error) { + mtype := method.Type + mname := method.Name + var replyType, argType, contextType reflect.Type + var stream bool + + // Endpoint() must be exported. + if method.PkgPath != "" { + return nil, fmt.Errorf("Endpoint must be exported") + } + + switch mtype.NumIn() { + case 3: + // assuming streaming + argType = mtype.In(2) + contextType = mtype.In(1) + stream = true + case 4: + // method that takes a context + argType = mtype.In(2) + replyType = mtype.In(3) + contextType = mtype.In(1) + default: + return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) + } + + switch stream { + case true: + // check stream type + streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() + if !argType.Implements(streamType) { + return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) + } + default: + // First arg need not be a pointer. + if !isExportedOrBuiltinType(argType) { + return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType) + } + + if replyType.Kind() != reflect.Ptr { + return nil, fmt.Errorf("method %v reply type not a pointer: %v", mname, replyType) + } + + // Reply type must be exported. + if !isExportedOrBuiltinType(replyType) { + return nil, fmt.Errorf("method %v reply type not exported: %v", mname, replyType) + } + } + + // Endpoint() needs one out. + if mtype.NumOut() != 1 { + return nil, fmt.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut()) + } + // The return type of the method must be error. + if returnType := mtype.Out(0); returnType != typeOfError { + return nil, fmt.Errorf("method %v returns %v not error", mname, returnType.String()) + } + return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil +} + +func (server *rServer) register(rcvr interface{}) error { + server.mu.Lock() + defer server.mu.Unlock() + if server.serviceMap == nil { + server.serviceMap = make(map[string]*service) + } + s := &service{} + s.typ = reflect.TypeOf(rcvr) + s.rcvr = reflect.ValueOf(rcvr) + sname := reflect.Indirect(s.rcvr).Type().Name() + if sname == "" { + return fmt.Errorf("rpc: no service name for type %v", s.typ.String()) + } + if !isExported(sname) { + return fmt.Errorf("rpc Register: type %s is not exported", sname) + } + if _, present := server.serviceMap[sname]; present { + return fmt.Errorf("rpc: service already defined: " + sname) + } + s.name = sname + s.method = make(map[string]*methodType) + + // Install the methods + for m := 0; m < s.typ.NumMethod(); m++ { + method := s.typ.Method(m) + mt, err := prepareEndpoint(method) + if mt != nil && err == nil { + s.method[method.Name] = mt + } else if err != nil { + return err + } + } + + if len(s.method) == 0 { + return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname) + } + server.serviceMap[s.name] = s + return nil +} + +func (m *methodType) prepareContext(ctx context.Context) reflect.Value { + if contextv := reflect.ValueOf(ctx); contextv.IsValid() { + return contextv + } + return reflect.Zero(m.ContextType) +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..89c4c6a --- /dev/null +++ b/stream.go @@ -0,0 +1,40 @@ +package drpc + +import ( + "context" + + "github.com/unistack-org/micro/v3/server" + "google.golang.org/grpc" +) + +// rpcStream implements a server side Stream. +type rpcStream struct { + // embed the grpc stream so we can access it + grpc.ServerStream + + request server.Request +} + +func (r *rpcStream) Close() error { + return nil +} + +func (r *rpcStream) Error() error { + return nil +} + +func (r *rpcStream) Request() server.Request { + return r.request +} + +func (r *rpcStream) Context() context.Context { + return r.ServerStream.Context() +} + +func (r *rpcStream) Send(m interface{}) error { + return r.ServerStream.SendMsg(m) +} + +func (r *rpcStream) Recv(m interface{}) error { + return r.ServerStream.RecvMsg(m) +} diff --git a/subscriber.go b/subscriber.go new file mode 100644 index 0000000..bf36f54 --- /dev/null +++ b/subscriber.go @@ -0,0 +1,232 @@ +package drpc + +import ( + "context" + "fmt" + "reflect" + "runtime/debug" + "strings" + + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/errors" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/register" + "github.com/unistack-org/micro/v3/server" +) + +type handler struct { + reqType reflect.Type + ctxType reflect.Type + method reflect.Value +} + +type subscriber struct { + topic string + rcvr reflect.Value + typ reflect.Type + subscriber interface{} + handlers []*handler + endpoints []*register.Endpoint + opts server.SubscriberOptions +} + +func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { + options := server.NewSubscriberOptions(opts...) + + var endpoints []*register.Endpoint + var handlers []*handler + + if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { + h := &handler{ + method: reflect.ValueOf(sub), + } + + switch typ.NumIn() { + case 1: + h.reqType = typ.In(0) + case 2: + h.ctxType = typ.In(0) + h.reqType = typ.In(1) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®ister.Endpoint{ + Name: "Func", + Request: register.ExtractSubValue(typ), + Metadata: map[string]string{ + "topic": topic, + "subscriber": "true", + }, + }) + } else { + hdlr := reflect.ValueOf(sub) + name := reflect.Indirect(hdlr).Type().Name() + + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + h := &handler{ + method: method.Func, + } + + switch method.Type.NumIn() { + case 2: + h.reqType = method.Type.In(1) + case 3: + h.ctxType = method.Type.In(1) + h.reqType = method.Type.In(2) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®ister.Endpoint{ + Name: name + "." + method.Name, + Request: register.ExtractSubValue(method.Type), + Metadata: map[string]string{ + "topic": topic, + "subscriber": "true", + }, + }) + } + } + + return &subscriber{ + rcvr: reflect.ValueOf(sub), + typ: reflect.TypeOf(sub), + topic: topic, + subscriber: sub, + handlers: handlers, + endpoints: endpoints, + opts: options, + } +} + +func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { + return func(p broker.Event) (err error) { + defer func() { + if r := recover(); r != nil { + if g.opts.Logger.V(logger.ErrorLevel) { + g.opts.Logger.Error(g.opts.Context, "panic recovered: ", r) + g.opts.Logger.Error(g.opts.Context, string(debug.Stack())) + } + err = errors.InternalServerError(g.opts.Name+".subscriber", "panic recovered: %v", r) + } + }() + + msg := p.Message() + // if we don't have headers, create empty map + if msg.Header == nil { + msg.Header = make(map[string]string) + } + + ct := msg.Header["Content-Type"] + if len(ct) == 0 { + msg.Header["Content-Type"] = defaultContentType + ct = defaultContentType + } + cf, err := g.newCodec(ct) + if err != nil { + return err + } + + hdr := make(map[string]string, len(msg.Header)) + for k, v := range msg.Header { + if k == "Content-Type" { + continue + } + hdr[k] = v + } + + ctx := metadata.NewIncomingContext(sb.opts.Context, hdr) + + results := make(chan error, len(sb.handlers)) + + for i := 0; i < len(sb.handlers); i++ { + handler := sb.handlers[i] + + var isVal bool + var req reflect.Value + + if handler.reqType.Kind() == reflect.Ptr { + req = reflect.New(handler.reqType.Elem()) + } else { + req = reflect.New(handler.reqType) + isVal = true + } + if isVal { + req = req.Elem() + } + + if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil { + return err + } + + fn := func(ctx context.Context, msg server.Message) error { + var vals []reflect.Value + if sb.typ.Kind() != reflect.Func { + vals = append(vals, sb.rcvr) + } + if handler.ctxType != nil { + vals = append(vals, reflect.ValueOf(ctx)) + } + + vals = append(vals, reflect.ValueOf(msg.Payload())) + + returnValues := handler.method.Call(vals) + if rerr := returnValues[0].Interface(); rerr != nil { + return rerr.(error) + } + return nil + } + + for i := len(opts.SubWrappers); i > 0; i-- { + fn = opts.SubWrappers[i-1](fn) + } + + if g.wg != nil { + g.wg.Add(1) + } + go func() { + if g.wg != nil { + defer g.wg.Done() + } + cerr := fn(ctx, &rpcMessage{ + topic: sb.topic, + contentType: ct, + payload: req.Interface(), + header: msg.Header, + body: msg.Body, + }) + results <- cerr + }() + } + var errors []string + for i := 0; i < len(sb.handlers); i++ { + if rerr := <-results; rerr != nil { + errors = append(errors, rerr.Error()) + } + } + if len(errors) > 0 { + err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + } + + return err + } +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Subscriber() interface{} { + return s.subscriber +} + +func (s *subscriber) Endpoints() []*register.Endpoint { + return s.endpoints +} + +func (s *subscriber) Options() server.SubscriberOptions { + return s.opts +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..8d74a08 --- /dev/null +++ b/util.go @@ -0,0 +1,59 @@ +package drpc + +import ( + "fmt" + "strings" +) + +// ServiceMethod converts a gRPC method to a Go method +// Input: +// Foo.Bar, /Foo/Bar, /package.Foo/Bar, /a.package.Foo/Bar +// Output: +// [Foo, Bar] +func serviceMethod(m string) (string, string, error) { + if len(m) == 0 { + return "", "", fmt.Errorf("malformed method name: %q", m) + } + + // grpc method + if m[0] == '/' { + // [ , Foo, Bar] + // [ , package.Foo, Bar] + // [ , a.package.Foo, Bar] + parts := strings.Split(m, "/") + if len(parts) != 3 || len(parts[1]) == 0 || len(parts[2]) == 0 { + return "", "", fmt.Errorf("malformed method name: %q", m) + } + service := strings.Split(parts[1], ".") + return service[len(service)-1], parts[2], nil + } + + // non grpc method + parts := strings.Split(m, ".") + + // expect [Foo, Bar] + if len(parts) != 2 { + return "", "", fmt.Errorf("malformed method name: %q", m) + } + + return parts[0], parts[1], nil +} + +/* +// ServiceFromMethod returns the service +// /service.Foo/Bar => service +func serviceFromMethod(m string) string { + if len(m) == 0 { + return m + } + if m[0] != '/' { + return m + } + parts := strings.Split(m, "/") + if len(parts) < 3 { + return m + } + parts = strings.Split(parts[1], ".") + return strings.Join(parts[:len(parts)-1], ".") +} +*/ diff --git a/util_test.go b/util_test.go new file mode 100644 index 0000000..c33b26d --- /dev/null +++ b/util_test.go @@ -0,0 +1,46 @@ +package drpc + +import ( + "testing" +) + +func TestServiceMethod(t *testing.T) { + type testCase struct { + input string + service string + method string + err bool + } + + methods := []testCase{ + {"Foo.Bar", "Foo", "Bar", false}, + {"/Foo/Bar", "Foo", "Bar", false}, + {"/package.Foo/Bar", "Foo", "Bar", false}, + {"/a.package.Foo/Bar", "Foo", "Bar", false}, + {"a.package.Foo/Bar", "", "", true}, + {"/Foo/Bar/Baz", "", "", true}, + {"Foo.Bar.Baz", "", "", true}, + } + for _, test := range methods { + service, method, err := serviceMethod(test.input) + if err != nil && test.err == true { + continue + } + // unexpected error + if err != nil && test.err == false { + t.Fatalf("unexpected err %v for %+v", err, test) + } + // expecter error + if test.err == true && err == nil { + t.Fatalf("expected error for %+v: got service: %s method: %s", test, service, method) + } + + if service != test.service { + t.Fatalf("wrong service for %+v: got service: %s method: %s", test, service, method) + } + + if method != test.method { + t.Fatalf("wrong method for %+v: got service: %s method: %s", test, service, method) + } + } +}