diff --git a/nats.go b/nats.go index 8a4e022..bff6b2c 100644 --- a/nats.go +++ b/nats.go @@ -15,13 +15,35 @@ type nbroker struct { } type subscriber struct { - s *nats.Subscription + s *nats.Subscription + opts broker.SubscribeOptions +} + +type publication struct { + t string + m *broker.Message } func init() { cmd.Brokers["nats"] = NewBroker } +func (n *publication) Topic() string { + return n.t +} + +func (n *publication) Message() *broker.Message { + return n.m +} + +func (n *publication) Ack() error { + return nil +} + +func (n *subscriber) Config() broker.SubscribeOptions { + return n.opts +} + func (n *subscriber) Topic() string { return n.s.Subject } @@ -57,11 +79,11 @@ func (n *nbroker) Disconnect() error { return nil } -func (n *nbroker) Init() error { +func (n *nbroker) Init(opts ...broker.Option) error { return nil } -func (n *nbroker) Publish(topic string, msg *broker.Message) error { +func (n *nbroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { b, err := json.Marshal(msg) if err != nil { return err @@ -69,18 +91,35 @@ func (n *nbroker) Publish(topic string, msg *broker.Message) error { return n.conn.Publish(topic, b) } -func (n *nbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) { - sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) { +func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + opt := broker.SubscribeOptions{ + AutoAck: true, + } + + for _, o := range opts { + o(&opt) + } + + fn := func(msg *nats.Msg) { var m *broker.Message if err := json.Unmarshal(msg.Data, &m); err != nil { return } - handler(m) - }) + handler(&publication{m: m, t: topic}) + } + + var sub *nats.Subscription + var err error + + if len(opt.Queue) > 0 { + sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn) + } else { + sub, err = n.conn.Subscribe(topic, fn) + } if err != nil { return nil, err } - return &subscriber{s: sub}, nil + return &subscriber{s: sub, opts: opt}, nil } func (n *nbroker) String() string {