micro/transport/rabbitmq/connection.go

148 lines
2.6 KiB
Go
Raw Normal View History

package rabbitmq
2015-05-20 22:57:19 +01:00
//
// All credit to Mondo
//
import (
2015-05-21 19:24:57 +01:00
"strings"
2015-05-20 22:57:19 +01:00
"sync"
"time"
"github.com/streadway/amqp"
)
var (
DefaultExchange = "micro"
DefaultRabbitURL = "amqp://guest:guest@127.0.0.1:5672"
)
2015-05-21 21:08:19 +01:00
type rabbitMQConn struct {
2015-05-20 22:57:19 +01:00
Connection *amqp.Connection
2015-05-21 21:08:19 +01:00
Channel *rabbitMQChannel
ExchangeChannel *rabbitMQChannel
2015-05-20 22:57:19 +01:00
notify chan bool
exchange string
url string
connected bool
2015-05-21 19:24:57 +01:00
mtx sync.Mutex
close chan bool
closed bool
2015-05-20 22:57:19 +01:00
}
2015-05-21 21:08:19 +01:00
func newRabbitMQConn(exchange string, urls []string) *rabbitMQConn {
var url string
if len(urls) > 0 && strings.HasPrefix(urls[0], "amqp://") {
url = urls[0]
} else {
url = DefaultRabbitURL
}
if len(exchange) == 0 {
exchange = DefaultExchange
}
return &rabbitMQConn{
exchange: exchange,
url: url,
notify: make(chan bool, 1),
close: make(chan bool),
}
}
func (r *rabbitMQConn) Init() chan bool {
2015-05-20 22:57:19 +01:00
go r.Connect(r.notify)
return r.notify
}
2015-05-21 21:08:19 +01:00
func (r *rabbitMQConn) Connect(connected chan bool) {
2015-05-20 22:57:19 +01:00
for {
if err := r.tryToConnect(); err != nil {
time.Sleep(1 * time.Second)
continue
}
connected <- true
r.connected = true
notifyClose := make(chan *amqp.Error)
r.Connection.NotifyClose(notifyClose)
// Block until we get disconnected, or shut down
select {
case <-notifyClose:
// Spin around and reconnect
r.connected = false
2015-05-21 19:24:57 +01:00
case <-r.close:
2015-05-20 22:57:19 +01:00
// Shut down connection
if err := r.Connection.Close(); err != nil {
}
r.connected = false
return
}
}
}
2015-05-21 21:08:19 +01:00
func (r *rabbitMQConn) IsConnected() bool {
2015-05-20 22:57:19 +01:00
return r.connected
}
2015-05-21 21:08:19 +01:00
func (r *rabbitMQConn) Close() {
2015-05-20 22:57:19 +01:00
r.mtx.Lock()
defer r.mtx.Unlock()
if r.closed {
return
}
2015-05-21 19:24:57 +01:00
close(r.close)
2015-05-20 22:57:19 +01:00
r.closed = true
}
2015-05-21 21:08:19 +01:00
func (r *rabbitMQConn) tryToConnect() error {
2015-05-20 22:57:19 +01:00
var err error
r.Connection, err = amqp.Dial(r.url)
if err != nil {
return err
}
2015-05-21 21:08:19 +01:00
r.Channel, err = newRabbitChannel(r.Connection)
2015-05-20 22:57:19 +01:00
if err != nil {
return err
}
r.Channel.DeclareExchange(r.exchange)
2015-05-21 21:08:19 +01:00
r.ExchangeChannel, err = newRabbitChannel(r.Connection)
2015-05-20 22:57:19 +01:00
if err != nil {
return err
}
return nil
}
2015-05-21 21:08:19 +01:00
func (r *rabbitMQConn) Consume(queue string) (<-chan amqp.Delivery, error) {
consumerChannel, err := newRabbitChannel(r.Connection)
2015-05-20 22:57:19 +01:00
if err != nil {
return nil, err
}
2015-05-21 19:24:57 +01:00
err = consumerChannel.DeclareQueue(queue)
2015-05-20 22:57:19 +01:00
if err != nil {
return nil, err
}
2015-05-21 19:24:57 +01:00
deliveries, err := consumerChannel.ConsumeQueue(queue)
2015-05-20 22:57:19 +01:00
if err != nil {
return nil, err
}
2015-05-21 19:24:57 +01:00
err = consumerChannel.BindQueue(queue, r.exchange)
2015-05-20 22:57:19 +01:00
if err != nil {
return nil, err
}
return deliveries, nil
}
2015-05-21 21:08:19 +01:00
func (r *rabbitMQConn) Publish(exchange, key string, msg amqp.Publishing) error {
2015-05-21 19:24:57 +01:00
return r.ExchangeChannel.Publish(exchange, key, msg)
2015-05-20 22:57:19 +01:00
}