Merge pull request #651 from magodo/master

wait nats drain since it's asynchronous
This commit is contained in:
Asim Aslam 2019-08-08 00:30:01 +01:00 committed by GitHub
commit c7e8a2aeb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 50 deletions

View File

@ -13,18 +13,19 @@ import (
) )
type natsBroker struct { type natsBroker struct {
sync.Once
sync.RWMutex sync.RWMutex
addrs []string addrs []string
conn *nats.Conn conn *nats.Conn
opts broker.Options opts broker.Options
nopts nats.Options nopts nats.Options
drain bool drain bool
closeCh chan (error)
} }
type subscriber struct { type subscriber struct {
s *nats.Subscription s *nats.Subscription
opts broker.SubscribeOptions opts broker.SubscribeOptions
drain bool
} }
type publication struct { type publication struct {
@ -54,9 +55,6 @@ func (s *subscriber) Topic() string {
} }
func (s *subscriber) Unsubscribe() error { func (s *subscriber) Unsubscribe() error {
if s.drain {
return s.s.Drain()
}
return s.s.Unsubscribe() return s.s.Unsubscribe()
} }
@ -122,20 +120,17 @@ func (n *natsBroker) Connect() error {
func (n *natsBroker) Disconnect() error { func (n *natsBroker) Disconnect() error {
n.RLock() n.RLock()
defer n.RUnlock()
if n.drain { if n.drain {
n.conn.Drain() n.conn.Drain()
} else { return <-n.closeCh
n.conn.Close()
} }
n.RUnlock() n.conn.Close()
return nil return nil
} }
func (n *natsBroker) Init(opts ...broker.Option) error { func (n *natsBroker) Init(opts ...broker.Option) error {
for _, o := range opts { n.setOption(opts...)
o(&n.opts)
}
n.addrs = setAddrs(n.opts.Addrs)
return nil return nil
} }
@ -167,11 +162,6 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
o(&opt) o(&opt)
} }
var drain bool
if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok {
drain = true
}
fn := func(msg *nats.Msg) { fn := func(msg *nats.Msg) {
var m broker.Message var m broker.Message
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
@ -193,7 +183,7 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &subscriber{s: sub, opts: opt, drain: drain}, nil return &subscriber{s: sub, opts: opt}, nil
} }
func (n *natsBroker) String() string { func (n *natsBroker) String() string {
@ -207,39 +197,59 @@ func NewBroker(opts ...broker.Option) broker.Broker {
Context: context.Background(), Context: context.Background(),
} }
n := &natsBroker{
opts: options,
}
n.setOption(opts...)
return n
}
func (n *natsBroker) setOption(opts ...broker.Option) {
for _, o := range opts { for _, o := range opts {
o(&options) o(&n.opts)
} }
natsOpts := nats.GetDefaultOptions() n.Once.Do(func() {
if n, ok := options.Context.Value(optionsKey{}).(nats.Options); ok { n.nopts = nats.GetDefaultOptions()
natsOpts = n })
}
var drain bool if nopts, ok := n.opts.Context.Value(optionsKey{}).(nats.Options); ok {
if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok { n.nopts = nopts
drain = true
} }
// broker.Options have higher priority than nats.Options // broker.Options have higher priority than nats.Options
// only if Addrs, Secure or TLSConfig were not set through a broker.Option // only if Addrs, Secure or TLSConfig were not set through a broker.Option
// we read them from nats.Option // we read them from nats.Option
if len(options.Addrs) == 0 { if len(n.opts.Addrs) == 0 {
options.Addrs = natsOpts.Servers n.opts.Addrs = n.nopts.Servers
} }
if !options.Secure { if !n.opts.Secure {
options.Secure = natsOpts.Secure n.opts.Secure = n.nopts.Secure
} }
if options.TLSConfig == nil { if n.opts.TLSConfig == nil {
options.TLSConfig = natsOpts.TLSConfig n.opts.TLSConfig = n.nopts.TLSConfig
} }
n.addrs = setAddrs(n.opts.Addrs)
return &natsBroker{ if n.opts.Context.Value(drainConnectionKey{}) != nil {
opts: options, n.drain = true
nopts: natsOpts, n.closeCh = make(chan error)
addrs: setAddrs(options.Addrs), n.nopts.ClosedCB = n.onClose
drain: drain, n.nopts.AsyncErrorCB = n.onAsyncError
}
}
func (n *natsBroker) onClose(conn *nats.Conn) {
n.closeCh <- nil
}
func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) {
// There are kinds of different async error nats might callback, but we are interested
// in ErrDrainTimeout only here.
if err == nats.ErrDrainTimeout {
n.closeCh <- err
} }
} }

View File

@ -7,7 +7,6 @@ import (
type optionsKey struct{} type optionsKey struct{}
type drainConnectionKey struct{} type drainConnectionKey struct{}
type drainSubscriptionKey struct{}
// Options accepts nats.Options // Options accepts nats.Options
func Options(opts nats.Options) broker.Option { func Options(opts nats.Options) broker.Option {
@ -16,10 +15,5 @@ func Options(opts nats.Options) broker.Option {
// DrainConnection will drain subscription on close // DrainConnection will drain subscription on close
func DrainConnection() broker.Option { func DrainConnection() broker.Option {
return setBrokerOption(drainConnectionKey{}, true) return setBrokerOption(drainConnectionKey{}, struct{}{})
}
// DrainSubscription will drain pending messages when unsubscribe
func DrainSubscription() broker.SubscribeOption {
return setSubscribeOption(drainSubscriptionKey{}, true)
} }