Vasiliy Tolstov
14c97d59c1
* add many options helpers * fix noop client to allow publish messages to topic in broker * fix noop server to allow registering in registry * fix noop server to allow subscribe to topic in broker * fix new service initialization Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
222 lines
4.3 KiB
Go
222 lines
4.3 KiB
Go
// Package broker is a tunnel broker
|
|
package broker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/unistack-org/micro/v3/broker"
|
|
"github.com/unistack-org/micro/v3/logger"
|
|
"github.com/unistack-org/micro/v3/network/transport"
|
|
"github.com/unistack-org/micro/v3/network/tunnel"
|
|
)
|
|
|
|
type tunBroker struct {
|
|
opts broker.Options
|
|
tunnel tunnel.Tunnel
|
|
}
|
|
|
|
type tunSubscriber struct {
|
|
topic string
|
|
handler broker.Handler
|
|
opts broker.SubscribeOptions
|
|
|
|
closed chan bool
|
|
listener tunnel.Listener
|
|
}
|
|
|
|
type tunEvent struct {
|
|
topic string
|
|
message *broker.Message
|
|
}
|
|
|
|
// used to access tunnel from options context
|
|
type tunnelKey struct{}
|
|
type tunnelAddr struct{}
|
|
|
|
func (t *tunBroker) Init(opts ...broker.Option) error {
|
|
for _, o := range opts {
|
|
o(&t.opts)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *tunBroker) Options() broker.Options {
|
|
return t.opts
|
|
}
|
|
|
|
func (t *tunBroker) Address() string {
|
|
return t.tunnel.Address()
|
|
}
|
|
|
|
func (t *tunBroker) Connect(ctx context.Context) error {
|
|
return t.tunnel.Connect()
|
|
}
|
|
|
|
func (t *tunBroker) Disconnect(ctx context.Context) error {
|
|
return t.tunnel.Close()
|
|
}
|
|
|
|
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
|
|
// TODO: this is probably inefficient, we might want to just maintain an open connection
|
|
// it may be easier to add broadcast to the tunnel
|
|
c, err := t.tunnel.Dial(topic, tunnel.DialMode(tunnel.Multicast))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer c.Close()
|
|
|
|
return c.Send(&transport.Message{
|
|
Header: m.Header,
|
|
Body: m.Body,
|
|
})
|
|
}
|
|
|
|
func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
|
l, err := t.tunnel.Listen(topic, tunnel.ListenMode(tunnel.Multicast))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tunSub := &tunSubscriber{
|
|
topic: topic,
|
|
handler: h,
|
|
opts: broker.NewSubscribeOptions(opts...),
|
|
closed: make(chan bool),
|
|
listener: l,
|
|
}
|
|
|
|
// start processing
|
|
go tunSub.run()
|
|
|
|
return tunSub, nil
|
|
}
|
|
|
|
func (t *tunBroker) String() string {
|
|
return "tunnel"
|
|
}
|
|
|
|
func (t *tunSubscriber) run() {
|
|
for {
|
|
// accept a new connection
|
|
c, err := t.listener.Accept()
|
|
if err != nil {
|
|
select {
|
|
case <-t.closed:
|
|
return
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
|
|
// receive message
|
|
m := new(transport.Message)
|
|
if err := c.Recv(m); err != nil {
|
|
if logger.V(logger.ErrorLevel) {
|
|
logger.Error(err)
|
|
}
|
|
if err = c.Close(); err != nil {
|
|
if logger.V(logger.ErrorLevel) {
|
|
logger.Error(err)
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// close the connection
|
|
c.Close()
|
|
|
|
// handle the message
|
|
go t.handler(&tunEvent{
|
|
topic: t.topic,
|
|
message: &broker.Message{
|
|
Header: m.Header,
|
|
Body: m.Body,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
func (t *tunSubscriber) Options() broker.SubscribeOptions {
|
|
return t.opts
|
|
}
|
|
|
|
func (t *tunSubscriber) Topic() string {
|
|
return t.topic
|
|
}
|
|
|
|
func (t *tunSubscriber) Unsubscribe(ctx context.Context) error {
|
|
select {
|
|
case <-t.closed:
|
|
return nil
|
|
default:
|
|
close(t.closed)
|
|
return t.listener.Close()
|
|
}
|
|
}
|
|
|
|
func (t *tunEvent) Topic() string {
|
|
return t.topic
|
|
}
|
|
|
|
func (t *tunEvent) Message() *broker.Message {
|
|
return t.message
|
|
}
|
|
|
|
func (t *tunEvent) Ack() error {
|
|
return nil
|
|
}
|
|
|
|
func (t *tunEvent) Error() error {
|
|
return nil
|
|
}
|
|
|
|
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
|
|
options := broker.NewOptions(opts...)
|
|
|
|
t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
|
|
if !ok {
|
|
return nil, fmt.Errorf("tunnel not set")
|
|
}
|
|
|
|
a, ok := options.Context.Value(tunnelAddr{}).(string)
|
|
if ok {
|
|
// initialise address
|
|
if err := t.Init(tunnel.Address(a)); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if len(options.Addrs) > 0 {
|
|
// initialise nodes
|
|
if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &tunBroker{
|
|
opts: options,
|
|
tunnel: t,
|
|
}, nil
|
|
}
|
|
|
|
// WithAddress sets the tunnel address
|
|
func WithAddress(a string) broker.Option {
|
|
return func(o *broker.Options) {
|
|
if o.Context == nil {
|
|
o.Context = context.Background()
|
|
}
|
|
o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
|
|
}
|
|
}
|
|
|
|
// WithTunnel sets the internal tunnel
|
|
func WithTunnel(t tunnel.Tunnel) broker.Option {
|
|
return func(o *broker.Options) {
|
|
if o.Context == nil {
|
|
o.Context = context.Background()
|
|
}
|
|
o.Context = context.WithValue(o.Context, tunnelKey{}, t)
|
|
}
|
|
}
|