micro/transport/rabbitmq/rabbitmq.go

240 lines
4.1 KiB
Go
Raw Normal View History

package rabbitmq
2015-05-20 22:57:19 +01:00
import (
"fmt"
"sync"
"time"
"errors"
uuid "github.com/nu7hatch/gouuid"
"github.com/streadway/amqp"
"github.com/myodc/go-micro/transport"
2015-05-20 22:57:19 +01:00
)
type rmqtport struct {
2015-05-21 21:08:19 +01:00
conn *rabbitMQConn
2015-05-21 19:24:57 +01:00
addrs []string
2015-05-20 22:57:19 +01:00
}
type rmqtportClient struct {
2015-05-20 22:57:19 +01:00
once sync.Once
rt *rmqtport
2015-05-21 19:24:57 +01:00
addr string
2015-05-20 22:57:19 +01:00
replyTo string
sync.Mutex
inflight map[string]chan amqp.Delivery
}
type rmqtportSocket struct {
2015-05-21 21:08:19 +01:00
conn *rabbitMQConn
d *amqp.Delivery
2015-05-20 22:57:19 +01:00
}
type rmqtportListener struct {
2015-05-21 21:08:19 +01:00
conn *rabbitMQConn
2015-05-21 19:24:57 +01:00
addr string
2015-05-20 22:57:19 +01:00
}
func (r *rmqtportClient) init() {
2015-05-21 19:24:57 +01:00
<-r.rt.conn.Init()
if err := r.rt.conn.Channel.DeclareReplyQueue(r.replyTo); err != nil {
2015-05-20 22:57:19 +01:00
return
}
2015-05-21 19:24:57 +01:00
deliveries, err := r.rt.conn.Channel.ConsumeQueue(r.replyTo)
2015-05-20 22:57:19 +01:00
if err != nil {
return
}
go func() {
for delivery := range deliveries {
2015-05-21 19:24:57 +01:00
go r.handle(delivery)
2015-05-20 22:57:19 +01:00
}
}()
}
func (r *rmqtportClient) handle(delivery amqp.Delivery) {
2015-05-21 19:24:57 +01:00
ch := r.getReq(delivery.CorrelationId)
2015-05-20 22:57:19 +01:00
if ch == nil {
return
}
select {
case ch <- delivery:
default:
}
}
func (r *rmqtportClient) putReq(id string) chan amqp.Delivery {
2015-05-21 19:24:57 +01:00
r.Lock()
2015-05-20 22:57:19 +01:00
ch := make(chan amqp.Delivery, 1)
2015-05-21 19:24:57 +01:00
r.inflight[id] = ch
r.Unlock()
2015-05-20 22:57:19 +01:00
return ch
}
func (r *rmqtportClient) getReq(id string) chan amqp.Delivery {
2015-05-21 19:24:57 +01:00
r.Lock()
defer r.Unlock()
if ch, ok := r.inflight[id]; ok {
delete(r.inflight, id)
2015-05-20 22:57:19 +01:00
return ch
}
return nil
}
func (r *rmqtportClient) Send(m *transport.Message) (*transport.Message, error) {
2015-05-21 19:24:57 +01:00
r.once.Do(r.init)
2015-05-20 22:57:19 +01:00
2015-05-21 19:24:57 +01:00
if !r.rt.conn.IsConnected() {
2015-05-20 22:57:19 +01:00
return nil, errors.New("Not connected to AMQP")
}
id, err := uuid.NewV4()
if err != nil {
return nil, err
}
2015-05-21 19:24:57 +01:00
replyChan := r.putReq(id.String())
2015-05-20 22:57:19 +01:00
headers := amqp.Table{}
for k, v := range m.Header {
headers[k] = v
}
message := amqp.Publishing{
CorrelationId: id.String(),
Timestamp: time.Now().UTC(),
Body: m.Body,
2015-05-21 19:24:57 +01:00
ReplyTo: r.replyTo,
2015-05-20 22:57:19 +01:00
Headers: headers,
}
2015-05-21 19:24:57 +01:00
if err := r.rt.conn.Publish("micro", r.addr, message); err != nil {
r.getReq(id.String())
2015-05-20 22:57:19 +01:00
return nil, err
}
select {
case d := <-replyChan:
mr := &transport.Message{
2015-05-20 22:57:19 +01:00
Header: make(map[string]string),
Body: d.Body,
}
for k, v := range d.Headers {
mr.Header[k] = fmt.Sprintf("%v", v)
}
return mr, nil
case <-time.After(time.Second * 10):
return nil, errors.New("timed out")
}
}
func (r *rmqtportClient) Close() error {
2015-05-20 22:57:19 +01:00
return nil
}
func (r *rmqtportSocket) Recv(m *transport.Message) error {
2015-05-21 21:08:19 +01:00
if m == nil {
return errors.New("message passed in is nil")
}
mr := &transport.Message{
2015-05-20 22:57:19 +01:00
Header: make(map[string]string),
2015-05-21 19:24:57 +01:00
Body: r.d.Body,
2015-05-20 22:57:19 +01:00
}
2015-05-21 19:24:57 +01:00
for k, v := range r.d.Headers {
2015-05-21 21:08:19 +01:00
mr.Header[k] = fmt.Sprintf("%v", v)
2015-05-20 22:57:19 +01:00
}
2015-05-21 21:08:19 +01:00
*m = *mr
return nil
2015-05-20 22:57:19 +01:00
}
func (r *rmqtportSocket) Send(m *transport.Message) error {
2015-05-21 21:08:19 +01:00
msg := amqp.Publishing{
CorrelationId: r.d.CorrelationId,
Timestamp: time.Now().UTC(),
Body: m.Body,
Headers: amqp.Table{},
}
for k, v := range m.Header {
msg.Headers[k] = v
}
return r.conn.Publish("", r.d.ReplyTo, msg)
2015-05-20 22:57:19 +01:00
}
func (r *rmqtportSocket) Close() error {
2015-05-21 21:08:19 +01:00
return nil
2015-05-20 22:57:19 +01:00
}
func (r *rmqtportListener) Addr() string {
2015-05-21 19:24:57 +01:00
return r.addr
2015-05-20 22:57:19 +01:00
}
func (r *rmqtportListener) Close() error {
2015-05-21 19:24:57 +01:00
r.conn.Close()
2015-05-20 22:57:19 +01:00
return nil
}
func (r *rmqtportListener) Accept(fn func(transport.Socket)) error {
2015-05-21 19:24:57 +01:00
deliveries, err := r.conn.Consume(r.addr)
2015-05-20 22:57:19 +01:00
if err != nil {
return err
}
handler := func(d amqp.Delivery) {
fn(&rmqtportSocket{
2015-05-21 21:08:19 +01:00
d: &d,
conn: r.conn,
2015-05-20 22:57:19 +01:00
})
}
for d := range deliveries {
go handler(d)
}
return nil
}
func (r *rmqtport) Dial(addr string) (transport.Client, error) {
2015-05-20 22:57:19 +01:00
id, err := uuid.NewV4()
if err != nil {
return nil, err
}
return &rmqtportClient{
2015-05-21 19:24:57 +01:00
rt: r,
addr: addr,
2015-05-20 22:57:19 +01:00
inflight: make(map[string]chan amqp.Delivery),
replyTo: fmt.Sprintf("replyTo-%s", id.String()),
}, nil
}
func (r *rmqtport) Listen(addr string) (transport.Listener, error) {
2015-05-21 19:24:57 +01:00
id, err := uuid.NewV4()
if err != nil {
return nil, err
}
2015-05-21 21:08:19 +01:00
conn := newRabbitMQConn("", r.addrs)
2015-05-20 22:57:19 +01:00
<-conn.Init()
return &rmqtportListener{
2015-05-21 19:24:57 +01:00
addr: id.String(),
2015-05-20 22:57:19 +01:00
conn: conn,
}, nil
}
func NewTransport(addrs []string, opt ...transport.Option) transport.Transport {
return &rmqtport{
2015-05-21 21:08:19 +01:00
conn: newRabbitMQConn("", addrs),
2015-05-21 19:24:57 +01:00
addrs: addrs,
2015-05-20 22:57:19 +01:00
}
}