Moving plugins to go-plugins
This commit is contained in:
commit
b73ddd8a03
103
nats.go
Normal file
103
nats.go
Normal file
@ -0,0 +1,103 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
"github.com/apcera/nats"
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/cmd"
|
||||
)
|
||||
|
||||
type nbroker struct {
|
||||
addrs []string
|
||||
conn *nats.Conn
|
||||
}
|
||||
|
||||
type subscriber struct {
|
||||
s *nats.Subscription
|
||||
}
|
||||
|
||||
func init() {
|
||||
cmd.Brokers["nats"] = NewBroker
|
||||
}
|
||||
|
||||
func (n *subscriber) Topic() string {
|
||||
return n.s.Subject
|
||||
}
|
||||
|
||||
func (n *subscriber) Unsubscribe() error {
|
||||
return n.s.Unsubscribe()
|
||||
}
|
||||
|
||||
func (n *nbroker) Address() string {
|
||||
if len(n.addrs) > 0 {
|
||||
return n.addrs[0]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (n *nbroker) Connect() error {
|
||||
if n.conn != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
opts := nats.DefaultOptions
|
||||
opts.Servers = n.addrs
|
||||
c, err := opts.Connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.conn = c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nbroker) Disconnect() error {
|
||||
n.conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nbroker) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nbroker) Publish(topic string, msg *broker.Message) error {
|
||||
b, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return n.conn.Publish(topic, b)
|
||||
}
|
||||
|
||||
func (n *nbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) {
|
||||
sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) {
|
||||
var m *broker.Message
|
||||
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||
return
|
||||
}
|
||||
handler(m)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &subscriber{s: sub}, nil
|
||||
}
|
||||
|
||||
func NewBroker(addrs []string, opt ...broker.Option) broker.Broker {
|
||||
var cAddrs []string
|
||||
for _, addr := range addrs {
|
||||
if len(addr) == 0 {
|
||||
continue
|
||||
}
|
||||
if !strings.HasPrefix(addr, "nats://") {
|
||||
addr = "nats://" + addr
|
||||
}
|
||||
cAddrs = append(cAddrs, addr)
|
||||
}
|
||||
if len(cAddrs) == 0 {
|
||||
cAddrs = []string{nats.DefaultURL}
|
||||
}
|
||||
return &nbroker{
|
||||
addrs: cAddrs,
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user