wait nats drain since it's asynchronous

1. nats subscription draining is removed, since it is asynchronous,
   and there is no reliable way to detect when it is finished.
   Remove this option to avoid confusion.
2. nats connection draining is kept, and use 2 callbacks to detect
   draining timeout (timeout is set via `nats.Options`) or finish.
3. Also honour options passed in `broker.Init()` (previously only
   `broker.New()` is honoured).
This commit is contained in:
magodo 2019-08-07 17:58:45 +08:00
parent edb0fe4b16
commit 0baea58938
2 changed files with 54 additions and 50 deletions

View File

@ -13,18 +13,19 @@ import (
) )
type natsBroker struct { type natsBroker struct {
sync.Once
sync.RWMutex sync.RWMutex
addrs []string addrs []string
conn *nats.Conn conn *nats.Conn
opts broker.Options opts broker.Options
nopts nats.Options nopts nats.Options
drain bool drain bool
closeCh chan (error)
} }
type subscriber struct { type subscriber struct {
s *nats.Subscription s *nats.Subscription
opts broker.SubscribeOptions opts broker.SubscribeOptions
drain bool
} }
type publication struct { type publication struct {
@ -54,9 +55,6 @@ func (s *subscriber) Topic() string {
} }
func (s *subscriber) Unsubscribe() error { func (s *subscriber) Unsubscribe() error {
if s.drain {
return s.s.Drain()
}
return s.s.Unsubscribe() return s.s.Unsubscribe()
} }
@ -122,20 +120,17 @@ func (n *natsBroker) Connect() error {
func (n *natsBroker) Disconnect() error { func (n *natsBroker) Disconnect() error {
n.RLock() n.RLock()
defer n.RUnlock()
if n.drain { if n.drain {
n.conn.Drain() n.conn.Drain()
} else { return <-n.closeCh
n.conn.Close()
} }
n.RUnlock() n.conn.Close()
return nil return nil
} }
func (n *natsBroker) Init(opts ...broker.Option) error { func (n *natsBroker) Init(opts ...broker.Option) error {
for _, o := range opts { n.setOption(opts...)
o(&n.opts)
}
n.addrs = setAddrs(n.opts.Addrs)
return nil return nil
} }
@ -167,11 +162,6 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
o(&opt) o(&opt)
} }
var drain bool
if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok {
drain = true
}
fn := func(msg *nats.Msg) { fn := func(msg *nats.Msg) {
var m broker.Message var m broker.Message
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
@ -193,7 +183,7 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &subscriber{s: sub, opts: opt, drain: drain}, nil return &subscriber{s: sub, opts: opt}, nil
} }
func (n *natsBroker) String() string { func (n *natsBroker) String() string {
@ -207,39 +197,59 @@ func NewBroker(opts ...broker.Option) broker.Broker {
Context: context.Background(), Context: context.Background(),
} }
n := &natsBroker{
opts: options,
}
n.setOption(opts...)
return n
}
func (n *natsBroker) setOption(opts ...broker.Option) {
for _, o := range opts { for _, o := range opts {
o(&options) o(&n.opts)
} }
natsOpts := nats.GetDefaultOptions() n.Once.Do(func() {
if n, ok := options.Context.Value(optionsKey{}).(nats.Options); ok { n.nopts = nats.GetDefaultOptions()
natsOpts = n })
}
var drain bool if nopts, ok := n.opts.Context.Value(optionsKey{}).(nats.Options); ok {
if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok { n.nopts = nopts
drain = true
} }
// broker.Options have higher priority than nats.Options // broker.Options have higher priority than nats.Options
// only if Addrs, Secure or TLSConfig were not set through a broker.Option // only if Addrs, Secure or TLSConfig were not set through a broker.Option
// we read them from nats.Option // we read them from nats.Option
if len(options.Addrs) == 0 { if len(n.opts.Addrs) == 0 {
options.Addrs = natsOpts.Servers n.opts.Addrs = n.nopts.Servers
} }
if !options.Secure { if !n.opts.Secure {
options.Secure = natsOpts.Secure n.opts.Secure = n.nopts.Secure
} }
if options.TLSConfig == nil { if n.opts.TLSConfig == nil {
options.TLSConfig = natsOpts.TLSConfig n.opts.TLSConfig = n.nopts.TLSConfig
} }
n.addrs = setAddrs(n.opts.Addrs)
return &natsBroker{ if n.opts.Context.Value(drainConnectionKey{}) != nil {
opts: options, n.drain = true
nopts: natsOpts, n.closeCh = make(chan error)
addrs: setAddrs(options.Addrs), n.nopts.ClosedCB = n.onClose
drain: drain, n.nopts.AsyncErrorCB = n.onAsyncError
}
}
func (n *natsBroker) onClose(conn *nats.Conn) {
n.closeCh <- nil
}
func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) {
// There are kinds of different async error nats might callback, but we are interested
// in ErrDrainTimeout only here.
if err == nats.ErrDrainTimeout {
n.closeCh <- err
} }
} }

View File

@ -7,7 +7,6 @@ import (
type optionsKey struct{} type optionsKey struct{}
type drainConnectionKey struct{} type drainConnectionKey struct{}
type drainSubscriptionKey struct{}
// Options accepts nats.Options // Options accepts nats.Options
func Options(opts nats.Options) broker.Option { func Options(opts nats.Options) broker.Option {
@ -16,10 +15,5 @@ func Options(opts nats.Options) broker.Option {
// DrainConnection will drain subscription on close // DrainConnection will drain subscription on close
func DrainConnection() broker.Option { func DrainConnection() broker.Option {
return setBrokerOption(drainConnectionKey{}, true) return setBrokerOption(drainConnectionKey{}, struct{}{})
}
// DrainSubscription will drain pending messages when unsubscribe
func DrainSubscription() broker.SubscribeOption {
return setSubscribeOption(drainSubscriptionKey{}, true)
} }