214 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			214 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package broker is a tunnel broker
 | 
						|
package broker
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
 | 
						|
	"github.com/micro/go-micro/broker"
 | 
						|
	"github.com/micro/go-micro/transport"
 | 
						|
	"github.com/micro/go-micro/tunnel"
 | 
						|
)
 | 
						|
 | 
						|
type tunBroker struct {
 | 
						|
	opts   broker.Options
 | 
						|
	tunnel tunnel.Tunnel
 | 
						|
}
 | 
						|
 | 
						|
type tunSubscriber struct {
 | 
						|
	topic   string
 | 
						|
	handler broker.Handler
 | 
						|
	opts    broker.SubscribeOptions
 | 
						|
 | 
						|
	closed   chan bool
 | 
						|
	listener tunnel.Listener
 | 
						|
}
 | 
						|
 | 
						|
type tunEvent struct {
 | 
						|
	topic   string
 | 
						|
	message *broker.Message
 | 
						|
}
 | 
						|
 | 
						|
// used to access tunnel from options context
 | 
						|
type tunnelKey struct{}
 | 
						|
type tunnelAddr struct{}
 | 
						|
 | 
						|
func (t *tunBroker) Init(opts ...broker.Option) error {
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&t.opts)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Options() broker.Options {
 | 
						|
	return t.opts
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Address() string {
 | 
						|
	return t.tunnel.Address()
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Connect() error {
 | 
						|
	return t.tunnel.Connect()
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Disconnect() error {
 | 
						|
	return t.tunnel.Close()
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.PublishOption) error {
 | 
						|
	// TODO: this is probably inefficient, we might want to just maintain an open connection
 | 
						|
	// it may be easier to add broadcast to the tunnel
 | 
						|
	c, err := t.tunnel.Dial(topic, tunnel.DialMode(tunnel.Multicast))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer c.Close()
 | 
						|
 | 
						|
	return c.Send(&transport.Message{
 | 
						|
		Header: m.Header,
 | 
						|
		Body:   m.Body,
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
 | 
						|
	l, err := t.tunnel.Listen(topic, tunnel.ListenMode(tunnel.Multicast))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	var options broker.SubscribeOptions
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	tunSub := &tunSubscriber{
 | 
						|
		topic:    topic,
 | 
						|
		handler:  h,
 | 
						|
		opts:     options,
 | 
						|
		closed:   make(chan bool),
 | 
						|
		listener: l,
 | 
						|
	}
 | 
						|
 | 
						|
	// start processing
 | 
						|
	go tunSub.run()
 | 
						|
 | 
						|
	return tunSub, nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) String() string {
 | 
						|
	return "tunnel"
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunSubscriber) run() {
 | 
						|
	for {
 | 
						|
		// accept a new connection
 | 
						|
		c, err := t.listener.Accept()
 | 
						|
		if err != nil {
 | 
						|
			select {
 | 
						|
			case <-t.closed:
 | 
						|
				return
 | 
						|
			default:
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// receive message
 | 
						|
		m := new(transport.Message)
 | 
						|
		if err := c.Recv(m); err != nil {
 | 
						|
			c.Close()
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// close the connection
 | 
						|
		c.Close()
 | 
						|
 | 
						|
		// handle the message
 | 
						|
		go t.handler(&tunEvent{
 | 
						|
			topic: t.topic,
 | 
						|
			message: &broker.Message{
 | 
						|
				Header: m.Header,
 | 
						|
				Body:   m.Body,
 | 
						|
			},
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunSubscriber) Options() broker.SubscribeOptions {
 | 
						|
	return t.opts
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunSubscriber) Topic() string {
 | 
						|
	return t.topic
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunSubscriber) Unsubscribe() error {
 | 
						|
	select {
 | 
						|
	case <-t.closed:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
		close(t.closed)
 | 
						|
		return t.listener.Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunEvent) Topic() string {
 | 
						|
	return t.topic
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunEvent) Message() *broker.Message {
 | 
						|
	return t.message
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunEvent) Ack() error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func NewBroker(opts ...broker.Option) broker.Broker {
 | 
						|
	options := broker.Options{
 | 
						|
		Context: context.Background(),
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
	t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
 | 
						|
	if !ok {
 | 
						|
		t = tunnel.NewTunnel()
 | 
						|
	}
 | 
						|
 | 
						|
	a, ok := options.Context.Value(tunnelAddr{}).(string)
 | 
						|
	if ok {
 | 
						|
		// initialise address
 | 
						|
		t.Init(tunnel.Address(a))
 | 
						|
	}
 | 
						|
 | 
						|
	if len(options.Addrs) > 0 {
 | 
						|
		// initialise nodes
 | 
						|
		t.Init(tunnel.Nodes(options.Addrs...))
 | 
						|
	}
 | 
						|
 | 
						|
	return &tunBroker{
 | 
						|
		opts:   options,
 | 
						|
		tunnel: t,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithAddress sets the tunnel address
 | 
						|
func WithAddress(a string) broker.Option {
 | 
						|
	return func(o *broker.Options) {
 | 
						|
		if o.Context == nil {
 | 
						|
			o.Context = context.Background()
 | 
						|
		}
 | 
						|
		o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithTunnel sets the internal tunnel
 | 
						|
func WithTunnel(t tunnel.Tunnel) broker.Option {
 | 
						|
	return func(o *broker.Options) {
 | 
						|
		if o.Context == nil {
 | 
						|
			o.Context = context.Background()
 | 
						|
		}
 | 
						|
		o.Context = context.WithValue(o.Context, tunnelKey{}, t)
 | 
						|
	}
 | 
						|
}
 |