231
network/tunnel/broker/broker.go
Normal file
231
network/tunnel/broker/broker.go
Normal file
@@ -0,0 +1,231 @@
|
||||
// Package broker is a tunnel broker
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/unistack-org/micro/v3/broker"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/network/transport"
|
||||
"github.com/unistack-org/micro/v3/network/tunnel"
|
||||
)
|
||||
|
||||
type tunBroker struct {
|
||||
opts broker.Options
|
||||
tunnel tunnel.Tunnel
|
||||
}
|
||||
|
||||
type tunSubscriber struct {
|
||||
topic string
|
||||
handler broker.Handler
|
||||
opts broker.SubscribeOptions
|
||||
|
||||
closed chan bool
|
||||
listener tunnel.Listener
|
||||
}
|
||||
|
||||
type tunEvent struct {
|
||||
topic string
|
||||
message *broker.Message
|
||||
}
|
||||
|
||||
// used to access tunnel from options context
|
||||
type tunnelKey struct{}
|
||||
type tunnelAddr struct{}
|
||||
|
||||
func (t *tunBroker) Init(opts ...broker.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&t.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tunBroker) Options() broker.Options {
|
||||
return t.opts
|
||||
}
|
||||
|
||||
func (t *tunBroker) Address() string {
|
||||
return t.tunnel.Address()
|
||||
}
|
||||
|
||||
func (t *tunBroker) Connect() error {
|
||||
return t.tunnel.Connect()
|
||||
}
|
||||
|
||||
func (t *tunBroker) Disconnect() error {
|
||||
return t.tunnel.Close()
|
||||
}
|
||||
|
||||
func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.PublishOption) error {
|
||||
// TODO: this is probably inefficient, we might want to just maintain an open connection
|
||||
// it may be easier to add broadcast to the tunnel
|
||||
c, err := t.tunnel.Dial(topic, tunnel.DialMode(tunnel.Multicast))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
return c.Send(&transport.Message{
|
||||
Header: m.Header,
|
||||
Body: m.Body,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
l, err := t.tunnel.Listen(topic, tunnel.ListenMode(tunnel.Multicast))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var options broker.SubscribeOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
tunSub := &tunSubscriber{
|
||||
topic: topic,
|
||||
handler: h,
|
||||
opts: options,
|
||||
closed: make(chan bool),
|
||||
listener: l,
|
||||
}
|
||||
|
||||
// start processing
|
||||
go tunSub.run()
|
||||
|
||||
return tunSub, nil
|
||||
}
|
||||
|
||||
func (t *tunBroker) String() string {
|
||||
return "tunnel"
|
||||
}
|
||||
|
||||
func (t *tunSubscriber) run() {
|
||||
for {
|
||||
// accept a new connection
|
||||
c, err := t.listener.Accept()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-t.closed:
|
||||
return
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// receive message
|
||||
m := new(transport.Message)
|
||||
if err := c.Recv(m); err != nil {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(err)
|
||||
}
|
||||
if err = c.Close(); err != nil {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(err)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// close the connection
|
||||
c.Close()
|
||||
|
||||
// handle the message
|
||||
go t.handler(&tunEvent{
|
||||
topic: t.topic,
|
||||
message: &broker.Message{
|
||||
Header: m.Header,
|
||||
Body: m.Body,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tunSubscriber) Options() broker.SubscribeOptions {
|
||||
return t.opts
|
||||
}
|
||||
|
||||
func (t *tunSubscriber) Topic() string {
|
||||
return t.topic
|
||||
}
|
||||
|
||||
func (t *tunSubscriber) Unsubscribe() error {
|
||||
select {
|
||||
case <-t.closed:
|
||||
return nil
|
||||
default:
|
||||
close(t.closed)
|
||||
return t.listener.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tunEvent) Topic() string {
|
||||
return t.topic
|
||||
}
|
||||
|
||||
func (t *tunEvent) Message() *broker.Message {
|
||||
return t.message
|
||||
}
|
||||
|
||||
func (t *tunEvent) Ack() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tunEvent) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
|
||||
options := broker.Options{
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("tunnel not set")
|
||||
}
|
||||
|
||||
a, ok := options.Context.Value(tunnelAddr{}).(string)
|
||||
if ok {
|
||||
// initialise address
|
||||
if err := t.Init(tunnel.Address(a)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if len(options.Addrs) > 0 {
|
||||
// initialise nodes
|
||||
if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &tunBroker{
|
||||
opts: options,
|
||||
tunnel: t,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// WithAddress sets the tunnel address
|
||||
func WithAddress(a string) broker.Option {
|
||||
return func(o *broker.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
|
||||
}
|
||||
}
|
||||
|
||||
// WithTunnel sets the internal tunnel
|
||||
func WithTunnel(t tunnel.Tunnel) broker.Option {
|
||||
return func(o *broker.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, tunnelKey{}, t)
|
||||
}
|
||||
}
|
153
network/tunnel/options.go
Normal file
153
network/tunnel/options.go
Normal file
@@ -0,0 +1,153 @@
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/network/transport"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultAddress is default tunnel bind address
|
||||
DefaultAddress = ":0"
|
||||
// The shared default token
|
||||
DefaultToken = "go.micro.tunnel"
|
||||
)
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
// Options provides network configuration options
|
||||
type Options struct {
|
||||
// Id is tunnel id
|
||||
Id string
|
||||
// Address is tunnel address
|
||||
Address string
|
||||
// Nodes are remote nodes
|
||||
Nodes []string
|
||||
// The shared auth token
|
||||
Token string
|
||||
// Transport listens to incoming connections
|
||||
Transport transport.Transport
|
||||
// Logger
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
type DialOption func(*DialOptions)
|
||||
|
||||
type DialOptions struct {
|
||||
// Link specifies the link to use
|
||||
Link string
|
||||
// specify mode of the session
|
||||
Mode Mode
|
||||
// Wait for connection to be accepted
|
||||
Wait bool
|
||||
// the dial timeout
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
type ListenOption func(*ListenOptions)
|
||||
|
||||
type ListenOptions struct {
|
||||
// specify mode of the session
|
||||
Mode Mode
|
||||
// The read timeout
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
// The tunnel id
|
||||
func Id(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.Id = id
|
||||
}
|
||||
}
|
||||
|
||||
// Logger sets the logger
|
||||
func Logger(l logger.Logger) Option {
|
||||
return func(o *Options) {
|
||||
o.Logger = l
|
||||
}
|
||||
}
|
||||
|
||||
// The tunnel address
|
||||
func Address(a string) Option {
|
||||
return func(o *Options) {
|
||||
o.Address = a
|
||||
}
|
||||
}
|
||||
|
||||
// Nodes specify remote network nodes
|
||||
func Nodes(n ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.Nodes = n
|
||||
}
|
||||
}
|
||||
|
||||
// Token sets the shared token for auth
|
||||
func Token(t string) Option {
|
||||
return func(o *Options) {
|
||||
o.Token = t
|
||||
}
|
||||
}
|
||||
|
||||
// Transport listens for incoming connections
|
||||
func Transport(t transport.Transport) Option {
|
||||
return func(o *Options) {
|
||||
o.Transport = t
|
||||
}
|
||||
}
|
||||
|
||||
// Listen options
|
||||
func ListenMode(m Mode) ListenOption {
|
||||
return func(o *ListenOptions) {
|
||||
o.Mode = m
|
||||
}
|
||||
}
|
||||
|
||||
// Timeout for reads and writes on the listener session
|
||||
func ListenTimeout(t time.Duration) ListenOption {
|
||||
return func(o *ListenOptions) {
|
||||
o.Timeout = t
|
||||
}
|
||||
}
|
||||
|
||||
// Dial options
|
||||
|
||||
// Dial multicast sets the multicast option to send only to those mapped
|
||||
func DialMode(m Mode) DialOption {
|
||||
return func(o *DialOptions) {
|
||||
o.Mode = m
|
||||
}
|
||||
}
|
||||
|
||||
// DialTimeout sets the dial timeout of the connection
|
||||
func DialTimeout(t time.Duration) DialOption {
|
||||
return func(o *DialOptions) {
|
||||
o.Timeout = t
|
||||
}
|
||||
}
|
||||
|
||||
// DialLink specifies the link to pin this connection to.
|
||||
// This is not applicable if the multicast option is set.
|
||||
func DialLink(id string) DialOption {
|
||||
return func(o *DialOptions) {
|
||||
o.Link = id
|
||||
}
|
||||
}
|
||||
|
||||
// DialWait specifies whether to wait for the connection
|
||||
// to be accepted before returning the session
|
||||
func DialWait(b bool) DialOption {
|
||||
return func(o *DialOptions) {
|
||||
o.Wait = b
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultOptions returns router default options
|
||||
func DefaultOptions() Options {
|
||||
return Options{
|
||||
Id: uuid.New().String(),
|
||||
Address: DefaultAddress,
|
||||
Token: DefaultToken,
|
||||
}
|
||||
}
|
30
network/tunnel/transport/listener.go
Normal file
30
network/tunnel/transport/listener.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"github.com/unistack-org/micro/v3/network/transport"
|
||||
"github.com/unistack-org/micro/v3/network/tunnel"
|
||||
)
|
||||
|
||||
type tunListener struct {
|
||||
l tunnel.Listener
|
||||
}
|
||||
|
||||
func (t *tunListener) Addr() string {
|
||||
return t.l.Channel()
|
||||
}
|
||||
|
||||
func (t *tunListener) Close() error {
|
||||
return t.l.Close()
|
||||
}
|
||||
|
||||
func (t *tunListener) Accept(fn func(socket transport.Socket)) error {
|
||||
for {
|
||||
// accept connection
|
||||
c, err := t.l.Accept()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// execute the function
|
||||
go fn(c)
|
||||
}
|
||||
}
|
114
network/tunnel/transport/transport.go
Normal file
114
network/tunnel/transport/transport.go
Normal file
@@ -0,0 +1,114 @@
|
||||
// Package transport provides a tunnel transport
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/unistack-org/micro/v3/network/transport"
|
||||
"github.com/unistack-org/micro/v3/network/tunnel"
|
||||
)
|
||||
|
||||
type tunTransport struct {
|
||||
options transport.Options
|
||||
|
||||
tunnel tunnel.Tunnel
|
||||
}
|
||||
|
||||
type tunnelKey struct{}
|
||||
|
||||
type transportKey struct{}
|
||||
|
||||
func (t *tunTransport) Init(opts ...transport.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&t.options)
|
||||
}
|
||||
|
||||
// close the existing tunnel
|
||||
if t.tunnel != nil {
|
||||
t.tunnel.Close()
|
||||
}
|
||||
|
||||
// get the tunnel
|
||||
tun, ok := t.options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
|
||||
if !ok {
|
||||
return fmt.Errorf("tunnel not set")
|
||||
}
|
||||
|
||||
// get the transport
|
||||
tr, ok := t.options.Context.Value(transportKey{}).(transport.Transport)
|
||||
if ok {
|
||||
tun.Init(tunnel.Transport(tr))
|
||||
}
|
||||
|
||||
// set the tunnel
|
||||
t.tunnel = tun
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tunTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
|
||||
if err := t.tunnel.Connect(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := t.tunnel.Dial(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (t *tunTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) {
|
||||
if err := t.tunnel.Connect(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l, err := t.tunnel.Listen(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &tunListener{l}, nil
|
||||
}
|
||||
|
||||
func (t *tunTransport) Options() transport.Options {
|
||||
return t.options
|
||||
}
|
||||
|
||||
func (t *tunTransport) String() string {
|
||||
return "tunnel"
|
||||
}
|
||||
|
||||
// NewTransport honours the initialiser used in
|
||||
func NewTransport(opts ...transport.Option) transport.Transport {
|
||||
t := &tunTransport{
|
||||
options: transport.Options{},
|
||||
}
|
||||
|
||||
// initialise
|
||||
t.Init(opts...)
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
// WithTransport sets the internal tunnel
|
||||
func WithTunnel(t tunnel.Tunnel) transport.Option {
|
||||
return func(o *transport.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, tunnelKey{}, t)
|
||||
}
|
||||
}
|
||||
|
||||
// WithTransport sets the internal transport
|
||||
func WithTransport(t transport.Transport) transport.Option {
|
||||
return func(o *transport.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, transportKey{}, t)
|
||||
}
|
||||
}
|
106
network/tunnel/tunnel.go
Normal file
106
network/tunnel/tunnel.go
Normal file
@@ -0,0 +1,106 @@
|
||||
// Package tunnel provides gre network tunnelling
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/unistack-org/micro/v3/network/transport"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultTunnel Tunnel
|
||||
)
|
||||
|
||||
const (
|
||||
// send over one link
|
||||
Unicast Mode = iota
|
||||
// send to all channel listeners
|
||||
Multicast
|
||||
// send to all links
|
||||
Broadcast
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultDialTimeout is the dial timeout if none is specified
|
||||
DefaultDialTimeout = time.Second * 5
|
||||
// ErrDialTimeout is returned by a call to Dial where the timeout occurs
|
||||
ErrDialTimeout = errors.New("dial timeout")
|
||||
// ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery
|
||||
ErrDiscoverChan = errors.New("failed to discover channel")
|
||||
// ErrLinkNotFound is returned when a link is specified at dial time and does not exist
|
||||
ErrLinkNotFound = errors.New("link not found")
|
||||
// ErrLinkDisconnected is returned when a link we attempt to send to is disconnected
|
||||
ErrLinkDisconnected = errors.New("link not connected")
|
||||
// ErrLinkLoppback is returned when attempting to send an outbound message over loopback link
|
||||
ErrLinkLoopback = errors.New("link is loopback")
|
||||
// ErrLinkRemote is returned when attempting to send a loopback message over remote link
|
||||
ErrLinkRemote = errors.New("link is remote")
|
||||
// ErrReadTimeout is a timeout on session.Recv
|
||||
ErrReadTimeout = errors.New("read timeout")
|
||||
// ErrDecryptingData is for when theres a nonce error
|
||||
ErrDecryptingData = errors.New("error decrypting data")
|
||||
)
|
||||
|
||||
// Mode of the session
|
||||
type Mode uint8
|
||||
|
||||
// Tunnel creates a gre tunnel on top of the go-micro/transport.
|
||||
// It establishes multiple streams using the Micro-Tunnel-Channel header
|
||||
// and Micro-Tunnel-Session header. The tunnel id is a hash of
|
||||
// the address being requested.
|
||||
type Tunnel interface {
|
||||
// Init initializes tunnel with options
|
||||
Init(opts ...Option) error
|
||||
// Address returns the address the tunnel is listening on
|
||||
Address() string
|
||||
// Connect connects the tunnel
|
||||
Connect() error
|
||||
// Close closes the tunnel
|
||||
Close() error
|
||||
// Links returns all the links the tunnel is connected to
|
||||
Links() []Link
|
||||
// Dial allows a client to connect to a channel
|
||||
Dial(channel string, opts ...DialOption) (Session, error)
|
||||
// Listen allows to accept connections on a channel
|
||||
Listen(channel string, opts ...ListenOption) (Listener, error)
|
||||
// String returns the name of the tunnel implementation
|
||||
String() string
|
||||
}
|
||||
|
||||
// Link represents internal links to the tunnel
|
||||
type Link interface {
|
||||
// Id returns the link unique Id
|
||||
Id() string
|
||||
// Delay is the current load on the link (lower is better)
|
||||
Delay() int64
|
||||
// Length returns the roundtrip time as nanoseconds (lower is better)
|
||||
Length() int64
|
||||
// Current transfer rate as bits per second (lower is better)
|
||||
Rate() float64
|
||||
// Is this a loopback link
|
||||
Loopback() bool
|
||||
// State of the link: connected/closed/error
|
||||
State() string
|
||||
// honours transport socket
|
||||
transport.Socket
|
||||
}
|
||||
|
||||
// The listener provides similar constructs to the transport.Listener
|
||||
type Listener interface {
|
||||
Accept() (Session, error)
|
||||
Channel() string
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Session is a unique session created when dialling or accepting connections on the tunnel
|
||||
type Session interface {
|
||||
// The unique session id
|
||||
Id() string
|
||||
// The channel name
|
||||
Channel() string
|
||||
// The link the session is on
|
||||
Link() string
|
||||
// a transport socket
|
||||
transport.Socket
|
||||
}
|
Reference in New Issue
Block a user