diff --git a/options.go b/options.go index 2982ee0..43f0bb9 100644 --- a/options.go +++ b/options.go @@ -42,16 +42,16 @@ func AckOnSuccess() broker.SubscribeOption { return setSubscribeOption(ackSuccessKey{}, true) } -type timeoutKey struct{} +type connectTimeoutKey struct{} -// Timeout for connecting to broker -1 infinitive or time.Duration value -func Timeout(td time.Duration) broker.Option { - return setBrokerOption(timeoutKey{}, td) +// ConnectTimeout timeout for connecting to broker -1 infinitive or time.Duration value +func ConnectTimeout(td time.Duration) broker.Option { + return setBrokerOption(connectTimeoutKey{}, td) } -type reconnectKey struct{} +type connectRetryKey struct{} -// Reconnect to broker in case of errors -func Reconnect(v bool) broker.Option { - return setBrokerOption(reconnectKey{}, v) +// ConnectRetry reconnect to broker in case of errors +func ConnectRetry(v bool) broker.Option { + return setBrokerOption(connectRetryKey{}, v) } diff --git a/stan.go b/stan.go index f34744d..1b0f31e 100644 --- a/stan.go +++ b/stan.go @@ -19,16 +19,16 @@ import ( type stanBroker struct { sync.RWMutex - addrs []string - conn stan.Conn - opts broker.Options - sopts stan.Options - nopts []stan.Option - clusterID string - timeout time.Duration - reconnect bool - done chan struct{} - ctx context.Context + addrs []string + conn stan.Conn + opts broker.Options + sopts stan.Options + nopts []stan.Option + clusterID string + connectTimeout time.Duration + connectRetry bool + done chan struct{} + ctx context.Context } type subscriber struct { @@ -118,7 +118,7 @@ func setAddrs(addrs []string) []string { } func (n *stanBroker) reconnectCB(c stan.Conn, err error) { - if n.reconnect { + if n.connectRetry { if err := n.connect(); err != nil { log.Log(err.Error()) } @@ -128,8 +128,8 @@ func (n *stanBroker) reconnectCB(c stan.Conn, err error) { func (n *stanBroker) connect() error { timeout := make(<-chan time.Time) - if n.timeout > 0 { - timeout = time.After(n.timeout) + if n.connectTimeout > 0 { + timeout = time.After(n.connectTimeout) } ticker := time.NewTicker(1 * time.Second) @@ -162,15 +162,15 @@ func (n *stanBroker) connect() error { return nil // in case of timeout fail with a timeout error case <-timeout: - return fmt.Errorf("timeout connect to %v", n.addrs) + return fmt.Errorf("[stan]: timeout connect to %v", n.addrs) // got a tick, try to connect case <-ticker.C: err := fn() if err == nil { - log.Logf("successeful connected to %v", n.addrs) + log.Logf("[stan]: successeful connected to %v", n.addrs) return nil } - log.Logf("failed to connect %v: %v\n", n.addrs, err) + log.Logf("[stan]: failed to connect %v: %v\n", n.addrs, err) } } @@ -190,23 +190,19 @@ func (n *stanBroker) Connect() error { return errors.New("must specify ClusterID Option") } - var reconnect bool - if val, ok := n.opts.Context.Value(reconnectKey{}).(bool); ok && val { - reconnect = val + if v, ok := n.opts.Context.Value(connectRetryKey{}).(bool); ok && v { + n.connectRetry = true } - var timeout time.Duration - if td, ok := n.opts.Context.Value(timeoutKey{}).(time.Duration); ok { - timeout = td - } else { - timeout = 5 * time.Second + if td, ok := n.opts.Context.Value(connectTimeoutKey{}).(time.Duration); ok { + n.connectTimeout = td } - if n.sopts.ConnectionLostCB != nil && reconnect { - return errors.New("impossible to use custom ConnectionLostCB and Reconnect(true)") + if n.sopts.ConnectionLostCB != nil && n.connectRetry { + return errors.New("impossible to use custom ConnectionLostCB and ConnectRetry(true)") } - if reconnect { + if n.connectRetry { n.sopts.ConnectionLostCB = n.reconnectCB } @@ -224,8 +220,6 @@ func (n *stanBroker) Connect() error { n.Lock() n.nopts = nopts n.clusterID = clusterID - n.timeout = timeout - n.reconnect = reconnect n.Unlock() return n.connect()