micro/transport/nats_transport.go

146 lines
2.3 KiB
Go
Raw Normal View History

2015-05-20 22:57:19 +01:00
package transport
import (
"encoding/json"
2015-05-21 21:08:19 +01:00
"errors"
2015-05-20 22:57:19 +01:00
"strings"
"time"
"github.com/apcera/nats"
)
2015-05-21 19:24:57 +01:00
type NatsTransport struct {
addrs []string
}
2015-05-20 22:57:19 +01:00
type NatsTransportClient struct {
2015-05-21 19:24:57 +01:00
conn *nats.Conn
addr string
2015-05-20 22:57:19 +01:00
}
type NatsTransportSocket struct {
2015-05-21 21:08:19 +01:00
conn *nats.Conn
m *nats.Msg
2015-05-20 22:57:19 +01:00
}
2015-05-21 21:08:19 +01:00
type NatsTransportListener struct {
2015-05-20 22:57:19 +01:00
conn *nats.Conn
2015-05-21 19:24:57 +01:00
addr string
2015-05-20 22:57:19 +01:00
exit chan bool
}
func (n *NatsTransportClient) Send(m *Message) (*Message, error) {
b, err := json.Marshal(m)
if err != nil {
return nil, err
}
2015-05-21 19:24:57 +01:00
rsp, err := n.conn.Request(n.addr, b, time.Second*10)
2015-05-20 22:57:19 +01:00
if err != nil {
return nil, err
}
var mr *Message
if err := json.Unmarshal(rsp.Data, &mr); err != nil {
return nil, err
}
return mr, nil
}
func (n *NatsTransportClient) Close() error {
n.conn.Close()
return nil
}
2015-05-21 21:08:19 +01:00
func (n *NatsTransportSocket) Recv(m *Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
2015-05-20 22:57:19 +01:00
if err := json.Unmarshal(n.m.Data, &m); err != nil {
2015-05-21 21:08:19 +01:00
return err
2015-05-20 22:57:19 +01:00
}
2015-05-21 21:08:19 +01:00
return nil
2015-05-20 22:57:19 +01:00
}
2015-05-21 21:08:19 +01:00
func (n *NatsTransportSocket) Send(m *Message) error {
b, err := json.Marshal(m)
if err != nil {
return err
}
return n.conn.Publish(n.m.Reply, b)
2015-05-20 22:57:19 +01:00
}
2015-05-21 21:08:19 +01:00
func (n *NatsTransportSocket) Close() error {
return nil
2015-05-20 22:57:19 +01:00
}
2015-05-21 21:08:19 +01:00
func (n *NatsTransportListener) Addr() string {
2015-05-21 19:24:57 +01:00
return n.addr
2015-05-20 22:57:19 +01:00
}
2015-05-21 21:08:19 +01:00
func (n *NatsTransportListener) Close() error {
2015-05-20 22:57:19 +01:00
n.exit <- true
n.conn.Close()
return nil
}
2015-05-21 21:08:19 +01:00
func (n *NatsTransportListener) Accept(fn func(Socket)) error {
2015-05-21 19:24:57 +01:00
s, err := n.conn.Subscribe(n.addr, func(m *nats.Msg) {
2015-05-20 22:57:19 +01:00
fn(&NatsTransportSocket{
2015-05-21 21:08:19 +01:00
conn: n.conn,
m: m,
2015-05-20 22:57:19 +01:00
})
})
if err != nil {
return err
}
<-n.exit
return s.Unsubscribe()
}
2015-05-21 21:08:19 +01:00
func (n *NatsTransport) Dial(addr string) (Client, error) {
2015-05-21 19:24:57 +01:00
cAddr := nats.DefaultURL
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
cAddr = n.addrs[0]
2015-05-20 22:57:19 +01:00
}
2015-05-21 19:24:57 +01:00
c, err := nats.Connect(cAddr)
2015-05-20 22:57:19 +01:00
if err != nil {
return nil, err
}
return &NatsTransportClient{
2015-05-21 19:24:57 +01:00
conn: c,
addr: addr,
2015-05-20 22:57:19 +01:00
}, nil
}
2015-05-21 21:08:19 +01:00
func (n *NatsTransport) Listen(addr string) (Listener, error) {
2015-05-21 19:24:57 +01:00
cAddr := nats.DefaultURL
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
cAddr = n.addrs[0]
2015-05-20 22:57:19 +01:00
}
2015-05-21 19:24:57 +01:00
c, err := nats.Connect(cAddr)
2015-05-20 22:57:19 +01:00
if err != nil {
return nil, err
}
2015-05-21 21:08:19 +01:00
return &NatsTransportListener{
2015-05-21 19:24:57 +01:00
addr: nats.NewInbox(),
2015-05-20 22:57:19 +01:00
conn: c,
exit: make(chan bool, 1),
}, nil
}
func NewNatsTransport(addrs []string) *NatsTransport {
2015-05-21 19:24:57 +01:00
return &NatsTransport{
addrs: addrs,
}
2015-05-20 22:57:19 +01:00
}