2019-01-23 01:39:42 +03:00
|
|
|
// Package stan provides a NATS Streaming broker
|
2021-10-27 23:08:30 +03:00
|
|
|
package stan // import "go.unistack.org/micro-broker-stan/v3"
|
2019-01-23 01:39:42 +03:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-02-10 13:16:41 +03:00
|
|
|
"fmt"
|
2019-01-23 01:39:42 +03:00
|
|
|
"strings"
|
|
|
|
"sync"
|
2019-02-10 13:16:41 +03:00
|
|
|
"time"
|
2019-01-23 01:39:42 +03:00
|
|
|
|
2019-05-30 06:45:20 +03:00
|
|
|
stan "github.com/nats-io/stan.go"
|
2021-10-27 23:08:30 +03:00
|
|
|
"go.unistack.org/micro/v3/broker"
|
|
|
|
"go.unistack.org/micro/v3/logger"
|
|
|
|
"go.unistack.org/micro/v3/metadata"
|
|
|
|
"go.unistack.org/micro/v3/util/id"
|
2019-01-23 01:39:42 +03:00
|
|
|
)
|
|
|
|
|
|
|
|
type stanBroker struct {
|
|
|
|
sync.RWMutex
|
2019-02-12 17:27:14 +03:00
|
|
|
addrs []string
|
|
|
|
conn stan.Conn
|
|
|
|
opts broker.Options
|
|
|
|
sopts stan.Options
|
|
|
|
nopts []stan.Option
|
|
|
|
clusterID string
|
2019-04-26 13:19:42 +03:00
|
|
|
clientID string
|
2019-02-12 17:27:14 +03:00
|
|
|
connectTimeout time.Duration
|
|
|
|
connectRetry bool
|
2021-03-24 23:25:36 +03:00
|
|
|
init bool
|
2019-02-12 17:27:14 +03:00
|
|
|
done chan struct{}
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
type subscriber struct {
|
|
|
|
t string
|
|
|
|
s stan.Subscription
|
|
|
|
dq bool
|
|
|
|
opts broker.SubscribeOptions
|
|
|
|
}
|
|
|
|
|
|
|
|
type publication struct {
|
|
|
|
t string
|
|
|
|
msg *stan.Msg
|
|
|
|
m *broker.Message
|
2020-03-07 13:32:35 +03:00
|
|
|
err error
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (n *publication) Topic() string {
|
|
|
|
return n.t
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *publication) Message() *broker.Message {
|
|
|
|
return n.m
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *publication) Ack() error {
|
|
|
|
return n.msg.Ack()
|
|
|
|
}
|
|
|
|
|
2020-03-07 13:32:35 +03:00
|
|
|
func (n *publication) Error() error {
|
|
|
|
return n.err
|
|
|
|
}
|
|
|
|
|
2021-08-05 00:02:43 +03:00
|
|
|
func (n *publication) SetError(err error) {
|
|
|
|
n.err = err
|
|
|
|
}
|
|
|
|
|
2019-01-23 01:39:42 +03:00
|
|
|
func (n *subscriber) Options() broker.SubscribeOptions {
|
|
|
|
return n.opts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *subscriber) Topic() string {
|
|
|
|
return n.t
|
|
|
|
}
|
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
func (n *subscriber) Unsubscribe(ctx context.Context) error {
|
2019-01-23 01:39:42 +03:00
|
|
|
if n.s == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// go-micro server Unsubscribe can't handle durable queues, so close as stan suggested
|
|
|
|
// from nats streaming readme:
|
|
|
|
// When a client disconnects, the streaming server is not notified, hence the importance of calling Close()
|
|
|
|
if !n.dq {
|
|
|
|
err := n.s.Unsubscribe()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return n.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *subscriber) Close() error {
|
|
|
|
if n.s != nil {
|
|
|
|
return n.s.Close()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *stanBroker) Address() string {
|
2021-09-10 23:44:15 +03:00
|
|
|
return strings.Join(n.addrs, ",")
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func setAddrs(addrs []string) []string {
|
2019-12-04 13:36:30 +03:00
|
|
|
cAddrs := make([]string, 0, len(addrs))
|
2019-01-23 01:39:42 +03:00
|
|
|
for _, addr := range addrs {
|
|
|
|
if len(addr) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if !strings.HasPrefix(addr, "nats://") {
|
|
|
|
addr = "nats://" + addr
|
|
|
|
}
|
|
|
|
cAddrs = append(cAddrs, addr)
|
|
|
|
}
|
|
|
|
if len(cAddrs) == 0 {
|
|
|
|
cAddrs = []string{stan.DefaultNatsURL}
|
|
|
|
}
|
|
|
|
return cAddrs
|
|
|
|
}
|
|
|
|
|
2019-02-10 13:16:41 +03:00
|
|
|
func (n *stanBroker) reconnectCB(c stan.Conn, err error) {
|
2019-02-12 17:27:14 +03:00
|
|
|
if n.connectRetry {
|
2021-01-10 14:45:50 +03:00
|
|
|
if err := n.connect(n.opts.Context); err != nil {
|
|
|
|
if n.opts.Logger.V(logger.ErrorLevel) {
|
2021-01-29 17:11:47 +03:00
|
|
|
n.opts.Logger.Errorf(n.opts.Context, "broker [stan] reconnect err: %v", err)
|
2021-01-10 14:45:50 +03:00
|
|
|
}
|
2019-02-10 13:16:41 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
func (n *stanBroker) connect(ctx context.Context) error {
|
2019-02-10 13:16:41 +03:00
|
|
|
timeout := make(<-chan time.Time)
|
|
|
|
|
2019-06-19 00:59:44 +03:00
|
|
|
n.RLock()
|
2019-02-12 17:27:14 +03:00
|
|
|
if n.connectTimeout > 0 {
|
|
|
|
timeout = time.After(n.connectTimeout)
|
2019-02-10 13:16:41 +03:00
|
|
|
}
|
2019-06-19 00:59:44 +03:00
|
|
|
clusterID := n.clusterID
|
|
|
|
clientID := n.clientID
|
|
|
|
nopts := n.nopts
|
|
|
|
n.RUnlock()
|
2019-02-10 13:16:41 +03:00
|
|
|
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
fn := func() error {
|
2019-06-19 00:59:44 +03:00
|
|
|
c, err := stan.Connect(clusterID, clientID, nopts...)
|
2019-02-10 13:16:41 +03:00
|
|
|
if err == nil {
|
|
|
|
n.Lock()
|
|
|
|
n.conn = c
|
|
|
|
n.Unlock()
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// don't wait for first try
|
|
|
|
if err := fn(); err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-05-09 23:48:37 +03:00
|
|
|
n.RLock()
|
|
|
|
done := n.done
|
|
|
|
n.RUnlock()
|
|
|
|
|
2019-02-10 13:16:41 +03:00
|
|
|
// wait loop
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
// context closed
|
|
|
|
case <-n.opts.Context.Done():
|
|
|
|
return nil
|
|
|
|
// call close, don't wait anymore
|
2019-05-09 23:48:37 +03:00
|
|
|
case <-done:
|
2019-02-10 13:16:41 +03:00
|
|
|
return nil
|
|
|
|
// in case of timeout fail with a timeout error
|
|
|
|
case <-timeout:
|
2019-02-12 17:27:14 +03:00
|
|
|
return fmt.Errorf("[stan]: timeout connect to %v", n.addrs)
|
2019-02-10 13:16:41 +03:00
|
|
|
// got a tick, try to connect
|
|
|
|
case <-ticker.C:
|
|
|
|
err := fn()
|
2021-01-10 14:45:50 +03:00
|
|
|
if err == nil && n.opts.Logger.V(logger.InfoLevel) {
|
|
|
|
{
|
2021-01-29 17:11:47 +03:00
|
|
|
n.opts.Logger.Infof(n.opts.Context, "[stan]: successeful connected to %v", n.addrs)
|
2021-01-10 14:45:50 +03:00
|
|
|
}
|
2019-02-10 13:16:41 +03:00
|
|
|
return nil
|
|
|
|
}
|
2021-01-10 14:45:50 +03:00
|
|
|
if n.opts.Logger.V(logger.ErrorLevel) {
|
2021-01-29 17:11:47 +03:00
|
|
|
n.opts.Logger.Errorf(n.opts.Context, "[stan]: failed to connect %v: %v", n.addrs, err)
|
2021-01-10 14:45:50 +03:00
|
|
|
}
|
2019-02-10 13:16:41 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
func (n *stanBroker) Connect(ctx context.Context) error {
|
2019-01-23 01:39:42 +03:00
|
|
|
n.RLock()
|
|
|
|
if n.conn != nil {
|
|
|
|
n.RUnlock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
n.RUnlock()
|
|
|
|
|
|
|
|
clusterID, ok := n.opts.Context.Value(clusterIDKey{}).(string)
|
|
|
|
if !ok || len(clusterID) == 0 {
|
2021-01-29 17:11:47 +03:00
|
|
|
return fmt.Errorf("must specify ClusterID Option")
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
2021-10-27 23:08:30 +03:00
|
|
|
var err error
|
2019-04-26 13:19:42 +03:00
|
|
|
clientID, ok := n.opts.Context.Value(clientIDKey{}).(string)
|
|
|
|
if !ok || len(clientID) == 0 {
|
2021-10-27 23:08:30 +03:00
|
|
|
clientID, err = id.New()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-04-26 13:19:42 +03:00
|
|
|
}
|
|
|
|
|
2019-06-19 00:59:44 +03:00
|
|
|
n.Lock()
|
2019-02-12 17:27:14 +03:00
|
|
|
if v, ok := n.opts.Context.Value(connectRetryKey{}).(bool); ok && v {
|
|
|
|
n.connectRetry = true
|
2019-02-10 13:16:41 +03:00
|
|
|
}
|
2019-01-23 01:39:42 +03:00
|
|
|
|
2019-02-12 17:27:14 +03:00
|
|
|
if td, ok := n.opts.Context.Value(connectTimeoutKey{}).(time.Duration); ok {
|
|
|
|
n.connectTimeout = td
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
|
|
|
|
2019-02-12 17:27:14 +03:00
|
|
|
if n.sopts.ConnectionLostCB != nil && n.connectRetry {
|
2019-06-19 00:59:44 +03:00
|
|
|
n.Unlock()
|
2021-01-29 17:11:47 +03:00
|
|
|
return fmt.Errorf("impossible to use custom ConnectionLostCB and ConnectRetry(true)")
|
2019-02-10 13:16:41 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
nopts := []stan.Option{
|
|
|
|
stan.NatsURL(n.sopts.NatsURL),
|
|
|
|
stan.NatsConn(n.sopts.NatsConn),
|
|
|
|
stan.ConnectWait(n.sopts.ConnectTimeout),
|
|
|
|
stan.PubAckWait(n.sopts.AckTimeout),
|
|
|
|
stan.MaxPubAcksInflight(n.sopts.MaxPubAcksInflight),
|
2019-05-30 06:45:20 +03:00
|
|
|
stan.Pings(n.sopts.PingInterval, n.sopts.PingMaxOut),
|
2019-02-10 13:16:41 +03:00
|
|
|
}
|
2019-06-19 03:09:12 +03:00
|
|
|
|
|
|
|
if n.connectRetry {
|
|
|
|
nopts = append(nopts, stan.SetConnectionLostHandler(n.reconnectCB))
|
|
|
|
}
|
|
|
|
|
2019-02-10 13:16:41 +03:00
|
|
|
nopts = append(nopts, stan.NatsURL(strings.Join(n.addrs, ",")))
|
|
|
|
|
|
|
|
n.nopts = nopts
|
|
|
|
n.clusterID = clusterID
|
2019-04-26 13:19:42 +03:00
|
|
|
n.clientID = clientID
|
2019-01-23 01:39:42 +03:00
|
|
|
n.Unlock()
|
2019-02-10 13:16:41 +03:00
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
return n.connect(ctx)
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
func (n *stanBroker) Disconnect(ctx context.Context) error {
|
2019-02-10 13:16:41 +03:00
|
|
|
var err error
|
|
|
|
|
|
|
|
n.Lock()
|
|
|
|
defer n.Unlock()
|
|
|
|
|
|
|
|
if n.done != nil {
|
|
|
|
close(n.done)
|
|
|
|
n.done = nil
|
|
|
|
}
|
|
|
|
if n.conn != nil {
|
|
|
|
err = n.conn.Close()
|
|
|
|
}
|
|
|
|
return err
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (n *stanBroker) Init(opts ...broker.Option) error {
|
2021-03-24 23:25:36 +03:00
|
|
|
if len(opts) == 0 && n.init {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := n.opts.Register.Init(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := n.opts.Tracer.Init(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := n.opts.Logger.Init(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := n.opts.Meter.Init(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-01-23 01:39:42 +03:00
|
|
|
for _, o := range opts {
|
|
|
|
o(&n.opts)
|
|
|
|
}
|
|
|
|
n.addrs = setAddrs(n.opts.Addrs)
|
2021-03-24 23:25:36 +03:00
|
|
|
|
|
|
|
n.init = true
|
2019-01-23 01:39:42 +03:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *stanBroker) Options() broker.Options {
|
|
|
|
return n.opts
|
|
|
|
}
|
|
|
|
|
2021-08-05 00:02:43 +03:00
|
|
|
func (n *stanBroker) BatchPublish(ctx context.Context, msg []*broker.Message, opts ...broker.PublishOption) error {
|
2021-09-10 23:44:15 +03:00
|
|
|
var err error
|
|
|
|
var buf []byte
|
|
|
|
|
2021-08-05 00:02:43 +03:00
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
2021-09-10 23:44:15 +03:00
|
|
|
msgs := make(map[string][][]byte)
|
|
|
|
|
|
|
|
options := broker.NewPublishOptions(opts...)
|
2021-08-05 00:02:43 +03:00
|
|
|
wg.Add(len(msg))
|
|
|
|
|
|
|
|
for _, m := range msg {
|
2021-09-10 23:44:15 +03:00
|
|
|
if options.BodyOnly {
|
|
|
|
buf = m.Body
|
|
|
|
} else {
|
|
|
|
buf, err = n.opts.Codec.Marshal(m)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-08-05 00:02:43 +03:00
|
|
|
}
|
|
|
|
topic, _ := m.Header.Get(metadata.HeaderTopic)
|
2021-09-10 23:44:15 +03:00
|
|
|
msgs[topic] = append(msgs[topic], buf)
|
2021-08-05 00:02:43 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
var ackErr error
|
|
|
|
|
|
|
|
ackHandler := func(ackedNuid string, err error) {
|
|
|
|
wg.Done()
|
|
|
|
if err != nil {
|
|
|
|
ackErr = err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-10 23:44:15 +03:00
|
|
|
n.RLock()
|
|
|
|
defer n.RUnlock()
|
|
|
|
|
2021-08-05 00:02:43 +03:00
|
|
|
for topic, ms := range msgs {
|
|
|
|
for _, m := range ms {
|
|
|
|
if _, err := n.conn.PublishAsync(topic, m, ackHandler); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
return ackErr
|
|
|
|
}
|
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
func (n *stanBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
2021-09-10 23:44:15 +03:00
|
|
|
var buf []byte
|
|
|
|
var err error
|
|
|
|
|
|
|
|
options := broker.NewPublishOptions(opts...)
|
|
|
|
|
|
|
|
if options.BodyOnly {
|
|
|
|
buf = msg.Body
|
|
|
|
} else {
|
|
|
|
buf, err = n.opts.Codec.Marshal(msg)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
2021-09-10 23:44:15 +03:00
|
|
|
|
2019-01-23 01:39:42 +03:00
|
|
|
n.RLock()
|
|
|
|
defer n.RUnlock()
|
2021-09-10 23:44:15 +03:00
|
|
|
return n.conn.Publish(topic, buf)
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
|
|
|
|
2021-08-05 00:02:43 +03:00
|
|
|
func (n *stanBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
func (n *stanBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
2019-06-22 00:25:50 +03:00
|
|
|
n.RLock()
|
2019-01-26 13:37:58 +03:00
|
|
|
if n.conn == nil {
|
2019-06-22 00:25:50 +03:00
|
|
|
n.RUnlock()
|
2021-01-29 17:11:47 +03:00
|
|
|
return nil, fmt.Errorf("not connected")
|
2019-01-26 13:37:58 +03:00
|
|
|
}
|
2019-06-22 00:25:50 +03:00
|
|
|
n.RUnlock()
|
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
options := broker.NewSubscribeOptions(opts...)
|
2019-01-23 01:39:42 +03:00
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
if subscribeContext, ok := options.Context.Value(subscribeContextKey{}).(context.Context); ok && subscribeContext != nil {
|
2019-01-23 01:39:42 +03:00
|
|
|
ctx = subscribeContext
|
|
|
|
}
|
|
|
|
|
|
|
|
var stanOpts []stan.SubscriptionOption
|
2021-01-10 14:45:50 +03:00
|
|
|
if options.AutoAck {
|
2019-01-23 01:39:42 +03:00
|
|
|
stanOpts = append(stanOpts, stan.SetManualAckMode())
|
|
|
|
}
|
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
if subOpts, ok := options.Context.Value(subscribeOptionKey{}).([]stan.SubscriptionOption); ok && len(subOpts) > 0 {
|
2019-01-23 01:39:42 +03:00
|
|
|
stanOpts = append(stanOpts, subOpts...)
|
|
|
|
}
|
|
|
|
|
|
|
|
bopts := stan.DefaultSubscriptionOptions
|
|
|
|
for _, bopt := range stanOpts {
|
|
|
|
if err := bopt(&bopts); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
options.AutoAck = !bopts.ManualAcks
|
2019-01-26 13:37:58 +03:00
|
|
|
|
2019-05-29 16:57:48 +03:00
|
|
|
if dn, ok := n.opts.Context.Value(durableKey{}).(string); ok && len(dn) > 0 {
|
|
|
|
stanOpts = append(stanOpts, stan.DurableName(dn))
|
|
|
|
bopts.DurableName = dn
|
|
|
|
}
|
|
|
|
|
2019-01-23 01:39:42 +03:00
|
|
|
fn := func(msg *stan.Msg) {
|
2021-01-10 14:45:50 +03:00
|
|
|
eh := n.opts.ErrorHandler
|
2019-01-28 11:02:42 +03:00
|
|
|
|
2021-01-10 14:45:50 +03:00
|
|
|
if options.ErrorHandler != nil {
|
|
|
|
eh = options.ErrorHandler
|
|
|
|
}
|
|
|
|
|
|
|
|
p := &publication{m: &broker.Message{}, msg: msg, t: msg.Subject}
|
|
|
|
if options.BodyOnly {
|
2020-03-07 13:32:35 +03:00
|
|
|
p.m.Body = msg.Data
|
2021-01-10 14:45:50 +03:00
|
|
|
} else {
|
|
|
|
// unmarshal message
|
|
|
|
if err := n.opts.Codec.Unmarshal(msg.Data, p.m); err != nil {
|
|
|
|
p.err = err
|
|
|
|
p.m.Body = msg.Data
|
|
|
|
eh(p)
|
|
|
|
return
|
|
|
|
}
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
2019-01-28 11:02:42 +03:00
|
|
|
// execute the handler
|
2020-03-07 13:32:35 +03:00
|
|
|
p.err = handler(p)
|
2019-01-28 11:02:42 +03:00
|
|
|
// if there's no error and success auto ack is enabled ack it
|
2021-01-10 14:45:50 +03:00
|
|
|
if p.err == nil && options.AutoAck {
|
2019-01-23 01:39:42 +03:00
|
|
|
msg.Ack()
|
|
|
|
}
|
2021-01-10 14:45:50 +03:00
|
|
|
if p.err != nil {
|
|
|
|
eh(p)
|
|
|
|
}
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
var sub stan.Subscription
|
|
|
|
var err error
|
|
|
|
|
|
|
|
n.RLock()
|
2021-01-10 14:45:50 +03:00
|
|
|
if len(options.Group) > 0 {
|
|
|
|
sub, err = n.conn.QueueSubscribe(topic, options.Group, fn, stanOpts...)
|
2019-01-23 01:39:42 +03:00
|
|
|
} else {
|
|
|
|
sub, err = n.conn.Subscribe(topic, fn, stanOpts...)
|
|
|
|
}
|
|
|
|
n.RUnlock()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-01-10 14:45:50 +03:00
|
|
|
return &subscriber{dq: len(bopts.DurableName) > 0, s: sub, opts: options, t: topic}, nil
|
2019-01-23 01:39:42 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (n *stanBroker) String() string {
|
|
|
|
return "stan"
|
|
|
|
}
|
|
|
|
|
2021-01-29 17:11:47 +03:00
|
|
|
func (n *stanBroker) Name() string {
|
|
|
|
return n.opts.Name
|
|
|
|
}
|
|
|
|
|
2019-01-23 01:39:42 +03:00
|
|
|
func NewBroker(opts ...broker.Option) broker.Broker {
|
2021-01-10 14:45:50 +03:00
|
|
|
options := broker.NewOptions(opts...)
|
2019-01-23 01:39:42 +03:00
|
|
|
|
2019-05-29 16:57:48 +03:00
|
|
|
stanOpts := stan.GetDefaultOptions()
|
2019-01-23 01:39:42 +03:00
|
|
|
if n, ok := options.Context.Value(optionsKey{}).(stan.Options); ok {
|
|
|
|
stanOpts = n
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(options.Addrs) == 0 {
|
|
|
|
options.Addrs = strings.Split(stanOpts.NatsURL, ",")
|
|
|
|
}
|
|
|
|
|
|
|
|
nb := &stanBroker{
|
2019-02-10 13:16:41 +03:00
|
|
|
done: make(chan struct{}),
|
2019-01-23 01:39:42 +03:00
|
|
|
opts: options,
|
2019-02-10 13:16:41 +03:00
|
|
|
sopts: stanOpts,
|
2019-01-23 01:39:42 +03:00
|
|
|
addrs: setAddrs(options.Addrs),
|
|
|
|
}
|
|
|
|
|
|
|
|
return nb
|
|
|
|
}
|