From 25a0d05ac9ccef5bf3d0ec3da3b0c06f7886c7fe Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 24 May 2019 17:15:59 +0100 Subject: [PATCH] Add grpc transport --- cmd/cmd.go | 2 + transport/grpc/grpc.go | 176 ++++++++++++++++++++++ transport/grpc/grpc_test.go | 109 ++++++++++++++ transport/grpc/handler.go | 39 +++++ transport/grpc/proto/transport.micro.go | 170 +++++++++++++++++++++ transport/grpc/proto/transport.pb.go | 188 ++++++++++++++++++++++++ transport/grpc/proto/transport.proto | 12 ++ transport/grpc/socket.go | 97 ++++++++++++ 8 files changed, 793 insertions(+) create mode 100644 transport/grpc/grpc.go create mode 100644 transport/grpc/grpc_test.go create mode 100644 transport/grpc/handler.go create mode 100644 transport/grpc/proto/transport.micro.go create mode 100644 transport/grpc/proto/transport.pb.go create mode 100644 transport/grpc/proto/transport.proto create mode 100644 transport/grpc/socket.go diff --git a/cmd/cmd.go b/cmd/cmd.go index c4987dbf..a33e016a 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -33,6 +33,7 @@ import ( // transports "github.com/micro/go-micro/transport" + tgrpc "github.com/micro/go-micro/transport/grpc" thttp "github.com/micro/go-micro/transport/http" tmem "github.com/micro/go-micro/transport/memory" ) @@ -197,6 +198,7 @@ var ( DefaultTransports = map[string]func(...transport.Option) transport.Transport{ "memory": tmem.NewTransport, "http": thttp.NewTransport, + "grpc": tgrpc.NewTransport, } // used for default selection as the fall back diff --git a/transport/grpc/grpc.go b/transport/grpc/grpc.go new file mode 100644 index 00000000..5ea2421b --- /dev/null +++ b/transport/grpc/grpc.go @@ -0,0 +1,176 @@ +// Package grpc provides a grpc transport +package grpc + +import ( + "context" + "crypto/tls" + "net" + + "github.com/micro/go-micro/transport" + maddr "github.com/micro/util/go/lib/addr" + mnet "github.com/micro/util/go/lib/net" + mls "github.com/micro/util/go/lib/tls" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + pb "github.com/micro/go-plugins/transport/grpc/proto" +) + +type grpcTransport struct { + opts transport.Options +} + +type grpcTransportListener struct { + listener net.Listener + secure bool + tls *tls.Config +} + +func getTLSConfig(addr string) (*tls.Config, error) { + hosts := []string{addr} + + // check if its a valid host:port + if host, _, err := net.SplitHostPort(addr); err == nil { + if len(host) == 0 { + hosts = maddr.IPs() + } else { + hosts = []string{host} + } + } + + // generate a certificate + cert, err := mls.Certificate(hosts...) + if err != nil { + return nil, err + } + + return &tls.Config{Certificates: []tls.Certificate{cert}}, nil +} + +func (t *grpcTransportListener) Addr() string { + return t.listener.Addr().String() +} + +func (t *grpcTransportListener) Close() error { + return t.listener.Close() +} + +func (t *grpcTransportListener) Accept(fn func(transport.Socket)) error { + var opts []grpc.ServerOption + + // setup tls if specified + if t.secure || t.tls != nil { + config := t.tls + if config == nil { + var err error + addr := t.listener.Addr().String() + config, err = getTLSConfig(addr) + if err != nil { + return err + } + } + + creds := credentials.NewTLS(config) + opts = append(opts, grpc.Creds(creds)) + } + + // new service + srv := grpc.NewServer(opts...) + + // register service + pb.RegisterTransportServer(srv, µTransport{addr: t.listener.Addr().String(), fn: fn}) + + // start serving + return srv.Serve(t.listener) +} + +func (t *grpcTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { + dopts := transport.DialOptions{ + Timeout: transport.DefaultDialTimeout, + } + + for _, opt := range opts { + opt(&dopts) + } + + options := []grpc.DialOption{ + grpc.WithTimeout(dopts.Timeout), + } + + if t.opts.Secure || t.opts.TLSConfig != nil { + config := t.opts.TLSConfig + if config == nil { + config = &tls.Config{ + InsecureSkipVerify: true, + } + } + creds := credentials.NewTLS(config) + options = append(options, grpc.WithTransportCredentials(creds)) + } else { + options = append(options, grpc.WithInsecure()) + } + + // dial the server + conn, err := grpc.Dial(addr, options...) + if err != nil { + return nil, err + } + + // create stream + stream, err := pb.NewTransportClient(conn).Stream(context.Background()) + if err != nil { + return nil, err + } + + // return a client + return &grpcTransportClient{ + conn: conn, + stream: stream, + local: "localhost", + remote: addr, + }, nil +} + +func (t *grpcTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) { + var options transport.ListenOptions + for _, o := range opts { + o(&options) + } + + ln, err := mnet.Listen(addr, func(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) + }) + if err != nil { + return nil, err + } + + return &grpcTransportListener{ + listener: ln, + tls: t.opts.TLSConfig, + secure: t.opts.Secure, + }, nil +} + +func (t *grpcTransport) Init(opts ...transport.Option) error { + for _, o := range opts { + o(&t.opts) + } + return nil +} + +func (t *grpcTransport) Options() transport.Options { + return t.opts +} + +func (t *grpcTransport) String() string { + return "grpc" +} + +func NewTransport(opts ...transport.Option) transport.Transport { + var options transport.Options + for _, o := range opts { + o(&options) + } + return &grpcTransport{opts: options} +} diff --git a/transport/grpc/grpc_test.go b/transport/grpc/grpc_test.go new file mode 100644 index 00000000..d4e82346 --- /dev/null +++ b/transport/grpc/grpc_test.go @@ -0,0 +1,109 @@ +package grpc + +import ( + "strings" + "testing" + + "github.com/micro/go-micro/transport" +) + +func expectedPort(t *testing.T, expected string, lsn transport.Listener) { + parts := strings.Split(lsn.Addr(), ":") + port := parts[len(parts)-1] + + if port != expected { + lsn.Close() + t.Errorf("Expected address to be `%s`, got `%s`", expected, port) + } +} + +func TestGRPCTransportPortRange(t *testing.T) { + tp := NewTransport() + + lsn1, err := tp.Listen(":44444-44448") + if err != nil { + t.Errorf("Did not expect an error, got %s", err) + } + expectedPort(t, "44444", lsn1) + + lsn2, err := tp.Listen(":44444-44448") + if err != nil { + t.Errorf("Did not expect an error, got %s", err) + } + expectedPort(t, "44445", lsn2) + + lsn, err := tp.Listen(":0") + if err != nil { + t.Errorf("Did not expect an error, got %s", err) + } + + lsn.Close() + lsn1.Close() + lsn2.Close() +} + +func TestGRPCTransportCommunication(t *testing.T) { + tr := NewTransport() + + l, err := tr.Listen(":0") + if err != nil { + t.Errorf("Unexpected listen err: %v", err) + } + defer l.Close() + + fn := func(sock transport.Socket) { + defer sock.Close() + + for { + var m transport.Message + if err := sock.Recv(&m); err != nil { + return + } + + if err := sock.Send(&m); err != nil { + return + } + } + } + + done := make(chan bool) + + go func() { + if err := l.Accept(fn); err != nil { + select { + case <-done: + default: + t.Errorf("Unexpected accept err: %v", err) + } + } + }() + + c, err := tr.Dial(l.Addr()) + if err != nil { + t.Errorf("Unexpected dial err: %v", err) + } + defer c.Close() + + m := transport.Message{ + Header: map[string]string{ + "X-Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + if err := c.Send(&m); err != nil { + t.Errorf("Unexpected send err: %v", err) + } + + var rm transport.Message + + if err := c.Recv(&rm); err != nil { + t.Errorf("Unexpected recv err: %v", err) + } + + if string(rm.Body) != string(m.Body) { + t.Errorf("Expected %v, got %v", m.Body, rm.Body) + } + + close(done) +} diff --git a/transport/grpc/handler.go b/transport/grpc/handler.go new file mode 100644 index 00000000..c28ba146 --- /dev/null +++ b/transport/grpc/handler.go @@ -0,0 +1,39 @@ +package grpc + +import ( + "runtime/debug" + + "github.com/micro/go-log" + "github.com/micro/go-micro/transport" + pb "github.com/micro/go-plugins/transport/grpc/proto" + "google.golang.org/grpc/peer" +) + +// microTransport satisfies the pb.TransportServer inteface +type microTransport struct { + addr string + fn func(transport.Socket) +} + +func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { + sock := &grpcTransportSocket{ + stream: ts, + local: m.addr, + } + + p, ok := peer.FromContext(ts.Context()) + if ok { + sock.remote = p.Addr.String() + } + + defer func() { + if r := recover(); r != nil { + log.Log(r, string(debug.Stack())) + sock.Close() + } + }() + + // execute socket func + m.fn(sock) + return nil +} diff --git a/transport/grpc/proto/transport.micro.go b/transport/grpc/proto/transport.micro.go new file mode 100644 index 00000000..bec3b82a --- /dev/null +++ b/transport/grpc/proto/transport.micro.go @@ -0,0 +1,170 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: github.com/micro/go-plugins/transport/grpc/proto/transport.proto + +/* +Package go_micro_grpc_transport is a generated protocol buffer package. + +It is generated from these files: + github.com/micro/go-plugins/transport/grpc/proto/transport.proto + +It has these top-level messages: + Message +*/ +package go_micro_grpc_transport + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "context" + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Transport service + +type TransportService interface { + Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) +} + +type transportService struct { + c client.Client + name string +} + +func NewTransportService(name string, c client.Client) TransportService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "go.micro.grpc.transport" + } + return &transportService{ + c: c, + name: name, + } +} + +func (c *transportService) Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) { + req := c.c.NewRequest(c.name, "Transport.Stream", &Message{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + return &transportServiceStream{stream}, nil +} + +type Transport_StreamService interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Message) error + Recv() (*Message, error) +} + +type transportServiceStream struct { + stream client.Stream +} + +func (x *transportServiceStream) Close() error { + return x.stream.Close() +} + +func (x *transportServiceStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *transportServiceStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *transportServiceStream) Send(m *Message) error { + return x.stream.Send(m) +} + +func (x *transportServiceStream) Recv() (*Message, error) { + m := new(Message) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Transport service + +type TransportHandler interface { + Stream(context.Context, Transport_StreamStream) error +} + +func RegisterTransportHandler(s server.Server, hdlr TransportHandler, opts ...server.HandlerOption) { + type transport interface { + Stream(ctx context.Context, stream server.Stream) error + } + type Transport struct { + transport + } + h := &transportHandler{hdlr} + s.Handle(s.NewHandler(&Transport{h}, opts...)) +} + +type transportHandler struct { + TransportHandler +} + +func (h *transportHandler) Stream(ctx context.Context, stream server.Stream) error { + return h.TransportHandler.Stream(ctx, &transportStreamStream{stream}) +} + +type Transport_StreamStream interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Message) error + Recv() (*Message, error) +} + +type transportStreamStream struct { + stream server.Stream +} + +func (x *transportStreamStream) Close() error { + return x.stream.Close() +} + +func (x *transportStreamStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *transportStreamStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *transportStreamStream) Send(m *Message) error { + return x.stream.Send(m) +} + +func (x *transportStreamStream) Recv() (*Message, error) { + m := new(Message) + if err := x.stream.Recv(m); err != nil { + return nil, err + } + return m, nil +} diff --git a/transport/grpc/proto/transport.pb.go b/transport/grpc/proto/transport.pb.go new file mode 100644 index 00000000..2209e425 --- /dev/null +++ b/transport/grpc/proto/transport.pb.go @@ -0,0 +1,188 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: github.com/micro/go-plugins/transport/grpc/proto/transport.proto + +/* +Package go_micro_grpc_transport is a generated protocol buffer package. + +It is generated from these files: + github.com/micro/go-plugins/transport/grpc/proto/transport.proto + +It has these top-level messages: + Message +*/ +package go_micro_grpc_transport + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Message struct { + Header map[string]string `protobuf:"bytes,1,rep,name=header" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Message) GetHeader() map[string]string { + if m != nil { + return m.Header + } + return nil +} + +func (m *Message) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +func init() { + proto.RegisterType((*Message)(nil), "go.micro.grpc.transport.Message") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for Transport service + +type TransportClient interface { + Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) +} + +type transportClient struct { + cc *grpc.ClientConn +} + +func NewTransportClient(cc *grpc.ClientConn) TransportClient { + return &transportClient{cc} +} + +func (c *transportClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Transport_serviceDesc.Streams[0], c.cc, "/go.micro.grpc.transport.Transport/Stream", opts...) + if err != nil { + return nil, err + } + x := &transportStreamClient{stream} + return x, nil +} + +type Transport_StreamClient interface { + Send(*Message) error + Recv() (*Message, error) + grpc.ClientStream +} + +type transportStreamClient struct { + grpc.ClientStream +} + +func (x *transportStreamClient) Send(m *Message) error { + return x.ClientStream.SendMsg(m) +} + +func (x *transportStreamClient) Recv() (*Message, error) { + m := new(Message) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for Transport service + +type TransportServer interface { + Stream(Transport_StreamServer) error +} + +func RegisterTransportServer(s *grpc.Server, srv TransportServer) { + s.RegisterService(&_Transport_serviceDesc, srv) +} + +func _Transport_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TransportServer).Stream(&transportStreamServer{stream}) +} + +type Transport_StreamServer interface { + Send(*Message) error + Recv() (*Message, error) + grpc.ServerStream +} + +type transportStreamServer struct { + grpc.ServerStream +} + +func (x *transportStreamServer) Send(m *Message) error { + return x.ServerStream.SendMsg(m) +} + +func (x *transportStreamServer) Recv() (*Message, error) { + m := new(Message) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _Transport_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.grpc.transport.Transport", + HandlerType: (*TransportServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Stream", + Handler: _Transport_Stream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "github.com/micro/go-plugins/transport/grpc/proto/transport.proto", +} + +func init() { + proto.RegisterFile("github.com/micro/go-plugins/transport/grpc/proto/transport.proto", fileDescriptor0) +} + +var fileDescriptor0 = []byte{ + // 234 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x72, 0x48, 0xcf, 0x2c, 0xc9, + 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0xcf, 0xcd, 0x4c, 0x2e, 0xca, 0xd7, 0x4f, 0xcf, 0xd7, + 0x2d, 0xc8, 0x29, 0x4d, 0xcf, 0xcc, 0x2b, 0xd6, 0x2f, 0x29, 0x4a, 0xcc, 0x2b, 0x2e, 0xc8, 0x2f, + 0x2a, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x47, 0x08, 0xea, 0x81, + 0xf9, 0x42, 0xe2, 0xe9, 0xf9, 0x7a, 0x60, 0x9d, 0x7a, 0x20, 0x45, 0x7a, 0x70, 0x69, 0xa5, 0x79, + 0x8c, 0x5c, 0xec, 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x42, 0x2e, 0x5c, 0x6c, 0x19, 0xa9, + 0x89, 0x29, 0xa9, 0x45, 0x12, 0x8c, 0x0a, 0xcc, 0x1a, 0xdc, 0x46, 0x3a, 0x7a, 0x38, 0x74, 0xe9, + 0x41, 0x75, 0xe8, 0x79, 0x80, 0x95, 0xbb, 0xe6, 0x95, 0x14, 0x55, 0x06, 0x41, 0xf5, 0x0a, 0x09, + 0x71, 0xb1, 0x24, 0xe5, 0xa7, 0x54, 0x4a, 0x30, 0x29, 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x52, + 0x96, 0x5c, 0xdc, 0x48, 0x4a, 0x85, 0x04, 0xb8, 0x98, 0xb3, 0x53, 0x2b, 0x25, 0x18, 0x15, 0x18, + 0x35, 0x38, 0x83, 0x40, 0x4c, 0x21, 0x11, 0x2e, 0xd6, 0xb2, 0xc4, 0x9c, 0xd2, 0x54, 0xb0, 0x2e, + 0xce, 0x20, 0x08, 0xc7, 0x8a, 0xc9, 0x82, 0xd1, 0x28, 0x9e, 0x8b, 0x33, 0x04, 0x66, 0xaf, 0x50, + 0x10, 0x17, 0x5b, 0x70, 0x49, 0x51, 0x6a, 0x62, 0xae, 0x90, 0x02, 0x21, 0xb7, 0x49, 0x11, 0x54, + 0xa1, 0xc4, 0xa0, 0xc1, 0x68, 0xc0, 0x98, 0xc4, 0x06, 0x0e, 0x21, 0x63, 0x40, 0x00, 0x00, 0x00, + 0xff, 0xff, 0x4e, 0xcc, 0x76, 0x38, 0x65, 0x01, 0x00, 0x00, +} diff --git a/transport/grpc/proto/transport.proto b/transport/grpc/proto/transport.proto new file mode 100644 index 00000000..3ceb7622 --- /dev/null +++ b/transport/grpc/proto/transport.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package go.micro.grpc.transport; + +service Transport { + rpc Stream(stream Message) returns (stream Message) {} +} + +message Message { + map header = 1; + bytes body = 2; +} diff --git a/transport/grpc/socket.go b/transport/grpc/socket.go new file mode 100644 index 00000000..b618bc32 --- /dev/null +++ b/transport/grpc/socket.go @@ -0,0 +1,97 @@ +package grpc + +import ( + "github.com/micro/go-micro/transport" + pb "github.com/micro/go-plugins/transport/grpc/proto" + "google.golang.org/grpc" +) + +type grpcTransportClient struct { + conn *grpc.ClientConn + stream pb.Transport_StreamClient + + local string + remote string +} + +type grpcTransportSocket struct { + stream pb.Transport_StreamServer + local string + remote string +} + +func (g *grpcTransportClient) Local() string { + return g.local +} + +func (g *grpcTransportClient) Remote() string { + return g.remote +} + +func (g *grpcTransportClient) Recv(m *transport.Message) error { + if m == nil { + return nil + } + + msg, err := g.stream.Recv() + if err != nil { + return err + } + + m.Header = msg.Header + m.Body = msg.Body + return nil +} + +func (g *grpcTransportClient) Send(m *transport.Message) error { + if m == nil { + return nil + } + + return g.stream.Send(&pb.Message{ + Header: m.Header, + Body: m.Body, + }) +} + +func (g *grpcTransportClient) Close() error { + return g.conn.Close() +} + +func (g *grpcTransportSocket) Local() string { + return g.local +} + +func (g *grpcTransportSocket) Remote() string { + return g.remote +} + +func (g *grpcTransportSocket) Recv(m *transport.Message) error { + if m == nil { + return nil + } + + msg, err := g.stream.Recv() + if err != nil { + return err + } + + m.Header = msg.Header + m.Body = msg.Body + return nil +} + +func (g *grpcTransportSocket) Send(m *transport.Message) error { + if m == nil { + return nil + } + + return g.stream.Send(&pb.Message{ + Header: m.Header, + Body: m.Body, + }) +} + +func (g *grpcTransportSocket) Close() error { + return nil +}