// Package broker is a tunnel broker package broker import ( "context" "fmt" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/network/transport" "github.com/unistack-org/micro/v3/network/tunnel" ) type tunBroker struct { opts broker.Options tunnel tunnel.Tunnel } type tunSubscriber struct { topic string handler broker.Handler opts broker.SubscribeOptions closed chan bool listener tunnel.Listener } type tunEvent struct { topic string message *broker.Message } // used to access tunnel from options context type tunnelKey struct{} type tunnelAddr struct{} func (t *tunBroker) Init(opts ...broker.Option) error { for _, o := range opts { o(&t.opts) } return nil } func (t *tunBroker) Options() broker.Options { return t.opts } func (t *tunBroker) Address() string { return t.tunnel.Address() } func (t *tunBroker) Connect(ctx context.Context) error { return t.tunnel.Connect(ctx) } func (t *tunBroker) Disconnect(ctx context.Context) error { return t.tunnel.Close(ctx) } func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error { // TODO: this is probably inefficient, we might want to just maintain an open connection // it may be easier to add broadcast to the tunnel c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast)) if err != nil { return err } defer c.Close() return c.Send(&transport.Message{ Header: m.Header, Body: m.Body, }) } func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast)) if err != nil { return nil, err } tunSub := &tunSubscriber{ topic: topic, handler: h, opts: broker.NewSubscribeOptions(opts...), closed: make(chan bool), listener: l, } // start processing go tunSub.run() return tunSub, nil } func (t *tunBroker) String() string { return "tunnel" } func (t *tunSubscriber) run() { for { // accept a new connection c, err := t.listener.Accept() if err != nil { select { case <-t.closed: return default: continue } } // receive message m := new(transport.Message) if err := c.Recv(m); err != nil { if logger.V(logger.ErrorLevel) { logger.Error(err) } if err = c.Close(); err != nil { if logger.V(logger.ErrorLevel) { logger.Error(err) } } continue } // close the connection c.Close() // handle the message go t.handler(&tunEvent{ topic: t.topic, message: &broker.Message{ Header: m.Header, Body: m.Body, }, }) } } func (t *tunSubscriber) Options() broker.SubscribeOptions { return t.opts } func (t *tunSubscriber) Topic() string { return t.topic } func (t *tunSubscriber) Unsubscribe(ctx context.Context) error { select { case <-t.closed: return nil default: close(t.closed) return t.listener.Close() } } func (t *tunEvent) Topic() string { return t.topic } func (t *tunEvent) Message() *broker.Message { return t.message } func (t *tunEvent) Ack() error { return nil } func (t *tunEvent) Error() error { return nil } func NewBroker(opts ...broker.Option) (broker.Broker, error) { options := broker.NewOptions(opts...) t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel) if !ok { return nil, fmt.Errorf("tunnel not set") } a, ok := options.Context.Value(tunnelAddr{}).(string) if ok { // initialise address if err := t.Init(tunnel.Address(a)); err != nil { return nil, err } } if len(options.Addrs) > 0 { // initialise nodes if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil { return nil, err } } return &tunBroker{ opts: options, tunnel: t, }, nil } // WithAddress sets the tunnel address func WithAddress(a string) broker.Option { return func(o *broker.Options) { if o.Context == nil { o.Context = context.Background() } o.Context = context.WithValue(o.Context, tunnelAddr{}, a) } } // WithTunnel sets the internal tunnel func WithTunnel(t tunnel.Tunnel) broker.Option { return func(o *broker.Options) { if o.Context == nil { o.Context = context.Background() } o.Context = context.WithValue(o.Context, tunnelKey{}, t) } }