micro/network/tunnel/broker/broker.go
Vasiliy Tolstov d18bf3dab9 many fixes for lint and context.Context usage
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2020-11-03 01:57:11 +03:00

222 lines
4.3 KiB
Go

// Package broker is a tunnel broker
package broker
import (
"context"
"fmt"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/network/tunnel"
)
type tunBroker struct {
opts broker.Options
tunnel tunnel.Tunnel
}
type tunSubscriber struct {
topic string
handler broker.Handler
opts broker.SubscribeOptions
closed chan bool
listener tunnel.Listener
}
type tunEvent struct {
topic string
message *broker.Message
}
// used to access tunnel from options context
type tunnelKey struct{}
type tunnelAddr struct{}
func (t *tunBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *tunBroker) Options() broker.Options {
return t.opts
}
func (t *tunBroker) Address() string {
return t.tunnel.Address()
}
func (t *tunBroker) Connect(ctx context.Context) error {
return t.tunnel.Connect(ctx)
}
func (t *tunBroker) Disconnect(ctx context.Context) error {
return t.tunnel.Close(ctx)
}
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}
defer c.Close()
return c.Send(&transport.Message{
Header: m.Header,
Body: m.Body,
})
}
func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
if err != nil {
return nil, err
}
tunSub := &tunSubscriber{
topic: topic,
handler: h,
opts: broker.NewSubscribeOptions(opts...),
closed: make(chan bool),
listener: l,
}
// start processing
go tunSub.run()
return tunSub, nil
}
func (t *tunBroker) String() string {
return "tunnel"
}
func (t *tunSubscriber) run() {
for {
// accept a new connection
c, err := t.listener.Accept()
if err != nil {
select {
case <-t.closed:
return
default:
continue
}
}
// receive message
m := new(transport.Message)
if err := c.Recv(m); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err)
}
if err = c.Close(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(err)
}
}
continue
}
// close the connection
c.Close()
// handle the message
go t.handler(&tunEvent{
topic: t.topic,
message: &broker.Message{
Header: m.Header,
Body: m.Body,
},
})
}
}
func (t *tunSubscriber) Options() broker.SubscribeOptions {
return t.opts
}
func (t *tunSubscriber) Topic() string {
return t.topic
}
func (t *tunSubscriber) Unsubscribe(ctx context.Context) error {
select {
case <-t.closed:
return nil
default:
close(t.closed)
return t.listener.Close()
}
}
func (t *tunEvent) Topic() string {
return t.topic
}
func (t *tunEvent) Message() *broker.Message {
return t.message
}
func (t *tunEvent) Ack() error {
return nil
}
func (t *tunEvent) Error() error {
return nil
}
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
options := broker.NewOptions(opts...)
t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
if !ok {
return nil, fmt.Errorf("tunnel not set")
}
a, ok := options.Context.Value(tunnelAddr{}).(string)
if ok {
// initialise address
if err := t.Init(tunnel.Address(a)); err != nil {
return nil, err
}
}
if len(options.Addrs) > 0 {
// initialise nodes
if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil {
return nil, err
}
}
return &tunBroker{
opts: options,
tunnel: t,
}, nil
}
// WithAddress sets the tunnel address
func WithAddress(a string) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
}
}
// WithTunnel sets the internal tunnel
func WithTunnel(t tunnel.Tunnel) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, tunnelKey{}, t)
}
}