move transport (#1967)

This commit is contained in:
Asim Aslam
2020-08-23 18:37:22 +01:00
committed by GitHub
parent d60d85de5c
commit c62d1d5eb8
49 changed files with 56 additions and 56 deletions

View File

@@ -16,16 +16,16 @@ import (
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/network"
pb "github.com/micro/go-micro/v3/network/mucp/proto"
"github.com/micro/go-micro/v3/proxy"
"github.com/micro/go-micro/v3/registry/noop"
"github.com/micro/go-micro/v3/network/resolver/dns"
"github.com/micro/go-micro/v3/router"
"github.com/micro/go-micro/v3/server"
smucp "github.com/micro/go-micro/v3/server/mucp"
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/network/transport"
"github.com/micro/go-micro/v3/network/tunnel"
bun "github.com/micro/go-micro/v3/network/tunnel/broker"
tun "github.com/micro/go-micro/v3/network/tunnel/transport"
"github.com/micro/go-micro/v3/proxy"
"github.com/micro/go-micro/v3/registry/noop"
"github.com/micro/go-micro/v3/router"
"github.com/micro/go-micro/v3/server"
smucp "github.com/micro/go-micro/v3/server/mucp"
"github.com/micro/go-micro/v3/util/backoff"
)

View File

@@ -2,12 +2,12 @@ package network
import (
"github.com/google/uuid"
"github.com/micro/go-micro/v3/network/tunnel"
tmucp "github.com/micro/go-micro/v3/network/tunnel/mucp"
"github.com/micro/go-micro/v3/proxy"
"github.com/micro/go-micro/v3/proxy/mucp"
"github.com/micro/go-micro/v3/router"
regRouter "github.com/micro/go-micro/v3/router/registry"
"github.com/micro/go-micro/v3/network/tunnel"
tmucp "github.com/micro/go-micro/v3/network/tunnel/mucp"
)
type Option func(*Options)

View File

@@ -2,9 +2,9 @@
package registry
import (
"github.com/micro/go-micro/v3/network/resolver"
"github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/registry/mdns"
"github.com/micro/go-micro/v3/network/resolver"
)
// Resolver is a registry network resolver

View File

@@ -0,0 +1,176 @@
// Package grpc provides a grpc transport
package grpc
import (
"context"
"crypto/tls"
"net"
"github.com/micro/go-micro/v3/network/transport"
maddr "github.com/micro/go-micro/v3/util/addr"
mnet "github.com/micro/go-micro/v3/util/net"
mls "github.com/micro/go-micro/v3/util/tls"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pb "github.com/micro/go-micro/v3/network/transport/grpc/proto"
)
type grpcTransport struct {
opts transport.Options
}
type grpcTransportListener struct {
listener net.Listener
secure bool
tls *tls.Config
}
func getTLSConfig(addr string) (*tls.Config, error) {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
return &tls.Config{Certificates: []tls.Certificate{cert}}, nil
}
func (t *grpcTransportListener) Addr() string {
return t.listener.Addr().String()
}
func (t *grpcTransportListener) Close() error {
return t.listener.Close()
}
func (t *grpcTransportListener) Accept(fn func(transport.Socket)) error {
var opts []grpc.ServerOption
// setup tls if specified
if t.secure || t.tls != nil {
config := t.tls
if config == nil {
var err error
addr := t.listener.Addr().String()
config, err = getTLSConfig(addr)
if err != nil {
return err
}
}
creds := credentials.NewTLS(config)
opts = append(opts, grpc.Creds(creds))
}
// new service
srv := grpc.NewServer(opts...)
// register service
pb.RegisterTransportServer(srv, &microTransport{addr: t.listener.Addr().String(), fn: fn})
// start serving
return srv.Serve(t.listener)
}
func (t *grpcTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
dopts := transport.DialOptions{
Timeout: transport.DefaultDialTimeout,
}
for _, opt := range opts {
opt(&dopts)
}
options := []grpc.DialOption{}
if t.opts.Secure || t.opts.TLSConfig != nil {
config := t.opts.TLSConfig
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
creds := credentials.NewTLS(config)
options = append(options, grpc.WithTransportCredentials(creds))
} else {
options = append(options, grpc.WithInsecure())
}
// dial the server
ctx, cancel := context.WithTimeout(context.Background(), dopts.Timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, options...)
if err != nil {
return nil, err
}
// create stream
stream, err := pb.NewTransportClient(conn).Stream(context.Background())
if err != nil {
return nil, err
}
// return a client
return &grpcTransportClient{
conn: conn,
stream: stream,
local: "localhost",
remote: addr,
}, nil
}
func (t *grpcTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) {
var options transport.ListenOptions
for _, o := range opts {
o(&options)
}
ln, err := mnet.Listen(addr, func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
})
if err != nil {
return nil, err
}
return &grpcTransportListener{
listener: ln,
tls: t.opts.TLSConfig,
secure: t.opts.Secure,
}, nil
}
func (t *grpcTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *grpcTransport) Options() transport.Options {
return t.opts
}
func (t *grpcTransport) String() string {
return "grpc"
}
func NewTransport(opts ...transport.Option) transport.Transport {
var options transport.Options
for _, o := range opts {
o(&options)
}
return &grpcTransport{opts: options}
}

View File

