broker: [stan] rework option naming, fix messages

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2019-02-12 17:27:14 +03:00
parent c37100b7e2
commit 8d0a3fba24
2 changed files with 31 additions and 37 deletions

View File

@ -42,16 +42,16 @@ func AckOnSuccess() broker.SubscribeOption {
return setSubscribeOption(ackSuccessKey{}, true) return setSubscribeOption(ackSuccessKey{}, true)
} }
type timeoutKey struct{} type connectTimeoutKey struct{}
// Timeout for connecting to broker -1 infinitive or time.Duration value // ConnectTimeout timeout for connecting to broker -1 infinitive or time.Duration value
func Timeout(td time.Duration) broker.Option { func ConnectTimeout(td time.Duration) broker.Option {
return setBrokerOption(timeoutKey{}, td) return setBrokerOption(connectTimeoutKey{}, td)
} }
type reconnectKey struct{} type connectRetryKey struct{}
// Reconnect to broker in case of errors // ConnectRetry reconnect to broker in case of errors
func Reconnect(v bool) broker.Option { func ConnectRetry(v bool) broker.Option {
return setBrokerOption(reconnectKey{}, v) return setBrokerOption(connectRetryKey{}, v)
} }

52
stan.go
View File

@ -19,16 +19,16 @@ import (
type stanBroker struct { type stanBroker struct {
sync.RWMutex sync.RWMutex
addrs []string addrs []string
conn stan.Conn conn stan.Conn
opts broker.Options opts broker.Options
sopts stan.Options sopts stan.Options
nopts []stan.Option nopts []stan.Option
clusterID string clusterID string
timeout time.Duration connectTimeout time.Duration
reconnect bool connectRetry bool
done chan struct{} done chan struct{}
ctx context.Context ctx context.Context
} }
type subscriber struct { type subscriber struct {
@ -118,7 +118,7 @@ func setAddrs(addrs []string) []string {
} }
func (n *stanBroker) reconnectCB(c stan.Conn, err error) { func (n *stanBroker) reconnectCB(c stan.Conn, err error) {
if n.reconnect { if n.connectRetry {
if err := n.connect(); err != nil { if err := n.connect(); err != nil {
log.Log(err.Error()) log.Log(err.Error())
} }
@ -128,8 +128,8 @@ func (n *stanBroker) reconnectCB(c stan.Conn, err error) {
func (n *stanBroker) connect() error { func (n *stanBroker) connect() error {
timeout := make(<-chan time.Time) timeout := make(<-chan time.Time)
if n.timeout > 0 { if n.connectTimeout > 0 {
timeout = time.After(n.timeout) timeout = time.After(n.connectTimeout)
} }
ticker := time.NewTicker(1 * time.Second) ticker := time.NewTicker(1 * time.Second)
@ -162,15 +162,15 @@ func (n *stanBroker) connect() error {
return nil return nil
// in case of timeout fail with a timeout error // in case of timeout fail with a timeout error
case <-timeout: case <-timeout:
return fmt.Errorf("timeout connect to %v", n.addrs) return fmt.Errorf("[stan]: timeout connect to %v", n.addrs)
// got a tick, try to connect // got a tick, try to connect
case <-ticker.C: case <-ticker.C:
err := fn() err := fn()
if err == nil { if err == nil {
log.Logf("successeful connected to %v", n.addrs) log.Logf("[stan]: successeful connected to %v", n.addrs)
return nil return nil
} }
log.Logf("failed to connect %v: %v\n", n.addrs, err) log.Logf("[stan]: failed to connect %v: %v\n", n.addrs, err)
} }
} }
@ -190,23 +190,19 @@ func (n *stanBroker) Connect() error {
return errors.New("must specify ClusterID Option") return errors.New("must specify ClusterID Option")
} }
var reconnect bool if v, ok := n.opts.Context.Value(connectRetryKey{}).(bool); ok && v {
if val, ok := n.opts.Context.Value(reconnectKey{}).(bool); ok && val { n.connectRetry = true
reconnect = val
} }
var timeout time.Duration if td, ok := n.opts.Context.Value(connectTimeoutKey{}).(time.Duration); ok {
if td, ok := n.opts.Context.Value(timeoutKey{}).(time.Duration); ok { n.connectTimeout = td
timeout = td
} else {
timeout = 5 * time.Second
} }
if n.sopts.ConnectionLostCB != nil && reconnect { if n.sopts.ConnectionLostCB != nil && n.connectRetry {
return errors.New("impossible to use custom ConnectionLostCB and Reconnect(true)") return errors.New("impossible to use custom ConnectionLostCB and ConnectRetry(true)")
} }
if reconnect { if n.connectRetry {
n.sopts.ConnectionLostCB = n.reconnectCB n.sopts.ConnectionLostCB = n.reconnectCB
} }
@ -224,8 +220,6 @@ func (n *stanBroker) Connect() error {
n.Lock() n.Lock()
n.nopts = nopts n.nopts = nopts
n.clusterID = clusterID n.clusterID = clusterID
n.timeout = timeout
n.reconnect = reconnect
n.Unlock() n.Unlock()
return n.connect() return n.connect()