| @@ -34,6 +34,7 @@ import ( | ||||
|  | ||||
| 	// transports | ||||
| 	"github.com/micro/go-micro/transport" | ||||
| 	tgrpc "github.com/micro/go-micro/transport/grpc" | ||||
| 	thttp "github.com/micro/go-micro/transport/http" | ||||
| 	tmem "github.com/micro/go-micro/transport/memory" | ||||
| ) | ||||
| @@ -199,6 +200,7 @@ var ( | ||||
| 	DefaultTransports = map[string]func(...transport.Option) transport.Transport{ | ||||
| 		"memory": tmem.NewTransport, | ||||
| 		"http":   thttp.NewTransport, | ||||
| 		"grpc":   tgrpc.NewTransport, | ||||
| 	} | ||||
|  | ||||
| 	// used for default selection as the fall back | ||||
|   | ||||
							
								
								
									
										176
									
								
								transport/grpc/grpc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										176
									
								
								transport/grpc/grpc.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,176 @@ | ||||
| // Package grpc provides a grpc transport | ||||
| package grpc | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"crypto/tls" | ||||
| 	"net" | ||||
|  | ||||
| 	"github.com/micro/go-micro/transport" | ||||
| 	maddr "github.com/micro/util/go/lib/addr" | ||||
| 	mnet "github.com/micro/util/go/lib/net" | ||||
| 	mls "github.com/micro/util/go/lib/tls" | ||||
|  | ||||
| 	"google.golang.org/grpc" | ||||
| 	"google.golang.org/grpc/credentials" | ||||
|  | ||||
| 	pb "github.com/micro/go-plugins/transport/grpc/proto" | ||||
| ) | ||||
|  | ||||
| type grpcTransport struct { | ||||
| 	opts transport.Options | ||||
| } | ||||
|  | ||||
| type grpcTransportListener struct { | ||||
| 	listener net.Listener | ||||
| 	secure   bool | ||||
| 	tls      *tls.Config | ||||
| } | ||||
|  | ||||
| func getTLSConfig(addr string) (*tls.Config, error) { | ||||
| 	hosts := []string{addr} | ||||
|  | ||||
| 	// check if its a valid host:port | ||||
| 	if host, _, err := net.SplitHostPort(addr); err == nil { | ||||
| 		if len(host) == 0 { | ||||
| 			hosts = maddr.IPs() | ||||
| 		} else { | ||||
| 			hosts = []string{host} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// generate a certificate | ||||
| 	cert, err := mls.Certificate(hosts...) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return &tls.Config{Certificates: []tls.Certificate{cert}}, nil | ||||
| } | ||||
|  | ||||
| func (t *grpcTransportListener) Addr() string { | ||||
| 	return t.listener.Addr().String() | ||||
| } | ||||
|  | ||||
| func (t *grpcTransportListener) Close() error { | ||||
| 	return t.listener.Close() | ||||
| } | ||||
|  | ||||
| func (t *grpcTransportListener) Accept(fn func(transport.Socket)) error { | ||||
| 	var opts []grpc.ServerOption | ||||
|  | ||||
| 	// setup tls if specified | ||||
| 	if t.secure || t.tls != nil { | ||||
| 		config := t.tls | ||||
| 		if config == nil { | ||||
| 			var err error | ||||
| 			addr := t.listener.Addr().String() | ||||
| 			config, err = getTLSConfig(addr) | ||||
| 			if err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		creds := credentials.NewTLS(config) | ||||
| 		opts = append(opts, grpc.Creds(creds)) | ||||
| 	} | ||||
|  | ||||
| 	// new service | ||||
| 	srv := grpc.NewServer(opts...) | ||||
|  | ||||
| 	// register service | ||||
| 	pb.RegisterTransportServer(srv, µTransport{addr: t.listener.Addr().String(), fn: fn}) | ||||
|  | ||||
| 	// start serving | ||||
| 	return srv.Serve(t.listener) | ||||
| } | ||||
|  | ||||
| func (t *grpcTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { | ||||
| 	dopts := transport.DialOptions{ | ||||
| 		Timeout: transport.DefaultDialTimeout, | ||||
| 	} | ||||
|  | ||||
| 	for _, opt := range opts { | ||||
| 		opt(&dopts) | ||||
| 	} | ||||
|  | ||||
| 	options := []grpc.DialOption{ | ||||
| 		grpc.WithTimeout(dopts.Timeout), | ||||
| 	} | ||||
|  | ||||
| 	if t.opts.Secure || t.opts.TLSConfig != nil { | ||||
| 		config := t.opts.TLSConfig | ||||
| 		if config == nil { | ||||
| 			config = &tls.Config{ | ||||
| 				InsecureSkipVerify: true, | ||||
| 			} | ||||
| 		} | ||||
| 		creds := credentials.NewTLS(config) | ||||
| 		options = append(options, grpc.WithTransportCredentials(creds)) | ||||
| 	} else { | ||||
| 		options = append(options, grpc.WithInsecure()) | ||||
| 	} | ||||
|  | ||||
| 	// dial the server | ||||
| 	conn, err := grpc.Dial(addr, options...) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	// create stream | ||||
| 	stream, err := pb.NewTransportClient(conn).Stream(context.Background()) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	// return a client | ||||
| 	return &grpcTransportClient{ | ||||
| 		conn:   conn, | ||||
| 		stream: stream, | ||||
| 		local:  "localhost", | ||||
| 		remote: addr, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (t *grpcTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) { | ||||
| 	var options transport.ListenOptions | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	ln, err := mnet.Listen(addr, func(addr string) (net.Listener, error) { | ||||
| 		return net.Listen("tcp", addr) | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return &grpcTransportListener{ | ||||
| 		listener: ln, | ||||
| 		tls:      t.opts.TLSConfig, | ||||
| 		secure:   t.opts.Secure, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (t *grpcTransport) Init(opts ...transport.Option) error { | ||||
| 	for _, o := range opts { | ||||
| 		o(&t.opts) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (t *grpcTransport) Options() transport.Options { | ||||
| 	return t.opts | ||||
| } | ||||
|  | ||||
| func (t *grpcTransport) String() string { | ||||
| 	return "grpc" | ||||
| } | ||||
|  | ||||
| func NewTransport(opts ...transport.Option) transport.Transport { | ||||
| 	var options transport.Options | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
| 	return &grpcTransport{opts: options} | ||||
| } | ||||
							
								
								
									
										109
									
								
								transport/grpc/grpc_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										109
									
								
								transport/grpc/grpc_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,109 @@ | ||||
| package grpc | ||||
|  | ||||
| import ( | ||||
| 	"strings" | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/micro/go-micro/transport" | ||||
| ) | ||||
|  | ||||
| func expectedPort(t *testing.T, expected string, lsn transport.Listener) { | ||||
| 	parts := strings.Split(lsn.Addr(), ":") | ||||
| 	port := parts[len(parts)-1] | ||||
|  | ||||
| 	if port != expected { | ||||
| 		lsn.Close() | ||||
| 		t.Errorf("Expected address to be `%s`, got `%s`", expected, port) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestGRPCTransportPortRange(t *testing.T) { | ||||
| 	tp := NewTransport() | ||||
|  | ||||
| 	lsn1, err := tp.Listen(":44444-44448") | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Did not expect an error, got %s", err) | ||||
| 	} | ||||
| 	expectedPort(t, "44444", lsn1) | ||||
|  | ||||
| 	lsn2, err := tp.Listen(":44444-44448") | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Did not expect an error, got %s", err) | ||||
| 	} | ||||
| 	expectedPort(t, "44445", lsn2) | ||||
|  | ||||
| 	lsn, err := tp.Listen(":0") | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Did not expect an error, got %s", err) | ||||
| 	} | ||||
|  | ||||
| 	lsn.Close() | ||||
| 	lsn1.Close() | ||||
| 	lsn2.Close() | ||||
| } | ||||
|  | ||||
| func TestGRPCTransportCommunication(t *testing.T) { | ||||
| 	tr := NewTransport() | ||||
|  | ||||
| 	l, err := tr.Listen(":0") | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Unexpected listen err: %v", err) | ||||
| 	} | ||||
| 	defer l.Close() | ||||
|  | ||||
| 	fn := func(sock transport.Socket) { | ||||
| 		defer sock.Close() | ||||
|  | ||||
| 		for { | ||||
| 			var m transport.Message | ||||
| 			if err := sock.Recv(&m); err != nil { | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			if err := sock.Send(&m); err != nil { | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	done := make(chan bool) | ||||
|  | ||||
| 	go func() { | ||||
| 		if err := l.Accept(fn); err != nil { | ||||
| 			select { | ||||
| 			case <-done: | ||||
| 			default: | ||||
| 				t.Errorf("Unexpected accept err: %v", err) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	c, err := tr.Dial(l.Addr()) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Unexpected dial err: %v", err) | ||||
| 	} | ||||
| 	defer c.Close() | ||||
|  | ||||
| 	m := transport.Message{ | ||||
| 		Header: map[string]string{ | ||||
| 			"X-Content-Type": "application/json", | ||||
| 		}, | ||||
| 		Body: []byte(`{"message": "Hello World"}`), | ||||
| 	} | ||||
|  | ||||
| 	if err := c.Send(&m); err != nil { | ||||
| 		t.Errorf("Unexpected send err: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	var rm transport.Message | ||||
|  | ||||
| 	if err := c.Recv(&rm); err != nil { | ||||
| 		t.Errorf("Unexpected recv err: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if string(rm.Body) != string(m.Body) { | ||||
| 		t.Errorf("Expected %v, got %v", m.Body, rm.Body) | ||||
| 	} | ||||
|  | ||||
| 	close(done) | ||||
| } | ||||
							
								
								
									
										39
									
								
								transport/grpc/handler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								transport/grpc/handler.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,39 @@ | ||||
| package grpc | ||||
|  | ||||
| import ( | ||||
| 	"runtime/debug" | ||||
|  | ||||
| 	"github.com/micro/go-log" | ||||
| 	"github.com/micro/go-micro/transport" | ||||
| 	pb "github.com/micro/go-plugins/transport/grpc/proto" | ||||
| 	"google.golang.org/grpc/peer" | ||||
| ) | ||||
|  | ||||
| // microTransport satisfies the pb.TransportServer inteface | ||||
| type microTransport struct { | ||||
| 	addr string | ||||
| 	fn   func(transport.Socket) | ||||
| } | ||||
|  | ||||
| func (m *microTransport) Stream(ts pb.Transport_StreamServer) error { | ||||
| 	sock := &grpcTransportSocket{ | ||||
| 		stream: ts, | ||||
| 		local:  m.addr, | ||||
| 	} | ||||
|  | ||||
| 	p, ok := peer.FromContext(ts.Context()) | ||||
| 	if ok { | ||||
| 		sock.remote = p.Addr.String() | ||||
| 	} | ||||
|  | ||||
| 	defer func() { | ||||
| 		if r := recover(); r != nil { | ||||
| 			log.Log(r, string(debug.Stack())) | ||||
| 			sock.Close() | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	// execute socket func | ||||
| 	m.fn(sock) | ||||
| 	return nil | ||||
| } | ||||
							
								
								
									
										170
									
								
								transport/grpc/proto/transport.micro.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										170
									
								
								transport/grpc/proto/transport.micro.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,170 @@ | ||||
| // Code generated by protoc-gen-micro. DO NOT EDIT. | ||||
| // source: github.com/micro/go-plugins/transport/grpc/proto/transport.proto | ||||
|  | ||||
| /* | ||||
| Package go_micro_grpc_transport is a generated protocol buffer package. | ||||
|  | ||||
| It is generated from these files: | ||||
| 	github.com/micro/go-plugins/transport/grpc/proto/transport.proto | ||||
|  | ||||
| It has these top-level messages: | ||||
| 	Message | ||||
| */ | ||||
| package go_micro_grpc_transport | ||||
|  | ||||
| import proto "github.com/golang/protobuf/proto" | ||||
| import fmt "fmt" | ||||
| import math "math" | ||||
|  | ||||
| import ( | ||||
| 	context "context" | ||||
| 	client "github.com/micro/go-micro/client" | ||||
| 	server "github.com/micro/go-micro/server" | ||||
| ) | ||||
|  | ||||
| // Reference imports to suppress errors if they are not otherwise used. | ||||
| var _ = proto.Marshal | ||||
| var _ = fmt.Errorf | ||||
| var _ = math.Inf | ||||
|  | ||||
| // This is a compile-time assertion to ensure that this generated file | ||||
| // is compatible with the proto package it is being compiled against. | ||||
| // A compilation error at this line likely means your copy of the | ||||
| // proto package needs to be updated. | ||||
| const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package | ||||
|  | ||||
| // Reference imports to suppress errors if they are not otherwise used. | ||||
| var _ context.Context | ||||
| var _ client.Option | ||||
| var _ server.Option | ||||
|  | ||||
| // Client API for Transport service | ||||
|  | ||||
| type TransportService interface { | ||||
| 	Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) | ||||
| } | ||||
|  | ||||
| type transportService struct { | ||||
| 	c    client.Client | ||||
| 	name string | ||||
| } | ||||
|  | ||||
| func NewTransportService(name string, c client.Client) TransportService { | ||||
| 	if c == nil { | ||||
| 		c = client.NewClient() | ||||
| 	} | ||||
| 	if len(name) == 0 { | ||||
| 		name = "go.micro.grpc.transport" | ||||
| 	} | ||||
| 	return &transportService{ | ||||
| 		c:    c, | ||||
| 		name: name, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (c *transportService) Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) { | ||||
| 	req := c.c.NewRequest(c.name, "Transport.Stream", &Message{}) | ||||
| 	stream, err := c.c.Stream(ctx, req, opts...) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &transportServiceStream{stream}, nil | ||||
| } | ||||
|  | ||||
| type Transport_StreamService interface { | ||||
| 	SendMsg(interface{}) error | ||||
| 	RecvMsg(interface{}) error | ||||
| 	Close() error | ||||
| 	Send(*Message) error | ||||
| 	Recv() (*Message, error) | ||||
| } | ||||
|  | ||||
| type transportServiceStream struct { | ||||
| 	stream client.Stream | ||||
| } | ||||
|  | ||||
| func (x *transportServiceStream) Close() error { | ||||
| 	return x.stream.Close() | ||||
| } | ||||
|  | ||||
| func (x *transportServiceStream) SendMsg(m interface{}) error { | ||||
| 	return x.stream.Send(m) | ||||
| } | ||||
|  | ||||
| func (x *transportServiceStream) RecvMsg(m interface{}) error { | ||||
| 	return x.stream.Recv(m) | ||||
| } | ||||
|  | ||||
| func (x *transportServiceStream) Send(m *Message) error { | ||||
| 	return x.stream.Send(m) | ||||
| } | ||||
|  | ||||
| func (x *transportServiceStream) Recv() (*Message, error) { | ||||
| 	m := new(Message) | ||||
| 	err := x.stream.Recv(m) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return m, nil | ||||
| } | ||||
|  | ||||
| // Server API for Transport service | ||||
|  | ||||
| type TransportHandler interface { | ||||
| 	Stream(context.Context, Transport_StreamStream) error | ||||
| } | ||||
|  | ||||
| func RegisterTransportHandler(s server.Server, hdlr TransportHandler, opts ...server.HandlerOption) { | ||||
| 	type transport interface { | ||||
| 		Stream(ctx context.Context, stream server.Stream) error | ||||
| 	} | ||||
| 	type Transport struct { | ||||
| 		transport | ||||
| 	} | ||||
| 	h := &transportHandler{hdlr} | ||||
| 	s.Handle(s.NewHandler(&Transport{h}, opts...)) | ||||
| } | ||||
|  | ||||
| type transportHandler struct { | ||||
| 	TransportHandler | ||||
| } | ||||
|  | ||||
| func (h *transportHandler) Stream(ctx context.Context, stream server.Stream) error { | ||||
| 	return h.TransportHandler.Stream(ctx, &transportStreamStream{stream}) | ||||
| } | ||||
|  | ||||
| type Transport_StreamStream interface { | ||||
| 	SendMsg(interface{}) error | ||||
| 	RecvMsg(interface{}) error | ||||
| 	Close() error | ||||
| 	Send(*Message) error | ||||
| 	Recv() (*Message, error) | ||||
| } | ||||
|  | ||||
| type transportStreamStream struct { | ||||
| 	stream server.Stream | ||||
| } | ||||
|  | ||||
| func (x *transportStreamStream) Close() error { | ||||
| 	return x.stream.Close() | ||||
| } | ||||
|  | ||||
| func (x *transportStreamStream) SendMsg(m interface{}) error { | ||||
| 	return x.stream.Send(m) | ||||
| } | ||||
|  | ||||
| func (x *transportStreamStream) RecvMsg(m interface{}) error { | ||||
| 	return x.stream.Recv(m) | ||||
| } | ||||
|  | ||||
| func (x *transportStreamStream) Send(m *Message) error { | ||||
| 	return x.stream.Send(m) | ||||
| } | ||||
|  | ||||
| func (x *transportStreamStream) Recv() (*Message, error) { | ||||
| 	m := new(Message) | ||||
| 	if err := x.stream.Recv(m); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return m, nil | ||||
| } | ||||
							
								
								
									
										188
									
								
								transport/grpc/proto/transport.pb.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										188
									
								
								transport/grpc/proto/transport.pb.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,188 @@ | ||||
| // Code generated by protoc-gen-go. DO NOT EDIT. | ||||
| // source: github.com/micro/go-plugins/transport/grpc/proto/transport.proto | ||||
|  | ||||
| /* | ||||
| Package go_micro_grpc_transport is a generated protocol buffer package. | ||||
|  | ||||
| It is generated from these files: | ||||
| 	github.com/micro/go-plugins/transport/grpc/proto/transport.proto | ||||
|  | ||||
| It has these top-level messages: | ||||
| 	Message | ||||
| */ | ||||
| package go_micro_grpc_transport | ||||
|  | ||||
| import proto "github.com/golang/protobuf/proto" | ||||
| import fmt "fmt" | ||||
| import math "math" | ||||
|  | ||||
| import ( | ||||
| 	context "golang.org/x/net/context" | ||||
| 	grpc "google.golang.org/grpc" | ||||
| ) | ||||
|  | ||||
| // Reference imports to suppress errors if they are not otherwise used. | ||||
| var _ = proto.Marshal | ||||
| var _ = fmt.Errorf | ||||
| var _ = math.Inf | ||||
|  | ||||
| // This is a compile-time assertion to ensure that this generated file | ||||
| // is compatible with the proto package it is being compiled against. | ||||
| // A compilation error at this line likely means your copy of the | ||||
| // proto package needs to be updated. | ||||
| const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package | ||||
|  | ||||
| type Message struct { | ||||
| 	Header map[string]string `protobuf:"bytes,1,rep,name=header" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` | ||||
| 	Body   []byte            `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` | ||||
| } | ||||
|  | ||||
| func (m *Message) Reset()                    { *m = Message{} } | ||||
| func (m *Message) String() string            { return proto.CompactTextString(m) } | ||||
| func (*Message) ProtoMessage()               {} | ||||
| func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } | ||||
|  | ||||
| func (m *Message) GetHeader() map[string]string { | ||||
| 	if m != nil { | ||||
| 		return m.Header | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (m *Message) GetBody() []byte { | ||||
| 	if m != nil { | ||||
| 		return m.Body | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	proto.RegisterType((*Message)(nil), "go.micro.grpc.transport.Message") | ||||
| } | ||||
|  | ||||
| // Reference imports to suppress errors if they are not otherwise used. | ||||
| var _ context.Context | ||||
| var _ grpc.ClientConn | ||||
|  | ||||
| // This is a compile-time assertion to ensure that this generated file | ||||
| // is compatible with the grpc package it is being compiled against. | ||||
| const _ = grpc.SupportPackageIsVersion4 | ||||
|  | ||||
| // Client API for Transport service | ||||
|  | ||||
| type TransportClient interface { | ||||
| 	Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) | ||||
| } | ||||
|  | ||||
| type transportClient struct { | ||||
| 	cc *grpc.ClientConn | ||||
| } | ||||
|  | ||||
| func NewTransportClient(cc *grpc.ClientConn) TransportClient { | ||||
| 	return &transportClient{cc} | ||||
| } | ||||
|  | ||||
| func (c *transportClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) { | ||||
| 	stream, err := grpc.NewClientStream(ctx, &_Transport_serviceDesc.Streams[0], c.cc, "/go.micro.grpc.transport.Transport/Stream", opts...) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	x := &transportStreamClient{stream} | ||||
| 	return x, nil | ||||
| } | ||||
|  | ||||
| type Transport_StreamClient interface { | ||||
| 	Send(*Message) error | ||||
| 	Recv() (*Message, error) | ||||
| 	grpc.ClientStream | ||||
| } | ||||
|  | ||||
| type transportStreamClient struct { | ||||
| 	grpc.ClientStream | ||||
| } | ||||
|  | ||||
| func (x *transportStreamClient) Send(m *Message) error { | ||||
| 	return x.ClientStream.SendMsg(m) | ||||
| } | ||||
|  | ||||
| func (x *transportStreamClient) Recv() (*Message, error) { | ||||
| 	m := new(Message) | ||||
| 	if err := x.ClientStream.RecvMsg(m); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return m, nil | ||||
| } | ||||
|  | ||||
| // Server API for Transport service | ||||
|  | ||||
| type TransportServer interface { | ||||
| 	Stream(Transport_StreamServer) error | ||||
| } | ||||
|  | ||||
| func RegisterTransportServer(s *grpc.Server, srv TransportServer) { | ||||
| 	s.RegisterService(&_Transport_serviceDesc, srv) | ||||
| } | ||||
|  | ||||
| func _Transport_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { | ||||
| 	return srv.(TransportServer).Stream(&transportStreamServer{stream}) | ||||
| } | ||||
|  | ||||
| type Transport_StreamServer interface { | ||||
| 	Send(*Message) error | ||||
| 	Recv() (*Message, error) | ||||
| 	grpc.ServerStream | ||||
| } | ||||
|  | ||||
| type transportStreamServer struct { | ||||
| 	grpc.ServerStream | ||||
| } | ||||
|  | ||||
| func (x *transportStreamServer) Send(m *Message) error { | ||||
| 	return x.ServerStream.SendMsg(m) | ||||
| } | ||||
|  | ||||
| func (x *transportStreamServer) Recv() (*Message, error) { | ||||
| 	m := new(Message) | ||||
| 	if err := x.ServerStream.RecvMsg(m); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return m, nil | ||||
| } | ||||
|  | ||||
| var _Transport_serviceDesc = grpc.ServiceDesc{ | ||||
| 	ServiceName: "go.micro.grpc.transport.Transport", | ||||
| 	HandlerType: (*TransportServer)(nil), | ||||
| 	Methods:     []grpc.MethodDesc{}, | ||||
| 	Streams: []grpc.StreamDesc{ | ||||
| 		{ | ||||
| 			StreamName:    "Stream", | ||||
| 			Handler:       _Transport_Stream_Handler, | ||||
| 			ServerStreams: true, | ||||
| 			ClientStreams: true, | ||||
| 		}, | ||||
| 	}, | ||||
| 	Metadata: "github.com/micro/go-plugins/transport/grpc/proto/transport.proto", | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	proto.RegisterFile("github.com/micro/go-plugins/transport/grpc/proto/transport.proto", fileDescriptor0) | ||||
| } | ||||
|  | ||||
| var fileDescriptor0 = []byte{ | ||||
| 	// 234 bytes of a gzipped FileDescriptorProto | ||||
| 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x72, 0x48, 0xcf, 0x2c, 0xc9, | ||||
| 	0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0xcf, 0xcd, 0x4c, 0x2e, 0xca, 0xd7, 0x4f, 0xcf, 0xd7, | ||||
| 	0x2d, 0xc8, 0x29, 0x4d, 0xcf, 0xcc, 0x2b, 0xd6, 0x2f, 0x29, 0x4a, 0xcc, 0x2b, 0x2e, 0xc8, 0x2f, | ||||
| 	0x2a, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x47, 0x08, 0xea, 0x81, | ||||
| 	0xf9, 0x42, 0xe2, 0xe9, 0xf9, 0x7a, 0x60, 0x9d, 0x7a, 0x20, 0x45, 0x7a, 0x70, 0x69, 0xa5, 0x79, | ||||
| 	0x8c, 0x5c, 0xec, 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x42, 0x2e, 0x5c, 0x6c, 0x19, 0xa9, | ||||
| 	0x89, 0x29, 0xa9, 0x45, 0x12, 0x8c, 0x0a, 0xcc, 0x1a, 0xdc, 0x46, 0x3a, 0x7a, 0x38, 0x74, 0xe9, | ||||
| 	0x41, 0x75, 0xe8, 0x79, 0x80, 0x95, 0xbb, 0xe6, 0x95, 0x14, 0x55, 0x06, 0x41, 0xf5, 0x0a, 0x09, | ||||
| 	0x71, 0xb1, 0x24, 0xe5, 0xa7, 0x54, 0x4a, 0x30, 0x29, 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x52, | ||||
| 	0x96, 0x5c, 0xdc, 0x48, 0x4a, 0x85, 0x04, 0xb8, 0x98, 0xb3, 0x53, 0x2b, 0x25, 0x18, 0x15, 0x18, | ||||
| 	0x35, 0x38, 0x83, 0x40, 0x4c, 0x21, 0x11, 0x2e, 0xd6, 0xb2, 0xc4, 0x9c, 0xd2, 0x54, 0xb0, 0x2e, | ||||
| 	0xce, 0x20, 0x08, 0xc7, 0x8a, 0xc9, 0x82, 0xd1, 0x28, 0x9e, 0x8b, 0x33, 0x04, 0x66, 0xaf, 0x50, | ||||
| 	0x10, 0x17, 0x5b, 0x70, 0x49, 0x51, 0x6a, 0x62, 0xae, 0x90, 0x02, 0x21, 0xb7, 0x49, 0x11, 0x54, | ||||
| 	0xa1, 0xc4, 0xa0, 0xc1, 0x68, 0xc0, 0x98, 0xc4, 0x06, 0x0e, 0x21, 0x63, 0x40, 0x00, 0x00, 0x00, | ||||
| 	0xff, 0xff, 0x4e, 0xcc, 0x76, 0x38, 0x65, 0x01, 0x00, 0x00, | ||||
| } | ||||
							
								
								
									
										12
									
								
								transport/grpc/proto/transport.proto
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								transport/grpc/proto/transport.proto
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,12 @@ | ||||
| syntax = "proto3"; | ||||
|  | ||||
| package go.micro.grpc.transport; | ||||
|  | ||||
| service Transport { | ||||
| 	rpc Stream(stream Message) returns (stream Message) {} | ||||
| } | ||||
|  | ||||
| message Message { | ||||
| 	map<string, string> header = 1; | ||||
| 	bytes body = 2; | ||||
| } | ||||
							
								
								
									
										97
									
								
								transport/grpc/socket.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										97
									
								
								transport/grpc/socket.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,97 @@ | ||||
| package grpc | ||||
|  | ||||
| import ( | ||||
| 	"github.com/micro/go-micro/transport" | ||||
| 	pb "github.com/micro/go-plugins/transport/grpc/proto" | ||||
| 	"google.golang.org/grpc" | ||||
| ) | ||||
|  | ||||
| type grpcTransportClient struct { | ||||
| 	conn   *grpc.ClientConn | ||||
| 	stream pb.Transport_StreamClient | ||||
|  | ||||
| 	local  string | ||||
| 	remote string | ||||
| } | ||||
|  | ||||
| type grpcTransportSocket struct { | ||||
| 	stream pb.Transport_StreamServer | ||||
| 	local  string | ||||
| 	remote string | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportClient) Local() string { | ||||
| 	return g.local | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportClient) Remote() string { | ||||
| 	return g.remote | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportClient) Recv(m *transport.Message) error { | ||||
| 	if m == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	msg, err := g.stream.Recv() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	m.Header = msg.Header | ||||
| 	m.Body = msg.Body | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportClient) Send(m *transport.Message) error { | ||||
| 	if m == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return g.stream.Send(&pb.Message{ | ||||
| 		Header: m.Header, | ||||
| 		Body:   m.Body, | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportClient) Close() error { | ||||
| 	return g.conn.Close() | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportSocket) Local() string { | ||||
| 	return g.local | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportSocket) Remote() string { | ||||
| 	return g.remote | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportSocket) Recv(m *transport.Message) error { | ||||
| 	if m == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	msg, err := g.stream.Recv() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	m.Header = msg.Header | ||||
| 	m.Body = msg.Body | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportSocket) Send(m *transport.Message) error { | ||||
| 	if m == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return g.stream.Send(&pb.Message{ | ||||
| 		Header: m.Header, | ||||
| 		Body:   m.Body, | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func (g *grpcTransportSocket) Close() error { | ||||
| 	return nil | ||||
| } | ||||
		Reference in New Issue
	
	Block a user