2015-05-23 20:04:16 +01:00
|
|
|
package nats
|
2015-05-20 22:57:19 +01:00
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
2015-05-21 21:08:19 +01:00
|
|
|
"errors"
|
2015-05-20 22:57:19 +01:00
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/apcera/nats"
|
2015-05-23 20:04:16 +01:00
|
|
|
"github.com/myodc/go-micro/transport"
|
2015-05-20 22:57:19 +01:00
|
|
|
)
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
type ntport struct {
|
2015-05-21 19:24:57 +01:00
|
|
|
addrs []string
|
|
|
|
}
|
2015-05-20 22:57:19 +01:00
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
type ntportClient struct {
|
2015-05-21 19:24:57 +01:00
|
|
|
conn *nats.Conn
|
|
|
|
addr string
|
2015-06-01 18:55:27 +01:00
|
|
|
id string
|
|
|
|
sub *nats.Subscription
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
type ntportSocket struct {
|
2015-05-21 21:08:19 +01:00
|
|
|
conn *nats.Conn
|
|
|
|
m *nats.Msg
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
type ntportListener struct {
|
2015-05-20 22:57:19 +01:00
|
|
|
conn *nats.Conn
|
2015-05-21 19:24:57 +01:00
|
|
|
addr string
|
2015-05-20 22:57:19 +01:00
|
|
|
exit chan bool
|
|
|
|
}
|
|
|
|
|
2015-06-01 18:55:27 +01:00
|
|
|
func (n *ntportClient) Send(m *transport.Message) error {
|
2015-05-20 22:57:19 +01:00
|
|
|
b, err := json.Marshal(m)
|
|
|
|
if err != nil {
|
2015-06-01 18:55:27 +01:00
|
|
|
return err
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-06-01 18:55:27 +01:00
|
|
|
return n.conn.PublishRequest(n.addr, n.id, b)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *ntportClient) Recv(m *transport.Message) error {
|
|
|
|
rsp, err := n.sub.NextMsg(time.Second * 10)
|
2015-05-20 22:57:19 +01:00
|
|
|
if err != nil {
|
2015-06-01 18:55:27 +01:00
|
|
|
return err
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
var mr *transport.Message
|
2015-05-20 22:57:19 +01:00
|
|
|
if err := json.Unmarshal(rsp.Data, &mr); err != nil {
|
2015-06-01 18:55:27 +01:00
|
|
|
return err
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-06-01 18:55:27 +01:00
|
|
|
*m = *mr
|
|
|
|
return nil
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
func (n *ntportClient) Close() error {
|
2015-06-01 18:55:27 +01:00
|
|
|
n.sub.Unsubscribe()
|
2015-05-20 22:57:19 +01:00
|
|
|
n.conn.Close()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
func (n *ntportSocket) Recv(m *transport.Message) error {
|
2015-05-21 21:08:19 +01:00
|
|
|
if m == nil {
|
|
|
|
return errors.New("message passed in is nil")
|
|
|
|
}
|
2015-05-20 22:57:19 +01:00
|
|
|
|
|
|
|
if err := json.Unmarshal(n.m.Data, &m); err != nil {
|
2015-05-21 21:08:19 +01:00
|
|
|
return err
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
2015-05-21 21:08:19 +01:00
|
|
|
return nil
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
func (n *ntportSocket) Send(m *transport.Message) error {
|
2015-05-21 21:08:19 +01:00
|
|
|
b, err := json.Marshal(m)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return n.conn.Publish(n.m.Reply, b)
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
func (n *ntportSocket) Close() error {
|
2015-05-21 21:08:19 +01:00
|
|
|
return nil
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
func (n *ntportListener) Addr() string {
|
2015-05-21 19:24:57 +01:00
|
|
|
return n.addr
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
func (n *ntportListener) Close() error {
|
2015-05-20 22:57:19 +01:00
|
|
|
n.exit <- true
|
|
|
|
n.conn.Close()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
func (n *ntportListener) Accept(fn func(transport.Socket)) error {
|
2015-05-21 19:24:57 +01:00
|
|
|
s, err := n.conn.Subscribe(n.addr, func(m *nats.Msg) {
|
2015-05-23 20:04:16 +01:00
|
|
|
fn(&ntportSocket{
|
2015-05-21 21:08:19 +01:00
|
|
|
conn: n.conn,
|
|
|
|
m: m,
|
2015-05-20 22:57:19 +01:00
|
|
|
})
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
<-n.exit
|
|
|
|
return s.Unsubscribe()
|
|
|
|
}
|
|
|
|
|
2015-06-01 18:55:27 +01:00
|
|
|
func (n *ntport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
|
2015-05-21 19:24:57 +01:00
|
|
|
cAddr := nats.DefaultURL
|
|
|
|
|
|
|
|
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
|
|
|
|
cAddr = n.addrs[0]
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-21 19:24:57 +01:00
|
|
|
c, err := nats.Connect(cAddr)
|
2015-05-20 22:57:19 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2015-06-01 18:55:27 +01:00
|
|
|
id := nats.NewInbox()
|
|
|
|
sub, err := c.SubscribeSync(id)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
return &ntportClient{
|
2015-05-21 19:24:57 +01:00
|
|
|
conn: c,
|
|
|
|
addr: addr,
|
2015-06-01 18:55:27 +01:00
|
|
|
id: id,
|
|
|
|
sub: sub,
|
2015-05-20 22:57:19 +01:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
func (n *ntport) Listen(addr string) (transport.Listener, error) {
|
2015-05-21 19:24:57 +01:00
|
|
|
cAddr := nats.DefaultURL
|
|
|
|
|
|
|
|
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
|
|
|
|
cAddr = n.addrs[0]
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|
|
|
|
|
2015-05-21 19:24:57 +01:00
|
|
|
c, err := nats.Connect(cAddr)
|
2015-05-20 22:57:19 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
return &ntportListener{
|
2015-05-21 19:24:57 +01:00
|
|
|
addr: nats.NewInbox(),
|
2015-05-20 22:57:19 +01:00
|
|
|
conn: c,
|
|
|
|
exit: make(chan bool, 1),
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2015-05-23 20:04:16 +01:00
|
|
|
func NewTransport(addrs []string, opt ...transport.Option) transport.Transport {
|
|
|
|
return &ntport{
|
2015-05-21 19:24:57 +01:00
|
|
|
addrs: addrs,
|
|
|
|
}
|
2015-05-20 22:57:19 +01:00
|
|
|
}
|