All checks were successful
		
		
	
	test / test (push) Successful in 42s
				
			## Pull Request template Please, go through these steps before clicking submit on this PR. 1. Give a descriptive title to your PR. 2. Provide a description of your changes. 3. Make sure you have some relevant tests. 4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable). **PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING** Reviewed-on: #369 Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru> Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
		
			
				
	
	
		
			373 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			373 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package broker is a tunnel broker
 | 
						|
package broker
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
 | 
						|
	"go.unistack.org/micro/v3/broker"
 | 
						|
	"go.unistack.org/micro/v3/logger"
 | 
						|
	"go.unistack.org/micro/v3/metadata"
 | 
						|
	"go.unistack.org/micro/v3/network/transport"
 | 
						|
	"go.unistack.org/micro/v3/network/tunnel"
 | 
						|
)
 | 
						|
 | 
						|
type tunBroker struct {
 | 
						|
	tunnel tunnel.Tunnel
 | 
						|
	opts   broker.Options
 | 
						|
}
 | 
						|
 | 
						|
type tunSubscriber struct {
 | 
						|
	listener tunnel.Listener
 | 
						|
	handler  broker.Handler
 | 
						|
	closed   chan bool
 | 
						|
	topic    string
 | 
						|
	opts     broker.SubscribeOptions
 | 
						|
}
 | 
						|
 | 
						|
type tunBatchSubscriber struct {
 | 
						|
	listener tunnel.Listener
 | 
						|
	handler  broker.BatchHandler
 | 
						|
	closed   chan bool
 | 
						|
	topic    string
 | 
						|
	opts     broker.SubscribeOptions
 | 
						|
}
 | 
						|
 | 
						|
type tunEvent struct {
 | 
						|
	err     error
 | 
						|
	message *broker.Message
 | 
						|
	topic   string
 | 
						|
}
 | 
						|
 | 
						|
// used to access tunnel from options context
 | 
						|
type (
 | 
						|
	tunnelKey  struct{}
 | 
						|
	tunnelAddr struct{}
 | 
						|
)
 | 
						|
 | 
						|