@@ -0,0 +1,111 @@
package grpc
import (
"net"
"testing"
"github.com/micro/go-micro/v3/network/transport"
)
func expectedPort(t *testing.T, expected string, lsn transport.Listener) {
_, port, err := net.SplitHostPort(lsn.Addr())
if err != nil {
t.Errorf("Expected address to be `%s`, got error: %v", expected, err)
}
if port != expected {
lsn.Close()
t.Errorf("Expected address to be `%s`, got `%s`", expected, port)
}
}
func TestGRPCTransportPortRange(t *testing.T) {
tp := NewTransport()
lsn1, err := tp.Listen(":44444-44448")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
expectedPort(t, "44444", lsn1)
lsn2, err := tp.Listen(":44444-44448")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
expectedPort(t, "44445", lsn2)
lsn, err := tp.Listen(":0")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
lsn.Close()
lsn1.Close()
lsn2.Close()
}
func TestGRPCTransportCommunication(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen(":0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
defer l.Close()
fn := func(sock transport.Socket) {
defer sock.Close()
for {
var m transport.Message
if err := sock.Recv(&m); err != nil {
return
}
if err := sock.Send(&m); err != nil {
return
}
}
}
done := make(chan bool)
go func() {
if err := l.Accept(fn); err != nil {
select {
case <-done:
default:
t.Errorf("Unexpected accept err: %v", err)
}
}
}()
c, err := tr.Dial(l.Addr())
if err != nil {
t.Errorf("Unexpected dial err: %v", err)
}
defer c.Close()
m := transport.Message{
Header: map[string]string{
"X-Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
if err := c.Send(&m); err != nil {
t.Errorf("Unexpected send err: %v", err)
}
var rm transport.Message
if err := c.Recv(&rm); err != nil {
t.Errorf("Unexpected recv err: %v", err)
}
if string(rm.Body) != string(m.Body) {
t.Errorf("Expected %v, got %v", m.Body, rm.Body)
}
close(done)
}

View File

@@ -0,0 +1,43 @@
package grpc
import (
"runtime/debug"
"github.com/micro/go-micro/v3/errors"
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/network/transport"
pb "github.com/micro/go-micro/v3/network/transport/grpc/proto"
"google.golang.org/grpc/peer"
)
// microTransport satisfies the pb.TransportServer inteface
type microTransport struct {
addr string
fn func(transport.Socket)
}
func (m *microTransport) Stream(ts pb.Transport_StreamServer) (err error) {
sock := &grpcTransportSocket{
stream: ts,
local: m.addr,
}
p, ok := peer.FromContext(ts.Context())
if ok {
sock.remote = p.Addr.String()
}
defer func() {
if r := recover(); r != nil {
logger.Error(r, string(debug.Stack()))
sock.Close()
err = errors.InternalServerError("go.micro.transport", "panic recovered: %v", r)
}
}()
// execute socket func
m.fn(sock)
return err
}

View File

@@ -0,0 +1,211 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: transport/grpc/proto/transport.proto
package go_micro_transport_grpc
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Message struct {
Header map[string]string `protobuf:"bytes,1,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_651718cd7c7ae974, []int{0}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Message.Unmarshal(m, b)
}
func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Message.Marshal(b, m, deterministic)
}
func (m *Message) XXX_Merge(src proto.Message) {
xxx_messageInfo_Message.Merge(m, src)
}
func (m *Message) XXX_Size() int {
return xxx_messageInfo_Message.Size(m)
}
func (m *Message) XXX_DiscardUnknown() {
xxx_messageInfo_Message.DiscardUnknown(m)
}
var xxx_messageInfo_Message proto.InternalMessageInfo
func (m *Message) GetHeader() map[string]string {
if m != nil {
return m.Header
}
return nil
}
func (m *Message) GetBody() []byte {
if m != nil {
return m.Body
}
return nil
}
func init() {
proto.RegisterType((*Message)(nil), "go.micro.transport.grpc.Message")
proto.RegisterMapType((map[string]string)(nil), "go.micro.transport.grpc.Message.HeaderEntry")
}
func init() {
proto.RegisterFile("transport/grpc/proto/transport.proto", fileDescriptor_651718cd7c7ae974)
}
var fileDescriptor_651718cd7c7ae974 = []byte{
// 209 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x29, 0x29, 0x4a, 0xcc,
0x2b, 0x2e, 0xc8, 0x2f, 0x2a, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0x2f, 0x28, 0xca, 0x2f, 0xc9,
0xd7, 0x87, 0x0b, 0xea, 0x81, 0xf9, 0x42, 0xe2, 0xe9, 0xf9, 0x7a, 0xb9, 0x99, 0xc9, 0x45, 0xf9,
0x7a, 0x08, 0x19, 0x90, 0x72, 0xa5, 0x79, 0x8c, 0x5c, 0xec, 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9,
0xa9, 0x42, 0x2e, 0x5c, 0x6c, 0x19, 0xa9, 0x89, 0x29, 0xa9, 0x45, 0x12, 0x8c, 0x0a, 0xcc, 0x1a,
0xdc, 0x46, 0x3a, 0x7a, 0x38, 0x74, 0xe9, 0x41, 0x75, 0xe8, 0x79, 0x80, 0x95, 0xbb, 0xe6, 0x95,
0x14, 0x55, 0x06, 0x41, 0xf5, 0x0a, 0x09, 0x71, 0xb1, 0x24, 0xe5, 0xa7, 0x54, 0x4a, 0x30, 0x29,
0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x52, 0x96, 0x5c, 0xdc, 0x48, 0x4a, 0x85, 0x04, 0xb8, 0x98,
0xb3, 0x53, 0x2b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0x40, 0x4c, 0x21, 0x11, 0x2e, 0xd6,
0xb2, 0xc4, 0x9c, 0xd2, 0x54, 0xb0, 0x2e, 0xce, 0x20, 0x08, 0xc7, 0x8a, 0xc9, 0x82, 0xd1, 0x28,
0x9e, 0x8b, 0x33, 0x04, 0x66, 0xb9, 0x50, 0x10, 0x17, 0x5b, 0x70, 0x49, 0x51, 0x6a, 0x62, 0xae,
0x90, 0x02, 0x21, 0xb7, 0x49, 0x11, 0x54, 0xa1, 0xc4, 0xa0, 0xc1, 0x68, 0xc0, 0x98, 0xc4, 0x06,
0x0e, 0x21, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd4, 0xd0, 0x4b, 0x4b, 0x49, 0x01, 0x00,
0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// TransportClient is the client API for Transport service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type TransportClient interface {
Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error)
}
type transportClient struct {
cc *grpc.ClientConn
}
func NewTransportClient(cc *grpc.ClientConn) TransportClient {
return &transportClient{cc}
}
func (c *transportClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Transport_StreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Transport_serviceDesc.Streams[0], "/go.micro.transport.grpc.Transport/Stream", opts...)
if err != nil {
return nil, err
}
x := &transportStreamClient{stream}
return x, nil
}
type Transport_StreamClient interface {
Send(*Message) error
Recv() (*Message, error)
grpc.ClientStream
}
type transportStreamClient struct {
grpc.ClientStream
}
func (x *transportStreamClient) Send(m *Message) error {
return x.ClientStream.SendMsg(m)
}
func (x *transportStreamClient) Recv() (*Message, error) {
m := new(Message)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// TransportServer is the server API for Transport service.
type TransportServer interface {
Stream(Transport_StreamServer) error
}
// UnimplementedTransportServer can be embedded to have forward compatible implementations.
type UnimplementedTransportServer struct {
}
func (*UnimplementedTransportServer) Stream(srv Transport_StreamServer) error {
return status.Errorf(codes.Unimplemented, "method Stream not implemented")
}
func RegisterTransportServer(s *grpc.Server, srv TransportServer) {
s.RegisterService(&_Transport_serviceDesc, srv)
}
func _Transport_Stream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(TransportServer).Stream(&transportStreamServer{stream})
}
type Transport_StreamServer interface {
Send(*Message) error
Recv() (*Message, error)
grpc.ServerStream
}
type transportStreamServer struct {
grpc.ServerStream
}
func (x *transportStreamServer) Send(m *Message) error {
return x.ServerStream.SendMsg(m)
}
func (x *transportStreamServer) Recv() (*Message, error) {
m := new(Message)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Transport_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.transport.grpc.Transport",
HandlerType: (*TransportServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Stream",
Handler: _Transport_Stream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "transport/grpc/proto/transport.proto",
}

View File

@@ -0,0 +1,175 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: transport/grpc/proto/transport.proto
package go_micro_transport_grpc
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
import (
context "context"
api "github.com/micro/go-micro/v3/api"
client "github.com/micro/go-micro/v3/client"
server "github.com/micro/go-micro/v3/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ api.Endpoint
var _ context.Context
var _ client.Option
var _ server.Option
// Api Endpoints for Transport service
func NewTransportEndpoints() []*api.Endpoint {
return []*api.Endpoint{}
}
// Client API for Transport service
type TransportService interface {
Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error)
}
type transportService struct {
c client.Client
name string
}
func NewTransportService(name string, c client.Client) TransportService {
return &transportService{
c: c,
name: name,
}
}
func (c *transportService) Stream(ctx context.Context, opts ...client.CallOption) (Transport_StreamService, error) {
req := c.c.NewRequest(c.name, "Transport.Stream", &Message{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
return &transportServiceStream{stream}, nil
}
type Transport_StreamService interface {
Context() context.Context
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*Message) error
Recv() (*Message, error)
}
type transportServiceStream struct {
stream client.Stream
}
func (x *transportServiceStream) Close() error {
return x.stream.Close()
}
func (x *transportServiceStream) Context() context.Context {
return x.stream.Context()
}
func (x *transportServiceStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *transportServiceStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *transportServiceStream) Send(m *Message) error {
return x.stream.Send(m)
}
func (x *transportServiceStream) Recv() (*Message, error) {
m := new(Message)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
// Server API for Transport service
type TransportHandler interface {
Stream(context.Context, Transport_StreamStream) error
}
func RegisterTransportHandler(s server.Server, hdlr TransportHandler, opts ...server.HandlerOption) error {
type transport interface {
Stream(ctx context.Context, stream server.Stream) error
}
type Transport struct {
transport
}
h := &transportHandler{hdlr}
return s.Handle(s.NewHandler(&Transport{h}, opts...))
}
type transportHandler struct {
TransportHandler
}
func (h *transportHandler) Stream(ctx context.Context, stream server.Stream) error {
return h.TransportHandler.Stream(ctx, &transportStreamStream{stream})
}
type Transport_StreamStream interface {
Context() context.Context
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*Message) error
Recv() (*Message, error)
}
type transportStreamStream struct {
stream server.Stream
}
func (x *transportStreamStream) Close() error {
return x.stream.Close()
}
func (x *transportStreamStream) Context() context.Context {
return x.stream.Context()
}
func (x *transportStreamStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *transportStreamStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *transportStreamStream) Send(m *Message) error {
return x.stream.Send(m)
}
func (x *transportStreamStream) Recv() (*Message, error) {
m := new(Message)
if err := x.stream.Recv(m); err != nil {
return nil, err
}
return m, nil
}

View File

@@ -0,0 +1,12 @@
syntax = "proto3";
package go.micro.transport.grpc;
service Transport {
rpc Stream(stream Message) returns (stream Message) {}
}
message Message {
map<string, string> header = 1;
bytes body = 2;
}

View File

@@ -0,0 +1,97 @@
package grpc
import (
"github.com/micro/go-micro/v3/network/transport"
pb "github.com/micro/go-micro/v3/network/transport/grpc/proto"
"google.golang.org/grpc"
)
type grpcTransportClient struct {
conn *grpc.ClientConn
stream pb.Transport_StreamClient
local string
remote string
}
type grpcTransportSocket struct {
stream pb.Transport_StreamServer
local string
remote string
}
func (g *grpcTransportClient) Local() string {
return g.local
}
func (g *grpcTransportClient) Remote() string {
return g.remote
}
func (g *grpcTransportClient) Recv(m *transport.Message) error {
if m == nil {
return nil
}
msg, err := g.stream.Recv()
if err != nil {
return err
}
m.Header = msg.Header
m.Body = msg.Body
return nil
}
func (g *grpcTransportClient) Send(m *transport.Message) error {
if m == nil {
return nil
}
return g.stream.Send(&pb.Message{
Header: m.Header,
Body: m.Body,
})
}
func (g *grpcTransportClient) Close() error {
return g.conn.Close()
}
func (g *grpcTransportSocket) Local() string {
return g.local
}
func (g *grpcTransportSocket) Remote() string {
return g.remote
}
func (g *grpcTransportSocket) Recv(m *transport.Message) error {
if m == nil {
return nil
}
msg, err := g.stream.Recv()
if err != nil {
return err
}
m.Header = msg.Header
m.Body = msg.Body
return nil
}
func (g *grpcTransportSocket) Send(m *transport.Message) error {
if m == nil {
return nil
}
return g.stream.Send(&pb.Message{
Header: m.Header,
Body: m.Body,
})
}
func (g *grpcTransportSocket) Close() error {
return nil
}

View File

@@ -0,0 +1,602 @@
package http
import (
"bufio"
"bytes"
"crypto/tls"
"errors"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
"github.com/micro/go-micro/v3/network/transport"
maddr "github.com/micro/go-micro/v3/util/addr"
"github.com/micro/go-micro/v3/util/buf"
mnet "github.com/micro/go-micro/v3/util/net"
mls "github.com/micro/go-micro/v3/util/tls"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
type httpTransport struct {
opts transport.Options
}
type httpTransportClient struct {
ht *httpTransport
addr string
conn net.Conn
dialOpts transport.DialOptions
once sync.Once
sync.RWMutex
// request must be stored for response processing
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
// local/remote ip
local string
remote string
}
type httpTransportSocket struct {
ht *httpTransport
w http.ResponseWriter
r *http.Request
rw *bufio.ReadWriter
mtx sync.RWMutex
// the hijacked when using http 1
conn net.Conn
// for the first request
ch chan *http.Request
// h2 things
buf *bufio.Reader
// indicate if socket is closed
closed chan bool
// local/remote ip
local string
remote string
}
type httpTransportListener struct {
ht *httpTransport
listener net.Listener
}
func (h *httpTransportClient) Local() string {
return h.local
}
func (h *httpTransportClient) Remote() string {
return h.remote
}
func (h *httpTransportClient) Send(m *transport.Message) error {
header := make(http.Header)
for k, v := range m.Header {
header.Set(k, v)
}
b := buf.New(bytes.NewBuffer(m.Body))
defer b.Close()
req := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: h.addr,
},
Header: header,
Body: b,
ContentLength: int64(b.Len()),
Host: h.addr,
}
h.Lock()
h.bl = append(h.bl, req)
select {
case h.r <- h.bl[0]:
h.bl = h.bl[1:]
default:
}
h.Unlock()
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return req.Write(h.conn)
}
func (h *httpTransportClient) Recv(m *transport.Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
var r *http.Request
if !h.dialOpts.Stream {
rc, ok := <-h.r
if !ok {
return io.EOF
}
r = rc
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
rsp, err := http.ReadResponse(h.buff, r)
if err != nil {
return err
}
defer rsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
if rsp.StatusCode != 200 {
return errors.New(rsp.Status + ": " + string(b))
}
m.Body = b
if m.Header == nil {
m.Header = make(map[string]string, len(rsp.Header))
}
for k, v := range rsp.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
return nil
}
func (h *httpTransportClient) Close() error {
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.Unlock()
close(h.r)
})
return h.conn.Close()
}
func (h *httpTransportSocket) Local() string {
return h.local
}
func (h *httpTransportSocket) Remote() string {
return h.remote
}
func (h *httpTransportSocket) Recv(m *transport.Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
if m.Header == nil {
m.Header = make(map[string]string, len(h.r.Header))
}
// process http 1
if h.r.ProtoMajor == 1 {
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
var r *http.Request
select {
// get first request
case r = <-h.ch:
// read next request
default:
rr, err := http.ReadRequest(h.rw.Reader)
if err != nil {
return err
}
r = rr
}
// read body
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
// set body
r.Body.Close()
m.Body = b
// set headers
for k, v := range r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
// return early early
return nil
}
// only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// processing http2 request
// read streaming body
// set max buffer size
// TODO: adjustable buffer size
buf := make([]byte, 4*1024*1024)
// read the request body
n, err := h.buf.Read(buf)
// not an eof error
if err != nil {
return err
}
// check if we have data
if n > 0 {
m.Body = buf[:n]
}
// set headers
for k, v := range h.r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
// set path
m.Header[":path"] = h.r.URL.Path
return nil
}
func (h *httpTransportSocket) Send(m *transport.Message) error {
if h.r.ProtoMajor == 1 {
// make copy of header
hdr := make(http.Header)
for k, v := range h.r.Header {
hdr[k] = v
}
rsp := &http.Response{
Header: hdr,
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return rsp.Write(h.conn)
}
// only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// we need to lock to protect the write
h.mtx.RLock()
defer h.mtx.RUnlock()
// set headers
for k, v := range m.Header {
h.w.Header().Set(k, v)
}
// write request
_, err := h.w.Write(m.Body)
// flush the trailers
h.w.(http.Flusher).Flush()
return err
}
func (h *httpTransportSocket) error(m *transport.Message) error {
if h.r.ProtoMajor == 1 {
rsp := &http.Response{
Header: make(http.Header),
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
return rsp.Write(h.conn)
}
return nil
}
func (h *httpTransportSocket) Close() error {
h.mtx.Lock()
defer h.mtx.Unlock()
select {
case <-h.closed:
return nil
default:
// close the channel
close(h.closed)
// close the buffer
h.r.Body.Close()
// close the connection
if h.r.ProtoMajor == 1 {
return h.conn.Close()
}
}
return nil
}
func (h *httpTransportListener) Addr() string {
return h.listener.Addr().String()
}
func (h *httpTransportListener) Close() error {
return h.listener.Close()
}
func (h *httpTransportListener) Accept(fn func(transport.Socket)) error {
// create handler mux
mux := http.NewServeMux()
// register our transport handler
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf *bufio.ReadWriter
var con net.Conn
// read a regular request
if r.ProtoMajor == 1 {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
r.Body = ioutil.NopCloser(bytes.NewReader(b))
// hijack the conn
hj, ok := w.(http.Hijacker)
if !ok {
// we're screwed
http.Error(w, "cannot serve conn", http.StatusInternalServerError)
return
}
conn, bufrw, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
buf = bufrw
con = conn
}
// buffered reader
bufr := bufio.NewReader(r.Body)
// save the request
ch := make(chan *http.Request, 1)
ch <- r
// create a new transport socket
sock := &httpTransportSocket{
ht: h.ht,
w: w,
r: r,
rw: buf,
buf: bufr,
ch: ch,
conn: con,
local: h.Addr(),
remote: r.RemoteAddr,
closed: make(chan bool),
}
// execute the socket
fn(sock)
})
// get optional handlers
if h.ht.opts.Context != nil {
handlers, ok := h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
mux.Handle(pattern, handler)
}
}
}
// default http2 server
srv := &http.Server{
Handler: mux,
}
// insecure connection use h2c
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
srv.Handler = h2c.NewHandler(mux, &http2.Server{})
}
// begin serving
return srv.Serve(h.listener)
}
func (h *httpTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
dopts := transport.DialOptions{
Timeout: transport.DefaultDialTimeout,
}
for _, opt := range opts {
opt(&dopts)
}
var conn net.Conn
var err error
// TODO: support dial option here rather than using internal config
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
config.NextProtos = []string{"http/1.1"}
conn, err = newConn(func(addr string) (net.Conn, error) {
return tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
})(addr)
} else {
conn, err = newConn(func(addr string) (net.Conn, error) {
return net.DialTimeout("tcp", addr, dopts.Timeout)
})(addr)
}
if err != nil {
return nil, err
}
return &httpTransportClient{
ht: h,
addr: addr,
conn: conn,
buff: bufio.NewReader(conn),
dialOpts: dopts,
r: make(chan *http.Request, 1),
local: conn.LocalAddr().String(),
remote: conn.RemoteAddr().String(),
}, nil
}
func (h *httpTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) {
var options transport.ListenOptions
for _, o := range opts {
o(&options)
}
var l net.Listener
var err error
// TODO: support use of listen options
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(addr, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(addr, fn)
}
if err != nil {
return nil, err
}
return &httpTransportListener{
ht: h,
listener: l,
}, nil
}
func (h *httpTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&h.opts)
}
return nil
}
func (h *httpTransport) Options() transport.Options {
return h.opts
}
func (h *httpTransport) String() string {
return "http"
}
func NewTransport(opts ...transport.Option) transport.Transport {
var options transport.Options
for _, o := range opts {
o(&options)
}
return &httpTransport{opts: options}
}

View File

@@ -0,0 +1,109 @@
package http
import (
"bufio"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
)
const (
proxyAuthHeader = "Proxy-Authorization"
)
func getURL(addr string) (*url.URL, error) {
r := &http.Request{
URL: &url.URL{
Scheme: "https",
Host: addr,
},
}
return http.ProxyFromEnvironment(r)
}
type pbuffer struct {
net.Conn
r io.Reader
}
func (p *pbuffer) Read(b []byte) (int, error) {
return p.r.Read(b)
}
func proxyDial(conn net.Conn, addr string, proxyURL *url.URL) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close()
}
}()
r := &http.Request{
Method: http.MethodConnect,
URL: &url.URL{Host: addr},
Header: map[string][]string{"User-Agent": {"micro/latest"}},
}
if user := proxyURL.User; user != nil {
u := user.Username()
p, _ := user.Password()
auth := []byte(u + ":" + p)
basicAuth := base64.StdEncoding.EncodeToString(auth)
r.Header.Add(proxyAuthHeader, "Basic "+basicAuth)
}
if err := r.Write(conn); err != nil {
return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
}
br := bufio.NewReader(conn)
rsp, err := http.ReadResponse(br, r)
if err != nil {
return nil, fmt.Errorf("reading server HTTP response: %v", err)
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
dump, err := httputil.DumpResponse(rsp, true)
if err != nil {
return nil, fmt.Errorf("failed to do connect handshake, status code: %s", rsp.Status)
}
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
}
return &pbuffer{Conn: conn, r: br}, nil
}
// Creates a new connection
func newConn(dial func(string) (net.Conn, error)) func(string) (net.Conn, error) {
return func(addr string) (net.Conn, error) {
// get the proxy url
proxyURL, err := getURL(addr)
if err != nil {
return nil, err
}
// set to addr
callAddr := addr
// got proxy
if proxyURL != nil {
callAddr = proxyURL.Host
}
// dial the addr
c, err := dial(callAddr)
if err != nil {
return nil, err
}
// do proxy connect if we have proxy url
if proxyURL != nil {
c, err = proxyDial(c, addr, proxyURL)
}
return c, err
}
}

View File

@@ -0,0 +1,138 @@
package http
import (
"sync"
"testing"
"github.com/micro/go-micro/v3/network/transport"
)
func call(b *testing.B, c int) {
b.StopTimer()
tr := NewTransport()
// server listen
l, err := tr.Listen("localhost:0")
if err != nil {
b.Fatal(err)
}
defer l.Close()
// socket func
fn := func(sock transport.Socket) {
defer sock.Close()
for {
var m transport.Message
if err := sock.Recv(&m); err != nil {
return
}
if err := sock.Send(&m); err != nil {
return
}
}
}
done := make(chan bool)
// accept connections
go func() {
if err := l.Accept(fn); err != nil {
select {
case <-done:
default:
b.Fatalf("Unexpected accept err: %v", err)
}
}
}()
m := transport.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
// client connection
client, err := tr.Dial(l.Addr())
if err != nil {
b.Fatalf("Unexpected dial err: %v", err)
}
send := func(c transport.Client) {
// send message
if err := c.Send(&m); err != nil {
b.Fatalf("Unexpected send err: %v", err)
}
var rm transport.Message
// receive message
if err := c.Recv(&rm); err != nil {
b.Fatalf("Unexpected recv err: %v", err)
}
}
// warm
for i := 0; i < 10; i++ {
send(client)
}
client.Close()
ch := make(chan int, c*4)
var wg sync.WaitGroup
wg.Add(c)
for i := 0; i < c; i++ {
go func() {
cl, err := tr.Dial(l.Addr())
if err != nil {
b.Fatalf("Unexpected dial err: %v", err)
}
defer cl.Close()
for range ch {
send(cl)
}
wg.Done()
}()
}
b.StartTimer()
for i := 0; i < b.N; i++ {
ch <- i
}
b.StopTimer()
close(ch)
wg.Wait()
// finish
close(done)
}
func BenchmarkTransport1(b *testing.B) {
call(b, 1)
}
func BenchmarkTransport8(b *testing.B) {
call(b, 8)
}
func BenchmarkTransport16(b *testing.B) {
call(b, 16)
}
func BenchmarkTransport64(b *testing.B) {
call(b, 64)
}
func BenchmarkTransport128(b *testing.B) {
call(b, 128)
}

View File

@@ -0,0 +1,248 @@
package http
import (
"io"
"net"
"testing"
"time"
"github.com/micro/go-micro/v3/network/transport"
)
func expectedPort(t *testing.T, expected string, lsn transport.Listener) {
_, port, err := net.SplitHostPort(lsn.Addr())
if err != nil {
t.Errorf("Expected address to be `%s`, got error: %v", expected, err)
}
if port != expected {
lsn.Close()
t.Errorf("Expected address to be `%s`, got `%s`", expected, port)
}
}
func TestHTTPTransportPortRange(t *testing.T) {
tp := NewTransport()
lsn1, err := tp.Listen(":44444-44448")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
expectedPort(t, "44444", lsn1)
lsn2, err := tp.Listen(":44444-44448")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
expectedPort(t, "44445", lsn2)
lsn, err := tp.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
lsn.Close()
lsn1.Close()
lsn2.Close()
}
func TestHTTPTransportCommunication(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
defer l.Close()
fn := func(sock transport.Socket) {
defer sock.Close()
for {
var m transport.Message
if err := sock.Recv(&m); err != nil {
return
}
if err := sock.Send(&m); err != nil {
return
}
}
}
done := make(chan bool)
go func() {
if err := l.Accept(fn); err != nil {
select {
case <-done:
default:
t.Errorf("Unexpected accept err: %v", err)
}
}
}()
c, err := tr.Dial(l.Addr())
if err != nil {
t.Errorf("Unexpected dial err: %v", err)
}
defer c.Close()
m := transport.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
if err := c.Send(&m); err != nil {
t.Errorf("Unexpected send err: %v", err)
}
var rm transport.Message
if err := c.Recv(&rm); err != nil {
t.Errorf("Unexpected recv err: %v", err)
}
if string(rm.Body) != string(m.Body) {
t.Errorf("Expected %v, got %v", m.Body, rm.Body)
}
close(done)
}
func TestHTTPTransportError(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
defer l.Close()
fn := func(sock transport.Socket) {
defer sock.Close()
for {
var m transport.Message
if err := sock.Recv(&m); err != nil {
if err == io.EOF {
return
}
t.Fatal(err)
}
sock.(*httpTransportSocket).error(&transport.Message{
Body: []byte(`an error occurred`),
})
}
}
done := make(chan bool)
go func() {
if err := l.Accept(fn); err != nil {
select {
case <-done:
default:
t.Errorf("Unexpected accept err: %v", err)
}
}
}()
c, err := tr.Dial(l.Addr())
if err != nil {
t.Errorf("Unexpected dial err: %v", err)
}
defer c.Close()
m := transport.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
if err := c.Send(&m); err != nil {
t.Errorf("Unexpected send err: %v", err)
}
var rm transport.Message
err = c.Recv(&rm)
if err == nil {
t.Fatal("Expected error but got nil")
}
if err.Error() != "500 Internal Server Error: an error occurred" {
t.Fatalf("Did not receive expected error, got: %v", err)
}
close(done)
}
func TestHTTPTransportTimeout(t *testing.T) {
tr := NewTransport(transport.Timeout(time.Millisecond * 100))
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
defer l.Close()
done := make(chan bool)
fn := func(sock transport.Socket) {
defer func() {
sock.Close()
close(done)
}()
go func() {
select {
case <-done:
return
case <-time.After(time.Second):
t.Fatal("deadline not executed")
}
}()
for {
var m transport.Message
if err := sock.Recv(&m); err != nil {
return
}
}
}
go func() {
if err := l.Accept(fn); err != nil {
select {
case <-done:
default:
t.Errorf("Unexpected accept err: %v", err)
}
}
}()
c, err := tr.Dial(l.Addr())
if err != nil {
t.Errorf("Unexpected dial err: %v", err)
}
defer c.Close()
m := transport.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
if err := c.Send(&m); err != nil {
t.Errorf("Unexpected send err: %v", err)
}
<-done
}

View File

@@ -0,0 +1,23 @@
package http
import (
"context"
"net/http"
"github.com/micro/go-micro/v3/network/transport"
)
// Handle registers the handler for the given pattern.
func Handle(pattern string, handler http.Handler) transport.Option {
return func(o *transport.Options) {
if o.Context == nil {
o.Context = context.Background()
}
handlers, ok := o.Context.Value("http_handlers").(map[string]http.Handler)
if !ok {
handlers = make(map[string]http.Handler)
}
handlers[pattern] = handler
o.Context = context.WithValue(o.Context, "http_handlers", handlers)
}
}

View File

@@ -0,0 +1,275 @@
// Package memory is an in-memory transport
package memory
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/micro/go-micro/v3/network/transport"
maddr "github.com/micro/go-micro/v3/util/addr"
mnet "github.com/micro/go-micro/v3/util/net"
)
type memorySocket struct {
recv chan *transport.Message
send chan *transport.Message
// sock exit
exit chan bool
// listener exit
lexit chan bool
local string
remote string
// for send/recv transport.Timeout
timeout time.Duration
ctx context.Context
sync.RWMutex
}
type memoryClient struct {
*memorySocket
opts transport.DialOptions
}
type memoryListener struct {
addr string
exit chan bool
conn chan *memorySocket
lopts transport.ListenOptions
topts transport.Options
sync.RWMutex
ctx context.Context
}
type memoryTransport struct {
opts transport.Options
sync.RWMutex
listeners map[string]*memoryListener
}
func (ms *memorySocket) Recv(m *transport.Message) error {
ms.RLock()
defer ms.RUnlock()
ctx := ms.ctx
if ms.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ms.ctx, ms.timeout)
defer cancel()
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ms.exit:
return errors.New("connection closed")
case <-ms.lexit:
return errors.New("server connection closed")
case cm := <-ms.recv:
*m = *cm
}
return nil
}
func (ms *memorySocket) Local() string {
return ms.local
}
func (ms *memorySocket) Remote() string {
return ms.remote
}
func (ms *memorySocket) Send(m *transport.Message) error {
ms.RLock()
defer ms.RUnlock()
ctx := ms.ctx
if ms.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ms.ctx, ms.timeout)
defer cancel()
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ms.exit:
return errors.New("connection closed")
case <-ms.lexit:
return errors.New("server connection closed")
case ms.send <- m:
}
return nil
}
func (ms *memorySocket) Close() error {
ms.Lock()
defer ms.Unlock()
select {
case <-ms.exit:
return nil
default:
close(ms.exit)
}
return nil
}
func (m *memoryListener) Addr() string {
return m.addr
}
func (m *memoryListener) Close() error {
m.Lock()
defer m.Unlock()
select {
case <-m.exit:
return nil
default:
close(m.exit)
}
return nil
}
func (m *memoryListener) Accept(fn func(transport.Socket)) error {
for {
select {
case <-m.exit:
return nil
case c := <-m.conn:
go fn(&memorySocket{
lexit: c.lexit,
exit: c.exit,
send: c.recv,
recv: c.send,
local: c.Remote(),
remote: c.Local(),
timeout: m.topts.Timeout,
ctx: m.topts.Context,
})
}
}
}
func (m *memoryTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
m.RLock()
defer m.RUnlock()
listener, ok := m.listeners[addr]
if !ok {
return nil, errors.New("could not dial " + addr)
}
var options transport.DialOptions
for _, o := range opts {
o(&options)
}
client := &memoryClient{
&memorySocket{
send: make(chan *transport.Message),
recv: make(chan *transport.Message),
exit: make(chan bool),
lexit: listener.exit,
local: addr,
remote: addr,
timeout: m.opts.Timeout,
ctx: m.opts.Context,
},
options,
}
// pseudo connect
select {
case <-listener.exit:
return nil, errors.New("connection error")
case listener.conn <- client.memorySocket:
}
return client, nil
}
func (m *memoryTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) {
m.Lock()
defer m.Unlock()
var options transport.ListenOptions
for _, o := range opts {
o(&options)
}
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
addr, err = maddr.Extract(host)
if err != nil {
return nil, err
}
// if zero port then randomly assign one
if len(port) > 0 && port == "0" {
i := rand.Intn(20000)
port = fmt.Sprintf("%d", 10000+i)
}
// set addr with port
addr = mnet.HostPort(addr, port)
if _, ok := m.listeners[addr]; ok {
return nil, errors.New("already listening on " + addr)
}
listener := &memoryListener{
lopts: options,
topts: m.opts,
addr: addr,
conn: make(chan *memorySocket),
exit: make(chan bool),
ctx: m.opts.Context,
}
m.listeners[addr] = listener
return listener, nil
}
func (m *memoryTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *memoryTransport) Options() transport.Options {
return m.opts
}
func (m *memoryTransport) String() string {
return "memory"
}
func NewTransport(opts ...transport.Option) transport.Transport {
var options transport.Options
rand.Seed(time.Now().UnixNano())
for _, o := range opts {
o(&options)
}
if options.Context == nil {
options.Context = context.Background()
}
return &memoryTransport{
opts: options,
listeners: make(map[string]*memoryListener),
}
}

View File

@@ -0,0 +1,94 @@
package memory
import (
"os"
"testing"
"github.com/micro/go-micro/v3/network/transport"
)
func TestMemoryTransport(t *testing.T) {
tr := NewTransport()
// bind / listen
l, err := tr.Listen("127.0.0.1:8080")
if err != nil {
t.Fatalf("Unexpected error listening %v", err)
}
defer l.Close()
// accept
go func() {
if err := l.Accept(func(sock transport.Socket) {
for {
var m transport.Message
if err := sock.Recv(&m); err != nil {
return
}
if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
t.Logf("Server Received %s", string(m.Body))
}
if err := sock.Send(&transport.Message{
Body: []byte(`pong`),
}); err != nil {
return
}
}
}); err != nil {
t.Fatalf("Unexpected error accepting %v", err)
}
}()
// dial
c, err := tr.Dial("127.0.0.1:8080")
if err != nil {
t.Fatalf("Unexpected error dialing %v", err)
}
defer c.Close()
// send <=> receive
for i := 0; i < 3; i++ {
if err := c.Send(&transport.Message{
Body: []byte(`ping`),
}); err != nil {
return
}
var m transport.Message
if err := c.Recv(&m); err != nil {
return
}
if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
t.Logf("Client Received %s", string(m.Body))
}
}
}
func TestListener(t *testing.T) {
tr := NewTransport()
// bind / listen on random port
l, err := tr.Listen(":0")
if err != nil {
t.Fatalf("Unexpected error listening %v", err)
}
defer l.Close()
// try again
l2, err := tr.Listen(":0")
if err != nil {
t.Fatalf("Unexpected error listening %v", err)
}
defer l2.Close()
// now make sure it still fails
l3, err := tr.Listen(":8080")
if err != nil {
t.Fatalf("Unexpected error listening %v", err)
}
defer l3.Close()
if _, err := tr.Listen(":8080"); err == nil {
t.Fatal("Expected error binding to :8080 got nil")
}
}

View File

@@ -0,0 +1,104 @@
package transport
import (
"context"
"crypto/tls"
"time"
"github.com/micro/go-micro/v3/codec"
)
type Options struct {
// Addrs is the list of intermediary addresses to connect to
Addrs []string
// Codec is the codec interface to use where headers are not supported
// by the transport and the entire payload must be encoded
Codec codec.Marshaler
// Secure tells the transport to secure the connection.
// In the case TLSConfig is not specified best effort self-signed
// certs should be used
Secure bool
// TLSConfig to secure the connection. The assumption is that this
// is mTLS keypair
TLSConfig *tls.Config
// Timeout sets the timeout for Send/Recv
Timeout time.Duration
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
type DialOptions struct {
// Tells the transport this is a streaming connection with
// multiple calls to send/recv and that send may not even be called
Stream bool
// Timeout for dialing
Timeout time.Duration
// TODO: add tls options when dialling
// Currently set in global options
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
type ListenOptions struct {
// TODO: add tls options when listening
// Currently set in global options
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
// Addrs to use for transport
func Addrs(addrs ...string) Option {
return func(o *Options) {
o.Addrs = addrs
}
}
// Codec sets the codec used for encoding where the transport
// does not support message headers
func Codec(c codec.Marshaler) Option {
return func(o *Options) {
o.Codec = c
}
}
// Timeout sets the timeout for Send/Recv execution
func Timeout(t time.Duration) Option {
return func(o *Options) {
o.Timeout = t
}
}
// Use secure communication. If TLSConfig is not specified we
// use InsecureSkipVerify and generate a self signed cert
func Secure(b bool) Option {
return func(o *Options) {
o.Secure = b
}
}
// TLSConfig to be used for the transport.
func TLSConfig(t *tls.Config) Option {
return func(o *Options) {
o.TLSConfig = t
}
}
// Indicates whether this is a streaming connection
func WithStream() DialOption {
return func(o *DialOptions) {
o.Stream = true
}
}
// Timeout used when dialling the remote side
func WithTimeout(d time.Duration) DialOption {
return func(o *DialOptions) {
o.Timeout = d
}
}

View File

@@ -0,0 +1,50 @@
// Package transport is an interface for synchronous connection based communication
package transport
import (
"time"
)
// Transport is an interface which is used for communication between
// services. It uses connection based socket send/recv semantics and
// has various implementations; http, grpc, quic.
type Transport interface {
Init(...Option) error
Options() Options
Dial(addr string, opts ...DialOption) (Client, error)
Listen(addr string, opts ...ListenOption) (Listener, error)
String() string
}
type Message struct {
Header map[string]string
Body []byte
}
type Socket interface {
Recv(*Message) error
Send(*Message) error
Close() error
Local() string
Remote() string
}
type Client interface {
Socket
}
type Listener interface {
Addr() string
Close() error
Accept(func(Socket)) error
}
type Option func(*Options)
type DialOption func(*DialOptions)
type ListenOption func(*ListenOptions)
var (
DefaultDialTimeout = time.Second * 5
)

View File

@@ -5,7 +5,7 @@ import (
"context"
"github.com/micro/go-micro/v3/broker"
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/network/transport"
"github.com/micro/go-micro/v3/network/tunnel"
"github.com/micro/go-micro/v3/network/tunnel/mucp"
)

View File

@@ -9,7 +9,7 @@ import (
"github.com/google/uuid"
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/network/transport"
)
type link struct {

View File

@@ -9,7 +9,7 @@ import (
"github.com/google/uuid"
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/network/transport"
"github.com/micro/go-micro/v3/network/tunnel"
)

View File

@@ -6,7 +6,7 @@ import (
"testing"
"time"
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/network/transport"
"github.com/micro/go-micro/v3/network/tunnel"
)

View File

@@ -8,7 +8,7 @@ import (
"time"
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/network/transport"
"github.com/micro/go-micro/v3/network/tunnel"
)

View File

@@ -4,8 +4,8 @@ import (
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/transport/grpc"
"github.com/micro/go-micro/v3/network/transport"
"github.com/micro/go-micro/v3/network/transport/grpc"
)
var (

View File

@@ -1,7 +1,7 @@
package transport
import (
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/network/transport"
"github.com/micro/go-micro/v3/network/tunnel"
)

View File

@@ -4,7 +4,7 @@ package transport
import (
"context"
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/network/transport"
"github.com/micro/go-micro/v3/network/tunnel"
"github.com/micro/go-micro/v3/network/tunnel/mucp"
)

View File

@@ -5,7 +5,7 @@ import (
"errors"
"time"
"github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/network/transport"
)
const (