Move transport to network/transport

This commit is contained in:
Asim Aslam 2019-07-07 10:37:34 +01:00 committed by Vasiliy Tolstov
commit 3262e34da9
7 changed files with 797 additions and 0 deletions

176
grpc.go Normal file
View File

@ -0,0 +1,176 @@
// Package grpc provides a grpc transport
package grpc
import (
"context"
"crypto/tls"
"net"
"github.com/micro/go-micro/network/transport"
maddr "github.com/micro/go-micro/util/addr"
mnet "github.com/micro/go-micro/util/net"
mls "github.com/micro/go-micro/util/tls"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pb "github.com/micro/go-micro/network/transport/grpc/proto"
)
type grpcTransport struct {
opts transport.Options
}
type grpcTransportListener struct {
listener net.Listener
secure bool
tls *tls.Config
}
func getTLSConfig(addr string) (*tls.Config, error) {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
return &tls.Config{Certificates: []tls.Certificate{cert}}, nil
}
func (t *grpcTransportListener) Addr() string {
return t.listener.Addr().String()
}
func (t *grpcTransportListener) Close() error {
return t.listener.Close()
}
func (t *grpcTransportListener) Accept(fn func(transport.Socket)) error {
var opts []grpc.ServerOption
// setup tls if specified
if t.secure || t.tls != nil {
config := t.tls
if config == nil {
var err error
addr := t.listener.Addr().String()
config, err = getTLSConfig(addr)
if err != nil {
return err
}
}
creds := credentials.NewTLS(config)
opts = append(opts, grpc.Creds(creds))
}
// new service
srv := grpc.NewServer(opts...)
// register service
pb.RegisterTransportServer(srv, &microTransport{addr: t.listener.Addr().String(), fn: fn})
// start serving
return srv.Serve(t.listener)
}
func (t *grpcTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
dopts := transport.DialOptions{
Timeout: transport.DefaultDialTimeout,
}
for _, opt := range opts {
opt(&dopts)
}
options := []grpc.DialOption{
grpc.WithTimeout(dopts.Timeout),
}
if t.opts.Secure || t.opts.TLSConfig != nil {
config := t.opts.TLSConfig
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
creds := credentials.NewTLS(config)
options = append(options, grpc.WithTransportCredentials(creds))
} else {
options = append(options, grpc.WithInsecure())
}
// dial the server
conn, err := grpc.Dial(addr, options...)
if err != nil {
return nil, err
}
// create stream
stream, err := pb.NewTransportClient(conn).Stream(context.Background())
if err != nil {
return nil, err
}
// return a client
return &grpcTransportClient{
conn: conn,
stream: stream,
local: "localhost",
remote: addr,
}, nil
}
func (t *grpcTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) {
var options transport.ListenOptions
for _, o := range opts {
o(&options)
}
ln, err := mnet.Listen(addr, func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
})
if err != nil {
return nil, err
}
return &grpcTransportListener{
listener: ln,
tls: t.opts.TLSConfig,
secure: t.opts.Secure,
}, nil
}
func (t *grpcTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *grpcTransport) Options() transport.Options {
return t.opts
}
func (t *grpcTransport) String() string {
return "grpc"
}
func NewTransport(opts ...transport.Option) transport.Transport {
var options transport.Options
for _, o := range opts {
o(&options)
}
return &grpcTransport{opts: options}
}

109
grpc_test.go Normal file
View File

