357 lines
7.0 KiB
Go
357 lines
7.0 KiB
Go
// Package broker is a tunnel broker
|
|
package broker // import "go.unistack.org/micro/v3/network/tunnel/broker"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"go.unistack.org/micro/v3/broker"
|
|
"go.unistack.org/micro/v3/logger"
|
|
"go.unistack.org/micro/v3/metadata"
|
|
"go.unistack.org/micro/v3/network/transport"
|
|
"go.unistack.org/micro/v3/network/tunnel"
|
|
)
|
|
|
|
type tunBroker struct {
|
|
tunnel tunnel.Tunnel
|
|
opts broker.Options
|
|
}
|
|
|
|
type tunSubscriber struct {
|
|
listener tunnel.Listener
|
|
handler broker.Handler
|
|
closed chan bool
|
|
topic string
|
|
opts broker.SubscribeOptions
|
|
}
|
|
|
|
type tunBatchSubscriber struct {
|
|
listener tunnel.Listener
|
|
handler broker.BatchHandler
|
|
closed chan bool
|
|
topic string
|
|
opts broker.SubscribeOptions
|
|
}
|
|
|
|
type tunEvent struct {
|
|
err error
|
|
message *broker.Message
|
|
topic string
|
|
}
|
|
|
|
// used to access tunnel from options context
|
|
type (
|
|
tunnelKey struct{}
|
|
tunnelAddr struct{}
|
|
)
|
|
|
|
func (t *tunBroker) Init(opts ...broker.Option) error {
|
|
for _, o := range opts {
|
|
o(&t.opts)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *tunBroker) Name() string {
|
|
return t.opts.Name
|
|
}
|
|
|
|
func (t *tunBroker) Options() broker.Options {
|
|
return t.opts
|
|
}
|
|
|
|
func (t *tunBroker) Address() string {
|
|
return t.tunnel.Address()
|
|
}
|
|
|
|
func (t *tunBroker) Connect(ctx context.Context) error {
|
|
return t.tunnel.Connect(ctx)
|
|
}
|
|
|
|
func (t *tunBroker) Disconnect(ctx context.Context) error {
|
|
return t.tunnel.Close(ctx)
|
|
}
|
|
|
|
func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
|
// TODO: this is probably inefficient, we might want to just maintain an open connection
|
|
// it may be easier to add broadcast to the tunnel
|
|
topicMap := make(map[string]tunnel.Session)
|
|
|
|
var err error
|
|
for _, msg := range msgs {
|
|
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
|
c, ok := topicMap[topic]
|
|
if !ok {
|
|
c, err = t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer c.Close()
|
|
topicMap[topic] = c
|
|
}
|
|
|
|
if err = c.Send(&transport.Message{
|
|
Header: msg.Header,
|
|
Body: msg.Body,
|
|
}); err != nil {
|
|
// msg.SetError(err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
|
|
// TODO: this is probably inefficient, we might want to just maintain an open connection
|
|
// it may be easier to add broadcast to the tunnel
|
|
c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer c.Close()
|
|
|
|
return c.Send(&transport.Message{
|
|
Header: m.Header,
|
|
Body: m.Body,
|
|
})
|
|
}
|
|
|
|
func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
|
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tunSub := &tunBatchSubscriber{
|
|
topic: topic,
|
|
handler: h,
|
|
opts: broker.NewSubscribeOptions(opts...),
|
|
closed: make(chan bool),
|
|
listener: l,
|
|
}
|
|
|
|
// start processing
|
|
go tunSub.run()
|
|
|
|
return tunSub, nil
|
|
}
|
|
|
|
func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
|
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tunSub := &tunSubscriber{
|
|
topic: topic,
|
|
handler: h,
|
|
opts: broker.NewSubscribeOptions(opts...),
|
|
closed: make(chan bool),
|
|
listener: l,
|
|
}
|
|
|
|
// start processing
|
|
go tunSub.run()
|
|
|
|
return tunSub, nil
|
|
}
|
|
|
|
func (t *tunBroker) String() string {
|
|
return "tunnel"
|
|
}
|
|
|
|
func (t *tunBatchSubscriber) run() {
|
|
for {
|
|
// accept a new connection
|
|
c, err := t.listener.Accept()
|
|
if err != nil {
|
|
select {
|
|
case <-t.closed:
|
|
return
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
|
|
// receive message
|
|
m := new(transport.Message)
|
|
if err := c.Recv(m); err != nil {
|
|
if logger.V(logger.ErrorLevel) {
|
|
logger.Error(t.opts.Context, err.Error())
|
|
}
|
|
if err = c.Close(); err != nil {
|
|
if logger.V(logger.ErrorLevel) {
|
|
logger.Error(t.opts.Context, err.Error())
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// close the connection
|
|
c.Close()
|
|
|
|
evts := broker.Events{&tunEvent{
|
|
topic: t.topic,
|
|
message: &broker.Message{
|
|
Header: m.Header,
|
|
Body: m.Body,
|
|
},
|
|
}}
|
|
// handle the message
|
|
go func() {
|
|
_ = t.handler(evts)
|
|
}()
|
|
|
|
}
|
|
}
|
|
|
|
func (t *tunSubscriber) run() {
|
|
for {
|
|
// accept a new connection
|
|
c, err := t.listener.Accept()
|
|
if err != nil {
|
|
select {
|
|
case <-t.closed:
|
|
return
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
|
|
// receive message
|
|
m := new(transport.Message)
|
|
if err := c.Recv(m); err != nil {
|
|
if logger.V(logger.ErrorLevel) {
|
|
logger.Error(t.opts.Context, err.Error())
|
|
}
|
|
if err = c.Close(); err != nil {
|
|
if logger.V(logger.ErrorLevel) {
|
|
logger.Error(t.opts.Context, err.Error())
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// close the connection
|
|
c.Close()
|
|
|
|
// handle the message
|
|
go func() {
|
|
_ = t.handler(&tunEvent{
|
|
topic: t.topic,
|
|
message: &broker.Message{
|
|
Header: m.Header,
|
|
Body: m.Body,
|
|
},
|
|
})
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (t *tunBatchSubscriber) Options() broker.SubscribeOptions {
|
|
return t.opts
|
|
}
|
|
|
|
func (t *tunBatchSubscriber) Topic() string {
|
|
return t.topic
|
|
}
|
|
|
|
func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error {
|
|
select {
|
|
case <-t.closed:
|
|
return nil
|
|
default:
|
|
close(t.closed)
|
|
return t.listener.Close()
|
|
}
|
|
}
|
|
|
|
func (t *tunSubscriber) Options() broker.SubscribeOptions {
|
|
return t.opts
|
|
}
|
|
|
|
func (t *tunSubscriber) Topic() string {
|
|
return t.topic
|
|
}
|
|
|
|
func (t *tunSubscriber) Unsubscribe(ctx context.Context) error {
|
|
select {
|
|
case <-t.closed:
|
|
return nil
|
|
default:
|
|
close(t.closed)
|
|
return t.listener.Close()
|
|
}
|
|
}
|
|
|
|
func (t *tunEvent) Topic() string {
|
|
return t.topic
|
|
}
|
|
|
|
func (t *tunEvent) Message() *broker.Message {
|
|
return t.message
|
|
}
|
|
|
|
func (t *tunEvent) Ack() error {
|
|
return nil
|
|
}
|
|
|
|
func (t *tunEvent) Error() error {
|
|
return t.err
|
|
}
|
|
|
|
func (t *tunEvent) SetError(err error) {
|
|
t.err = err
|
|
}
|
|
|
|
// NewBroker returns new tunnel broker
|
|
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
|
|
options := broker.NewOptions(opts...)
|
|
|
|
t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
|
|
if !ok {
|
|
return nil, fmt.Errorf("tunnel not set")
|
|
}
|
|
|
|
a, ok := options.Context.Value(tunnelAddr{}).(string)
|
|
if ok {
|
|
// initialise address
|
|
if err := t.Init(tunnel.Address(a)); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if len(options.Addrs) > 0 {
|
|
// initialise nodes
|
|
if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &tunBroker{
|
|
opts: options,
|
|
tunnel: t,
|
|
}, nil
|
|
}
|
|
|
|
// WithAddress sets the tunnel address
|
|
func WithAddress(a string) broker.Option {
|
|
return func(o *broker.Options) {
|
|
if o.Context == nil {
|
|
o.Context = context.Background()
|
|
}
|
|
o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
|
|
}
|
|
}
|
|
|
|
// WithTunnel sets the internal tunnel
|
|
func WithTunnel(t tunnel.Tunnel) broker.Option {
|
|
return func(o *broker.Options) {
|
|
if o.Context == nil {
|
|
o.Context = context.Background()
|
|
}
|
|
o.Context = context.WithValue(o.Context, tunnelKey{}, t)
|
|
}
|
|
}
|