From 2bb4e4d6a84f7acc9ee7428de5169148b73c466a Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 9 Jul 2019 18:41:26 +0100 Subject: [PATCH] move transport back --- grpc.go | 176 ---------------------------------- grpc_test.go | 109 --------------------- handler.go | 39 -------- proto/transport.micro.go | 163 ------------------------------- proto/transport.pb.go | 201 --------------------------------------- proto/transport.proto | 12 --- socket.go | 97 ------------------- 7 files changed, 797 deletions(-) delete mode 100644 grpc.go delete mode 100644 grpc_test.go delete mode 100644 handler.go delete mode 100644 proto/transport.micro.go delete mode 100644 proto/transport.pb.go delete mode 100644 proto/transport.proto delete mode 100644 socket.go diff --git a/grpc.go b/grpc.go deleted file mode 100644 index 6b8dc8c..0000000 --- a/grpc.go +++ /dev/null @@ -1,176 +0,0 @@ -// Package grpc provides a grpc transport -package grpc - -import ( - "context" - "crypto/tls" - "net" - - "github.com/micro/go-micro/network/transport" - maddr "github.com/micro/go-micro/util/addr" - mnet "github.com/micro/go-micro/util/net" - mls "github.com/micro/go-micro/util/tls" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - - pb "github.com/micro/go-micro/network/transport/grpc/proto" -) - -type grpcTransport struct { - opts transport.Options -} - -type grpcTransportListener struct { - listener net.Listener - secure bool - tls *tls.Config -} - -func getTLSConfig(addr string) (*tls.Config, error) { - hosts := []string{addr} - - // check if its a valid host:port - if host, _, err := net.SplitHostPort(addr); err == nil { - if len(host) == 0 { - hosts = maddr.IPs() - } else { - hosts = []string{host} - } - } - - // generate a certificate - cert, err := mls.Certificate(hosts...) - if err != nil { - return nil, err - } - - return &tls.Config{Certificates: []tls.Certificate{cert}}, nil -} - -func (t *grpcTransportListener) Addr() string { - return t.listener.Addr().String() -} - -func (t *grpcTransportListener) Close() error { - return t.listener.Close() -} - -func (t *grpcTransportListener) Accept(fn func(transport.Socket)) error { - var opts []grpc.ServerOption - - // setup tls if specified - if t.secure || t.tls != nil { - config := t.tls - if config == nil { - var err error - addr := t.listener.Addr().String() - config, err = getTLSConfig(addr) - if err != nil { - return err - } - } - - creds := credentials.NewTLS(config) - opts = append(opts, grpc.Creds(creds)) - } - - // new service - srv := grpc.NewServer(opts...) - - // register service - pb.RegisterTransportServer(srv, µTransport{addr: t.listener.Addr().String(), fn: fn}) - - // start serving - return srv.Serve(t.listener) -} - -func (t *grpcTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { - dopts := transport.DialOptions{ - Timeout: transport.DefaultDialTimeout, - } - - for _, opt := range opts { - opt(&dopts) - } - - options := []grpc.DialOption{ - grpc.WithTimeout(dopts.Timeout), - } - - if t.opts.Secure || t.opts.TLSConfig != nil { - config := t.opts.TLSConfig - if config == nil { - config = &tls.Config{ - InsecureSkipVerify: true, - } - } - creds := credentials.NewTLS(config) - options = append(options, grpc.WithTransportCredentials(creds)) - } else { - options = append(options, grpc.WithInsecure()) - } - - // dial the server - conn, err := grpc.Dial(addr, options...) - if err != nil { - return nil, err - } - - // create stream - stream, err := pb.NewTransportClient(conn).Stream(context.Background()) - if err != nil { - return nil, err - } - - // return a client - return &grpcTransportClient{ - conn: conn, - stream: stream, - local: "localhost", - remote: addr, - }, nil -} - -func (t *grpcTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) { - var options transport.ListenOptions - for _, o := range opts { - o(&options) - } - - ln, err := mnet.Listen(addr, func(addr string) (net.Listener, error) { - return net.Listen("tcp", addr) - }) - if err != nil { - return nil, err - } - - return &grpcTransportListener{ - listener: ln, - tls: t.opts.TLSConfig, - secure: t.opts.Secure, - }, nil -} - -func (t *grpcTransport) Init(opts ...transport.Option) error { - for _, o := range opts { - o(&t.opts) - } - return nil -} - -func (t *grpcTransport) Options() transport.Options { - return t.opts -} - -func (t *grpcTransport) String() string { - return "grpc" -} - -func NewTransport(opts ...transport.Option) transport.Transport { - var options transport.Options - for _, o := range opts { - o(&options) - } - return &grpcTransport{opts: options} -} diff --git a/grpc_test.go b/grpc_test.go deleted file mode 100644 index 85abf12..0000000 --- a/grpc_test.go +++ /dev/null @@ -1,109 +0,0 @@ -package grpc - -import ( - "strings" - "testing" - - "github.com/micro/go-micro/network/transport" -) - -func expectedPort(t *testing.T, expected string, lsn transport.Listener) { - parts := strings.Split(lsn.Addr(), ":") - port := parts[len(parts)-1] - - if port != expected { - lsn.Close() - t.Errorf("Expected address to be `%s`, got `%s`", expected, port) - } -} - -func TestGRPCTransportPortRange(t *testing.T) { - tp := NewTransport() - - lsn1, err := tp.Listen(":44444-44448") - if err != nil { - t.Errorf("Did not expect an error, got %s", err) - } - expectedPort(t, "44444", lsn1) - - lsn2, err := tp.Listen(":44444-44448") - if err != nil { - t.Errorf("Did not expect an error, got %s", err) - } - expectedPort(t, "44445", lsn2) - - lsn, err := tp.Listen(":0") - if err != nil { - t.Errorf("Did not expect an error, got %s", err) - } - - lsn.Close() - lsn1.Close() - lsn2.Close() -} - -func TestGRPCTransportCommunication(t *testing.T) { - tr := NewTransport() - - l, err := tr.Listen(":0") - if err != nil { - t.Errorf("Unexpected listen err: %v", err) - } - defer l.Close() - - fn := func(sock transport.Socket) { - defer sock.Close() - - for { - var m transport.Message - if err := sock.Recv(&m); err != nil { - return - } - - if err := sock.Send(&m); err != nil { - return - } - } - } - - done := make(chan bool) - - go func() { - if err := l.Accept(fn); err != nil { - select { - case <-done: - default: - t.Errorf("Unexpected accept err: %v", err) - } - } - }() - - c, err := tr.Dial(l.Addr()) - if err != nil { - t.Errorf("Unexpected dial err: %v", err) - } - defer c.Close() - - m := transport.Message{ - Header: map[string]string{ - "X-Content-Type": "application/json", - }, - Body: []byte(`{"message": "Hello World"}`), - } - - if err := c.Send(&m); err != nil { - t.Errorf("Unexpected send err: %v", err) - } - - var rm transport.Message - - if err := c.Recv(&rm); err != nil { - t.Errorf("Unexpected recv err: %v", err) - } - - if string(rm.Body) != string(m.Body) { - t.Errorf("Expected %v, got %v", m.Body, rm.Body) - } - - close(done) -} diff --git a/handler.go b/handler.go deleted file mode 100644 index 6dc1b51..0000000 --- a/handler.go +++ /dev/null @@ -1,39 +0,0 @@ -package grpc - -import ( - "runtime/debug" - - "github.com/micro/go-micro/network/transport" - pb "github.com/micro/go-micro/network/transport/grpc/proto" - "github.com/micro/go-micro/util/log" - "google.golang.org/grpc/peer" -) - -// microTransport satisfies the pb.TransportServer inteface -type microTransport struct { - addr string - fn func(transport.Socket) -} - -func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { - sock := &grpcTransportSocket{ - stream: ts, - local: m.addr, - } - - p, ok := peer.FromContext(ts.Context()) - if ok { - sock.remote = p.Addr.String() - } - - defer func() { - if r := recover(); r != nil { - log.Log(r, string(debug.Stack())) - sock.Close() - } - }() - - // execute socket func - m.fn(sock) - return nil -} diff --git a/proto/transport.micro.go b/proto/transport.micro.go deleted file mode 100644 index 1b8d840..0000000 --- a/proto/transport.micro.go +++ /dev/null @@ -1,163 +0,0 @@ -// Code generated by protoc-gen-micro. DO NOT EDIT. -// source: go-micro/network/transport/grpc/proto/transport.proto - -package go_micro_grpc_transport - -import ( - fmt "fmt" - proto "github.com/golang/protobuf/proto" - math "math" -) - -import ( - context "context" - client "github.com/micro/go-micro/client" - server "github.com/micro/go-micro/server" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ client.Option -var _ server.Option - -// Client API for Transport service - -type TransportService interface { - Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) -} - -type transportService struct { - c client.Client - name string -} - -func NewTransportService(name string, c client.Client) TransportService { - if c == nil { - c = client.NewClient() - } - if len(name) == 0 { - name = "go.micro.grpc.transport" - } - return &transportService{ - c: c, - name: name, - } -} - -func (c *transportService) Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) { - req := c.c.NewRequest(c.name, "Transport.Stream", &Message{}) - stream, err := c.c.Stream(ctx, req, opts...) - if err != nil { - return nil, err - } - return &transportServiceStream{stream}, nil -} - -type Transport_StreamService interface { - SendMsg(interface{}) error - RecvMsg(interface{}) error - Close() error - Send(*Message) error - Recv() (*Message, error) -} - -type transportServiceStream struct { - stream client.Stream -} - -func (x *transportServiceStream) Close() error { - return x.stream.Close() -} - -func (x *transportServiceStream) SendMsg(m interface{}) error { - return x.stream.Send(m) -} - -func (x *transportServiceStream) RecvMsg(m interface{}) error { - return x.stream.Recv(m) -} - -func (x *transportServiceStream) Send(m *Message) error { - return x.stream.Send(m) -} - -func (x *transportServiceStream) Recv() (*Message, error) { - m := new(Message) - err := x.stream.Recv(m) - if err != nil { - return nil, err - } - return m, nil -} - -// Server API for Transport service - -type TransportHandler interface { - Stream(context.Context, Transport_StreamStream) error -} - -func RegisterTransportHandler(s server.Server, hdlr TransportHandler, opts ...server.HandlerOption) error { - type transport interface { - Stream(ctx context.Context, stream server.Stream) error - } - type Transport struct { - transport - } - h := &transportHandler{hdlr} - return s.Handle(s.NewHandler(&Transport{h}, opts...)) -} - -type transportHandler struct { - TransportHandler -} - -func (h *transportHandler) Stream(ctx context.Context, stream server.Stream) error { - return h.TransportHandler.Stream(ctx, &transportStreamStream{stream}) -} - -type Transport_StreamStream interface { - SendMsg(interface{}) error - RecvMsg(interface{}) error - Close() error - Send(*Message) error - Recv() (*Message, error) -} - -type transportStreamStream struct { - stream server.Stream -} - -func (x *transportStreamStream) Close() error { - return x.stream.Close() -} - -func (x *transportStreamStream) SendMsg(m interface{}) error { - return x.stream.Send(m) -} - -func (x *transportStreamStream) RecvMsg(m interface{}) error { - return x.stream.Recv(m) -} - -func (x *transportStreamStream) Send(m *Message) error { - return x.stream.Send(m) -} - -func (x *transportStreamStream) Recv() (*Message, error) { - m := new(Message) - if err := x.stream.Recv(m); err != nil { - return nil, err - } - return m, nil -} diff --git a/proto/transport.pb.go b/proto/transport.pb.go deleted file mode 100644 index c8fb893..0000000 --- a/proto/transport.pb.go +++ /dev/null @@ -1,201 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: go-micro/network/transport/grpc/proto/transport.proto - -package go_micro_grpc_transport - -import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type Message struct { - Header map[string]string `protobuf:"bytes,1,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Message) Reset() { *m = Message{} } -func (m *Message) String() string { return proto.CompactTextString(m) } -func (*Message) ProtoMessage() {} -func (*Message) Descriptor() ([]byte, []int) { - return fileDescriptor_29b90b9ccd5e0da5, []int{0} -} - -func (m *Message) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Message.Unmarshal(m, b) -} -func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Message.Marshal(b, m, deterministic) -} -func (m *Message) XXX_Merge(src proto.Message) { - xxx_messageInfo_Message.Merge(m, src) -} -func (m *Message) XXX_Size() int { - return xxx_messageInfo_Message.Size(m) -} -func (m *Message) XXX_DiscardUnknown() { - xxx_messageInfo_Message.DiscardUnknown(m) -} - -var xxx_messageInfo_Message proto.InternalMessageInfo - -func (m *Message) GetHeader() map[string]string { - if m != nil { - return m.Header - } - return nil -} - -func (m *Message) GetBody() []byte { - if m != nil { - return m.Body - } - return nil -} - -func init() { - proto.RegisterType((*Message)(nil), "go.micro.grpc.transport.Message") - proto.RegisterMapType((map[string]string)(nil), "go.micro.grpc.transport.Message.HeaderEntry") -} - -func init() { - proto.RegisterFile("go-micro/network/transport/grpc/proto/transport.proto", fileDescriptor_29b90b9ccd5e0da5) -} - -var fileDescriptor_29b90b9ccd5e0da5 = []byte{ - // 214 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x4d, 0xcf, 0xd7, 0xcd, - 0xcd, 0x4c, 0x2e, 0xca, 0xd7, 0x2f, 0x29, 0x4a, 0xcc, 0x2b, 0x2e, 0xc8, 0x2f, 0x2a, 0xd1, 0x4f, - 0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x28, 0xca, 0x2f, 0x41, 0x12, 0xd4, 0x03, 0xf3, 0x85, 0xc4, 0xd3, - 0xf3, 0xf5, 0xc0, 0xca, 0xf5, 0x40, 0x8a, 0xf4, 0xe0, 0xd2, 0x4a, 0xf3, 0x18, 0xb9, 0xd8, 0x7d, - 0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x53, 0x85, 0x5c, 0xb8, 0xd8, 0x32, 0x52, 0x13, 0x53, 0x52, 0x8b, - 0x24, 0x18, 0x15, 0x98, 0x35, 0xb8, 0x8d, 0x74, 0xf4, 0x70, 0xe8, 0xd2, 0x83, 0xea, 0xd0, 0xf3, - 0x00, 0x2b, 0x77, 0xcd, 0x2b, 0x29, 0xaa, 0x0c, 0x82, 0xea, 0x15, 0x12, 0xe2, 0x62, 0x49, 0xca, - 0x4f, 0xa9, 0x94, 0x60, 0x52, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0xb3, 0xa5, 0x2c, 0xb9, 0xb8, 0x91, - 0x94, 0x0a, 0x09, 0x70, 0x31, 0x67, 0xa7, 0x56, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x81, - 0x98, 0x42, 0x22, 0x5c, 0xac, 0x65, 0x89, 0x39, 0xa5, 0xa9, 0x60, 0x5d, 0x9c, 0x41, 0x10, 0x8e, - 0x15, 0x93, 0x05, 0xa3, 0x51, 0x3c, 0x17, 0x67, 0x08, 0xcc, 0x5e, 0xa1, 0x20, 0x2e, 0xb6, 0xe0, - 0x92, 0xa2, 0xd4, 0xc4, 0x5c, 0x21, 0x05, 0x42, 0x6e, 0x93, 0x22, 0xa8, 0x42, 0x89, 0x41, 0x83, - 0xd1, 0x80, 0x31, 0x89, 0x0d, 0x1c, 0x42, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x77, 0xa1, - 0xa4, 0xcb, 0x52, 0x01, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// TransportClient is the client API for Transport service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type TransportClient interface { - Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) -} - -type transportClient struct { - cc *grpc.ClientConn -} - -func NewTransportClient(cc *grpc.ClientConn) TransportClient { - return &transportClient{cc} -} - -func (c *transportClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) { - stream, err := c.cc.NewStream(ctx, &_Transport_serviceDesc.Streams[0], "/go.micro.grpc.transport.Transport/Stream", opts...) - if err != nil { - return nil, err - } - x := &transportStreamClient{stream} - return x, nil -} - -type Transport_StreamClient interface { - Send(*Message) error - Recv() (*Message, error) - grpc.ClientStream -} - -type transportStreamClient struct { - grpc.ClientStream -} - -func (x *transportStreamClient) Send(m *Message) error { - return x.ClientStream.SendMsg(m) -} - -func (x *transportStreamClient) Recv() (*Message, error) { - m := new(Message) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// TransportServer is the server API for Transport service. -type TransportServer interface { - Stream(Transport_StreamServer) error -} - -func RegisterTransportServer(s *grpc.Server, srv TransportServer) { - s.RegisterService(&_Transport_serviceDesc, srv) -} - -func _Transport_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(TransportServer).Stream(&transportStreamServer{stream}) -} - -type Transport_StreamServer interface { - Send(*Message) error - Recv() (*Message, error) - grpc.ServerStream -} - -type transportStreamServer struct { - grpc.ServerStream -} - -func (x *transportStreamServer) Send(m *Message) error { - return x.ServerStream.SendMsg(m) -} - -func (x *transportStreamServer) Recv() (*Message, error) { - m := new(Message) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -var _Transport_serviceDesc = grpc.ServiceDesc{ - ServiceName: "go.micro.grpc.transport.Transport", - HandlerType: (*TransportServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "Stream", - Handler: _Transport_Stream_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "go-micro/network/transport/grpc/proto/transport.proto", -} diff --git a/proto/transport.proto b/proto/transport.proto deleted file mode 100644 index 3ceb762..0000000 --- a/proto/transport.proto +++ /dev/null @@ -1,12 +0,0 @@ -syntax = "proto3"; - -package go.micro.grpc.transport; - -service Transport { - rpc Stream(stream Message) returns (stream Message) {} -} - -message Message { - map header = 1; - bytes body = 2; -} diff --git a/socket.go b/socket.go deleted file mode 100644 index 1d072cb..0000000 --- a/socket.go +++ /dev/null @@ -1,97 +0,0 @@ -package grpc - -import ( - "github.com/micro/go-micro/network/transport" - pb "github.com/micro/go-micro/network/transport/grpc/proto" - "google.golang.org/grpc" -) - -type grpcTransportClient struct { - conn *grpc.ClientConn - stream pb.Transport_StreamClient - - local string - remote string -} - -type grpcTransportSocket struct { - stream pb.Transport_StreamServer - local string - remote string -} - -func (g *grpcTransportClient) Local() string { - return g.local -} - -func (g *grpcTransportClient) Remote() string { - return g.remote -} - -func (g *grpcTransportClient) Recv(m *transport.Message) error { - if m == nil { - return nil - } - - msg, err := g.stream.Recv() - if err != nil { - return err - } - - m.Header = msg.Header - m.Body = msg.Body - return nil -} - -func (g *grpcTransportClient) Send(m *transport.Message) error { - if m == nil { - return nil - } - - return g.stream.Send(&pb.Message{ - Header: m.Header, - Body: m.Body, - }) -} - -func (g *grpcTransportClient) Close() error { - return g.conn.Close() -} - -func (g *grpcTransportSocket) Local() string { - return g.local -} - -func (g *grpcTransportSocket) Remote() string { - return g.remote -} - -func (g *grpcTransportSocket) Recv(m *transport.Message) error { - if m == nil { - return nil - } - - msg, err := g.stream.Recv() - if err != nil { - return err - } - - m.Header = msg.Header - m.Body = msg.Body - return nil -} - -func (g *grpcTransportSocket) Send(m *transport.Message) error { - if m == nil { - return nil - } - - return g.stream.Send(&pb.Message{ - Header: m.Header, - Body: m.Body, - }) -} - -func (g *grpcTransportSocket) Close() error { - return nil -}