DrainXXX options for nats broker and subscriber

This commit is contained in:
magodo 2019-05-15 15:45:03 +08:00 committed by Vasiliy Tolstov
parent 20453c885d
commit 4ac1ad4a6b
2 changed files with 37 additions and 4 deletions

23
nats.go
View File

@ -19,11 +19,13 @@ type nbroker struct {
conn *nats.Conn conn *nats.Conn
opts broker.Options opts broker.Options
nopts nats.Options nopts nats.Options
drain bool
} }
type subscriber struct { type subscriber struct {
s *nats.Subscription s *nats.Subscription
opts broker.SubscribeOptions opts broker.SubscribeOptions
drain bool
} }
type publication struct { type publication struct {
@ -56,6 +58,9 @@ func (n *subscriber) Topic() string {
} }
func (n *subscriber) Unsubscribe() error { func (n *subscriber) Unsubscribe() error {
if n.drain {
return n.s.Drain()
}
return n.s.Unsubscribe() return n.s.Unsubscribe()
} }
@ -121,7 +126,11 @@ func (n *nbroker) Connect() error {
func (n *nbroker) Disconnect() error { func (n *nbroker) Disconnect() error {
n.RLock() n.RLock()
if n.drain {
n.conn.Drain()
} else {
n.conn.Close() n.conn.Close()
}
n.RUnlock() n.RUnlock()
return nil return nil
} }
@ -155,12 +164,18 @@ func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker
opt := broker.SubscribeOptions{ opt := broker.SubscribeOptions{
AutoAck: true, AutoAck: true,
Context: context.Background(),
} }
for _, o := range opts { for _, o := range opts {
o(&opt) o(&opt)
} }
var drain bool
if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok {
drain = true
}
fn := func(msg *nats.Msg) { fn := func(msg *nats.Msg) {
var m broker.Message var m broker.Message
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
@ -182,7 +197,7 @@ func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &subscriber{s: sub, opts: opt}, nil return &subscriber{s: sub, opts: opt, drain: drain}, nil
} }
func (n *nbroker) String() string { func (n *nbroker) String() string {
@ -205,6 +220,11 @@ func NewBroker(opts ...broker.Option) broker.Broker {
natsOpts = n natsOpts = n
} }
var drain bool
if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok {
drain = true
}
// broker.Options have higher priority than nats.Options // broker.Options have higher priority than nats.Options
// only if Addrs, Secure or TLSConfig were not set through a broker.Option // only if Addrs, Secure or TLSConfig were not set through a broker.Option
// we read them from nats.Option // we read them from nats.Option
@ -224,6 +244,7 @@ func NewBroker(opts ...broker.Option) broker.Broker {
opts: options, opts: options,
nopts: natsOpts, nopts: natsOpts,
addrs: setAddrs(options.Addrs), addrs: setAddrs(options.Addrs),
drain: drain,
} }
return nb return nb

View File

@ -6,8 +6,20 @@ import (
) )
type optionsKey struct{} type optionsKey struct{}
type drainConnectionKey struct{}
type drainSubscriptionKey struct{}
// Options accepts nats.Options // Options accepts nats.Options
func Options(opts nats.Options) broker.Option { func Options(opts nats.Options) broker.Option {
return setBrokerOption(optionsKey{}, opts) return setBrokerOption(optionsKey{}, opts)
} }
// DrainConnection will drain subscription on close
func DrainConnection() broker.Option {
return setBrokerOption(drainConnectionKey{}, true)
}
// DrainSubscription will drain pending messages when unsubscribe
func DrainSubscription() broker.SubscribeOption {
return setSubscribeOption(drainSubscriptionKey{}, true)
}