Add grpc transport

This commit is contained in:
Asim Aslam 2019-05-24 17:15:59 +01:00
parent b926ae81bb
commit 25a0d05ac9
8 changed files with 793 additions and 0 deletions

View File

@ -33,6 +33,7 @@ import (
// transports
"github.com/micro/go-micro/transport"
tgrpc "github.com/micro/go-micro/transport/grpc"
thttp "github.com/micro/go-micro/transport/http"
tmem "github.com/micro/go-micro/transport/memory"
)
@ -197,6 +198,7 @@ var (
DefaultTransports = map[string]func(...transport.Option) transport.Transport{
"memory": tmem.NewTransport,
"http": thttp.NewTransport,
"grpc": tgrpc.NewTransport,
}
// used for default selection as the fall back

176
transport/grpc/grpc.go Normal file
View File

@ -0,0 +1,176 @@
// Package grpc provides a grpc transport
package grpc
import (
"context"
"crypto/tls"
"net"
"github.com/micro/go-micro/transport"
maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pb "github.com/micro/go-plugins/transport/grpc/proto"
)
type grpcTransport struct {
opts transport.Options
}
type grpcTransportListener struct {
listener net.Listener
secure bool
tls *tls.Config
}
func getTLSConfig(addr string) (*tls.Config, error) {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
return &tls.Config{Certificates: []tls.Certificate{cert}}, nil
}
func (t *grpcTransportListener) Addr() string {
return t.listener.Addr().String()
}
func (t *grpcTransportListener) Close() error {
return t.listener.Close()
}
func (t *grpcTransportListener) Accept(fn func(transport.Socket)) error {
var opts []grpc.ServerOption
// setup tls if specified
if t.secure || t.tls != nil {
config := t.tls
if config == nil {
var err error
addr := t.listener.Addr().String()
config, err = getTLSConfig(addr)
if err != nil {
return err
}
}
creds := credentials.NewTLS(config)
opts = append(opts, grpc.Creds(creds))
}
// new service
srv := grpc.NewServer(opts...)
// register service
pb.RegisterTransportServer(srv, &microTransport{addr: t.listener.Addr().String(), fn: fn})
// start serving
return srv.Serve(t.listener)
}
func (t *grpcTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
dopts := transport.DialOptions{
Timeout: transport.DefaultDialTimeout,
}
for _, opt := range opts {
opt(&dopts)
}
options := []grpc.DialOption{
grpc.WithTimeout(dopts.Timeout),
}
if t.opts.Secure || t.opts.TLSConfig != nil {
config := t.opts.TLSConfig
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
creds := credentials.NewTLS(config)
options = append(options, grpc.WithTransportCredentials(creds))
} else {
options = append(options, grpc.WithInsecure())
}
// dial the server
conn, err := grpc.Dial(addr, options...)
if err != nil {
return nil, err
}
// create stream
stream, err := pb.NewTransportClient(conn).Stream(context.Background())
if err != nil {
return nil, err
}
// return a client
return &grpcTransportClient{
conn: conn,
stream: stream,
local: "localhost",
remote: addr,
}, nil
}
func (t *grpcTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) {
var options transport.ListenOptions
for _, o := range opts {
o(&options)
}
ln, err := mnet.Listen(addr, func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
})
if err != nil {
return nil, err
}
return &grpcTransportListener{
listener: ln,
tls: t.opts.TLSConfig,
secure: t.opts.Secure,
}, nil
}
func (t *grpcTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *grpcTransport) Options() transport.Options {
return t.opts
}
func (t *grpcTransport) String() string {
return "grpc"
}
func NewTransport(opts ...transport.Option) transport.Transport {
var options transport.Options
for _, o := range opts {
o(&options)
}
return &grpcTransport{opts: options}
}

109
transport/grpc/grpc_test.go Normal file
View File

