micro/broker/nats/nats.go

123 lines
2.2 KiB
Go
Raw Normal View History

package nats
import (
"encoding/json"
"strings"
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/apcera/nats"
"github.com/myodc/go-micro/broker"
2015-05-23 23:16:26 +01:00
c "github.com/myodc/go-micro/context"
"golang.org/x/net/context"
)
type nbroker struct {
addrs []string
conn *nats.Conn
}
type subscriber struct {
s *nats.Subscription
}
2015-05-23 23:16:26 +01:00
// used in brokers where there is no support for headers
type envelope struct {
Header map[string]string
Message *broker.Message
}
func (n *subscriber) Topic() string {
return n.s.Subject
}
func (n *subscriber) Unsubscribe() error {
return n.s.Unsubscribe()
}
func (n *nbroker) Address() string {
if len(n.addrs) > 0 {
return n.addrs[0]
}
return ""
}
func (n *nbroker) Connect() error {
if n.conn != nil {
return nil
}
opts := nats.DefaultOptions
opts.Servers = n.addrs
c, err := opts.Connect()
if err != nil {
return err
}
n.conn = c
return nil
}
func (n *nbroker) Disconnect() error {
n.conn.Close()
return nil
}
func (n *nbroker) Init() error {
return nil
}
2015-05-23 23:16:26 +01:00
func (n *nbroker) Publish(ctx context.Context, topic string, body []byte) error {
2015-05-26 22:39:48 +01:00
header, _ := c.GetMetadata(ctx)
2015-05-23 23:16:26 +01:00
message := &broker.Message{
Id: uuid.NewUUID().String(),
Timestamp: time.Now().Unix(),
Topic: topic,
2015-05-23 23:16:26 +01:00
Body: body,
}
b, err := json.Marshal(&envelope{
header,
message,
})
if err != nil {
return err
}
return n.conn.Publish(topic, b)
}
2015-05-23 23:16:26 +01:00
func (n *nbroker) Subscribe(topic string, function func(context.Context, *broker.Message)) (broker.Subscriber, error) {
sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) {
2015-05-23 23:16:26 +01:00
var e *envelope
if err := json.Unmarshal(msg.Data, &e); err != nil {
return
}
2015-05-26 22:39:48 +01:00
ctx := c.WithMetadata(context.Background(), e.Header)
2015-05-23 23:16:26 +01:00
function(ctx, e.Message)
})
if err != nil {
return nil, err
}
return &subscriber{s: sub}, nil
}
func NewBroker(addrs []string, opt ...broker.Option) broker.Broker {
2015-05-23 13:13:55 +01:00
var cAddrs []string
for _, addr := range addrs {
if len(addr) == 0 {
continue
}
if !strings.HasPrefix(addr, "nats://") {
2015-05-23 13:13:55 +01:00
addr = "nats://" + addr
}
2015-05-23 13:13:55 +01:00
cAddrs = append(cAddrs, addr)
}
if len(cAddrs) == 0 {
cAddrs = []string{nats.DefaultURL}
}
return &nbroker{
2015-05-23 13:13:55 +01:00
addrs: cAddrs,
}
}