broker: stan add Reconnect and Timeout

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2019-02-10 13:16:41 +03:00
parent d2ea46c139
commit c37100b7e2
2 changed files with 136 additions and 26 deletions

View File

@ -2,6 +2,7 @@ package stan
import ( import (
"context" "context"
"time"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
stan "github.com/nats-io/go-nats-streaming" stan "github.com/nats-io/go-nats-streaming"
@ -40,3 +41,17 @@ type ackSuccessKey struct{}
func AckOnSuccess() broker.SubscribeOption { func AckOnSuccess() broker.SubscribeOption {
return setSubscribeOption(ackSuccessKey{}, true) return setSubscribeOption(ackSuccessKey{}, true)
} }
type timeoutKey struct{}
// Timeout for connecting to broker -1 infinitive or time.Duration value
func Timeout(td time.Duration) broker.Option {
return setBrokerOption(timeoutKey{}, td)
}
type reconnectKey struct{}
// Reconnect to broker in case of errors
func Reconnect(v bool) broker.Option {
return setBrokerOption(reconnectKey{}, v)
}

141
stan.go
View File

@ -4,10 +4,13 @@ package stan
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"strings" "strings"
"sync" "sync"
"time"
"github.com/google/uuid" "github.com/google/uuid"
log "github.com/micro/go-log"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/cmd" "github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/codec/json" "github.com/micro/go-micro/codec/json"
@ -19,7 +22,13 @@ type stanBroker struct {
addrs []string addrs []string
conn stan.Conn conn stan.Conn
opts broker.Options opts broker.Options
nopts stan.Options sopts stan.Options
nopts []stan.Option
clusterID string
timeout time.Duration
reconnect bool
done chan struct{}
ctx context.Context
} }
type subscriber struct { type subscriber struct {
@ -108,6 +117,66 @@ func setAddrs(addrs []string) []string {
return cAddrs return cAddrs
} }
func (n *stanBroker) reconnectCB(c stan.Conn, err error) {
if n.reconnect {
if err := n.connect(); err != nil {
log.Log(err.Error())
}
}
}
func (n *stanBroker) connect() error {
timeout := make(<-chan time.Time)
if n.timeout > 0 {
timeout = time.After(n.timeout)
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
fn := func() error {
clientID := uuid.New().String()
c, err := stan.Connect(n.clusterID, clientID, n.nopts...)
if err == nil {
n.Lock()
n.conn = c
n.Unlock()
}
return err
}
// don't wait for first try
if err := fn(); err == nil {
return nil
}
// wait loop
for {
select {
// context closed
case <-n.opts.Context.Done():
return nil
// call close, don't wait anymore
case <-n.done:
return nil
// in case of timeout fail with a timeout error
case <-timeout:
return fmt.Errorf("timeout connect to %v", n.addrs)
// got a tick, try to connect
case <-ticker.C:
err := fn()
if err == nil {
log.Logf("successeful connected to %v", n.addrs)
return nil
}
log.Logf("failed to connect %v: %v\n", n.addrs, err)
}
}
return nil
}
func (n *stanBroker) Connect() error { func (n *stanBroker) Connect() error {
n.RLock() n.RLock()
if n.conn != nil { if n.conn != nil {
@ -116,41 +185,66 @@ func (n *stanBroker) Connect() error {
} }
n.RUnlock() n.RUnlock()
opts := n.nopts
opts.NatsURL = strings.Join(n.addrs, ",")
clusterID, ok := n.opts.Context.Value(clusterIDKey{}).(string) clusterID, ok := n.opts.Context.Value(clusterIDKey{}).(string)
if !ok || len(clusterID) == 0 { if !ok || len(clusterID) == 0 {
return errors.New("must specify ClusterID Option") return errors.New("must specify ClusterID Option")
} }
clientID := uuid.New().String() var reconnect bool
if val, ok := n.opts.Context.Value(reconnectKey{}).(bool); ok && val {
reconnect = val
}
var timeout time.Duration
if td, ok := n.opts.Context.Value(timeoutKey{}).(time.Duration); ok {
timeout = td
} else {
timeout = 5 * time.Second
}
if n.sopts.ConnectionLostCB != nil && reconnect {
return errors.New("impossible to use custom ConnectionLostCB and Reconnect(true)")
}
if reconnect {
n.sopts.ConnectionLostCB = n.reconnectCB
}
nopts := []stan.Option{ nopts := []stan.Option{
stan.NatsURL(opts.NatsURL), stan.NatsURL(n.sopts.NatsURL),
stan.NatsConn(opts.NatsConn), stan.NatsConn(n.sopts.NatsConn),
stan.ConnectWait(opts.ConnectTimeout), stan.ConnectWait(n.sopts.ConnectTimeout),
stan.PubAckWait(opts.AckTimeout), stan.PubAckWait(n.sopts.AckTimeout),
stan.MaxPubAcksInflight(opts.MaxPubAcksInflight), stan.MaxPubAcksInflight(n.sopts.MaxPubAcksInflight),
stan.Pings(opts.PingIterval, opts.PingMaxOut), stan.Pings(n.sopts.PingIterval, n.sopts.PingMaxOut),
stan.SetConnectionLostHandler(opts.ConnectionLostCB), stan.SetConnectionLostHandler(n.sopts.ConnectionLostCB),
} }
nopts = append(nopts, stan.NatsURL(strings.Join(n.addrs, ",")))
c, err := stan.Connect(clusterID, clientID, nopts...)
if err != nil {
return err
}
n.Lock() n.Lock()
n.conn = c n.nopts = nopts
n.clusterID = clusterID
n.timeout = timeout
n.reconnect = reconnect
n.Unlock() n.Unlock()
return nil
return n.connect()
} }
func (n *stanBroker) Disconnect() error { func (n *stanBroker) Disconnect() error {
n.RLock() var err error
n.conn.Close()
n.RUnlock() n.Lock()
return nil defer n.Unlock()
if n.done != nil {
close(n.done)
n.done = nil
}
if n.conn != nil {
err = n.conn.Close()
}
return err
} }
func (n *stanBroker) Init(opts ...broker.Option) error { func (n *stanBroker) Init(opts ...broker.Option) error {
@ -279,8 +373,9 @@ func NewBroker(opts ...broker.Option) broker.Broker {
} }
nb := &stanBroker{ nb := &stanBroker{
done: make(chan struct{}),
opts: options, opts: options,
nopts: stanOpts, sopts: stanOpts,
addrs: setAddrs(options.Addrs), addrs: setAddrs(options.Addrs),
} }