From 37e2bf569553b803b4997d86cf7bb31acbf003d7 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 3 Jun 2019 18:44:43 +0100 Subject: [PATCH] Further consolidate the libraries --- README.md | 25 +++ buffer.go | 14 ++ codec.go | 98 +++++++++ error.go | 30 +++ grpc.go | 541 ++++++++++++++++++++++++++++++++++++++++++++++ grpc_pool.go | 83 +++++++ grpc_pool_test.go | 64 ++++++ grpc_test.go | 91 ++++++++ message.go | 40 ++++ options.go | 74 +++++++ request.go | 92 ++++++++ request_test.go | 48 ++++ stream.go | 77 +++++++ 13 files changed, 1277 insertions(+) create mode 100644 README.md create mode 100644 buffer.go create mode 100644 codec.go create mode 100644 error.go create mode 100644 grpc.go create mode 100644 grpc_pool.go create mode 100644 grpc_pool_test.go create mode 100644 grpc_test.go create mode 100644 message.go create mode 100644 options.go create mode 100644 request.go create mode 100644 request_test.go create mode 100644 stream.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..87c0c90 --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +# GRPC Client + +The grpc client is a [micro.Client](https://godoc.org/github.com/micro/go-micro/client#Client) compatible client. + +## Overview + +The client makes use of the [google.golang.org/grpc](google.golang.org/grpc) framework for the underlying communication mechanism. + +## Usage + +Specify the client to your micro service + +```go +import ( + "github.com/micro/go-micro" + "github.com/micro/go-plugins/client/grpc" +) + +func main() { + service := micro.NewService( + micro.Name("greeter"), + micro.Client(grpc.NewClient()), + ) +} +``` diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..c43bb23 --- /dev/null +++ b/buffer.go @@ -0,0 +1,14 @@ +package grpc + +import ( + "bytes" +) + +type buffer struct { + *bytes.Buffer +} + +func (b *buffer) Close() error { + b.Buffer.Reset() + return nil +} diff --git a/codec.go b/codec.go new file mode 100644 index 0000000..4994db7 --- /dev/null +++ b/codec.go @@ -0,0 +1,98 @@ +package grpc + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/json-iterator/go" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/protorpc" + "google.golang.org/grpc/encoding" +) + +type jsonCodec struct{} +type protoCodec struct{} +type bytesCodec struct{} +type wrapCodec struct{ encoding.Codec } + +var ( + defaultGRPCCodecs = map[string]encoding.Codec{ + "application/json": jsonCodec{}, + "application/proto": protoCodec{}, + "application/protobuf": protoCodec{}, + "application/octet-stream": protoCodec{}, + "application/grpc+json": jsonCodec{}, + "application/grpc+proto": protoCodec{}, + "application/grpc+bytes": bytesCodec{}, + } + + defaultRPCCodecs = map[string]codec.NewCodec{ + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, + } + + json = jsoniter.ConfigCompatibleWithStandardLibrary +) + +// UseNumber fix unmarshal Number(8234567890123456789) to interface(8.234567890123457e+18) +func UseNumber() { + json = jsoniter.Config{ + UseNumber: true, + EscapeHTML: true, + SortMapKeys: true, + ValidateJsonRawMessage: true, + }.Froze() +} + +func (w wrapCodec) String() string { + return w.Codec.Name() +} + +func (protoCodec) Marshal(v interface{}) ([]byte, error) { + return proto.Marshal(v.(proto.Message)) +} + +func (protoCodec) Unmarshal(data []byte, v interface{}) error { + return proto.Unmarshal(data, v.(proto.Message)) +} + +func (protoCodec) Name() string { + return "proto" +} + +func (bytesCodec) Marshal(v interface{}) ([]byte, error) { + b, ok := v.(*[]byte) + if !ok { + return nil, fmt.Errorf("failed to marshal: %v is not type of *[]byte", v) + } + return *b, nil +} + +func (bytesCodec) Unmarshal(data []byte, v interface{}) error { + b, ok := v.(*[]byte) + if !ok { + return fmt.Errorf("failed to unmarshal: %v is not type of *[]byte", v) + } + *b = data + return nil +} + +func (bytesCodec) Name() string { + return "bytes" +} + +func (jsonCodec) Marshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (jsonCodec) Unmarshal(data []byte, v interface{}) error { + return json.Unmarshal(data, v) +} + +func (jsonCodec) Name() string { + return "json" +} diff --git a/error.go b/error.go new file mode 100644 index 0000000..4783252 --- /dev/null +++ b/error.go @@ -0,0 +1,30 @@ +package grpc + +import ( + "github.com/micro/go-micro/errors" + "google.golang.org/grpc/status" +) + +func microError(err error) error { + // no error + switch err { + case nil: + return nil + } + + // micro error + if v, ok := err.(*errors.Error); ok { + return v + } + + // grpc error + if s, ok := status.FromError(err); ok { + if e := errors.Parse(s.Message()); e.Code > 0 { + return e // actually a micro error + } + return errors.InternalServerError("go.micro.client", s.Message()) + } + + // do nothing + return err +} diff --git a/grpc.go b/grpc.go new file mode 100644 index 0000000..fabc5fe --- /dev/null +++ b/grpc.go @@ -0,0 +1,541 @@ +// Package grpc provides a gRPC client +package grpc + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "sync" + "time" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/metadata" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" + "github.com/micro/go-micro/transport" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/encoding" + gmetadata "google.golang.org/grpc/metadata" +) + +type grpcClient struct { + once sync.Once + opts client.Options + pool *pool +} + +func init() { + encoding.RegisterCodec(jsonCodec{}) + encoding.RegisterCodec(bytesCodec{}) + + cmd.DefaultClients["grpc"] = NewClient +} + +// secure returns the dial option for whether its a secure or insecure connection +func (g *grpcClient) secure() grpc.DialOption { + if g.opts.Context != nil { + if v := g.opts.Context.Value(tlsAuth{}); v != nil { + tls := v.(*tls.Config) + creds := credentials.NewTLS(tls) + return grpc.WithTransportCredentials(creds) + } + } + return grpc.WithInsecure() +} + +func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { + // return remote address + if len(opts.Address) > 0 { + return func() (*registry.Node, error) { + return ®istry.Node{ + Address: opts.Address, + }, nil + }, nil + } + + // get next nodes from the selector + next, err := g.opts.Selector.Select(request.Service(), opts.SelectOptions...) + if err != nil && err == selector.ErrNotFound { + return nil, errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + return next, nil +} + +func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { + address := node.Address + if node.Port > 0 { + address = fmt.Sprintf("%s:%d", address, node.Port) + } + + header := make(map[string]string) + if md, ok := metadata.FromContext(ctx); ok { + for k, v := range md { + header[k] = v + } + } + + // set timeout in nanoseconds + header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) + // set the content type for the request + header["x-content-type"] = req.ContentType() + + md := gmetadata.New(header) + ctx = gmetadata.NewOutgoingContext(ctx, md) + + cf, err := g.newGRPCCodec(req.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + maxRecvMsgSize := g.maxRecvMsgSizeValue() + maxSendMsgSize := g.maxSendMsgSizeValue() + + var grr error + + cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(cf)), + grpc.WithTimeout(opts.DialTimeout), g.secure(), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(maxRecvMsgSize), + grpc.MaxCallSendMsgSize(maxSendMsgSize), + )) + if err != nil { + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) + } + defer func() { + // defer execution of release + g.pool.release(address, cc, grr) + }() + + ch := make(chan error, 1) + + go func() { + err := cc.Invoke(ctx, methodToGRPC(req.Endpoint(), req.Body()), req.Body(), rsp, grpc.CallContentSubtype(cf.String())) + ch <- microError(err) + }() + + select { + case err := <-ch: + grr = err + case <-ctx.Done(): + grr = ctx.Err() + } + + return grr +} + +func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, opts client.CallOptions) (client.Stream, error) { + address := node.Address + if node.Port > 0 { + address = fmt.Sprintf("%s:%d", address, node.Port) + } + + header := make(map[string]string) + if md, ok := metadata.FromContext(ctx); ok { + for k, v := range md { + header[k] = v + } + } + + // set timeout in nanoseconds + header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) + // set the content type for the request + header["x-content-type"] = req.ContentType() + + md := gmetadata.New(header) + ctx = gmetadata.NewOutgoingContext(ctx, md) + + cf, err := g.newGRPCCodec(req.ContentType()) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + var dialCtx context.Context + var cancel context.CancelFunc + if opts.DialTimeout >= 0 { + dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout) + } else { + dialCtx, cancel = context.WithCancel(ctx) + } + defer cancel() + cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(cf)), g.secure()) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) + } + + desc := &grpc.StreamDesc{ + StreamName: req.Service() + req.Endpoint(), + ClientStreams: true, + ServerStreams: true, + } + + st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Endpoint(), req.Body()), grpc.CallContentSubtype(cf.String())) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) + } + + return &grpcStream{ + context: ctx, + request: req, + stream: st, + conn: cc, + }, nil +} + +func (g *grpcClient) maxRecvMsgSizeValue() int { + if g.opts.Context == nil { + return DefaultMaxRecvMsgSize + } + v := g.opts.Context.Value(maxRecvMsgSizeKey{}) + if v == nil { + return DefaultMaxRecvMsgSize + } + return v.(int) +} + +func (g *grpcClient) maxSendMsgSizeValue() int { + if g.opts.Context == nil { + return DefaultMaxSendMsgSize + } + v := g.opts.Context.Value(maxSendMsgSizeKey{}) + if v == nil { + return DefaultMaxSendMsgSize + } + return v.(int) +} + +func (g *grpcClient) newGRPCCodec(contentType string) (grpc.Codec, error) { + codecs := make(map[string]encoding.Codec) + if g.opts.Context != nil { + if v := g.opts.Context.Value(codecsKey{}); v != nil { + codecs = v.(map[string]encoding.Codec) + } + } + if c, ok := codecs[contentType]; ok { + return wrapCodec{c}, nil + } + if c, ok := defaultGRPCCodecs[contentType]; ok { + return wrapCodec{c}, nil + } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) +} + +func (g *grpcClient) newCodec(contentType string) (codec.NewCodec, error) { + if c, ok := g.opts.Codecs[contentType]; ok { + return c, nil + } + if cf, ok := defaultRPCCodecs[contentType]; ok { + return cf, nil + } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) +} + +func (g *grpcClient) Init(opts ...client.Option) error { + size := g.opts.PoolSize + ttl := g.opts.PoolTTL + + for _, o := range opts { + o(&g.opts) + } + + // update pool configuration if the options changed + if size != g.opts.PoolSize || ttl != g.opts.PoolTTL { + g.pool.Lock() + g.pool.size = g.opts.PoolSize + g.pool.ttl = int64(g.opts.PoolTTL.Seconds()) + g.pool.Unlock() + } + + return nil +} + +func (g *grpcClient) Options() client.Options { + return g.opts +} + +func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { + return newGRPCPublication(topic, msg, "application/octet-stream") +} + +func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { + return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...) +} + +func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + // make a copy of call opts + callOpts := g.opts.CallOptions + for _, opt := range opts { + opt(&callOpts) + } + + next, err := g.next(req, callOpts) + if err != nil { + return err + } + + // check if we already have a deadline + d, ok := ctx.Deadline() + if !ok { + // no deadline so we create a new one + ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) + } else { + // got a deadline so no need to setup context + // but we need to set the timeout we pass along + opt := client.WithRequestTimeout(time.Until(d)) + opt(&callOpts) + } + + // should we noop right here? + select { + case <-ctx.Done(): + return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + default: + } + + // make copy of call method + gcall := g.call + + // wrap the call in reverse + for i := len(callOpts.CallWrappers); i > 0; i-- { + gcall = callOpts.CallWrappers[i-1](gcall) + } + + // return errors.New("go.micro.client", "request timeout", 408) + call := func(i int) error { + // call backoff first. Someone may want an initial start delay + t, err := callOpts.Backoff(ctx, req, i) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + + // select next node + node, err := next() + if err != nil && err == selector.ErrNotFound { + return errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // make the call + err = gcall(ctx, node, req, rsp, callOpts) + g.opts.Selector.Mark(req.Service(), node, err) + return err + } + + ch := make(chan error, callOpts.Retries+1) + var gerr error + + for i := 0; i <= callOpts.Retries; i++ { + go func() { + ch <- call(i) + }() + + select { + case <-ctx.Done(): + return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + case err := <-ch: + // if the call succeeded lets bail early + if err == nil { + return nil + } + + retry, rerr := callOpts.Retry(ctx, req, i, err) + if rerr != nil { + return rerr + } + + if !retry { + return err + } + + gerr = err + } + } + + return gerr +} + +func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + // make a copy of call opts + callOpts := g.opts.CallOptions + for _, opt := range opts { + opt(&callOpts) + } + + next, err := g.next(req, callOpts) + if err != nil { + return nil, err + } + + // #200 - streams shouldn't have a request timeout set on the context + + // should we noop right here? + select { + case <-ctx.Done(): + return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + default: + } + + call := func(i int) (client.Stream, error) { + // call backoff first. Someone may want an initial start delay + t, err := callOpts.Backoff(ctx, req, i) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + + node, err := next() + if err != nil && err == selector.ErrNotFound { + return nil, errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + stream, err := g.stream(ctx, node, req, callOpts) + g.opts.Selector.Mark(req.Service(), node, err) + return stream, err + } + + type response struct { + stream client.Stream + err error + } + + ch := make(chan response, callOpts.Retries+1) + var grr error + + for i := 0; i <= callOpts.Retries; i++ { + go func() { + s, err := call(i) + ch <- response{s, err} + }() + + select { + case <-ctx.Done(): + return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + case rsp := <-ch: + // if the call succeeded lets bail early + if rsp.err == nil { + return rsp.stream, nil + } + + retry, rerr := callOpts.Retry(ctx, req, i, err) + if rerr != nil { + return nil, rerr + } + + if !retry { + return nil, rsp.err + } + + grr = rsp.err + } + } + + return nil, grr +} + +func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + md, ok := metadata.FromContext(ctx) + if !ok { + md = make(map[string]string) + } + md["Content-Type"] = p.ContentType() + + cf, err := g.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + b := &buffer{bytes.NewBuffer(nil)} + if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Payload()); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + g.once.Do(func() { + g.opts.Broker.Connect() + }) + + return g.opts.Broker.Publish(p.Topic(), &broker.Message{ + Header: md, + Body: b.Bytes(), + }) +} + +func (g *grpcClient) String() string { + return "grpc" +} + +func newClient(opts ...client.Option) client.Client { + options := client.Options{ + Codecs: make(map[string]codec.NewCodec), + CallOptions: client.CallOptions{ + Backoff: client.DefaultBackoff, + Retry: client.DefaultRetry, + Retries: client.DefaultRetries, + RequestTimeout: client.DefaultRequestTimeout, + DialTimeout: transport.DefaultDialTimeout, + }, + PoolSize: client.DefaultPoolSize, + PoolTTL: client.DefaultPoolTTL, + } + + for _, o := range opts { + o(&options) + } + + if len(options.ContentType) == 0 { + options.ContentType = "application/grpc+proto" + } + + if options.Broker == nil { + options.Broker = broker.DefaultBroker + } + + if options.Registry == nil { + options.Registry = registry.DefaultRegistry + } + + if options.Selector == nil { + options.Selector = selector.NewSelector( + selector.Registry(options.Registry), + ) + } + + rc := &grpcClient{ + once: sync.Once{}, + opts: options, + pool: newPool(options.PoolSize, options.PoolTTL), + } + + c := client.Client(rc) + + // wrap in reverse + for i := len(options.Wrappers); i > 0; i-- { + c = options.Wrappers[i-1](c) + } + + return c +} + +func NewClient(opts ...client.Option) client.Client { + return newClient(opts...) +} diff --git a/grpc_pool.go b/grpc_pool.go new file mode 100644 index 0000000..ea084e7 --- /dev/null +++ b/grpc_pool.go @@ -0,0 +1,83 @@ +package grpc + +import ( + "sync" + "time" + + "google.golang.org/grpc" +) + +type pool struct { + size int + ttl int64 + + sync.Mutex + conns map[string][]*poolConn +} + +type poolConn struct { + *grpc.ClientConn + created int64 +} + +func newPool(size int, ttl time.Duration) *pool { + return &pool{ + size: size, + ttl: int64(ttl.Seconds()), + conns: make(map[string][]*poolConn), + } +} + +func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) { + p.Lock() + conns := p.conns[addr] + now := time.Now().Unix() + + // while we have conns check age and then return one + // otherwise we'll create a new conn + for len(conns) > 0 { + conn := conns[len(conns)-1] + conns = conns[:len(conns)-1] + p.conns[addr] = conns + + // if conn is old kill it and move on + if d := now - conn.created; d > p.ttl { + conn.ClientConn.Close() + continue + } + + // we got a good conn, lets unlock and return it + p.Unlock() + + return conn, nil + } + + p.Unlock() + + // create new conn + cc, err := grpc.Dial(addr, opts...) + if err != nil { + return nil, err + } + + return &poolConn{cc, time.Now().Unix()}, nil +} + +func (p *pool) release(addr string, conn *poolConn, err error) { + // don't store the conn if it has errored + if err != nil { + conn.ClientConn.Close() + return + } + + // otherwise put it back for reuse + p.Lock() + conns := p.conns[addr] + if len(conns) >= p.size { + p.Unlock() + conn.ClientConn.Close() + return + } + p.conns[addr] = append(conns, conn) + p.Unlock() +} diff --git a/grpc_pool_test.go b/grpc_pool_test.go new file mode 100644 index 0000000..955e18c --- /dev/null +++ b/grpc_pool_test.go @@ -0,0 +1,64 @@ +package grpc + +import ( + "net" + "testing" + "time" + + "context" + "google.golang.org/grpc" + pgrpc "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/helloworld/helloworld" +) + +func testPool(t *testing.T, size int, ttl time.Duration) { + // setup server + l, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer l.Close() + + s := pgrpc.NewServer() + pb.RegisterGreeterServer(s, &greeterServer{}) + + go s.Serve(l) + defer s.Stop() + + // zero pool + p := newPool(size, ttl) + + for i := 0; i < 10; i++ { + // get a conn + cc, err := p.getConn(l.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatal(err) + } + + rsp := pb.HelloReply{} + + err = cc.Invoke(context.TODO(), "/helloworld.Greeter/SayHello", &pb.HelloRequest{Name: "John"}, &rsp) + if err != nil { + t.Fatal(err) + } + + if rsp.Message != "Hello John" { + t.Fatalf("Got unexpected response %v", rsp.Message) + } + + // release the conn + p.release(l.Addr().String(), cc, nil) + + p.Lock() + if i := len(p.conns[l.Addr().String()]); i > size { + p.Unlock() + t.Fatalf("pool size %d is greater than expected %d", i, size) + } + p.Unlock() + } +} + +func TestGRPCPool(t *testing.T) { + testPool(t, 0, time.Minute) + testPool(t, 2, time.Minute) +} diff --git a/grpc_test.go b/grpc_test.go new file mode 100644 index 0000000..574058c --- /dev/null +++ b/grpc_test.go @@ -0,0 +1,91 @@ +package grpc + +import ( + "context" + "net" + "strconv" + "strings" + "testing" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/registry/memory" + "github.com/micro/go-micro/selector" + pgrpc "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/helloworld/helloworld" +) + +// server is used to implement helloworld.GreeterServer. +type greeterServer struct{} + +// SayHello implements helloworld.GreeterServer +func (g *greeterServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { + return &pb.HelloReply{Message: "Hello " + in.Name}, nil +} + +func TestGRPCClient(t *testing.T) { + l, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer l.Close() + + s := pgrpc.NewServer() + pb.RegisterGreeterServer(s, &greeterServer{}) + + go s.Serve(l) + defer s.Stop() + + parts := strings.Split(l.Addr().String(), ":") + port, _ := strconv.Atoi(parts[len(parts)-1]) + addr := strings.Join(parts[:len(parts)-1], ":") + + // create mock registry + r := memory.NewRegistry() + + // register service + r.Register(®istry.Service{ + Name: "test", + Version: "test", + Nodes: []*registry.Node{ + ®istry.Node{ + Id: "test-1", + Address: addr, + Port: port, + }, + }, + }) + + // create selector + se := selector.NewSelector( + selector.Registry(r), + ) + + // create client + c := NewClient( + client.Registry(r), + client.Selector(se), + ) + + testMethods := []string{ + "/helloworld.Greeter/SayHello", + "Greeter.SayHello", + } + + for _, method := range testMethods { + req := c.NewRequest("test", method, &pb.HelloRequest{ + Name: "John", + }) + + rsp := pb.HelloReply{} + + err = c.Call(context.TODO(), req, &rsp) + if err != nil { + t.Fatal(err) + } + + if rsp.Message != "Hello John" { + t.Fatalf("Got unexpected response %v", rsp.Message) + } + } +} diff --git a/message.go b/message.go new file mode 100644 index 0000000..6938064 --- /dev/null +++ b/message.go @@ -0,0 +1,40 @@ +package grpc + +import ( + "github.com/micro/go-micro/client" +) + +type grpcPublication struct { + topic string + contentType string + payload interface{} +} + +func newGRPCPublication(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { + var options client.MessageOptions + for _, o := range opts { + o(&options) + } + + if len(options.ContentType) > 0 { + contentType = options.ContentType + } + + return &grpcPublication{ + payload: payload, + topic: topic, + contentType: contentType, + } +} + +func (g *grpcPublication) ContentType() string { + return g.contentType +} + +func (g *grpcPublication) Topic() string { + return g.topic +} + +func (g *grpcPublication) Payload() interface{} { + return g.payload +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..c702ade --- /dev/null +++ b/options.go @@ -0,0 +1,74 @@ +// Package grpc provides a gRPC options +package grpc + +import ( + "context" + "crypto/tls" + + "github.com/micro/go-micro/client" + "google.golang.org/grpc/encoding" +) + +var ( + // DefaultMaxRecvMsgSize maximum message that client can receive + // (4 MB). + DefaultMaxRecvMsgSize = 1024 * 1024 * 4 + + // DefaultMaxSendMsgSize maximum message that client can send + // (4 MB). + DefaultMaxSendMsgSize = 1024 * 1024 * 4 +) + +type codecsKey struct{} +type tlsAuth struct{} +type maxRecvMsgSizeKey struct{} +type maxSendMsgSizeKey struct{} + +// gRPC Codec to be used to encode/decode requests for a given content type +func Codec(contentType string, c encoding.Codec) client.Option { + return func(o *client.Options) { + codecs := make(map[string]encoding.Codec) + if o.Context == nil { + o.Context = context.Background() + } + if v := o.Context.Value(codecsKey{}); v != nil { + codecs = v.(map[string]encoding.Codec) + } + codecs[contentType] = c + o.Context = context.WithValue(o.Context, codecsKey{}, codecs) + } +} + +// AuthTLS should be used to setup a secure authentication using TLS +func AuthTLS(t *tls.Config) client.Option { + return func(o *client.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, tlsAuth{}, t) + } +} + +// +// MaxRecvMsgSize set the maximum size of message that client can receive. +// +func MaxRecvMsgSize(s int) client.Option { + return func(o *client.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, maxRecvMsgSizeKey{}, s) + } +} + +// +// MaxSendMsgSize set the maximum size of message that client can send. +// +func MaxSendMsgSize(s int) client.Option { + return func(o *client.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, maxSendMsgSizeKey{}, s) + } +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..28409c4 --- /dev/null +++ b/request.go @@ -0,0 +1,92 @@ +package grpc + +import ( + "fmt" + "reflect" + "strings" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/codec" +) + +type grpcRequest struct { + service string + method string + contentType string + request interface{} + opts client.RequestOptions +} + +func methodToGRPC(method string, request interface{}) string { + // no method or already grpc method + if len(method) == 0 || method[0] == '/' { + return method + } + // can't operate on nil request + t := reflect.TypeOf(request) + if t == nil { + return method + } + // dereference + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + // get package name + pParts := strings.Split(t.PkgPath(), "/") + pkg := pParts[len(pParts)-1] + // assume method is Foo.Bar + mParts := strings.Split(method, ".") + if len(mParts) != 2 { + return method + } + // return /pkg.Foo/Bar + return fmt.Sprintf("/%s.%s/%s", pkg, mParts[0], mParts[1]) +} + +func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { + var opts client.RequestOptions + for _, o := range reqOpts { + o(&opts) + } + + // set the content-type specified + if len(opts.ContentType) > 0 { + contentType = opts.ContentType + } + + return &grpcRequest{ + service: service, + method: method, + request: request, + contentType: contentType, + opts: opts, + } +} + +func (g *grpcRequest) ContentType() string { + return g.contentType +} + +func (g *grpcRequest) Service() string { + return g.service +} + +func (g *grpcRequest) Method() string { + return g.method +} + +func (g *grpcRequest) Endpoint() string { + return g.method +} + +func (g *grpcRequest) Codec() codec.Writer { + return nil +} + +func (g *grpcRequest) Body() interface{} { + return g.request +} + +func (g *grpcRequest) Stream() bool { + return g.opts.Stream +} diff --git a/request_test.go b/request_test.go new file mode 100644 index 0000000..eab3a3a --- /dev/null +++ b/request_test.go @@ -0,0 +1,48 @@ +package grpc + +import ( + "testing" + + pb "google.golang.org/grpc/examples/helloworld/helloworld" +) + +func TestMethodToGRPC(t *testing.T) { + testData := []struct { + method string + expect string + request interface{} + }{ + { + "Greeter.SayHello", + "/helloworld.Greeter/SayHello", + new(pb.HelloRequest), + }, + { + "/helloworld.Greeter/SayHello", + "/helloworld.Greeter/SayHello", + new(pb.HelloRequest), + }, + { + "Greeter.SayHello", + "/helloworld.Greeter/SayHello", + pb.HelloRequest{}, + }, + { + "/helloworld.Greeter/SayHello", + "/helloworld.Greeter/SayHello", + pb.HelloRequest{}, + }, + { + "Greeter.SayHello", + "Greeter.SayHello", + nil, + }, + } + + for _, d := range testData { + method := methodToGRPC(d.method, d.request) + if method != d.expect { + t.Fatalf("expected %s got %s", d.expect, method) + } + } +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..bc00dbf --- /dev/null +++ b/stream.go @@ -0,0 +1,77 @@ +package grpc + +import ( + "context" + "io" + "sync" + + "github.com/micro/go-micro/client" + "google.golang.org/grpc" +) + +// Implements the streamer interface +type grpcStream struct { + sync.RWMutex + err error + conn *grpc.ClientConn + request client.Request + stream grpc.ClientStream + context context.Context +} + +func (g *grpcStream) Context() context.Context { + return g.context +} + +func (g *grpcStream) Request() client.Request { + return g.request +} + +func (g *grpcStream) Response() client.Response { + return nil +} + +func (g *grpcStream) Send(msg interface{}) error { + if err := g.stream.SendMsg(msg); err != nil { + g.setError(err) + return err + } + return nil +} + +func (g *grpcStream) Recv(msg interface{}) (err error) { + defer g.setError(err) + if err = g.stream.RecvMsg(msg); err != nil { + if err == io.EOF { + // #202 - inconsistent gRPC stream behavior + // the only way to tell if the stream is done is when we get a EOF on the Recv + // here we should close the underlying gRPC ClientConn + closeErr := g.conn.Close() + if closeErr != nil { + err = closeErr + } + } + } + return +} + +func (g *grpcStream) Error() error { + g.RLock() + defer g.RUnlock() + return g.err +} + +func (g *grpcStream) setError(e error) { + g.Lock() + g.err = e + g.Unlock() +} + +// Close the gRPC send stream +// #202 - inconsistent gRPC stream behavior +// The underlying gRPC stream should not be closed here since the +// stream should still be able to receive after this function call +// TODO: should the conn be closed in another way? +func (g *grpcStream) Close() error { + return g.stream.CloseSend() +}