micro/transport/nats/nats.go

163 lines
2.6 KiB
Go
Raw Normal View History

package nats
2015-05-21 00:57:19 +03:00
import (
"encoding/json"
2015-05-21 23:08:19 +03:00
"errors"
2015-05-21 00:57:19 +03:00
"strings"
"time"
"github.com/apcera/nats"
2015-11-20 19:17:33 +03:00
"github.com/micro/go-micro/transport"
2015-05-21 00:57:19 +03:00
)
type ntport struct {
2015-05-21 21:24:57 +03:00
addrs []string
}
2015-05-21 00:57:19 +03:00
type ntportClient struct {
2015-05-21 21:24:57 +03:00
conn *nats.Conn
addr string
id string
sub *nats.Subscription
2015-05-21 00:57:19 +03:00
}
type ntportSocket struct {
2015-05-21 23:08:19 +03:00
conn *nats.Conn
m *nats.Msg
2015-05-21 00:57:19 +03:00
}
type ntportListener struct {
2015-05-21 00:57:19 +03:00
conn *nats.Conn
2015-05-21 21:24:57 +03:00
addr string
2015-05-21 00:57:19 +03:00
exit chan bool
}
func (n *ntportClient) Send(m *transport.Message) error {
2015-05-21 00:57:19 +03:00
b, err := json.Marshal(m)
if err != nil {
return err
2015-05-21 00:57:19 +03:00
}
return n.conn.PublishRequest(n.addr, n.id, b)
}
func (n *ntportClient) Recv(m *transport.Message) error {
rsp, err := n.sub.NextMsg(time.Second * 10)
2015-05-21 00:57:19 +03:00
if err != nil {
return err
2015-05-21 00:57:19 +03:00
}
var mr *transport.Message
2015-05-21 00:57:19 +03:00
if err := json.Unmarshal(rsp.Data, &mr); err != nil {
return err
2015-05-21 00:57:19 +03:00
}
*m = *mr
return nil
2015-05-21 00:57:19 +03:00
}
func (n *ntportClient) Close() error {
n.sub.Unsubscribe()
2015-05-21 00:57:19 +03:00
n.conn.Close()
return nil
}
func (n *ntportSocket) Recv(m *transport.Message) error {
2015-05-21 23:08:19 +03:00
if m == nil {
return errors.New("message passed in is nil")
}
2015-05-21 00:57:19 +03:00
if err := json.Unmarshal(n.m.Data, &m); err != nil {
2015-05-21 23:08:19 +03:00
return err
2015-05-21 00:57:19 +03:00
}
2015-05-21 23:08:19 +03:00
return nil
2015-05-21 00:57:19 +03:00
}
func (n *ntportSocket) Send(m *transport.Message) error {
2015-05-21 23:08:19 +03:00
b, err := json.Marshal(m)
if err != nil {
return err
}
return n.conn.Publish(n.m.Reply, b)
2015-05-21 00:57:19 +03:00
}
func (n *ntportSocket) Close() error {
2015-05-21 23:08:19 +03:00
return nil
2015-05-21 00:57:19 +03:00
}
func (n *ntportListener) Addr() string {
2015-05-21 21:24:57 +03:00
return n.addr
2015-05-21 00:57:19 +03:00
}
func (n *ntportListener) Close() error {
2015-05-21 00:57:19 +03:00
n.exit <- true
n.conn.Close()
return nil
}
func (n *ntportListener) Accept(fn func(transport.Socket)) error {
2015-05-21 21:24:57 +03:00
s, err := n.conn.Subscribe(n.addr, func(m *nats.Msg) {
fn(&ntportSocket{
2015-05-21 23:08:19 +03:00
conn: n.conn,
m: m,
2015-05-21 00:57:19 +03:00
})
})
if err != nil {
return err
}
<-n.exit
return s.Unsubscribe()
}
func (n *ntport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
2015-05-21 21:24:57 +03:00
cAddr := nats.DefaultURL
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
cAddr = n.addrs[0]
2015-05-21 00:57:19 +03:00
}
2015-05-21 21:24:57 +03:00
c, err := nats.Connect(cAddr)
2015-05-21 00:57:19 +03:00
if err != nil {
return nil, err
}
id := nats.NewInbox()
sub, err := c.SubscribeSync(id)
if err != nil {
return nil, err
}
return &ntportClient{
2015-05-21 21:24:57 +03:00
conn: c,
addr: addr,
id: id,
sub: sub,
2015-05-21 00:57:19 +03:00
}, nil
}
func (n *ntport) Listen(addr string) (transport.Listener, error) {
2015-05-21 21:24:57 +03:00
cAddr := nats.DefaultURL
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
cAddr = n.addrs[0]
2015-05-21 00:57:19 +03:00
}
2015-05-21 21:24:57 +03:00
c, err := nats.Connect(cAddr)
2015-05-21 00:57:19 +03:00
if err != nil {
return nil, err
}
return &ntportListener{
2015-05-21 21:24:57 +03:00
addr: nats.NewInbox(),
2015-05-21 00:57:19 +03:00
conn: c,
exit: make(chan bool, 1),
}, nil
}
func NewTransport(addrs []string, opt ...transport.Option) transport.Transport {
return &ntport{
2015-05-21 21:24:57 +03:00
addrs: addrs,
}
2015-05-21 00:57:19 +03:00
}