@ -0,0 +1,109 @@
package grpc
import (
"strings"
"testing"
"github.com/micro/go-micro/transport"
)
func expectedPort(t *testing.T, expected string, lsn transport.Listener) {
parts := strings.Split(lsn.Addr(), ":")
port := parts[len(parts)-1]
if port != expected {
lsn.Close()
t.Errorf("Expected address to be `%s`, got `%s`", expected, port)
}
}
func TestGRPCTransportPortRange(t *testing.T) {
tp := NewTransport()
lsn1, err := tp.Listen(":44444-44448")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
expectedPort(t, "44444", lsn1)
lsn2, err := tp.Listen(":44444-44448")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
expectedPort(t, "44445", lsn2)
lsn, err := tp.Listen(":0")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
lsn.Close()
lsn1.Close()
lsn2.Close()
}
func TestGRPCTransportCommunication(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen(":0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
defer l.Close()
fn := func(sock transport.Socket) {
defer sock.Close()
for {
var m transport.Message
if err := sock.Recv(&m); err != nil {
return
}
if err := sock.Send(&m); err != nil {
return
}
}
}
done := make(chan bool)
go func() {
if err := l.Accept(fn); err != nil {
select {
case <-done:
default:
t.Errorf("Unexpected accept err: %v", err)
}
}
}()
c, err := tr.Dial(l.Addr())
if err != nil {
t.Errorf("Unexpected dial err: %v", err)
}
defer c.Close()
m := transport.Message{
Header: map[string]string{
"X-Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
if err := c.Send(&m); err != nil {
t.Errorf("Unexpected send err: %v", err)
}
var rm transport.Message
if err := c.Recv(&rm); err != nil {
t.Errorf("Unexpected recv err: %v", err)
}
if string(rm.Body) != string(m.Body) {
t.Errorf("Expected %v, got %v", m.Body, rm.Body)
}
close(done)
}

39
transport/grpc/handler.go Normal file
View File

@ -0,0 +1,39 @@
package grpc
import (
"runtime/debug"
"github.com/micro/go-log"
"github.com/micro/go-micro/transport"
pb "github.com/micro/go-plugins/transport/grpc/proto"
"google.golang.org/grpc/peer"
)
// microTransport satisfies the pb.TransportServer inteface
type microTransport struct {
addr string
fn func(transport.Socket)
}
func (m *microTransport) Stream(ts pb.Transport_StreamServer) error {
sock := &grpcTransportSocket{
stream: ts,
local: m.addr,
}
p, ok := peer.FromContext(ts.Context())
if ok {
sock.remote = p.Addr.String()
}
defer func() {
if r := recover(); r != nil {
log.Log(r, string(debug.Stack()))
sock.Close()
}
}()
// execute socket func
m.fn(sock)
return nil
}

View File

@ -0,0 +1,170 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: github.com/micro/go-plugins/transport/grpc/proto/transport.proto
/*
Package go_micro_grpc_transport is a generated protocol buffer package.
It is generated from these files:
github.com/micro/go-plugins/transport/grpc/proto/transport.proto
It has these top-level messages:
Message
*/
package go_micro_grpc_transport
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "context"
client "github.com/micro/go-micro/client"
server "github.com/micro/go-micro/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ client.Option
var _ server.Option
// Client API for Transport service
type TransportService interface {
Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error)
}
type transportService struct {
c client.Client
name string
}
func NewTransportService(name string, c client.Client) TransportService {
if c == nil {
c = client.NewClient()
}
if len(name) == 0 {
name = "go.micro.grpc.transport"
}
return &transportService{
c: c,
name: name,
}
}
func (c *transportService) Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) {
req := c.c.NewRequest(c.name, "Transport.Stream", &Message{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
return &transportServiceStream{stream}, nil
}
type Transport_StreamService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*Message) error
Recv() (*Message, error)
}
type transportServiceStream struct {
stream client.Stream
}
func (x *transportServiceStream) Close() error {
return x.stream.Close()
}
func (x *transportServiceStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *transportServiceStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *transportServiceStream) Send(m *Message) error {
return x.stream.Send(m)
}
func (x *transportServiceStream) Recv() (*Message, error) {
m := new(Message)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
// Server API for Transport service
type TransportHandler interface {
Stream(context.Context, Transport_StreamStream) error
}
func RegisterTransportHandler(s server.Server, hdlr TransportHandler, opts ...server.HandlerOption) {
type transport interface {
Stream(ctx context.Context, stream server.Stream) error
}
type Transport struct {
transport
}
h := &transportHandler{hdlr}
s.Handle(s.NewHandler(&Transport{h}, opts...))
}
type transportHandler struct {
TransportHandler
}
func (h *transportHandler) Stream(ctx context.Context, stream server.Stream) error {
return h.TransportHandler.Stream(ctx, &transportStreamStream{stream})
}
type Transport_StreamStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*Message) error
Recv() (*Message, error)
}
type transportStreamStream struct {
stream server.Stream
}
func (x *transportStreamStream) Close() error {
return x.stream.Close()
}
func (x *transportStreamStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *transportStreamStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *transportStreamStream) Send(m *Message) error {
return x.stream.Send(m)
}
func (x *transportStreamStream) Recv() (*Message, error) {
m := new(Message)
if err := x.stream.Recv(m); err != nil {
return nil, err
}
return m, nil
}

