From ee76d866ae31410deca069ed89d3349452885b32 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 23 Aug 2020 18:37:22 +0100 Subject: [PATCH] move transport (#1967) --- grpc.go | 176 ++++++++++++++++++++++++++++++ grpc_test.go | 111 +++++++++++++++++++ handler.go | 43 ++++++++ proto/transport.pb.go | 211 ++++++++++++++++++++++++++++++++++++ proto/transport.pb.micro.go | 175 ++++++++++++++++++++++++++++++ proto/transport.proto | 12 ++ socket.go | 97 +++++++++++++++++ 7 files changed, 825 insertions(+) create mode 100644 grpc.go create mode 100644 grpc_test.go create mode 100644 handler.go create mode 100644 proto/transport.pb.go create mode 100644 proto/transport.pb.micro.go create mode 100644 proto/transport.proto create mode 100644 socket.go diff --git a/grpc.go b/grpc.go new file mode 100644 index 0000000..0c6fbf8 --- /dev/null +++ b/grpc.go @@ -0,0 +1,176 @@ +// Package grpc provides a grpc transport +package grpc + +import ( + "context" + "crypto/tls" + "net" + + "github.com/micro/go-micro/v3/network/transport" + maddr "github.com/micro/go-micro/v3/util/addr" + mnet "github.com/micro/go-micro/v3/util/net" + mls "github.com/micro/go-micro/v3/util/tls" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + pb "github.com/micro/go-micro/v3/network/transport/grpc/proto" +) + +type grpcTransport struct { + opts transport.Options +} + +type grpcTransportListener struct { + listener net.Listener + secure bool + tls *tls.Config +} + +func getTLSConfig(addr string) (*tls.Config, error) { + hosts := []string{addr} + + // check if its a valid host:port + if host, _, err := net.SplitHostPort(addr); err == nil { + if len(host) == 0 { + hosts = maddr.IPs() + } else { + hosts = []string{host} + } + } + + // generate a certificate + cert, err := mls.Certificate(hosts...) + if err != nil { + return nil, err + } + + return &tls.Config{Certificates: []tls.Certificate{cert}}, nil +} + +func (t *grpcTransportListener) Addr() string { + return t.listener.Addr().String() +} + +func (t *grpcTransportListener) Close() error { + return t.listener.Close() +} + +func (t *grpcTransportListener) Accept(fn func(transport.Socket)) error { + var opts []grpc.ServerOption + + // setup tls if specified + if t.secure || t.tls != nil { + config := t.tls + if config == nil { + var err error + addr := t.listener.Addr().String() + config, err = getTLSConfig(addr) + if err != nil { + return err + } + } + + creds := credentials.NewTLS(config) + opts = append(opts, grpc.Creds(creds)) + } + + // new service + srv := grpc.NewServer(opts...) + + // register service + pb.RegisterTransportServer(srv, µTransport{addr: t.listener.Addr().String(), fn: fn}) + + // start serving + return srv.Serve(t.listener) +} + +func (t *grpcTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { + dopts := transport.DialOptions{ + Timeout: transport.DefaultDialTimeout, + } + + for _, opt := range opts { + opt(&dopts) + } + + options := []grpc.DialOption{} + + if t.opts.Secure || t.opts.TLSConfig != nil { + config := t.opts.TLSConfig + if config == nil { + config = &tls.Config{ + InsecureSkipVerify: true, + } + } + creds := credentials.NewTLS(config) + options = append(options, grpc.WithTransportCredentials(creds)) + } else { + options = append(options, grpc.WithInsecure()) + } + + // dial the server + ctx, cancel := context.WithTimeout(context.Background(), dopts.Timeout) + defer cancel() + conn, err := grpc.DialContext(ctx, addr, options...) + if err != nil { + return nil, err + } + + // create stream + stream, err := pb.NewTransportClient(conn).Stream(context.Background()) + if err != nil { + return nil, err + } + + // return a client + return &grpcTransportClient{ + conn: conn, + stream: stream, + local: "localhost", + remote: addr, + }, nil +} + +func (t *grpcTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) { + var options transport.ListenOptions + for _, o := range opts { + o(&options) + } + + ln, err := mnet.Listen(addr, func(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) + }) + if err != nil { + return nil, err + } + + return &grpcTransportListener{ + listener: ln, + tls: t.opts.TLSConfig, + secure: t.opts.Secure, + }, nil +} + +func (t *grpcTransport) Init(opts ...transport.Option) error { + for _, o := range opts { + o(&t.opts) + } + return nil +} + +func (t *grpcTransport) Options() transport.Options { + return t.opts +} + +func (t *grpcTransport) String() string { + return "grpc" +} + +func NewTransport(opts ...transport.Option) transport.Transport { + var options transport.Options + for _, o := range opts { + o(&options) + } + return &grpcTransport{opts: options} +} diff --git a/grpc_test.go b/grpc_test.go new file mode 100644 index 0000000..35cf9c1 --- /dev/null +++ b/grpc_test.go @@ -0,0 +1,111 @@ +package grpc + +import ( + "net" + "testing" + + "github.com/micro/go-micro/v3/network/transport" +) + +func expectedPort(t *testing.T, expected string, lsn transport.Listener) { + _, port, err := net.SplitHostPort(lsn.Addr()) + if err != nil { + t.Errorf("Expected address to be `%s`, got error: %v", expected, err) + } + + if port != expected { + lsn.Close() + t.Errorf("Expected address to be `%s`, got `%s`", expected, port) + } +} + +func TestGRPCTransportPortRange(t *testing.T) { + tp := NewTransport() + + lsn1, err := tp.Listen(":44444-44448") + if err != nil { + t.Errorf("Did not expect an error, got %s", err) + } + expectedPort(t, "44444", lsn1) + + lsn2, err := tp.Listen(":44444-44448") + if err != nil { + t.Errorf("Did not expect an error, got %s", err) + } + expectedPort(t, "44445", lsn2) + + lsn, err := tp.Listen(":0") + if err != nil { + t.Errorf("Did not expect an error, got %s", err) + } + + lsn.Close() + lsn1.Close() + lsn2.Close() +} + +func TestGRPCTransportCommunication(t *testing.T) { + tr := NewTransport() + + l, err := tr.Listen(":0") + if err != nil { + t.Errorf("Unexpected listen err: %v", err) + } + defer l.Close() + + fn := func(sock transport.Socket) { + defer sock.Close() + + for { + var m transport.Message + if err := sock.Recv(&m); err != nil { + return + } + + if err := sock.Send(&m); err != nil { + return + } + } + } + + done := make(chan bool) + + go func() { + if err := l.Accept(fn); err != nil { + select { + case <-done: + default: + t.Errorf("Unexpected accept err: %v", err) + } + } + }() + + c, err := tr.Dial(l.Addr()) + if err != nil { + t.Errorf("Unexpected dial err: %v", err) + } + defer c.Close() + + m := transport.Message{ + Header: map[string]string{ + "X-Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + if err := c.Send(&m); err != nil { + t.Errorf("Unexpected send err: %v", err) + } + + var rm transport.Message + + if err := c.Recv(&rm); err != nil { + t.Errorf("Unexpected recv err: %v", err) + } + + if string(rm.Body) != string(m.Body) { + t.Errorf("Expected %v, got %v", m.Body, rm.Body) + } + + close(done) +} diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..4240552 --- /dev/null +++ b/handler.go @@ -0,0 +1,43 @@ +package grpc + +import ( + "runtime/debug" + + "github.com/micro/go-micro/v3/errors" + "github.com/micro/go-micro/v3/logger" + "github.com/micro/go-micro/v3/network/transport" + pb "github.com/micro/go-micro/v3/network/transport/grpc/proto" + "google.golang.org/grpc/peer" +) + +// microTransport satisfies the pb.TransportServer inteface +type microTransport struct { + addr string + fn func(transport.Socket) +} + +func (m *microTransport) Stream(ts pb.Transport_StreamServer) (err error) { + + sock := &grpcTransportSocket{ + stream: ts, + local: m.addr, + } + + p, ok := peer.FromContext(ts.Context()) + if ok { + sock.remote = p.Addr.String() + } + + defer func() { + if r := recover(); r != nil { + logger.Error(r, string(debug.Stack())) + sock.Close() + err = errors.InternalServerError("go.micro.transport", "panic recovered: %v", r) + } + }() + + // execute socket func + m.fn(sock) + + return err +} diff --git a/proto/transport.pb.go b/proto/transport.pb.go new file mode 100644 index 0000000..88abb01 --- /dev/null +++ b/proto/transport.pb.go @@ -0,0 +1,211 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: transport/grpc/proto/transport.proto + +package go_micro_transport_grpc + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Message struct { + Header map[string]string `protobuf:"bytes,1,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_651718cd7c7ae974, []int{0} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetHeader() map[string]string { + if m != nil { + return m.Header + } + return nil +} + +func (m *Message) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +func init() { + proto.RegisterType((*Message)(nil), "go.micro.transport.grpc.Message") + proto.RegisterMapType((map[string]string)(nil), "go.micro.transport.grpc.Message.HeaderEntry") +} + +func init() { + proto.RegisterFile("transport/grpc/proto/transport.proto", fileDescriptor_651718cd7c7ae974) +} + +var fileDescriptor_651718cd7c7ae974 = []byte{ + // 209 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x29, 0x29, 0x4a, 0xcc, + 0x2b, 0x2e, 0xc8, 0x2f, 0x2a, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x28, 0xca, 0x2f, 0xc9, + 0xd7, 0x87, 0x0b, 0xea, 0x81, 0xf9, 0x42, 0xe2, 0xe9, 0xf9, 0x7a, 0xb9, 0x99, 0xc9, 0x45, 0xf9, + 0x7a, 0x08, 0x19, 0x90, 0x72, 0xa5, 0x79, 0x8c, 0x5c, 0xec, 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, + 0xa9, 0x42, 0x2e, 0x5c, 0x6c, 0x19, 0xa9, 0x89, 0x29, 0xa9, 0x45, 0x12, 0x8c, 0x0a, 0xcc, 0x1a, + 0xdc, 0x46, 0x3a, 0x7a, 0x38, 0x74, 0xe9, 0x41, 0x75, 0xe8, 0x79, 0x80, 0x95, 0xbb, 0xe6, 0x95, + 0x14, 0x55, 0x06, 0x41, 0xf5, 0x0a, 0x09, 0x71, 0xb1, 0x24, 0xe5, 0xa7, 0x54, 0x4a, 0x30, 0x29, + 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x52, 0x96, 0x5c, 0xdc, 0x48, 0x4a, 0x85, 0x04, 0xb8, 0x98, + 0xb3, 0x53, 0x2b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0x40, 0x4c, 0x21, 0x11, 0x2e, 0xd6, + 0xb2, 0xc4, 0x9c, 0xd2, 0x54, 0xb0, 0x2e, 0xce, 0x20, 0x08, 0xc7, 0x8a, 0xc9, 0x82, 0xd1, 0x28, + 0x9e, 0x8b, 0x33, 0x04, 0x66, 0xb9, 0x50, 0x10, 0x17, 0x5b, 0x70, 0x49, 0x51, 0x6a, 0x62, 0xae, + 0x90, 0x02, 0x21, 0xb7, 0x49, 0x11, 0x54, 0xa1, 0xc4, 0xa0, 0xc1, 0x68, 0xc0, 0x98, 0xc4, 0x06, + 0x0e, 0x21, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd4, 0xd0, 0x4b, 0x4b, 0x49, 0x01, 0x00, + 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// TransportClient is the client API for Transport service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type TransportClient interface { + Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) +} + +type transportClient struct { + cc *grpc.ClientConn +} + +func NewTransportClient(cc *grpc.ClientConn) TransportClient { + return &transportClient{cc} +} + +func (c *transportClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) { + stream, err := c.cc.NewStream(ctx, &_Transport_serviceDesc.Streams[0], "/go.micro.transport.grpc.Transport/Stream", opts...) + if err != nil { + return nil, err + } + x := &transportStreamClient{stream} + return x, nil +} + +type Transport_StreamClient interface { + Send(*Message) error + Recv() (*Message, error) + grpc.ClientStream +} + +type transportStreamClient struct { + grpc.ClientStream +} + +func (x *transportStreamClient) Send(m *Message) error { + return x.ClientStream.SendMsg(m) +} + +func (x *transportStreamClient) Recv() (*Message, error) { + m := new(Message) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// TransportServer is the server API for Transport service. +type TransportServer interface { + Stream(Transport_StreamServer) error +} + +// UnimplementedTransportServer can be embedded to have forward compatible implementations. +type UnimplementedTransportServer struct { +} + +func (*UnimplementedTransportServer) Stream(srv Transport_StreamServer) error { + return status.Errorf(codes.Unimplemented, "method Stream not implemented") +} + +func RegisterTransportServer(s *grpc.Server, srv TransportServer) { + s.RegisterService(&_Transport_serviceDesc, srv) +} + +func _Transport_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TransportServer).Stream(&transportStreamServer{stream}) +} + +type Transport_StreamServer interface { + Send(*Message) error + Recv() (*Message, error) + grpc.ServerStream +} + +type transportStreamServer struct { + grpc.ServerStream +} + +func (x *transportStreamServer) Send(m *Message) error { + return x.ServerStream.SendMsg(m) +} + +func (x *transportStreamServer) Recv() (*Message, error) { + m := new(Message) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _Transport_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.transport.grpc.Transport", + HandlerType: (*TransportServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Stream", + Handler: _Transport_Stream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "transport/grpc/proto/transport.proto", +} diff --git a/proto/transport.pb.micro.go b/proto/transport.pb.micro.go new file mode 100644 index 0000000..c5a1863 --- /dev/null +++ b/proto/transport.pb.micro.go @@ -0,0 +1,175 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: transport/grpc/proto/transport.proto + +package go_micro_transport_grpc + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + api "github.com/micro/go-micro/v3/api" + client "github.com/micro/go-micro/v3/client" + server "github.com/micro/go-micro/v3/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ api.Endpoint +var _ context.Context +var _ client.Option +var _ server.Option + +// Api Endpoints for Transport service + +func NewTransportEndpoints() []*api.Endpoint { + return []*api.Endpoint{} +} + +// Client API for Transport service + +type TransportService interface { + Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) +} + +type transportService struct { + c client.Client + name string +} + +func NewTransportService(name string, c client.Client) TransportService { + return &transportService{ + c: c, + name: name, + } +} + +func (c *transportService) Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) { + req := c.c.NewRequest(c.name, "Transport.Stream", &Message{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + return &transportServiceStream{stream}, nil +} + +type Transport_StreamService interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Message) error + Recv() (*Message, error) +} + +type transportServiceStream struct { + stream client.Stream +} + +func (x *transportServiceStream) Close() error { + return x.stream.Close() +} + +func (x *transportServiceStream) Context() context.Context { + return x.stream.Context() +} + +func (x *transportServiceStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *transportServiceStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *transportServiceStream) Send(m *Message) error { + return x.stream.Send(m) +} + +func (x *transportServiceStream) Recv() (*Message, error) { + m := new(Message) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Transport service + +type TransportHandler interface { + Stream(context.Context, Transport_StreamStream) error +} + +func RegisterTransportHandler(s server.Server, hdlr TransportHandler, opts ...server.HandlerOption) error { + type transport interface { + Stream(ctx context.Context, stream server.Stream) error + } + type Transport struct { + transport + } + h := &transportHandler{hdlr} + return s.Handle(s.NewHandler(&Transport{h}, opts...)) +} + +type transportHandler struct { + TransportHandler +} + +func (h *transportHandler) Stream(ctx context.Context, stream server.Stream) error { + return h.TransportHandler.Stream(ctx, &transportStreamStream{stream}) +} + +type Transport_StreamStream interface { + Context() context.Context + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Message) error + Recv() (*Message, error) +} + +type transportStreamStream struct { + stream server.Stream +} + +func (x *transportStreamStream) Close() error { + return x.stream.Close() +} + +func (x *transportStreamStream) Context() context.Context { + return x.stream.Context() +} + +func (x *transportStreamStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *transportStreamStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *transportStreamStream) Send(m *Message) error { + return x.stream.Send(m) +} + +func (x *transportStreamStream) Recv() (*Message, error) { + m := new(Message) + if err := x.stream.Recv(m); err != nil { + return nil, err + } + return m, nil +} diff --git a/proto/transport.proto b/proto/transport.proto new file mode 100644 index 0000000..1ec353a --- /dev/null +++ b/proto/transport.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package go.micro.transport.grpc; + +service Transport { + rpc Stream(stream Message) returns (stream Message) {} +} + +message Message { + map header = 1; + bytes body = 2; +} diff --git a/socket.go b/socket.go new file mode 100644 index 0000000..c8d180e --- /dev/null +++ b/socket.go @@ -0,0 +1,97 @@ +package grpc + +import ( + "github.com/micro/go-micro/v3/network/transport" + pb "github.com/micro/go-micro/v3/network/transport/grpc/proto" + "google.golang.org/grpc" +) + +type grpcTransportClient struct { + conn *grpc.ClientConn + stream pb.Transport_StreamClient + + local string + remote string +} + +type grpcTransportSocket struct { + stream pb.Transport_StreamServer + local string + remote string +} + +func (g *grpcTransportClient) Local() string { + return g.local +} + +func (g *grpcTransportClient) Remote() string { + return g.remote +} + +func (g *grpcTransportClient) Recv(m *transport.Message) error { + if m == nil { + return nil + } + + msg, err := g.stream.Recv() + if err != nil { + return err + } + + m.Header = msg.Header + m.Body = msg.Body + return nil +} + +func (g *grpcTransportClient) Send(m *transport.Message) error { + if m == nil { + return nil + } + + return g.stream.Send(&pb.Message{ + Header: m.Header, + Body: m.Body, + }) +} + +func (g *grpcTransportClient) Close() error { + return g.conn.Close() +} + +func (g *grpcTransportSocket) Local() string { + return g.local +} + +func (g *grpcTransportSocket) Remote() string { + return g.remote +} + +func (g *grpcTransportSocket) Recv(m *transport.Message) error { + if m == nil { + return nil + } + + msg, err := g.stream.Recv() + if err != nil { + return err + } + + m.Header = msg.Header + m.Body = msg.Body + return nil +} + +func (g *grpcTransportSocket) Send(m *transport.Message) error { + if m == nil { + return nil + } + + return g.stream.Send(&pb.Message{ + Header: m.Header, + Body: m.Body, + }) +} + +func (g *grpcTransportSocket) Close() error { + return nil +}