353 lines
7.0 KiB
Go
Raw Normal View History

2019-08-24 09:46:55 +01:00
// Package broker is a tunnel broker
package broker
import (
"context"
"fmt"
2019-08-24 09:46:55 +01:00
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/network/tunnel"
2019-08-24 09:46:55 +01:00
)
type tunBroker struct {
tunnel tunnel.Tunnel
opts broker.Options
2019-08-24 09:46:55 +01:00
}
type tunSubscriber struct {
listener tunnel.Listener
handler broker.Handler
closed chan bool
topic string
opts broker.SubscribeOptions
2019-08-24 09:46:55 +01:00
}
type tunBatchSubscriber struct {
listener tunnel.Listener
handler broker.BatchHandler
closed chan bool
topic string
opts broker.SubscribeOptions
}
type tunEvent struct {
message *broker.Message
topic string
err error
}
2019-08-24 09:46:55 +01:00
// used to access tunnel from options context
type (
tunnelKey struct{}
tunnelAddr struct{}
)
2019-08-24 09:46:55 +01:00
func (t *tunBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *tunBroker) Name() string {
return t.opts.Name
}
2019-08-24 09:46:55 +01:00
func (t *tunBroker) Options() broker.Options {
return t.opts
}
func (t *tunBroker) Address() string {
return t.tunnel.Address()
}
func (t *tunBroker) Connect(ctx context.Context) error {
return t.tunnel.Connect(ctx)
2019-08-24 09:46:55 +01:00
}
func (t *tunBroker) Disconnect(ctx context.Context) error {
return t.tunnel.Close(ctx)
2019-08-24 09:46:55 +01:00
}
func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
topicMap := make(map[string]tunnel.Session)
var err error
for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
c, ok := topicMap[topic]
if !ok {
c, err = t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}
defer c.Close()
topicMap[topic] = c
}
if err = c.Send(&transport.Message{
Header: msg.Header,
Body: msg.Body,
}); err != nil {
// msg.SetError(err)
return err
}
}
return nil
}
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
2019-08-24 09:46:55 +01:00
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
2019-08-24 09:46:55 +01:00
if err != nil {
return err
}
defer c.Close()
return c.Send(&transport.Message{
Header: m.Header,
Body: m.Body,
})
}
func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
if err != nil {
return nil, err
}
tunSub := &tunBatchSubscriber{
topic: topic,
handler: h,
opts: broker.NewSubscribeOptions(opts...),
closed: make(chan bool),
listener: l,
}
// start processing
go tunSub.run()
return tunSub, nil
}
func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
2019-08-24 09:46:55 +01:00
if err != nil {
return nil, err
}
tunSub := &tunSubscriber{
topic: topic,
handler: h,
opts: broker.NewSubscribeOptions(opts...),
2019-08-24 09:46:55 +01:00
closed: make(chan bool),
listener: l,
}
// start processing
go tunSub.run()
return tunSub, nil
}
func (t *tunBroker) String() string {
return "tunnel"
}
func (t *tunBatchSubscriber) run() {
for {
// accept a new connection
c, err := t.listener.Accept()
if err != nil {
select {
case <-t.closed:
return
default:
continue
}
}
// receive message
m := new(transport.Message)
if err := c.Recv(m); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(t.opts.Context, err.Error())
}
if err = c.Close(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(t.opts.Context, err.Error())
}
}
continue
}
// close the connection
c.Close()
evts := broker.Events{&tunEvent{
topic: t.topic,
message: &broker.Message{
Header: m.Header,
Body: m.Body,
},
}}
// handle the message
go t.handler(evts)
}
}
2019-08-24 09:46:55 +01:00
func (t *tunSubscriber) run() {
for {
// accept a new connection
c, err := t.listener.Accept()
if err != nil {
select {
case <-t.closed:
return
default:
continue
}
}
// receive message
m := new(transport.Message)
if err := c.Recv(m); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(t.opts.Context, err.Error())
}
if err = c.Close(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(t.opts.Context, err.Error())
}
}
2019-08-24 09:46:55 +01:00
continue
}
// close the connection
c.Close()
// handle the message
go t.handler(&tunEvent{
topic: t.topic,
message: &broker.Message{
Header: m.Header,
Body: m.Body,
},
2019-08-24 09:46:55 +01:00
})
}
}
func (t *tunBatchSubscriber) Options() broker.SubscribeOptions {
return t.opts
}
func (t *tunBatchSubscriber) Topic() string {
return t.topic
}
func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error {
select {
case <-t.closed:
return nil
default:
close(t.closed)
return t.listener.Close()
}
}
2019-08-24 09:46:55 +01:00
func (t *tunSubscriber) Options() broker.SubscribeOptions {
return t.opts
}
func (t *tunSubscriber) Topic() string {
return t.topic
}
func (t *tunSubscriber) Unsubscribe(ctx context.Context) error {
2019-08-24 09:46:55 +01:00
select {
case <-t.closed:
return nil
default:
close(t.closed)
return t.listener.Close()
}
}
func (t *tunEvent) Topic() string {
return t.topic
}
func (t *tunEvent) Message() *broker.Message {
return t.message
}
func (t *tunEvent) Ack() error {
return nil
}
func (t *tunEvent) Error() error {
return t.err
}
func (t *tunEvent) SetError(err error) {
t.err = err
}
// NewBroker returns new tunnel broker
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
options := broker.NewOptions(opts...)
2019-08-24 09:46:55 +01:00
t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
if !ok {
return nil, fmt.Errorf("tunnel not set")
2019-08-24 09:46:55 +01:00
}
a, ok := options.Context.Value(tunnelAddr{}).(string)
if ok {
// initialise address
if err := t.Init(tunnel.Address(a)); err != nil {
return nil, err
}
2019-08-24 09:46:55 +01:00
}
if len(options.Addrs) > 0 {
// initialise nodes
if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil {
return nil, err
}
2019-08-24 09:46:55 +01:00
}
return &tunBroker{
opts: options,
tunnel: t,
}, nil
2019-08-24 09:46:55 +01:00
}
// WithAddress sets the tunnel address
func WithAddress(a string) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
}
}
// WithTunnel sets the internal tunnel
func WithTunnel(t tunnel.Tunnel) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, tunnelKey{}, t)
}
}