update broker to match go-micro broker, TODO: finish rabbitmq queue fanout

This commit is contained in:
Asim 2015-12-23 21:04:19 +00:00 committed by Vasiliy Tolstov
parent 56cf033626
commit a9dc71f4ad

53
nats.go
View File

@ -16,12 +16,34 @@ type nbroker struct {
type subscriber struct { type subscriber struct {
s *nats.Subscription s *nats.Subscription
opts broker.SubscribeOptions
}
type publication struct {
t string
m *broker.Message
} }
func init() { func init() {
cmd.Brokers["nats"] = NewBroker cmd.Brokers["nats"] = NewBroker
} }
func (n *publication) Topic() string {
return n.t
}
func (n *publication) Message() *broker.Message {
return n.m
}
func (n *publication) Ack() error {
return nil
}
func (n *subscriber) Config() broker.SubscribeOptions {
return n.opts
}
func (n *subscriber) Topic() string { func (n *subscriber) Topic() string {
return n.s.Subject return n.s.Subject
} }
@ -57,11 +79,11 @@ func (n *nbroker) Disconnect() error {
return nil return nil
} }
func (n *nbroker) Init() error { func (n *nbroker) Init(opts ...broker.Option) error {
return nil return nil
} }
func (n *nbroker) Publish(topic string, msg *broker.Message) error { func (n *nbroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
b, err := json.Marshal(msg) b, err := json.Marshal(msg)
if err != nil { if err != nil {
return err return err
@ -69,18 +91,35 @@ func (n *nbroker) Publish(topic string, msg *broker.Message) error {
return n.conn.Publish(topic, b) return n.conn.Publish(topic, b)
} }
func (n *nbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) { func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) { opt := broker.SubscribeOptions{
AutoAck: true,
}
for _, o := range opts {
o(&opt)
}
fn := func(msg *nats.Msg) {
var m *broker.Message var m *broker.Message
if err := json.Unmarshal(msg.Data, &m); err != nil { if err := json.Unmarshal(msg.Data, &m); err != nil {
return return
} }
handler(m) handler(&publication{m: m, t: topic})
}) }
var sub *nats.Subscription
var err error
if len(opt.Queue) > 0 {
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
} else {
sub, err = n.conn.Subscribe(topic, fn)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &subscriber{s: sub}, nil return &subscriber{s: sub, opts: opt}, nil
} }
func (n *nbroker) String() string { func (n *nbroker) String() string {