func (t *tunBroker) Live() bool {
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Ready() bool {
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Health() bool {
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Init(opts ...broker.Option) error {
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&t.opts)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Name() string {
 | 
						|
	return t.opts.Name
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Options() broker.Options {
 | 
						|
	return t.opts
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Address() string {
 | 
						|
	return t.tunnel.Address()
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Connect(ctx context.Context) error {
 | 
						|
	return t.tunnel.Connect(ctx)
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Disconnect(ctx context.Context) error {
 | 
						|
	return t.tunnel.Close(ctx)
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, _ ...broker.PublishOption) error {
 | 
						|
	// TODO: this is probably inefficient, we might want to just maintain an open connection
 | 
						|
	// it may be easier to add broadcast to the tunnel
 | 
						|
	topicMap := make(map[string]tunnel.Session)
 | 
						|
 | 
						|
	var err error
 | 
						|
	for _, msg := range msgs {
 | 
						|
		topic, _ := msg.Header.Get(metadata.HeaderTopic)
 | 
						|
		c, ok := topicMap[topic]
 | 
						|
		if !ok {
 | 
						|
			c, err = t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			defer c.Close()
 | 
						|
			topicMap[topic] = c
 | 
						|
		}
 | 
						|
 | 
						|
		if err = c.Send(&transport.Message{
 | 
						|
			Header: msg.Header,
 | 
						|
			Body:   msg.Body,
 | 
						|
		}); err != nil {
 | 
						|
			//	msg.SetError(err)
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, _ ...broker.PublishOption) error {
 | 
						|
	// TODO: this is probably inefficient, we might want to just maintain an open connection
 | 
						|
	// it may be easier to add broadcast to the tunnel
 | 
						|
	c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer c.Close()
 | 
						|
 | 
						|
	return c.Send(&transport.Message{
 | 
						|
		Header: m.Header,
 | 
						|
		Body:   m.Body,
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
 | 
						|
	l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	tunSub := &tunBatchSubscriber{
 | 
						|
		topic:    topic,
 | 
						|
		handler:  h,
 | 
						|
		opts:     broker.NewSubscribeOptions(opts...),
 | 
						|
		closed:   make(chan bool),
 | 
						|
		listener: l,
 | 
						|
	}
 | 
						|
 | 
						|
	// start processing
 | 
						|
	go tunSub.run()
 | 
						|
 | 
						|
	return tunSub, nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
 | 
						|
	l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	tunSub := &tunSubscriber{
 | 
						|
		topic:    topic,
 | 
						|
		handler:  h,
 | 
						|
		opts:     broker.NewSubscribeOptions(opts...),
 | 
						|
		closed:   make(chan bool),
 | 
						|
		listener: l,
 | 
						|
	}
 | 
						|
 | 
						|
	// start processing
 | 
						|
	go tunSub.run()
 | 
						|
 | 
						|
	return tunSub, nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBroker) String() string {
 | 
						|
	return "tunnel"
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBatchSubscriber) run() {
 | 
						|
	for {
 | 
						|
		// accept a new connection
 | 
						|
		c, err := t.listener.Accept()
 | 
						|
		if err != nil {
 | 
						|
			select {
 | 
						|
			case <-t.closed:
 | 
						|
				return
 | 
						|
			default:
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// receive message
 | 
						|
		m := new(transport.Message)
 | 
						|
		if err := c.Recv(m); err != nil {
 | 
						|
			if logger.DefaultLogger.V(logger.ErrorLevel) {
 | 
						|
				logger.DefaultLogger.Error(t.opts.Context, err.Error(), err)
 | 
						|
			}
 | 
						|
			if err = c.Close(); err != nil {
 | 
						|
				if logger.DefaultLogger.V(logger.ErrorLevel) {
 | 
						|
					logger.DefaultLogger.Error(t.opts.Context, err.Error(), err)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// close the connection
 | 
						|
		c.Close()
 | 
						|
 | 
						|
		evts := broker.Events{&tunEvent{
 | 
						|
			topic: t.topic,
 | 
						|
			message: &broker.Message{
 | 
						|
				Header: m.Header,
 | 
						|
				Body:   m.Body,
 | 
						|
			},
 | 
						|
		}}
 | 
						|
		// handle the message
 | 
						|
		go func() {
 | 
						|
			_ = t.handler(evts)
 | 
						|
		}()
 | 
						|
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunSubscriber) run() {
 | 
						|
	for {
 | 
						|
		// accept a new connection
 | 
						|
		c, err := t.listener.Accept()
 | 
						|
		if err != nil {
 | 
						|
			select {
 | 
						|
			case <-t.closed:
 | 
						|
				return
 | 
						|
			default:
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// receive message
 | 
						|
		m := new(transport.Message)
 | 
						|
		if err := c.Recv(m); err != nil {
 | 
						|
			if logger.DefaultLogger.V(logger.ErrorLevel) {
 | 
						|
				logger.DefaultLogger.Error(t.opts.Context, err.Error(), err)
 | 
						|
			}
 | 
						|
			if err = c.Close(); err != nil {
 | 
						|
				if logger.DefaultLogger.V(logger.ErrorLevel) {
 | 
						|
					logger.DefaultLogger.Error(t.opts.Context, err.Error(), err)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// close the connection
 | 
						|
		c.Close()
 | 
						|
 | 
						|
		// handle the message
 | 
						|
		go func() {
 | 
						|
			_ = t.handler(&tunEvent{
 | 
						|
				topic: t.topic,
 | 
						|
				message: &broker.Message{
 | 
						|
					Header: m.Header,
 | 
						|
					Body:   m.Body,
 | 
						|
				},
 | 
						|
			})
 | 
						|
		}()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBatchSubscriber) Options() broker.SubscribeOptions {
 | 
						|
	return t.opts
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBatchSubscriber) Topic() string {
 | 
						|
	return t.topic
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error {
 | 
						|
	select {
 | 
						|
	case <-t.closed:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
		close(t.closed)
 | 
						|
		return t.listener.Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunSubscriber) Options() broker.SubscribeOptions {
 | 
						|
	return t.opts
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunSubscriber) Topic() string {
 | 
						|
	return t.topic
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunSubscriber) Unsubscribe(ctx context.Context) error {
 | 
						|
	select {
 | 
						|
	case <-t.closed:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
		close(t.closed)
 | 
						|
		return t.listener.Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunEvent) Topic() string {
 | 
						|
	return t.topic
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunEvent) Message() *broker.Message {
 | 
						|
	return t.message
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunEvent) Ack() error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunEvent) Error() error {
 | 
						|
	return t.err
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunEvent) SetError(err error) {
 | 
						|
	t.err = err
 | 
						|
}
 | 
						|
 | 
						|
func (t *tunEvent) Context() context.Context {
 | 
						|
	return context.TODO()
 | 
						|
}
 | 
						|
 | 
						|
// NewBroker returns new tunnel broker
 | 
						|
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
 | 
						|
	options := broker.NewOptions(opts...)
 | 
						|
 | 
						|
	t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("tunnel not set")
 | 
						|
	}
 | 
						|
 | 
						|
	a, ok := options.Context.Value(tunnelAddr{}).(string)
 | 
						|
	if ok {
 | 
						|
		// initialise address
 | 
						|
		if err := t.Init(tunnel.Address(a)); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if len(options.Addrs) > 0 {
 | 
						|
		// initialise nodes
 | 
						|
		if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return &tunBroker{
 | 
						|
		opts:   options,
 | 
						|
		tunnel: t,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// WithAddress sets the tunnel address
 | 
						|
func WithAddress(a string) broker.Option {
 | 
						|
	return func(o *broker.Options) {
 | 
						|
		if o.Context == nil {
 | 
						|
			o.Context = context.Background()
 | 
						|
		}
 | 
						|
		o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithTunnel sets the internal tunnel
 | 
						|
func WithTunnel(t tunnel.Tunnel) broker.Option {
 | 
						|
	return func(o *broker.Options) {
 | 
						|
		if o.Context == nil {
 | 
						|
			o.Context = context.Background()
 | 
						|
		}
 | 
						|
		o.Context = context.WithValue(o.Context, tunnelKey{}, t)
 | 
						|
	}
 | 
						|
}
 |