@ -0,0 +1,109 @@
package grpc
import (
"strings"
"testing"
"github.com/micro/go-micro/network/transport"
)
func expectedPort(t *testing.T, expected string, lsn transport.Listener) {
parts := strings.Split(lsn.Addr(), ":")
port := parts[len(parts)-1]
if port != expected {
lsn.Close()
t.Errorf("Expected address to be `%s`, got `%s`", expected, port)
}
}
func TestGRPCTransportPortRange(t *testing.T) {
tp := NewTransport()
lsn1, err := tp.Listen(":44444-44448")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
expectedPort(t, "44444", lsn1)
lsn2, err := tp.Listen(":44444-44448")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
expectedPort(t, "44445", lsn2)
lsn, err := tp.Listen(":0")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
lsn.Close()
lsn1.Close()
lsn2.Close()
}
func TestGRPCTransportCommunication(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen(":0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
defer l.Close()
fn := func(sock transport.Socket) {
defer sock.Close()
for {
var m transport.Message
if err := sock.Recv(&m); err != nil {
return
}
if err := sock.Send(&m); err != nil {
return
}
}
}
done := make(chan bool)
go func() {
if err := l.Accept(fn); err != nil {
select {
case <-done:
default:
t.Errorf("Unexpected accept err: %v", err)
}
}
}()
c, err := tr.Dial(l.Addr())
if err != nil {
t.Errorf("Unexpected dial err: %v", err)
}
defer c.Close()
m := transport.Message{
Header: map[string]string{
"X-Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
if err := c.Send(&m); err != nil {
t.Errorf("Unexpected send err: %v", err)
}
var rm transport.Message
if err := c.Recv(&rm); err != nil {
t.Errorf("Unexpected recv err: %v", err)
}
if string(rm.Body) != string(m.Body) {
t.Errorf("Expected %v, got %v", m.Body, rm.Body)
}
close(done)
}

39
handler.go Normal file
View File

@ -0,0 +1,39 @@
package grpc
import (
"runtime/debug"
"github.com/micro/go-micro/network/transport"
pb "github.com/micro/go-micro/network/transport/grpc/proto"
"github.com/micro/go-micro/util/log"
"google.golang.org/grpc/peer"
)
// microTransport satisfies the pb.TransportServer inteface
type microTransport struct {
addr string
fn func(transport.Socket)
}
func (m *microTransport) Stream(ts pb.Transport_StreamServer) error {
sock := &grpcTransportSocket{
stream: ts,
local: m.addr,
}
p, ok := peer.FromContext(ts.Context())
if ok {
sock.remote = p.Addr.String()
}
defer func() {
if r := recover(); r != nil {
log.Log(r, string(debug.Stack()))
sock.Close()
}
}()
// execute socket func
m.fn(sock)
return nil
}

163
proto/transport.micro.go Normal file
View File

@ -0,0 +1,163 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: go-micro/network/transport/grpc/proto/transport.proto
package go_micro_grpc_transport
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
import (
context "context"
client "github.com/micro/go-micro/client"
server "github.com/micro/go-micro/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ client.Option
var _ server.Option
// Client API for Transport service
type TransportService interface {
Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error)
}
type transportService struct {
c client.Client
name string
}
func NewTransportService(name string, c client.Client) TransportService {
if c == nil {
c = client.NewClient()
}
if len(name) == 0 {
name = "go.micro.grpc.transport"
}
return &transportService{
c: c,
name: name,
}
}
func (c *transportService) Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) {
req := c.c.NewRequest(c.name, "Transport.Stream", &Message{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
return &transportServiceStream{stream}, nil
}
type Transport_StreamService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*Message) error
Recv() (*Message, error)
}
type transportServiceStream struct {
stream client.Stream
}
func (x *transportServiceStream) Close() error {
return x.stream.Close()
}
func (x *transportServiceStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *transportServiceStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *transportServiceStream) Send(m *Message) error {
return x.stream.Send(m)
}
func (x *transportServiceStream) Recv() (*Message, error) {
m := new(Message)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
// Server API for Transport service
type TransportHandler interface {
Stream(context.Context, Transport_StreamStream) error
}
func RegisterTransportHandler(s server.Server, hdlr TransportHandler, opts ...server.HandlerOption) error {
type transport interface {
Stream(ctx context.Context, stream server.Stream) error
}
type Transport struct {
transport
}
h := &transportHandler{hdlr}
return s.Handle(s.NewHandler(&Transport{h}, opts...))
}
type transportHandler struct {
TransportHandler
}
func (h *transportHandler) Stream(ctx context.Context, stream server.Stream) error {
return h.TransportHandler.Stream(ctx, &transportStreamStream{stream})
}
type Transport_StreamStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*Message) error
Recv() (*Message, error)
}
type transportStreamStream struct {
stream server.Stream
}
func (x *transportStreamStream) Close() error {
return x.stream.Close()
}
func (x *transportStreamStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *transportStreamStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *transportStreamStream) Send(m *Message) error {
return x.stream.Send(m)
}
func (x *transportStreamStream) Recv() (*Message, error) {
m := new(Message)
if err := x.stream.Recv(m); err != nil {
return nil, err
}
return m, nil
}

201
proto/transport.pb.go Normal file
View File

@ -0,0 +1,201 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: go-micro/network/transport/grpc/proto/transport.proto
package go_micro_grpc_transport
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Message struct {
Header map[string]string `protobuf:"bytes,1,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_29b90b9ccd5e0da5, []int{0}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Message.Unmarshal(m, b)
}
func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Message.Marshal(b, m, deterministic)
}
func (m *Message) XXX_Merge(src proto.Message) {
xxx_messageInfo_Message.Merge(m, src)
}
func (m *Message) XXX_Size() int {
return xxx_messageInfo_Message.Size(m)
}
func (m *Message) XXX_DiscardUnknown() {
xxx_messageInfo_Message.DiscardUnknown(m)
}
var xxx_messageInfo_Message proto.InternalMessageInfo
func (m *Message) GetHeader() map[string]string {
if m != nil {
return m.Header
}
return nil
}
func (m *Message) GetBody() []byte {
if m != nil {
return m.Body
}
return nil
}
func init() {
proto.RegisterType((*Message)(nil), "go.micro.grpc.transport.Message")
proto.RegisterMapType((map[string]string)(nil), "go.micro.grpc.transport.Message.HeaderEntry")
}
func init() {
proto.RegisterFile("go-micro/network/transport/grpc/proto/transport.proto", fileDescriptor_29b90b9ccd5e0da5)
}
var fileDescriptor_29b90b9ccd5e0da5 = []byte{
// 214 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x4d, 0xcf, 0xd7, 0xcd,
0xcd, 0x4c, 0x2e, 0xca, 0xd7, 0x2f, 0x29, 0x4a, 0xcc, 0x2b, 0x2e, 0xc8, 0x2f, 0x2a, 0xd1, 0x4f,
0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x28, 0xca, 0x2f, 0x41, 0x12, 0xd4, 0x03, 0xf3, 0x85, 0xc4, 0xd3,
0xf3, 0xf5, 0xc0, 0xca, 0xf5, 0x40, 0x8a, 0xf4, 0xe0, 0xd2, 0x4a, 0xf3, 0x18, 0xb9, 0xd8, 0x7d,
0x53, 0x8b, 0x8b, 0x13, 0xd3, 0x53, 0x85, 0x5c, 0xb8, 0xd8, 0x32, 0x52, 0x13, 0x53, 0x52, 0x8b,
0x24, 0x18, 0x15, 0x98, 0x35, 0xb8, 0x8d, 0x74, 0xf4, 0x70, 0xe8, 0xd2, 0x83, 0xea, 0xd0, 0xf3,
0x00, 0x2b, 0x77, 0xcd, 0x2b, 0x29, 0xaa, 0x0c, 0x82, 0xea, 0x15, 0x12, 0xe2, 0x62, 0x49, 0xca,
0x4f, 0xa9, 0x94, 0x60, 0x52, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0xb3, 0xa5, 0x2c, 0xb9, 0xb8, 0x91,
0x94, 0x0a, 0x09, 0x70, 0x31, 0x67, 0xa7, 0x56, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x81,
0x98, 0x42, 0x22, 0x5c, 0xac, 0x65, 0x89, 0x39, 0xa5, 0xa9, 0x60, 0x5d, 0x9c, 0x41, 0x10, 0x8e,
0x15, 0x93, 0x05, 0xa3, 0x51, 0x3c, 0x17, 0x67, 0x08, 0xcc, 0x5e, 0xa1, 0x20, 0x2e, 0xb6, 0xe0,
0x92, 0xa2, 0xd4, 0xc4, 0x5c, 0x21, 0x05, 0x42, 0x6e, 0x93, 0x22, 0xa8, 0x42, 0x89, 0x41, 0x83,
0xd1, 0x80, 0x31, 0x89, 0x0d, 0x1c, 0x42, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x77, 0xa1,
0xa4, 0xcb, 0x52, 0x01, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// TransportClient is the client API for Transport service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type TransportClient interface {
Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error)
}
type transportClient struct {
cc *grpc.ClientConn
}
func NewTransportClient(cc *grpc.ClientConn) TransportClient {
return &transportClient{cc}
}
func (c *transportClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Transport_serviceDesc.Streams[0], "/go.micro.grpc.transport.Transport/Stream", opts...)
if err != nil {
return nil, err
}
x := &transportStreamClient{stream}
return x, nil
}
type Transport_StreamClient interface {
Send(*Message) error
Recv() (*Message, error)
grpc.ClientStream
}
type transportStreamClient struct {
grpc.ClientStream
}
func (x *transportStreamClient) Send(m *Message) error {
return x.ClientStream.SendMsg(m)
}
func (x *transportStreamClient) Recv() (*Message, error) {
m := new(Message)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// TransportServer is the server API for Transport service.
type TransportServer interface {
Stream(Transport_StreamServer) error
}
func RegisterTransportServer(s *grpc.Server, srv TransportServer) {
s.RegisterService(&_Transport_serviceDesc, srv)
}
func _Transport_Stream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(TransportServer).Stream(&transportStreamServer{stream})
}
type Transport_StreamServer interface {
Send(*Message) error
Recv() (*Message, error)
grpc.ServerStream
}
type transportStreamServer struct {
grpc.ServerStream
}
func (x *transportStreamServer) Send(m *Message) error {
return x.ServerStream.SendMsg(m)
}
func (x *transportStreamServer) Recv() (*Message, error) {
m := new(Message)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Transport_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.grpc.transport.Transport",
HandlerType: (*TransportServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Stream",
Handler: _Transport_Stream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "go-micro/network/transport/grpc/proto/transport.proto",
}

12
proto/transport.proto Normal file
View File

@ -0,0 +1,12 @@
syntax = "proto3";
package go.micro.grpc.transport;
service Transport {
rpc Stream(stream Message) returns (stream Message) {}
}
message Message {
map<string, string> header = 1;
bytes body = 2;
}

97
socket.go Normal file
View File

@ -0,0 +1,97 @@
package grpc
import (
"github.com/micro/go-micro/network/transport"
pb "github.com/micro/go-micro/network/transport/grpc/proto"
"google.golang.org/grpc"
)
type grpcTransportClient struct {
conn *grpc.ClientConn
stream pb.Transport_StreamClient
local string
remote string
}
type grpcTransportSocket struct {
stream pb.Transport_StreamServer
local string
remote string
}
func (g *grpcTransportClient) Local() string {
return g.local
}
func (g *grpcTransportClient) Remote() string {
return g.remote
}
func (g *grpcTransportClient) Recv(m *transport.Message) error {
if m == nil {
return nil
}
msg, err := g.stream.Recv()
if err != nil {
return err
}
m.Header = msg.Header
m.Body = msg.Body
return nil
}
func (g *grpcTransportClient) Send(m *transport.Message) error {
if m == nil {
return nil
}
return g.stream.Send(&pb.Message{
Header: m.Header,
Body: m.Body,
})
}
func (g *grpcTransportClient) Close() error {
return g.conn.Close()
}
func (g *grpcTransportSocket) Local() string {
return g.local
}
func (g *grpcTransportSocket) Remote() string {
return g.remote
}
func (g *grpcTransportSocket) Recv(m *transport.Message) error {
if m == nil {
return nil
}
msg, err := g.stream.Recv()
if err != nil {
return err
}
m.Header = msg.Header
m.Body = msg.Body
return nil
}
func (g *grpcTransportSocket) Send(m *transport.Message) error {
if m == nil {
return nil
}
return g.stream.Send(&pb.Message{
Header: m.Header,
Body: m.Body,
})
}
func (g *grpcTransportSocket) Close() error {
return nil
}