diff --git a/broker/rabbitmq/channel.go b/broker/rabbitmq/channel.go new file mode 100644 index 00000000..28dc1eae --- /dev/null +++ b/broker/rabbitmq/channel.go @@ -0,0 +1,127 @@ +package rabbitmq + +// +// All credit to Mondo +// + +import ( + "errors" + + "github.com/nu7hatch/gouuid" + "github.com/streadway/amqp" +) + +type rabbitMQChannel struct { + uuid string + connection *amqp.Connection + channel *amqp.Channel +} + +func newRabbitChannel(conn *amqp.Connection) (*rabbitMQChannel, error) { + id, err := uuid.NewV4() + if err != nil { + return nil, err + } + rabbitCh := &rabbitMQChannel{ + uuid: id.String(), + connection: conn, + } + if err := rabbitCh.Connect(); err != nil { + return nil, err + } + return rabbitCh, nil + +} + +func (r *rabbitMQChannel) Connect() error { + var err error + r.channel, err = r.connection.Channel() + if err != nil { + return err + } + return nil +} + +func (r *rabbitMQChannel) Close() error { + if r.channel == nil { + return errors.New("Channel is nil") + } + return r.channel.Close() +} + +func (r *rabbitMQChannel) Publish(exchange, key string, message amqp.Publishing) error { + if r.channel == nil { + return errors.New("Channel is nil") + } + return r.channel.Publish(exchange, key, false, false, message) +} + +func (r *rabbitMQChannel) DeclareExchange(exchange string) error { + return r.channel.ExchangeDeclare( + exchange, // name + "topic", // kind + false, // durable + false, // autoDelete + false, // internal + false, // noWait + nil, // args + ) +} + +func (r *rabbitMQChannel) DeclareQueue(queue string) error { + _, err := r.channel.QueueDeclare( + queue, // name + false, // durable + true, // autoDelete + false, // exclusive + false, // noWait + nil, // args + ) + return err +} + +func (r *rabbitMQChannel) DeclareDurableQueue(queue string) error { + _, err := r.channel.QueueDeclare( + queue, // name + true, // durable + false, // autoDelete + false, // exclusive + false, // noWait + nil, // args + ) + return err +} + +func (r *rabbitMQChannel) DeclareReplyQueue(queue string) error { + _, err := r.channel.QueueDeclare( + queue, // name + false, // durable + true, // autoDelete + true, // exclusive + false, // noWait + nil, // args + ) + return err +} + +func (r *rabbitMQChannel) ConsumeQueue(queue string) (<-chan amqp.Delivery, error) { + return r.channel.Consume( + queue, // queue + r.uuid, // consumer + true, // autoAck + false, // exclusive + false, // nolocal + false, // nowait + nil, // args + ) +} + +func (r *rabbitMQChannel) BindQueue(queue, exchange string) error { + return r.channel.QueueBind( + queue, // name + queue, // key + exchange, // exchange + false, // noWait + nil, // args + ) +} diff --git a/broker/rabbitmq/connection.go b/broker/rabbitmq/connection.go new file mode 100644 index 00000000..0f079581 --- /dev/null +++ b/broker/rabbitmq/connection.go @@ -0,0 +1,147 @@ +package rabbitmq + +// +// All credit to Mondo +// + +import ( + "strings" + "sync" + "time" + + "github.com/streadway/amqp" +) + +var ( + DefaultExchange = "micro" + DefaultRabbitURL = "amqp://guest:guest@127.0.0.1:5672" +) + +type rabbitMQConn struct { + Connection *amqp.Connection + Channel *rabbitMQChannel + ExchangeChannel *rabbitMQChannel + notify chan bool + exchange string + url string + + connected bool + + mtx sync.Mutex + close chan bool + closed bool +} + +func newRabbitMQConn(exchange string, urls []string) *rabbitMQConn { + var url string + + if len(urls) > 0 && strings.HasPrefix(urls[0], "amqp://") { + url = urls[0] + } else { + url = DefaultRabbitURL + } + + if len(exchange) == 0 { + exchange = DefaultExchange + } + + return &rabbitMQConn{ + exchange: exchange, + url: url, + notify: make(chan bool, 1), + close: make(chan bool), + } +} + +func (r *rabbitMQConn) Init() chan bool { + go r.Connect(r.notify) + return r.notify +} + +func (r *rabbitMQConn) Connect(connected chan bool) { + for { + if err := r.tryToConnect(); err != nil { + time.Sleep(1 * time.Second) + continue + } + connected <- true + r.connected = true + notifyClose := make(chan *amqp.Error) + r.Connection.NotifyClose(notifyClose) + + // Block until we get disconnected, or shut down + select { + case <-notifyClose: + // Spin around and reconnect + r.connected = false + case <-r.close: + // Shut down connection + if err := r.Connection.Close(); err != nil { + } + r.connected = false + return + } + } +} + +func (r *rabbitMQConn) IsConnected() bool { + return r.connected +} + +func (r *rabbitMQConn) Close() { + r.mtx.Lock() + defer r.mtx.Unlock() + + if r.closed { + return + } + + close(r.close) + r.closed = true +} + +func (r *rabbitMQConn) tryToConnect() error { + var err error + r.Connection, err = amqp.Dial(r.url) + if err != nil { + return err + } + r.Channel, err = newRabbitChannel(r.Connection) + if err != nil { + return err + } + r.Channel.DeclareExchange(r.exchange) + r.ExchangeChannel, err = newRabbitChannel(r.Connection) + if err != nil { + return err + } + return nil +} + +func (r *rabbitMQConn) Consume(queue string) (*rabbitMQChannel, <-chan amqp.Delivery, error) { + consumerChannel, err := newRabbitChannel(r.Connection) + if err != nil { + return nil, nil, err + } + + err = consumerChannel.DeclareQueue(queue) + if err != nil { + return nil, nil, err + } + + deliveries, err := consumerChannel.ConsumeQueue(queue) + if err != nil { + return nil, nil, err + } + + err = consumerChannel.BindQueue(queue, r.exchange) + if err != nil { + return nil, nil, err + } + + return consumerChannel, deliveries, nil +} + +func (r *rabbitMQConn) Publish(exchange, key string, msg amqp.Publishing) error { + return r.ExchangeChannel.Publish(exchange, key, msg) +} diff --git a/broker/rabbitmq/rabbitmq.go b/broker/rabbitmq/rabbitmq.go new file mode 100644 index 00000000..903dfcb5 --- /dev/null +++ b/broker/rabbitmq/rabbitmq.go @@ -0,0 +1,103 @@ +package rabbitmq + +import ( + "time" + + "code.google.com/p/go-uuid/uuid" + "github.com/myodc/go-micro/broker" + c "github.com/myodc/go-micro/context" + "github.com/streadway/amqp" + "golang.org/x/net/context" +) + +type rbroker struct { + conn *rabbitMQConn + addrs []string +} + +type subscriber struct { + topic string + ch *rabbitMQChannel +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Unsubscribe() error { + return s.ch.Close() +} + +func (r *rbroker) Publish(ctx context.Context, topic string, body []byte) error { + header, _ := c.GetMetadata(ctx) + + msg := amqp.Publishing{ + MessageId: uuid.NewUUID().String(), + Timestamp: time.Now().UTC(), + Body: body, + Headers: amqp.Table{}, + } + + for k, v := range header { + msg.Headers[k] = v + } + + return r.conn.Publish("", topic, msg) +} + +func (r *rbroker) Subscribe(topic string, function func(context.Context, *broker.Message)) (broker.Subscriber, error) { + ch, sub, err := r.conn.Consume(topic) + if err != nil { + return nil, err + } + + fn := func(msg amqp.Delivery) { + header := make(map[string]string) + for k, v := range msg.Headers { + header[k], _ = v.(string) + } + ctx := c.WithMetadata(context.Background(), header) + function(ctx, &broker.Message{ + Id: msg.MessageId, + Timestamp: msg.Timestamp.Unix(), + Topic: topic, + Body: msg.Body, + }) + } + + go func() { + for d := range sub { + go fn(d) + } + }() + + return &subscriber{ch: ch, topic: topic}, nil +} + +func (r *rbroker) Address() string { + if len(r.addrs) > 0 { + return r.addrs[0] + } + return "" +} + +func (r *rbroker) Init() error { + return nil +} + +func (r *rbroker) Connect() error { + <-r.conn.Init() + return nil +} + +func (r *rbroker) Disconnect() error { + r.conn.Close() + return nil +} + +func NewBroker(addrs []string, opt ...broker.Option) broker.Broker { + return &rbroker{ + conn: newRabbitMQConn("", addrs), + addrs: addrs, + } +} diff --git a/cmd/cmd.go b/cmd/cmd.go index 8bff9da5..1f67cf2e 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -17,6 +17,7 @@ import ( // brokers "github.com/myodc/go-micro/broker/http" "github.com/myodc/go-micro/broker/nats" + "github.com/myodc/go-micro/broker/rabbitmq" // registries "github.com/myodc/go-micro/registry/consul" @@ -26,7 +27,7 @@ import ( // transport thttp "github.com/myodc/go-micro/transport/http" tnats "github.com/myodc/go-micro/transport/nats" - "github.com/myodc/go-micro/transport/rabbitmq" + trmq "github.com/myodc/go-micro/transport/rabbitmq" ) var ( @@ -97,6 +98,8 @@ func Setup(c *cli.Context) error { broker.DefaultBroker = http.NewBroker(bAddrs) case "nats": broker.DefaultBroker = nats.NewBroker(bAddrs) + case "rabbitmq": + broker.DefaultBroker = rabbitmq.NewBroker(bAddrs) } rAddrs := strings.Split(c.String("registry_address"), ",") @@ -116,7 +119,7 @@ func Setup(c *cli.Context) error { case "http": transport.DefaultTransport = thttp.NewTransport(tAddrs) case "rabbitmq": - transport.DefaultTransport = rabbitmq.NewTransport(tAddrs) + transport.DefaultTransport = trmq.NewTransport(tAddrs) case "nats": transport.DefaultTransport = tnats.NewTransport(tAddrs) }