View File

@ -0,0 +1,188 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: github.com/micro/go-plugins/transport/grpc/proto/transport.proto
/*
Package go_micro_grpc_transport is a generated protocol buffer package.
It is generated from these files:
github.com/micro/go-plugins/transport/grpc/proto/transport.proto
It has these top-level messages:
Message
*/
package go_micro_grpc_transport
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Message struct {
Header map[string]string `protobuf:"bytes,1,rep,name=header" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Message) GetHeader() map[string]string {
if m != nil {
return m.Header
}
return nil
}
func (m *Message) GetBody() []byte {
if m != nil {
return m.Body
}
return nil
}
func init() {
proto.RegisterType((*Message)(nil), "go.micro.grpc.transport.Message")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for Transport service
type TransportClient interface {
Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error)
}
type transportClient struct {
cc *grpc.ClientConn
}
func NewTransportClient(cc *grpc.ClientConn) TransportClient {
return &transportClient{cc}
}
func (c *transportClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) {
stream, err := grpc.NewClientStream(ctx, &_Transport_serviceDesc.Streams[0], c.cc, "/go.micro.grpc.transport.Transport/Stream", opts...)
if err != nil {
return nil, err
}
x := &transportStreamClient{stream}
return x, nil
}
type Transport_StreamClient interface {
Send(*Message) error
Recv() (*Message, error)
grpc.ClientStream
}
type transportStreamClient struct {
grpc.ClientStream
}
func (x *transportStreamClient) Send(m *Message) error {
return x.ClientStream.SendMsg(m)
}
func (x *transportStreamClient) Recv() (*Message, error) {
m := new(Message)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for Transport service
type TransportServer interface {
Stream(Transport_StreamServer) error
}
func RegisterTransportServer(s *grpc.Server, srv TransportServer) {
s.RegisterService(&_Transport_serviceDesc, srv)
}
func _Transport_Stream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(TransportServer).Stream(&transportStreamServer{stream})
}
type Transport_StreamServer interface {
Send(*Message) error
Recv() (*Message, error)
grpc.ServerStream
}
type transportStreamServer struct {
grpc.ServerStream
}
func (x *transportStreamServer) Send(m *Message) error {
return x.ServerStream.SendMsg(m)
}
func (x *transportStreamServer) Recv() (*Message, error) {
m := new(Message)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Transport_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.grpc.transport.Transport",
HandlerType: (*TransportServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Stream",
Handler: _Transport_Stream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "github.com/micro/go-plugins/transport/grpc/proto/transport.proto",
}
func init() {
proto.RegisterFile("github.com/micro/go-plugins/transport/grpc/proto/transport.proto", fileDescriptor0)
}
var fileDescriptor0 = []byte{
// 234 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x72, 0x48, 0xcf, 0x2c, 0xc9,
0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0xcf, 0xcd, 0x4c, 0x2e, 0xca, 0xd7, 0x4f, 0xcf, 0xd7,
0x2d, 0xc8, 0x29, 0x4d, 0xcf, 0xcc, 0x2b, 0xd6, 0x2f, 0x29, 0x4a, 0xcc, 0x2b, 0x2e, 0xc8, 0x2f,
0x2a, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x47, 0x08, 0xea, 0x81,
0xf9, 0x42, 0xe2, 0xe9, 0xf9, 0x7a, 0x60, 0x9d, 0x7a, 0x20, 0x45, 0x7a, 0x70, 0x69, 0xa5, 0x79,
0x8c, 0x5c, 0xec, 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x42, 0x2e, 0x5c, 0x6c, 0x19, 0xa9,
0x89, 0x29, 0xa9, 0x45, 0x12, 0x8c, 0x0a, 0xcc, 0x1a, 0xdc, 0x46, 0x3a, 0x7a, 0x38, 0x74, 0xe9,
0x41, 0x75, 0xe8, 0x79, 0x80, 0x95, 0xbb, 0xe6, 0x95, 0x14, 0x55, 0x06, 0x41, 0xf5, 0x0a, 0x09,
0x71, 0xb1, 0x24, 0xe5, 0xa7, 0x54, 0x4a, 0x30, 0x29, 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x52,
0x96, 0x5c, 0xdc, 0x48, 0x4a, 0x85, 0x04, 0xb8, 0x98, 0xb3, 0x53, 0x2b, 0x25, 0x18, 0x15, 0x18,
0x35, 0x38, 0x83, 0x40, 0x4c, 0x21, 0x11, 0x2e, 0xd6, 0xb2, 0xc4, 0x9c, 0xd2, 0x54, 0xb0, 0x2e,
0xce, 0x20, 0x08, 0xc7, 0x8a, 0xc9, 0x82, 0xd1, 0x28, 0x9e, 0x8b, 0x33, 0x04, 0x66, 0xaf, 0x50,
0x10, 0x17, 0x5b, 0x70, 0x49, 0x51, 0x6a, 0x62, 0xae, 0x90, 0x02, 0x21, 0xb7, 0x49, 0x11, 0x54,
0xa1, 0xc4, 0xa0, 0xc1, 0x68, 0xc0, 0x98, 0xc4, 0x06, 0x0e, 0x21, 0x63, 0x40, 0x00, 0x00, 0x00,
0xff, 0xff, 0x4e, 0xcc, 0x76, 0x38, 0x65, 0x01, 0x00, 0x00,
}

View File

@ -0,0 +1,12 @@
syntax = "proto3";
package go.micro.grpc.transport;
service Transport {
rpc Stream(stream Message) returns (stream Message) {}
}
message Message {
map<string, string> header = 1;
bytes body = 2;
}

97
transport/grpc/socket.go Normal file
View File

@ -0,0 +1,97 @@
package grpc
import (
"github.com/micro/go-micro/transport"
pb "github.com/micro/go-plugins/transport/grpc/proto"
"google.golang.org/grpc"
)
type grpcTransportClient struct {
conn *grpc.ClientConn
stream pb.Transport_StreamClient
local string
remote string
}
type grpcTransportSocket struct {
stream pb.Transport_StreamServer
local string
remote string
}
func (g *grpcTransportClient) Local() string {
return g.local
}
func (g *grpcTransportClient) Remote() string {
return g.remote
}
func (g *grpcTransportClient) Recv(m *transport.Message) error {
if m == nil {
return nil
}
msg, err := g.stream.Recv()
if err != nil {
return err
}
m.Header = msg.Header
m.Body = msg.Body
return nil
}
func (g *grpcTransportClient) Send(m *transport.Message) error {
if m == nil {
return nil
}
return g.stream.Send(&pb.Message{
Header: m.Header,
Body: m.Body,
})
}
func (g *grpcTransportClient) Close() error {
return g.conn.Close()
}
func (g *grpcTransportSocket) Local() string {
return g.local
}
func (g *grpcTransportSocket) Remote() string {
return g.remote
}
func (g *grpcTransportSocket) Recv(m *transport.Message) error {
if m == nil {
return nil
}
msg, err := g.stream.Recv()
if err != nil {
return err
}
m.Header = msg.Header
m.Body = msg.Body
return nil
}
func (g *grpcTransportSocket) Send(m *transport.Message) error {
if m == nil {
return nil
}
return g.stream.Send(&pb.Message{
Header: m.Header,
Body: m.Body,
})
}
func (g *grpcTransportSocket) Close() error {
return nil
}