228 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			228 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package broker is a tunnel broker
 | |
| package broker
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 
 | |
| 	"github.com/unistack-org/micro/v3/broker"
 | |
| 	"github.com/unistack-org/micro/v3/logger"
 | |
| 	"github.com/unistack-org/micro/v3/network/transport"
 | |
| 	"github.com/unistack-org/micro/v3/network/tunnel"
 | |
| )
 | |
| 
 | |
| type tunBroker struct {
 | |
| 	tunnel tunnel.Tunnel
 | |
| 	opts   broker.Options
 | |
| }
 | |
| 
 | |
| type tunSubscriber struct {
 | |
| 	listener tunnel.Listener
 | |
| 	handler  broker.Handler
 | |
| 	closed   chan bool
 | |
| 	topic    string
 | |
| 	opts     broker.SubscribeOptions
 | |
| }
 | |
| 
 | |
| type tunEvent struct {
 | |
| 	message *broker.Message
 | |
| 	topic   string
 | |
| }
 | |
| 
 | |
| // used to access tunnel from options context
 | |
| type (
 | |
| 	tunnelKey  struct{}
 | |
| 	tunnelAddr struct{}
 | |
| )
 | |
| 
 | |
| func (t *tunBroker) Init(opts ...broker.Option) error {
 | |
| 	for _, o := range opts {
 | |
| 		o(&t.opts)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (t *tunBroker) Name() string {
 | |
| 	return t.opts.Name
 | |
| }
 | |
| 
 | |
| func (t *tunBroker) Options() broker.Options {
 | |
| 	return t.opts
 | |
| }
 | |
| 
 | |
| func (t *tunBroker) Address() string {
 | |
| 	return t.tunnel.Address()
 | |
| }
 | |
| 
 | |
| func (t *tunBroker) Connect(ctx context.Context) error {
 | |
| 	return t.tunnel.Connect(ctx)
 | |
| }
 | |
| 
 | |
| func (t *tunBroker) Disconnect(ctx context.Context) error {
 | |
| 	return t.tunnel.Close(ctx)
 | |
| }
 | |
| 
 | |
| func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
 | |
| 	// TODO: this is probably inefficient, we might want to just maintain an open connection
 | |
| 	// it may be easier to add broadcast to the tunnel
 | |
| 	c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer c.Close()
 | |
| 
 | |
| 	return c.Send(&transport.Message{
 | |
| 		Header: m.Header,
 | |
| 		Body:   m.Body,
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
 | |
| 	l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	tunSub := &tunSubscriber{
 | |
| 		topic:    topic,
 | |
| 		handler:  h,
 | |
| 		opts:     broker.NewSubscribeOptions(opts...),
 | |
| 		closed:   make(chan bool),
 | |
| 		listener: l,
 | |
| 	}
 | |
| 
 | |
| 	// start processing
 | |
| 	go tunSub.run()
 | |
| 
 | |
| 	return tunSub, nil
 | |
| }
 | |
| 
 | |
| func (t *tunBroker) String() string {
 | |
| 	return "tunnel"
 | |
| }
 | |
| 
 | |
| func (t *tunSubscriber) run() {
 | |
| 	for {
 | |
| 		// accept a new connection
 | |
| 		c, err := t.listener.Accept()
 | |
| 		if err != nil {
 | |
| 			select {
 | |
| 			case <-t.closed:
 | |
| 				return
 | |
| 			default:
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// receive message
 | |
| 		m := new(transport.Message)
 | |
| 		if err := c.Recv(m); err != nil {
 | |
| 			if logger.V(logger.ErrorLevel) {
 | |
| 				logger.Error(t.opts.Context, err.Error())
 | |
| 			}
 | |
| 			if err = c.Close(); err != nil {
 | |
| 				if logger.V(logger.ErrorLevel) {
 | |
| 					logger.Error(t.opts.Context, err.Error())
 | |
| 				}
 | |
| 			}
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// close the connection
 | |
| 		c.Close()
 | |
| 
 | |
| 		// handle the message
 | |
| 		go t.handler(&tunEvent{
 | |
| 			topic: t.topic,
 | |
| 			message: &broker.Message{
 | |
| 				Header: m.Header,
 | |
| 				Body:   m.Body,
 | |
| 			},
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (t *tunSubscriber) Options() broker.SubscribeOptions {
 | |
| 	return t.opts
 | |
| }
 | |
| 
 | |
| func (t *tunSubscriber) Topic() string {
 | |
| 	return t.topic
 | |
| }
 | |
| 
 | |
| func (t *tunSubscriber) Unsubscribe(ctx context.Context) error {
 | |
| 	select {
 | |
| 	case <-t.closed:
 | |
| 		return nil
 | |
| 	default:
 | |
| 		close(t.closed)
 | |
| 		return t.listener.Close()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (t *tunEvent) Topic() string {
 | |
| 	return t.topic
 | |
| }
 | |
| 
 | |
| func (t *tunEvent) Message() *broker.Message {
 | |
| 	return t.message
 | |
| }
 | |
| 
 | |
| func (t *tunEvent) Ack() error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (t *tunEvent) Error() error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // NewBroker returns new tunnel broker
 | |
| func NewBroker(opts ...broker.Option) (broker.Broker, error) {
 | |
| 	options := broker.NewOptions(opts...)
 | |
| 
 | |
| 	t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
 | |
| 	if !ok {
 | |
| 		return nil, fmt.Errorf("tunnel not set")
 | |
| 	}
 | |
| 
 | |
| 	a, ok := options.Context.Value(tunnelAddr{}).(string)
 | |
| 	if ok {
 | |
| 		// initialise address
 | |
| 		if err := t.Init(tunnel.Address(a)); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if len(options.Addrs) > 0 {
 | |
| 		// initialise nodes
 | |
| 		if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return &tunBroker{
 | |
| 		opts:   options,
 | |
| 		tunnel: t,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // WithAddress sets the tunnel address
 | |
| func WithAddress(a string) broker.Option {
 | |
| 	return func(o *broker.Options) {
 | |
| 		if o.Context == nil {
 | |
| 			o.Context = context.Background()
 | |
| 		}
 | |
| 		o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // WithTunnel sets the internal tunnel
 | |
| func WithTunnel(t tunnel.Tunnel) broker.Option {
 | |
| 	return func(o *broker.Options) {
 | |
| 		if o.Context == nil {
 | |
| 			o.Context = context.Background()
 | |
| 		}
 | |
| 		o.Context = context.WithValue(o.Context, tunnelKey{}, t)
 | |
| 	}
 | |
| }
 |