diff --git a/LICENSE b/LICENSE index 261eeb9..6e701f1 100644 --- a/LICENSE +++ b/LICENSE @@ -1,3 +1,4 @@ + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ @@ -175,18 +176,8 @@ END OF TERMS AND CONDITIONS - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] + Copyright 2015-2020 Asim Aslam. + Copyright 2019-2020 Unistack LLC. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/codec.go b/codec.go new file mode 100644 index 0000000..204609f --- /dev/null +++ b/codec.go @@ -0,0 +1,54 @@ +package drpc + +import ( + "io" + + "github.com/unistack-org/micro/v3/codec" + "storj.io/drpc" +) + +type wrapMicroCodec struct{ codec.Codec } + +func (w *wrapMicroCodec) Marshal(v drpc.Message) ([]byte, error) { + if m, ok := v.(*codec.Frame); ok { + return m.Data, nil + } + return w.Codec.Marshal(v.(interface{})) +} + +func (w *wrapMicroCodec) Unmarshal(d []byte, v drpc.Message) error { + if d == nil || v == nil { + return nil + } + if m, ok := v.(*codec.Frame); ok { + m.Data = d + return nil + } + return w.Codec.Unmarshal(d, v.(interface{})) +} + +func (w *wrapMicroCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error { + return nil +} + +func (w *wrapMicroCodec) ReadBody(conn io.Reader, v interface{}) error { + if m, ok := v.(*codec.Frame); ok { + _, err := conn.Read(m.Data) + return err + } + return codec.ErrInvalidMessage +} + +func (w *wrapMicroCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error { + // if we don't have a body + if v != nil { + b, err := w.Marshal(v) + if err != nil { + return err + } + m.Body = b + } + // write the body using the framing codec + _, err := conn.Write(m.Body) + return err +} diff --git a/drpc.go b/drpc.go new file mode 100644 index 0000000..d66bedf --- /dev/null +++ b/drpc.go @@ -0,0 +1,961 @@ +// Package drpc provides a drpc server +package drpc + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "reflect" + "runtime/debug" + "sort" + "strconv" + "strings" + "sync" + "time" + + // nolint: staticcheck + + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/errors" + "github.com/unistack-org/micro/v3/logger" + metadata "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/register" + "github.com/unistack-org/micro/v3/server" + "golang.org/x/net/netutil" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "storj.io/drpc/drpcerr" + gmetadata "storj.io/drpc/drpcmetadata" +) + +const ( + defaultContentType = "application/grpc+proto" +) + +/* +type grpcServerReflection struct { + srv *grpc.Server + s *serverReflectionServer +} +*/ + +type grpcServer struct { + handlers map[string]server.Handler + srv *grpc.Server + exit chan chan error + wg *sync.WaitGroup + rsvc *register.Service + subscribers map[*subscriber][]broker.Subscriber + rpc *rServer + opts server.Options + sync.RWMutex + init bool + started bool + registered bool +} + +func newGRPCServer(opts ...server.Option) server.Server { + // create a grpc server + g := &grpcServer{ + opts: server.NewOptions(opts...), + rpc: &rServer{ + serviceMap: make(map[string]*service), + }, + handlers: make(map[string]server.Handler), + subscribers: make(map[*subscriber][]broker.Subscriber), + exit: make(chan chan error), + } + + return g +} + +/* +type grpcRouter struct { + h func(context.Context, server.Request, interface{}) error + m func(context.Context, server.Message) error +} + +func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error { + return r.m(ctx, msg) +} + +func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { + return r.h(ctx, req, rsp) +} + +*/ + +func (g *grpcServer) configure(opts ...server.Option) error { + g.Lock() + defer g.Unlock() + + for _, o := range opts { + o(&g.opts) + } + + if err := g.opts.Register.Init(); err != nil { + return err + } + if err := g.opts.Broker.Init(); err != nil { + return err + } + if err := g.opts.Tracer.Init(); err != nil { + return err + } + if err := g.opts.Auth.Init(); err != nil { + return err + } + if err := g.opts.Logger.Init(); err != nil { + return err + } + if err := g.opts.Meter.Init(); err != nil { + return err + } + if err := g.opts.Transport.Init(); err != nil { + return err + } + + g.wg = g.opts.Wait + + maxMsgSize := g.getMaxMsgSize() + + gopts := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(maxMsgSize), + grpc.MaxSendMsgSize(maxMsgSize), + grpc.UnknownServiceHandler(g.handler), + } + + /* + if creds := g.getCredentials(); creds != nil { + gopts = append(gopts, grpc.Creds(creds)) + } + + if opts := g.getGrpcOptions(); opts != nil { + gopts = append(gopts, opts...) + } + */ + + g.rsvc = nil + restart := false + if g.started { + restart = true + if err := g.Stop(); err != nil { + return err + } + } + g.srv = grpc.NewServer(gopts...) + + if restart { + return g.Start() + } + + g.init = true + + return nil +} + +func (g *grpcServer) getMaxMsgSize() int { + if g.opts.Context == nil { + return codec.DefaultMaxMsgSize + } + s, ok := g.opts.Context.Value(maxMsgSizeKey{}).(int) + if !ok { + return codec.DefaultMaxMsgSize + } + return s +} + +/* +func (g *grpcServer) getCredentials() credentials.TransportCredentials { + if g.opts.TLSConfig != nil { + return credentials.NewTLS(g.opts.TLSConfig) + } + return nil +} +*/ +/* +func (g *grpcServer) getGrpcOptions() []grpc.ServerOption { + if g.opts.Context == nil { + return nil + } + + opts, ok := g.opts.Context.Value(grpcOptions{}).([]grpc.ServerOption) + if !ok || opts == nil { + return nil + } + + return opts +} +*/ + +func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) { + defer func() { + if r := recover(); r != nil { + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error(config.Context, "panic recovered: ", r) + config.Logger.Error(config.Context, string(debug.Stack())) + } + err = errors.InternalServerError(g.opts.Name, "panic recovered: %v", r) + } else if err != nil { + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "grpc handler got error: %s", err) + } + } + }() + + if g.wg != nil { + g.wg.Add(1) + defer g.wg.Done() + } + + fullMethod, ok := grpc.MethodFromServerStream(stream) + if !ok { + return status.Errorf(codes.Internal, "method does not exist in context") + } + + serviceName, methodName, err := serviceMethod(fullMethod) + if err != nil { + return status.New(StatusInvalidArgument, err.Error()).Err() + } + + // get drpc metadata + dmd, _ := gmetadata.Get(stream.Context()) + + md := metadata.New(len(dmd)) + for k, v := range dmd { + md.Set(k, v) + } + + // timeout for server deadline + to, ok := md.Get("timeout") + if ok { + md.Del("timeout") + } + + // get content type + ct := defaultContentType + + if ctype, ok := md.Get("content-type"); ok { + ct = ctype + } else if ctype, ok := md.Get("x-content-type"); ok { + ct = ctype + md.Del("x-content-type") + } + + // create new context + ctx := metadata.NewIncomingContext(stream.Context(), md) + + // get peer from context + if p, ok := peer.FromContext(stream.Context()); ok { + md["Remote"] = p.Addr.String() + ctx = peer.NewContext(ctx, p) + } + + // set the timeout if we have it + if len(to) > 0 { + if n, err := strconv.ParseUint(to, 10, 64); err == nil { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(n)) + defer cancel() + } + } + + g.rpc.mu.RLock() + svc := g.rpc.serviceMap[serviceName] + g.rpc.mu.RUnlock() + + /* + if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { + rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} + svc = &service{} + svc.typ = reflect.TypeOf(rfl) + svc.rcvr = reflect.ValueOf(rfl) + svc.name = reflect.Indirect(svc.rcvr).Type().Name() + svc.method = make(map[string]*methodType) + typ := reflect.TypeOf(rfl) + if me, ok := typ.MethodByName("ServerReflectionInfo"); ok { + g.rpc.mu.Lock() + ep, err := prepareEndpoint(me) + if ep != nil && err != nil { + svc.method["ServerReflectionInfo"] = ep + } else if err != nil { + return status.New(codes.Unimplemented, err.Error()).Err() + } + g.rpc.mu.Unlock() + } + } + */ + + if svc == nil { + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() + } + + mtype := svc.method[methodName] + if mtype == nil { + return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s.%s", serviceName, methodName)).Err() + } + + // process unary + if !mtype.stream { + return g.processRequest(ctx, stream, svc, mtype, ct) + } + + // process stream + return g.processStream(ctx, stream, svc, mtype, ct) +} + +func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { + // for { + var argv, replyv reflect.Value + + // Decode the argument value. + argIsValue := false // if true, need to indirect before calling. + if mtype.ArgType.Kind() == reflect.Ptr { + argv = reflect.New(mtype.ArgType.Elem()) + } else { + argv = reflect.New(mtype.ArgType) + argIsValue = true + } + + // Unmarshal request + if err := stream.RecvMsg(argv.Interface()); err != nil { + return err + } + + if argIsValue { + argv = argv.Elem() + } + + // reply value + replyv = reflect.New(mtype.ReplyType.Elem()) + + function := mtype.method.Func + var returnValues []reflect.Value + + cf, err := g.newCodec(ct) + if err != nil { + return errors.InternalServerError(g.opts.Name, err.Error()) + } + b, err := cf.Marshal(argv.Interface()) + if err != nil { + return err + } + + // create a client.Request + r := &rpcRequest{ + service: g.opts.Name, + contentType: ct, + method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name), + body: b, + payload: argv.Interface(), + } + // define the handler func + fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) { + returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) + + // The return value for the method is an error. + if rerr := returnValues[0].Interface(); rerr != nil { + err = rerr.(error) + } + + return err + } + + // wrap the handler func + for i := len(g.opts.HdlrWrappers); i > 0; i-- { + fn = g.opts.HdlrWrappers[i-1](fn) + } + + statusCode := StatusOK + statusDesc := "" + + // execute the handler + if appErr := fn(ctx, r, replyv.Interface()); appErr != nil { + switch verr := appErr.(type) { + case *errors.Error: + statusCode = microError(verr) + statusDesc = verr.Error() + case proto.Message: + // user defined error that proto based we can attach it to grpc status + statusCode = convertCode(appErr) + statusDesc = appErr.Error() + default: + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Warn(config.Context, "handler error will not be transferred properly, must return *errors.Error or drpcerr.Error") + } + // default case user pass own error type that not proto based + statusCode = convertCode(verr) + statusDesc = verr.Error() + } + + return drpcerr.WithCode(fmt.Errorf(statusDesc), uint64(statusCode)) + } + + if err := stream.SendMsg(replyv.Interface()); err != nil { + return err + } + + return nil +} + +/* +type reflectStream struct { + stream server.Stream +} + +func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error { + return s.stream.Send(rsp) +} + +func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) { + req := &grpcreflect.ServerReflectionRequest{} + err := s.stream.Recv(req) + return req, err +} + +func (s *reflectStream) SetHeader(gmetadata.MD) error { + return nil +} + +func (s *reflectStream) SendHeader(gmetadata.MD) error { + return nil +} + +func (s *reflectStream) SetTrailer(gmetadata.MD) { + +} + +func (s *reflectStream) Context() context.Context { + return s.stream.Context() +} + +func (s *reflectStream) SendMsg(m interface{}) error { + return s.stream.Send(m) +} + +func (s *reflectStream) RecvMsg(m interface{}) error { + return s.stream.Recv(m) +} + +func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { + return g.s.ServerReflectionInfo(&reflectStream{stream}) +} +*/ + +func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { + opts := g.opts + + r := &rpcRequest{ + service: opts.Name, + contentType: ct, + method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name), + stream: true, + } + + ss := &rpcStream{ + ServerStream: stream, + request: r, + } + + function := mtype.method.Func + var returnValues []reflect.Value + + // Invoke the method, providing a new value for the reply. + fn := func(ctx context.Context, req server.Request, stream interface{}) error { + returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(stream)}) + if err := returnValues[0].Interface(); err != nil { + return err.(error) + } + + return nil + } + + for i := len(opts.HdlrWrappers); i > 0; i-- { + fn = opts.HdlrWrappers[i-1](fn) + } + + statusCode := StatusOK + statusDesc := "" + + if appErr := fn(ctx, r, ss); appErr != nil { + switch verr := appErr.(type) { + case *errors.Error: + statusCode = microError(verr) + statusDesc = verr.Error() + case proto.Message: + // user defined error that proto based we can attach it to grpc status + statusCode = convertCode(appErr) + statusDesc = appErr.Error() + default: + if g.opts.Logger.V(logger.ErrorLevel) { + g.opts.Logger.Error(g.opts.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message") + } + // default case user pass own error type that not proto based + statusCode = convertCode(verr) + statusDesc = verr.Error() + } + return drpcerr.WithCode(fmt.Errorf(statusDesc), uint64(statusCode)) + } + + return nil +} + +func (g *grpcServer) newCodec(ct string) (codec.Codec, error) { + g.RLock() + defer g.RUnlock() + + if idx := strings.IndexRune(ct, ';'); idx >= 0 { + ct = ct[:idx] + } + + if c, ok := g.opts.Codecs[ct]; ok { + return c, nil + } + + return nil, codec.ErrUnknownContentType +} + +func (g *grpcServer) Options() server.Options { + g.RLock() + opts := g.opts + g.RUnlock() + + return opts +} + +func (g *grpcServer) Init(opts ...server.Option) error { + if len(opts) == 0 && g.init { + return nil + } + return g.configure(opts...) +} + +func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { + return newRPCHandler(h, opts...) +} + +func (g *grpcServer) Handle(h server.Handler) error { + if err := g.rpc.register(h.Handler()); err != nil { + return err + } + + g.handlers[h.Name()] = h + return nil +} + +func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber { + return newSubscriber(topic, sb, opts...) +} + +func (g *grpcServer) Subscribe(sb server.Subscriber) error { + sub, ok := sb.(*subscriber) + if !ok { + return fmt.Errorf("invalid subscriber: expected *subscriber") + } + if len(sub.handlers) == 0 { + return fmt.Errorf("invalid subscriber: no handler functions") + } + + if err := server.ValidateSubscriber(sb); err != nil { + return err + } + + g.Lock() + if _, ok = g.subscribers[sub]; ok { + g.Unlock() + return fmt.Errorf("subscriber %v already exists", sub) + } + + g.subscribers[sub] = nil + g.Unlock() + return nil +} + +func (g *grpcServer) Register() error { + g.RLock() + rsvc := g.rsvc + config := g.opts + g.RUnlock() + + // if service already filled, reuse it and return early + if rsvc != nil { + if err := server.DefaultRegisterFunc(rsvc, config); err != nil { + return err + } + return nil + } + + service, err := server.NewRegisterService(g) + if err != nil { + return err + } + + g.RLock() + // Maps are ordered randomly, sort the keys for consistency + handlerList := make([]string, 0, len(g.handlers)) + for n := range g.handlers { + // Only advertise non internal handlers + handlerList = append(handlerList, n) + } + + sort.Strings(handlerList) + + subscriberList := make([]*subscriber, 0, len(g.subscribers)) + for e := range g.subscribers { + // Only advertise non internal subscribers + subscriberList = append(subscriberList, e) + } + sort.Slice(subscriberList, func(i, j int) bool { + return subscriberList[i].topic > subscriberList[j].topic + }) + + endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList)) + for _, n := range handlerList { + endpoints = append(endpoints, g.handlers[n].Endpoints()...) + } + for _, e := range subscriberList { + endpoints = append(endpoints, e.Endpoints()...) + } + g.RUnlock() + + service.Nodes[0].Metadata["protocol"] = "grpc" + service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"] + service.Endpoints = endpoints + + g.RLock() + registered := g.registered + g.RUnlock() + + if !registered { + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID) + } + } + + // register the service + if err := server.DefaultRegisterFunc(service, config); err != nil { + return err + } + + // already registered? don't need to register subscribers + if registered { + return nil + } + + g.Lock() + defer g.Unlock() + + for sb := range g.subscribers { + handler := g.createSubHandler(sb, config) + var opts []broker.SubscribeOption + if queue := sb.Options().Queue; len(queue) > 0 { + opts = append(opts, broker.SubscribeGroup(queue)) + } + + subCtx := config.Context + if cx := sb.Options().Context; cx != nil { + subCtx = cx + } + opts = append(opts, broker.SubscribeContext(subCtx)) + opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic()) + } + sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) + if err != nil { + return err + } + g.subscribers[sb] = []broker.Subscriber{sub} + } + + g.registered = true + g.rsvc = service + + return nil +} + +func (g *grpcServer) Deregister() error { + var err error + + g.RLock() + config := g.opts + g.RUnlock() + + service, err := server.NewRegisterService(g) + if err != nil { + return err + } + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID) + } + + if err := server.DefaultDeregisterFunc(service, config); err != nil { + return err + } + + g.Lock() + g.rsvc = nil + + if !g.registered { + g.Unlock() + return nil + } + + g.registered = false + + wg := sync.WaitGroup{} + for sb, subs := range g.subscribers { + for _, sub := range subs { + wg.Add(1) + go func(s broker.Subscriber) { + defer wg.Done() + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", s.Topic()) + } + if err := s.Unsubscribe(g.opts.Context); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Unsubscribing from topic: %s err: %v", s.Topic(), err) + } + } + }(sub) + } + g.subscribers[sb] = nil + } + wg.Wait() + + g.Unlock() + return nil +} + +func (g *grpcServer) Start() error { + g.RLock() + if g.started { + g.RUnlock() + return nil + } + g.RUnlock() + + config := g.Options() + + // micro: config.Transport.Listen(config.Address) + var ts net.Listener + + if l := config.Listener; l != nil { + ts = l + } else { + var err error + + // check the tls config for secure connect + if tc := config.TLSConfig; tc != nil { + ts, err = tls.Listen("tcp", config.Address, tc) + // otherwise just plain tcp listener + } else { + ts, err = net.Listen("tcp", config.Address) + } + if err != nil { + return err + } + } + + if config.MaxConn > 0 { + ts = netutil.LimitListener(ts, config.MaxConn) + } + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Server [grpc] Listening on %s", ts.Addr().String()) + } + g.Lock() + g.opts.Address = ts.Addr().String() + if len(g.opts.Advertise) == 0 { + g.opts.Advertise = ts.Addr().String() + } + g.Unlock() + + // only connect if we're subscribed + if len(g.subscribers) > 0 { + // connect to the broker + if err := config.Broker.Connect(config.Context); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Broker [%s] connect error: %v", config.Broker.String(), err) + } + return err + } + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + } + } + + // use RegisterCheck func before register + // nolint: nestif + if err := g.opts.RegisterCheck(config.Context); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err) + } + } else { + // announce self to the world + if err := g.Register(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server register error: %v", err) + } + } + } + + // micro: go ts.Accept(s.accept) + go func() { + if err := g.srv.Serve(ts); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "gRPC Server start error: %v", err) + } + if err := g.Stop(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "gRPC Server stop error: %v", err) + } + } + } + }() + + go func() { + t := new(time.Ticker) + + // only process if it exists + if g.opts.RegisterInterval > time.Duration(0) { + // new ticker + t = time.NewTicker(g.opts.RegisterInterval) + } + + // return error chan + var ch chan error + + Loop: + for { + select { + // register self on interval + case <-t.C: + g.RLock() + registered := g.registered + g.RUnlock() + rerr := g.opts.RegisterCheck(g.opts.Context) + // nolint: nestif + if rerr != nil && registered { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr) + } + // deregister self in case of error + if err := g.Deregister(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err) + } + } + } else if rerr != nil && !registered { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr) + } + continue + } + if err := g.Register(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err) + } + } + // wait for exit + case ch = <-g.exit: + break Loop + } + } + + // deregister self + if err := g.Deregister(); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Server deregister error: %v", err) + } + } + + // wait for waitgroup + if g.wg != nil { + g.wg.Wait() + } + + // stop the grpc server + exit := make(chan bool) + + go func() { + g.srv.GracefulStop() + close(exit) + }() + + select { + case <-exit: + case <-time.After(time.Second): + g.srv.Stop() + } + + // close transport + ch <- nil + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + } + // disconnect broker + if err := config.Broker.Disconnect(config.Context); err != nil { + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Errorf(config.Context, "Broker [%s] disconnect error: %v", config.Broker.String(), err) + } + } + }() + + // mark the server as started + g.Lock() + g.started = true + g.Unlock() + + return nil +} + +func (g *grpcServer) Stop() error { + g.RLock() + if !g.started { + g.RUnlock() + return nil + } + g.RUnlock() + + ch := make(chan error) + g.exit <- ch + + err := <-ch + g.Lock() + g.rsvc = nil + g.started = false + g.Unlock() + + return err +} + +func (g *grpcServer) String() string { + return "grpc" +} + +func (g *grpcServer) Name() string { + return g.opts.Name +} + +func NewServer(opts ...server.Option) server.Server { + return newGRPCServer(opts...) +} diff --git a/error.go b/error.go new file mode 100644 index 0000000..92a6e5a --- /dev/null +++ b/error.go @@ -0,0 +1,236 @@ +package drpc + +import ( + "context" + "io" + "net/http" + "os" + + "github.com/unistack-org/micro/v3/errors" +) + +type StatusCode uint64 + +const ( + // OK is returned on success. + StatusOK StatusCode = 0 + + // Canceled indicates the operation was canceled (typically by the caller). + // + // The gRPC framework will generate this error code when cancellation + // is requested. + StatusCanceled StatusCode = 1 + + // Unknown error. An example of where this error may be returned is + // if a Status value received from another address space belongs to + // an error-space that is not known in this address space. Also + // errors raised by APIs that do not return enough error information + // may be converted to this error. + // + // The gRPC framework will generate this error code in the above two + // mentioned cases. + StatusUnknown StatusCode = 2 + + // InvalidArgument indicates client specified an invalid argument. + // Note that this differs from FailedPrecondition. It indicates arguments + // that are problematic regardless of the state of the system + // (e.g., a malformed file name). + // + // This error code will not be generated by the gRPC framework. + StatusInvalidArgument StatusCode = 3 + + // DeadlineExceeded means operation expired before completion. + // For operations that change the state of the system, this error may be + // returned even if the operation has completed successfully. For + // example, a successful response from a server could have been delayed + // long enough for the deadline to expire. + // + // The gRPC framework will generate this error code when the deadline is + // exceeded. + StatusDeadlineExceeded StatusCode = 4 + + // NotFound means some requested entity (e.g., file or directory) was + // not found. + // + // This error code will not be generated by the gRPC framework. + StatusNotFound StatusCode = 5 + + // AlreadyExists means an attempt to create an entity failed because one + // already exists. + // + // This error code will not be generated by the gRPC framework. + StatusAlreadyExists StatusCode = 6 + + // PermissionDenied indicates the caller does not have permission to + // execute the specified operation. It must not be used for rejections + // caused by exhausting some resource (use ResourceExhausted + // instead for those errors). It must not be + // used if the caller cannot be identified (use Unauthenticated + // instead for those errors). + // + // This error code will not be generated by the gRPC core framework, + // but expect authentication middleware to use it. + StatusPermissionDenied StatusCode = 7 + + // ResourceExhausted indicates some resource has been exhausted, perhaps + // a per-user quota, or perhaps the entire file system is out of space. + // + // This error code will be generated by the gRPC framework in + // out-of-memory and server overload situations, or when a message is + // larger than the configured maximum size. + StatusResourceExhausted StatusCode = 8 + + // FailedPrecondition indicates operation was rejected because the + // system is not in a state required for the operation's execution. + // For example, directory to be deleted may be non-empty, an rmdir + // operation is applied to a non-directory, etc. + // + // A litmus test that may help a service implementor in deciding + // between FailedPrecondition, Aborted, and Unavailable: + // (a) Use Unavailable if the client can retry just the failing call. + // (b) Use Aborted if the client should retry at a higher-level + // (e.g., restarting a read-modify-write sequence). + // (c) Use FailedPrecondition if the client should not retry until + // the system state has been explicitly fixed. E.g., if an "rmdir" + // fails because the directory is non-empty, FailedPrecondition + // should be returned since the client should not retry unless + // they have first fixed up the directory by deleting files from it. + // (d) Use FailedPrecondition if the client performs conditional + // REST Get/Update/Delete on a resource and the resource on the + // server does not match the condition. E.g., conflicting + // read-modify-write on the same resource. + // + // This error code will not be generated by the gRPC framework. + StatusFailedPrecondition StatusCode = 9 + + // Aborted indicates the operation was aborted, typically due to a + // concurrency issue like sequencer check failures, transaction aborts, + // etc. + // + // See litmus test above for deciding between FailedPrecondition, + // Aborted, and Unavailable. + // + // This error code will not be generated by the gRPC framework. + StatusAborted StatusCode = 10 + + // OutOfRange means operation was attempted past the valid range. + // E.g., seeking or reading past end of file. + // + // Unlike InvalidArgument, this error indicates a problem that may + // be fixed if the system state changes. For example, a 32-bit file + // system will generate InvalidArgument if asked to read at an + // offset that is not in the range [0,2^32-1], but it will generate + // OutOfRange if asked to read from an offset past the current + // file size. + // + // There is a fair bit of overlap between FailedPrecondition and + // OutOfRange. We recommend using OutOfRange (the more specific + // error) when it applies so that callers who are iterating through + // a space can easily look for an OutOfRange error to detect when + // they are done. + // + // This error code will not be generated by the gRPC framework. + StatusOutOfRange StatusCode = 11 + + // Unimplemented indicates operation is not implemented or not + // supported/enabled in this service. + // + // This error code will be generated by the gRPC framework. Most + // commonly, you will see this error code when a method implementation + // is missing on the server. It can also be generated for unknown + // compression algorithms or a disagreement as to whether an RPC should + // be streaming. + StatusUnimplemented StatusCode = 12 + + // Internal errors. Means some invariants expected by underlying + // system has been broken. If you see one of these errors, + // something is very broken. + // + // This error code will be generated by the gRPC framework in several + // internal error conditions. + StatusInternal StatusCode = 13 + + // Unavailable indicates the service is currently unavailable. + // This is a most likely a transient condition and may be corrected + // by retrying with a backoff. Note that it is not always safe to retry + // non-idempotent operations. + // + // See litmus test above for deciding between FailedPrecondition, + // Aborted, and Unavailable. + // + // This error code will be generated by the gRPC framework during + // abrupt shutdown of a server process or network connection. + StatusUnavailable StatusCode = 14 + + // DataLoss indicates unrecoverable data loss or corruption. + // + // This error code will not be generated by the gRPC framework. + StatusDataLoss StatusCode = 15 + + // Unauthenticated indicates the request does not have valid + // authentication credentials for the operation. + // + // The gRPC framework will generate this error code when the + // authentication metadata is invalid or a Credentials callback fails, + // but also expect authentication middleware to generate it. + StatusUnauthenticated StatusCode = 16 +) + +var errMapping = map[int32]StatusCode{ + http.StatusOK: StatusOK, + http.StatusBadRequest: StatusInvalidArgument, + http.StatusRequestTimeout: StatusDeadlineExceeded, + http.StatusNotFound: StatusNotFound, + http.StatusConflict: StatusAlreadyExists, + http.StatusForbidden: StatusPermissionDenied, + http.StatusUnauthorized: StatusUnauthenticated, + http.StatusPreconditionFailed: StatusFailedPrecondition, + http.StatusNotImplemented: StatusUnimplemented, + http.StatusInternalServerError: StatusInternal, + http.StatusServiceUnavailable: StatusUnavailable, +} + +// convertCode converts a standard Go error into its canonical code. Note that +// this is only used to translate the error returned by the server applications. +func convertCode(err error) StatusCode { + switch err { + case nil: + return StatusOK + case io.EOF: + return StatusOutOfRange + case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF: + return StatusFailedPrecondition + case os.ErrInvalid: + return StatusInvalidArgument + case context.Canceled: + return StatusCanceled + case context.DeadlineExceeded: + return StatusDeadlineExceeded + } + switch { + case os.IsExist(err): + return StatusAlreadyExists + case os.IsNotExist(err): + return StatusNotFound + case os.IsPermission(err): + return StatusPermissionDenied + } + return StatusUnknown +} + +func microError(err error) StatusCode { + if err == nil { + return StatusOK + } + + var ec int32 + if verr, ok := err.(*errors.Error); ok { + ec = verr.Code + } + + if code, ok := errMapping[ec]; ok { + return code + } + + return StatusUnknown +} diff --git a/go.mod b/go.mod index ddee484..a3c0826 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,13 @@ -module github.com/unistack-org/micro-server-drpc/v3 +module github.com/unistack-org/micro-server-grpc/v3 go 1.16 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/unistack-org/micro/v3 v3.3.17 + golang.org/x/net v0.0.0-20210510120150-4163338589ed + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + google.golang.org/grpc v1.37.1 + google.golang.org/protobuf v1.26.0 + storj.io/drpc v0.0.23 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..284db5c --- /dev/null +++ b/go.sum @@ -0,0 +1,115 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/unistack-org/micro/v3 v3.3.17 h1:WcyS7InP0DlS/JpRQGLh5sG6VstkdHJbgpMp+gmHmwg= +github.com/unistack-org/micro/v3 v3.3.17/go.mod h1:022EOEZZ789hZY3yB5ZSMXU6jLiadBgcNB/cpediV3c= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g= +github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I= +golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.37.1 h1:ARnQJNWxGyYJpdf/JXscNlQr/uv607ZPU9Z7ogHi+iI= +google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +storj.io/drpc v0.0.23 h1:MHVOMVoBbQkPuL17GerlGTA9PLwE1HF6zAOYFvWwYG4= +storj.io/drpc v0.0.23/go.mod h1:OSJH7wvH3yKlhnMHwblKJioaaeyI6X8xbXT1SG9woe8= diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..5505cec --- /dev/null +++ b/handler.go @@ -0,0 +1,60 @@ +package drpc + +import ( + "reflect" + + "github.com/unistack-org/micro/v3/register" + "github.com/unistack-org/micro/v3/server" +) + +type rpcHandler struct { + opts server.HandlerOptions + handler interface{} + name string + endpoints []*register.Endpoint +} + +func newRPCHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { + options := server.NewHandlerOptions(opts...) + + typ := reflect.TypeOf(handler) + hdlr := reflect.ValueOf(handler) + name := reflect.Indirect(hdlr).Type().Name() + + var endpoints []*register.Endpoint + + for m := 0; m < typ.NumMethod(); m++ { + if e := register.ExtractEndpoint(typ.Method(m)); e != nil { + e.Name = name + "." + e.Name + + for k, v := range options.Metadata[e.Name] { + e.Metadata[k] = v + } + + endpoints = append(endpoints, e) + } + } + + return &rpcHandler{ + name: name, + handler: handler, + endpoints: endpoints, + opts: options, + } +} + +func (r *rpcHandler) Name() string { + return r.name +} + +func (r *rpcHandler) Handler() interface{} { + return r.handler +} + +func (r *rpcHandler) Endpoints() []*register.Endpoint { + return r.endpoints +} + +func (r *rpcHandler) Options() server.HandlerOptions { + return r.opts +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..13cfbb8 --- /dev/null +++ b/options.go @@ -0,0 +1,17 @@ +package drpc + +import ( + "github.com/unistack-org/micro/v3/server" +) + +type ( + maxMsgSizeKey struct{} +) + +// +// MaxMsgSize set the maximum message in bytes the server can receive and +// send. Default maximum message size is 4 MB. +// +func MaxMsgSize(s int) server.Option { + return server.SetOption(maxMsgSizeKey{}, s) +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..9cff051 --- /dev/null +++ b/request.go @@ -0,0 +1,100 @@ +package drpc + +import ( + "io" + + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/server" +) + +var ( + _ server.Request = &rpcRequest{} + _ server.Message = &rpcMessage{} +) + +type rpcRequest struct { + rw io.ReadWriter + payload interface{} + codec codec.Codec + header metadata.Metadata + method string + endpoint string + contentType string + service string + body []byte + stream bool +} + +type rpcMessage struct { + payload interface{} + codec codec.Codec + header metadata.Metadata + topic string + contentType string + body []byte +} + +func (r *rpcRequest) ContentType() string { + return r.contentType +} + +func (r *rpcRequest) Service() string { + return r.service +} + +func (r *rpcRequest) Method() string { + return r.method +} + +func (r *rpcRequest) Endpoint() string { + return r.endpoint +} + +func (r *rpcRequest) Codec() codec.Codec { + return r.codec +} + +func (r *rpcRequest) Header() metadata.Metadata { + return r.header +} + +func (r *rpcRequest) Read() ([]byte, error) { + f := &codec.Frame{} + if err := r.codec.ReadBody(r.rw, f); err != nil { + return nil, err + } + return f.Data, nil +} + +func (r *rpcRequest) Stream() bool { + return r.stream +} + +func (r *rpcRequest) Body() interface{} { + return r.payload +} + +func (r *rpcMessage) ContentType() string { + return r.contentType +} + +func (r *rpcMessage) Topic() string { + return r.topic +} + +func (r *rpcMessage) Payload() interface{} { + return r.payload +} + +func (r *rpcMessage) Header() metadata.Metadata { + return r.header +} + +func (r *rpcMessage) Body() []byte { + return r.body +} + +func (r *rpcMessage) Codec() codec.Codec { + return r.codec +} diff --git a/response.go b/response.go new file mode 100644 index 0000000..a94bf46 --- /dev/null +++ b/response.go @@ -0,0 +1,34 @@ +package drpc + +import ( + "io" + + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/server" +) + +var _ server.Response = &rpcResponse{} + +type rpcResponse struct { + rw io.ReadWriter + header metadata.Metadata + codec codec.Codec +} + +func (r *rpcResponse) Codec() codec.Codec { + return r.codec +} + +func (r *rpcResponse) WriteHeader(hdr metadata.Metadata) { + for k, v := range hdr { + r.header[k] = v + } +} + +func (r *rpcResponse) Write(b []byte) error { + return r.codec.Write(r.rw, &codec.Message{ + Header: r.header, + Body: b, + }, nil) +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..7ae5b54 --- /dev/null +++ b/server.go @@ -0,0 +1,171 @@ +package drpc + +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// +// Meh, we need to get rid of this shit + +import ( + "context" + "fmt" + "reflect" + "sync" + "unicode" + "unicode/utf8" + + "github.com/unistack-org/micro/v3/server" +) + +// Precompute the reflect type for error. Can't use error directly +// because Typeof takes an empty interface value. This is annoying. +var typeOfError = reflect.TypeOf((*error)(nil)).Elem() + +type methodType struct { + ArgType reflect.Type + ReplyType reflect.Type + ContextType reflect.Type + method reflect.Method + stream bool +} + +// type reflectionType func(context.Context, server.Stream) error + +type service struct { + typ reflect.Type + method map[string]*methodType + rcvr reflect.Value + name string +} + +// server represents an RPC Server. +type rServer struct { + serviceMap map[string]*service + mu sync.RWMutex + // reflection bool +} + +// Is this an exported - upper case - name? +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +// Is this type exported or a builtin? +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +// prepareEndpoint() returns a methodType for the provided method or nil +// in case if the method was unsuitable. +func prepareEndpoint(method reflect.Method) (*methodType, error) { + mtype := method.Type + mname := method.Name + var replyType, argType, contextType reflect.Type + var stream bool + + // Endpoint() must be exported. + if method.PkgPath != "" { + return nil, fmt.Errorf("Endpoint must be exported") + } + + switch mtype.NumIn() { + case 3: + // assuming streaming + argType = mtype.In(2) + contextType = mtype.In(1) + stream = true + case 4: + // method that takes a context + argType = mtype.In(2) + replyType = mtype.In(3) + contextType = mtype.In(1) + default: + return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) + } + + switch stream { + case true: + // check stream type + streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() + if !argType.Implements(streamType) { + return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) + } + default: + // First arg need not be a pointer. + if !isExportedOrBuiltinType(argType) { + return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType) + } + + if replyType.Kind() != reflect.Ptr { + return nil, fmt.Errorf("method %v reply type not a pointer: %v", mname, replyType) + } + + // Reply type must be exported. + if !isExportedOrBuiltinType(replyType) { + return nil, fmt.Errorf("method %v reply type not exported: %v", mname, replyType) + } + } + + // Endpoint() needs one out. + if mtype.NumOut() != 1 { + return nil, fmt.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut()) + } + // The return type of the method must be error. + if returnType := mtype.Out(0); returnType != typeOfError { + return nil, fmt.Errorf("method %v returns %v not error", mname, returnType.String()) + } + return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil +} + +func (server *rServer) register(rcvr interface{}) error { + server.mu.Lock() + defer server.mu.Unlock() + if server.serviceMap == nil { + server.serviceMap = make(map[string]*service) + } + s := &service{} + s.typ = reflect.TypeOf(rcvr) + s.rcvr = reflect.ValueOf(rcvr) + sname := reflect.Indirect(s.rcvr).Type().Name() + if sname == "" { + return fmt.Errorf("rpc: no service name for type %v", s.typ.String()) + } + if !isExported(sname) { + return fmt.Errorf("rpc Register: type %s is not exported", sname) + } + if _, present := server.serviceMap[sname]; present { + return fmt.Errorf("rpc: service already defined: " + sname) + } + s.name = sname + s.method = make(map[string]*methodType) + + // Install the methods + for m := 0; m < s.typ.NumMethod(); m++ { + method := s.typ.Method(m) + mt, err := prepareEndpoint(method) + if mt != nil && err == nil { + s.method[method.Name] = mt + } else if err != nil { + return err + } + } + + if len(s.method) == 0 { + return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname) + } + server.serviceMap[s.name] = s + return nil +} + +func (m *methodType) prepareContext(ctx context.Context) reflect.Value { + if contextv := reflect.ValueOf(ctx); contextv.IsValid() { + return contextv + } + return reflect.Zero(m.ContextType) +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..89c4c6a --- /dev/null +++ b/stream.go @@ -0,0 +1,40 @@ +package drpc + +import ( + "context" + + "github.com/unistack-org/micro/v3/server" + "google.golang.org/grpc" +) + +// rpcStream implements a server side Stream. +type rpcStream struct { + // embed the grpc stream so we can access it + grpc.ServerStream + + request server.Request +} + +func (r *rpcStream) Close() error { + return nil +} + +func (r *rpcStream) Error() error { + return nil +} + +func (r *rpcStream) Request() server.Request { + return r.request +} + +func (r *rpcStream) Context() context.Context { + return r.ServerStream.Context() +} + +func (r *rpcStream) Send(m interface{}) error { + return r.ServerStream.SendMsg(m) +} + +func (r *rpcStream) Recv(m interface{}) error { + return r.ServerStream.RecvMsg(m) +} diff --git a/subscriber.go b/subscriber.go new file mode 100644 index 0000000..bf36f54 --- /dev/null +++ b/subscriber.go @@ -0,0 +1,232 @@ +package drpc + +import ( + "context" + "fmt" + "reflect" + "runtime/debug" + "strings" + + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/errors" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/register" + "github.com/unistack-org/micro/v3/server" +) + +type handler struct { + reqType reflect.Type + ctxType reflect.Type + method reflect.Value +} + +type subscriber struct { + topic string + rcvr reflect.Value + typ reflect.Type + subscriber interface{} + handlers []*handler + endpoints []*register.Endpoint + opts server.SubscriberOptions +} + +func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { + options := server.NewSubscriberOptions(opts...) + + var endpoints []*register.Endpoint + var handlers []*handler + + if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { + h := &handler{ + method: reflect.ValueOf(sub), + } + + switch typ.NumIn() { + case 1: + h.reqType = typ.In(0) + case 2: + h.ctxType = typ.In(0) + h.reqType = typ.In(1) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®ister.Endpoint{ + Name: "Func", + Request: register.ExtractSubValue(typ), + Metadata: map[string]string{ + "topic": topic, + "subscriber": "true", + }, + }) + } else { + hdlr := reflect.ValueOf(sub) + name := reflect.Indirect(hdlr).Type().Name() + + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + h := &handler{ + method: method.Func, + } + + switch method.Type.NumIn() { + case 2: + h.reqType = method.Type.In(1) + case 3: + h.ctxType = method.Type.In(1) + h.reqType = method.Type.In(2) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®ister.Endpoint{ + Name: name + "." + method.Name, + Request: register.ExtractSubValue(method.Type), + Metadata: map[string]string{ + "topic": topic, + "subscriber": "true", + }, + }) + } + } + + return &subscriber{ + rcvr: reflect.ValueOf(sub), + typ: reflect.TypeOf(sub), + topic: topic, + subscriber: sub, + handlers: handlers, + endpoints: endpoints, + opts: options, + } +} + +func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { + return func(p broker.Event) (err error) { + defer func() { + if r := recover(); r != nil { + if g.opts.Logger.V(logger.ErrorLevel) { + g.opts.Logger.Error(g.opts.Context, "panic recovered: ", r) + g.opts.Logger.Error(g.opts.Context, string(debug.Stack())) + } + err = errors.InternalServerError(g.opts.Name+".subscriber", "panic recovered: %v", r) + } + }() + + msg := p.Message() + // if we don't have headers, create empty map + if msg.Header == nil { + msg.Header = make(map[string]string) + } + + ct := msg.Header["Content-Type"] + if len(ct) == 0 { + msg.Header["Content-Type"] = defaultContentType + ct = defaultContentType + } + cf, err := g.newCodec(ct) + if err != nil { + return err + } + + hdr := make(map[string]string, len(msg.Header)) + for k, v := range msg.Header { + if k == "Content-Type" { + continue + } + hdr[k] = v + } + + ctx := metadata.NewIncomingContext(sb.opts.Context, hdr) + + results := make(chan error, len(sb.handlers)) + + for i := 0; i < len(sb.handlers); i++ { + handler := sb.handlers[i] + + var isVal bool + var req reflect.Value + + if handler.reqType.Kind() == reflect.Ptr { + req = reflect.New(handler.reqType.Elem()) + } else { + req = reflect.New(handler.reqType) + isVal = true + } + if isVal { + req = req.Elem() + } + + if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil { + return err + } + + fn := func(ctx context.Context, msg server.Message) error { + var vals []reflect.Value + if sb.typ.Kind() != reflect.Func { + vals = append(vals, sb.rcvr) + } + if handler.ctxType != nil { + vals = append(vals, reflect.ValueOf(ctx)) + } + + vals = append(vals, reflect.ValueOf(msg.Payload())) + + returnValues := handler.method.Call(vals) + if rerr := returnValues[0].Interface(); rerr != nil { + return rerr.(error) + } + return nil + } + + for i := len(opts.SubWrappers); i > 0; i-- { + fn = opts.SubWrappers[i-1](fn) + } + + if g.wg != nil { + g.wg.Add(1) + } + go func() { + if g.wg != nil { + defer g.wg.Done() + } + cerr := fn(ctx, &rpcMessage{ + topic: sb.topic, + contentType: ct, + payload: req.Interface(), + header: msg.Header, + body: msg.Body, + }) + results <- cerr + }() + } + var errors []string + for i := 0; i < len(sb.handlers); i++ { + if rerr := <-results; rerr != nil { + errors = append(errors, rerr.Error()) + } + } + if len(errors) > 0 { + err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + } + + return err + } +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Subscriber() interface{} { + return s.subscriber +} + +func (s *subscriber) Endpoints() []*register.Endpoint { + return s.endpoints +} + +func (s *subscriber) Options() server.SubscriberOptions { + return s.opts +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..8d74a08 --- /dev/null +++ b/util.go @@ -0,0 +1,59 @@ +package drpc + +import ( + "fmt" + "strings" +) + +// ServiceMethod converts a gRPC method to a Go method +// Input: +// Foo.Bar, /Foo/Bar, /package.Foo/Bar, /a.package.Foo/Bar +// Output: +// [Foo, Bar] +func serviceMethod(m string) (string, string, error) { + if len(m) == 0 { + return "", "", fmt.Errorf("malformed method name: %q", m) + } + + // grpc method + if m[0] == '/' { + // [ , Foo, Bar] + // [ , package.Foo, Bar] + // [ , a.package.Foo, Bar] + parts := strings.Split(m, "/") + if len(parts) != 3 || len(parts[1]) == 0 || len(parts[2]) == 0 { + return "", "", fmt.Errorf("malformed method name: %q", m) + } + service := strings.Split(parts[1], ".") + return service[len(service)-1], parts[2], nil + } + + // non grpc method + parts := strings.Split(m, ".") + + // expect [Foo, Bar] + if len(parts) != 2 { + return "", "", fmt.Errorf("malformed method name: %q", m) + } + + return parts[0], parts[1], nil +} + +/* +// ServiceFromMethod returns the service +// /service.Foo/Bar => service +func serviceFromMethod(m string) string { + if len(m) == 0 { + return m + } + if m[0] != '/' { + return m + } + parts := strings.Split(m, "/") + if len(parts) < 3 { + return m + } + parts = strings.Split(parts[1], ".") + return strings.Join(parts[:len(parts)-1], ".") +} +*/ diff --git a/util_test.go b/util_test.go new file mode 100644 index 0000000..c33b26d --- /dev/null +++ b/util_test.go @@ -0,0 +1,46 @@ +package drpc + +import ( + "testing" +) + +func TestServiceMethod(t *testing.T) { + type testCase struct { + input string + service string + method string + err bool + } + + methods := []testCase{ + {"Foo.Bar", "Foo", "Bar", false}, + {"/Foo/Bar", "Foo", "Bar", false}, + {"/package.Foo/Bar", "Foo", "Bar", false}, + {"/a.package.Foo/Bar", "Foo", "Bar", false}, + {"a.package.Foo/Bar", "", "", true}, + {"/Foo/Bar/Baz", "", "", true}, + {"Foo.Bar.Baz", "", "", true}, + } + for _, test := range methods { + service, method, err := serviceMethod(test.input) + if err != nil && test.err == true { + continue + } + // unexpected error + if err != nil && test.err == false { + t.Fatalf("unexpected err %v for %+v", err, test) + } + // expecter error + if test.err == true && err == nil { + t.Fatalf("expected error for %+v: got service: %s method: %s", test, service, method) + } + + if service != test.service { + t.Fatalf("wrong service for %+v: got service: %s method: %s", test, service, method) + } + + if method != test.method { + t.Fatalf("wrong method for %+v: got service: %s method: %s", test, service, method) + } + } +}