Move plugins to go-plugins
This commit is contained in:
parent
469b12ecea
commit
04e07f4b39
@ -1,11 +0,0 @@
|
||||
package http
|
||||
|
||||
// This is a hack
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/broker"
|
||||
)
|
||||
|
||||
func NewBroker(addrs []string, opt ...broker.Option) broker.Broker {
|
||||
return broker.NewBroker(addrs, opt...)
|
||||
}
|
@ -1,98 +0,0 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
"github.com/apcera/nats"
|
||||
"github.com/micro/go-micro/broker"
|
||||
)
|
||||
|
||||
type nbroker struct {
|
||||
addrs []string
|
||||
conn *nats.Conn
|
||||
}
|
||||
|
||||
type subscriber struct {
|
||||
s *nats.Subscription
|
||||
}
|
||||
|
||||
func (n *subscriber) Topic() string {
|
||||
return n.s.Subject
|
||||
}
|
||||
|
||||
func (n *subscriber) Unsubscribe() error {
|
||||
return n.s.Unsubscribe()
|
||||
}
|
||||
|
||||
func (n *nbroker) Address() string {
|
||||
if len(n.addrs) > 0 {
|
||||
return n.addrs[0]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (n *nbroker) Connect() error {
|
||||
if n.conn != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
opts := nats.DefaultOptions
|
||||
opts.Servers = n.addrs
|
||||
c, err := opts.Connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.conn = c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nbroker) Disconnect() error {
|
||||
n.conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nbroker) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nbroker) Publish(topic string, msg *broker.Message) error {
|
||||
b, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return n.conn.Publish(topic, b)
|
||||
}
|
||||
|
||||
func (n *nbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) {
|
||||
sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) {
|
||||
var m *broker.Message
|
||||
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||
return
|
||||
}
|
||||
handler(m)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &subscriber{s: sub}, nil
|
||||
}
|
||||
|
||||
func NewBroker(addrs []string, opt ...broker.Option) broker.Broker {
|
||||
var cAddrs []string
|
||||
for _, addr := range addrs {
|
||||
if len(addr) == 0 {
|
||||
continue
|
||||
}
|
||||
if !strings.HasPrefix(addr, "nats://") {
|
||||
addr = "nats://" + addr
|
||||
}
|
||||
cAddrs = append(cAddrs, addr)
|
||||
}
|
||||
if len(cAddrs) == 0 {
|
||||
cAddrs = []string{nats.DefaultURL}
|
||||
}
|
||||
return &nbroker{
|
||||
addrs: cAddrs,
|
||||
}
|
||||
}
|
@ -1,127 +0,0 @@
|
||||
package rabbitmq
|
||||
|
||||
//
|
||||
// All credit to Mondo
|
||||
//
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/nu7hatch/gouuid"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type rabbitMQChannel struct {
|
||||
uuid string
|
||||
connection *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
}
|
||||
|
||||
func newRabbitChannel(conn *amqp.Connection) (*rabbitMQChannel, error) {
|
||||
id, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rabbitCh := &rabbitMQChannel{
|
||||
uuid: id.String(),
|
||||
connection: conn,
|
||||
}
|
||||
if err := rabbitCh.Connect(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rabbitCh, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) Connect() error {
|
||||
var err error
|
||||
r.channel, err = r.connection.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) Close() error {
|
||||
if r.channel == nil {
|
||||
return errors.New("Channel is nil")
|
||||
}
|
||||
return r.channel.Close()
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) Publish(exchange, key string, message amqp.Publishing) error {
|
||||
if r.channel == nil {
|
||||
return errors.New("Channel is nil")
|
||||
}
|
||||
return r.channel.Publish(exchange, key, false, false, message)
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) DeclareExchange(exchange string) error {
|
||||
return r.channel.ExchangeDeclare(
|
||||
exchange, // name
|
||||
"topic", // kind
|
||||
false, // durable
|
||||
false, // autoDelete
|
||||
false, // internal
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) DeclareQueue(queue string) error {
|
||||
_, err := r.channel.QueueDeclare(
|
||||
queue, // name
|
||||
false, // durable
|
||||
true, // autoDelete
|
||||
false, // exclusive
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) DeclareDurableQueue(queue string) error {
|
||||
_, err := r.channel.QueueDeclare(
|
||||
queue, // name
|
||||
true, // durable
|
||||
false, // autoDelete
|
||||
false, // exclusive
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) DeclareReplyQueue(queue string) error {
|
||||
_, err := r.channel.QueueDeclare(
|
||||
queue, // name
|
||||
false, // durable
|
||||
true, // autoDelete
|
||||
true, // exclusive
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) ConsumeQueue(queue string) (<-chan amqp.Delivery, error) {
|
||||
return r.channel.Consume(
|
||||
queue, // queue
|
||||
r.uuid, // consumer
|
||||
true, // autoAck
|
||||
false, // exclusive
|
||||
false, // nolocal
|
||||
false, // nowait
|
||||
nil, // args
|
||||
)
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) BindQueue(queue, exchange string) error {
|
||||
return r.channel.QueueBind(
|
||||
queue, // name
|
||||
queue, // key
|
||||
exchange, // exchange
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
}
|
@ -1,147 +0,0 @@
|
||||
package rabbitmq
|
||||
|
||||
//
|
||||
// All credit to Mondo
|
||||
//
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultExchange = "micro"
|
||||
DefaultRabbitURL = "amqp://guest:guest@127.0.0.1:5672"
|
||||
)
|
||||
|
||||
type rabbitMQConn struct {
|
||||
Connection *amqp.Connection
|
||||
Channel *rabbitMQChannel
|
||||
ExchangeChannel *rabbitMQChannel
|
||||
notify chan bool
|
||||
exchange string
|
||||
url string
|
||||
|
||||
connected bool
|
||||
|
||||
mtx sync.Mutex
|
||||
close chan bool
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newRabbitMQConn(exchange string, urls []string) *rabbitMQConn {
|
||||
var url string
|
||||
|
||||
if len(urls) > 0 && strings.HasPrefix(urls[0], "amqp://") {
|
||||
url = urls[0]
|
||||
} else {
|
||||
url = DefaultRabbitURL
|
||||
}
|
||||
|
||||
if len(exchange) == 0 {
|
||||
exchange = DefaultExchange
|
||||
}
|
||||
|
||||
return &rabbitMQConn{
|
||||
exchange: exchange,
|
||||
url: url,
|
||||
notify: make(chan bool, 1),
|
||||
close: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Init() chan bool {
|
||||
go r.Connect(r.notify)
|
||||
return r.notify
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Connect(connected chan bool) {
|
||||
for {
|
||||
if err := r.tryToConnect(); err != nil {
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
connected <- true
|
||||
r.connected = true
|
||||
notifyClose := make(chan *amqp.Error)
|
||||
r.Connection.NotifyClose(notifyClose)
|
||||
|
||||
// Block until we get disconnected, or shut down
|
||||
select {
|
||||
case <-notifyClose:
|
||||
// Spin around and reconnect
|
||||
r.connected = false
|
||||
case <-r.close:
|
||||
// Shut down connection
|
||||
if err := r.Connection.Close(); err != nil {
|
||||
}
|
||||
r.connected = false
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) IsConnected() bool {
|
||||
return r.connected
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Close() {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
if r.closed {
|
||||
return
|
||||
}
|
||||
|
||||
close(r.close)
|
||||
r.closed = true
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) tryToConnect() error {
|
||||
var err error
|
||||
r.Connection, err = amqp.Dial(r.url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Channel, err = newRabbitChannel(r.Connection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Channel.DeclareExchange(r.exchange)
|
||||
r.ExchangeChannel, err = newRabbitChannel(r.Connection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Consume(queue string) (*rabbitMQChannel, <-chan amqp.Delivery, error) {
|
||||
consumerChannel, err := newRabbitChannel(r.Connection)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
err = consumerChannel.DeclareQueue(queue)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
deliveries, err := consumerChannel.ConsumeQueue(queue)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
err = consumerChannel.BindQueue(queue, r.exchange)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return consumerChannel, deliveries, nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Publish(exchange, key string, msg amqp.Publishing) error {
|
||||
return r.ExchangeChannel.Publish(exchange, key, msg)
|
||||
}
|
@ -1,91 +0,0 @@
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type rbroker struct {
|
||||
conn *rabbitMQConn
|
||||
addrs []string
|
||||
}
|
||||
|
||||
type subscriber struct {
|
||||
topic string
|
||||
ch *rabbitMQChannel
|
||||
}
|
||||
|
||||
func (s *subscriber) Topic() string {
|
||||
return s.topic
|
||||
}
|
||||
|
||||
func (s *subscriber) Unsubscribe() error {
|
||||
return s.ch.Close()
|
||||
}
|
||||
|
||||
func (r *rbroker) Publish(topic string, msg *broker.Message) error {
|
||||
m := amqp.Publishing{
|
||||
Body: msg.Body,
|
||||
Headers: amqp.Table{},
|
||||
}
|
||||
|
||||
for k, v := range msg.Header {
|
||||
m.Headers[k] = v
|
||||
}
|
||||
|
||||
return r.conn.Publish("", topic, m)
|
||||
}
|
||||
|
||||
func (r *rbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) {
|
||||
ch, sub, err := r.conn.Consume(topic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fn := func(msg amqp.Delivery) {
|
||||
header := make(map[string]string)
|
||||
for k, v := range msg.Headers {
|
||||
header[k], _ = v.(string)
|
||||
}
|
||||
handler(&broker.Message{
|
||||
Header: header,
|
||||
Body: msg.Body,
|
||||
})
|
||||
}
|
||||
|
||||
go func() {
|
||||
for d := range sub {
|
||||
go fn(d)
|
||||
}
|
||||
}()
|
||||
|
||||
return &subscriber{ch: ch, topic: topic}, nil
|
||||
}
|
||||
|
||||
func (r *rbroker) Address() string {
|
||||
if len(r.addrs) > 0 {
|
||||
return r.addrs[0]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (r *rbroker) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rbroker) Connect() error {
|
||||
<-r.conn.Init()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rbroker) Disconnect() error {
|
||||
r.conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewBroker(addrs []string, opt ...broker.Option) broker.Broker {
|
||||
return &rbroker{
|
||||
conn: newRabbitMQConn("", addrs),
|
||||
addrs: addrs,
|
||||
}
|
||||
}
|
27
cmd/cmd.go
27
cmd/cmd.go
@ -15,21 +15,6 @@ import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/server"
|
||||
"github.com/micro/go-micro/transport"
|
||||
|
||||
// brokers
|
||||
"github.com/micro/go-micro/broker/http"
|
||||
"github.com/micro/go-micro/broker/nats"
|
||||
"github.com/micro/go-micro/broker/rabbitmq"
|
||||
|
||||
// registries
|
||||
"github.com/micro/go-micro/registry/consul"
|
||||
"github.com/micro/go-micro/registry/etcd"
|
||||
"github.com/micro/go-micro/registry/memory"
|
||||
|
||||
// transport
|
||||
thttp "github.com/micro/go-micro/transport/http"
|
||||
tnats "github.com/micro/go-micro/transport/nats"
|
||||
trmq "github.com/micro/go-micro/transport/rabbitmq"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -132,21 +117,15 @@ var (
|
||||
}
|
||||
|
||||
Brokers = map[string]func([]string, ...broker.Option) broker.Broker{
|
||||
"http": http.NewBroker,
|
||||
"nats": nats.NewBroker,
|
||||
"rabbitmq": rabbitmq.NewBroker,
|
||||
"http": broker.NewBroker,
|
||||
}
|
||||
|
||||
Registries = map[string]func([]string, ...registry.Option) registry.Registry{
|
||||
"consul": consul.NewRegistry,
|
||||
"etcd": etcd.NewRegistry,
|
||||
"memory": memory.NewRegistry,
|
||||
"consul": registry.NewRegistry,
|
||||
}
|
||||
|
||||
Transports = map[string]func([]string, ...transport.Option) transport.Transport{
|
||||
"http": thttp.NewTransport,
|
||||
"rabbitmq": trmq.NewTransport,
|
||||
"nats": tnats.NewTransport,
|
||||
"http": transport.NewTransport,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -1,11 +0,0 @@
|
||||
package consul
|
||||
|
||||
// This is a hack
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry {
|
||||
return registry.NewRegistry(addrs, opt...)
|
||||
}
|
@ -1,193 +0,0 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
prefix = "/micro-registry"
|
||||
)
|
||||
|
||||
type etcdRegistry struct {
|
||||
client etcd.KeysAPI
|
||||
|
||||
sync.RWMutex
|
||||
services map[string][]*registry.Service
|
||||
}
|
||||
|
||||
func encode(s *registry.Service) string {
|
||||
b, _ := json.Marshal(s)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func decode(ds string) *registry.Service {
|
||||
var s *registry.Service
|
||||
json.Unmarshal([]byte(ds), &s)
|
||||
return s
|
||||
}
|
||||
|
||||
func nodePath(s, id string) string {
|
||||
service := strings.Replace(s, "/", "-", -1)
|
||||
node := strings.Replace(id, "/", "-", -1)
|
||||
return filepath.Join(prefix, service, node)
|
||||
}
|
||||
|
||||
func servicePath(s string) string {
|
||||
return filepath.Join(prefix, strings.Replace(s, "/", "-", -1))
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Deregister(s *registry.Service) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
|
||||
for _, node := range s.Nodes {
|
||||
_, err := e.client.Delete(context.Background(), nodePath(s.Name, node.Id), &etcd.DeleteOptions{Recursive: false})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
e.client.Delete(context.Background(), servicePath(s.Name), &etcd.DeleteOptions{Dir: true})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Register(s *registry.Service) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
|
||||
service := ®istry.Service{
|
||||
Name: s.Name,
|
||||
Version: s.Version,
|
||||
Metadata: s.Metadata,
|
||||
Endpoints: s.Endpoints,
|
||||
}
|
||||
|
||||
e.client.Set(context.Background(), servicePath(s.Name), "", &etcd.SetOptions{Dir: true})
|
||||
|
||||
for _, node := range s.Nodes {
|
||||
service.Nodes = []*registry.Node{node}
|
||||
_, err := e.client.Set(context.Background(), nodePath(service.Name, node.Id), encode(service), &etcd.SetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) {
|
||||
e.RLock()
|
||||
service, ok := e.services[name]
|
||||
e.RUnlock()
|
||||
|
||||
if ok {
|
||||
return service, nil
|
||||
}
|
||||
|
||||
rsp, err := e.client.Get(context.Background(), servicePath(name), &etcd.GetOptions{})
|
||||
if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serviceMap := map[string]*registry.Service{}
|
||||
|
||||
for _, n := range rsp.Node.Nodes {
|
||||
if n.Dir {
|
||||
continue
|
||||
}
|
||||
sn := decode(n.Value)
|
||||
|
||||
s, ok := serviceMap[sn.Version]
|
||||
if !ok {
|
||||
s = ®istry.Service{
|
||||
Name: sn.Name,
|
||||
Version: sn.Version,
|
||||
Metadata: sn.Metadata,
|
||||
Endpoints: sn.Endpoints,
|
||||
}
|
||||
serviceMap[s.Version] = s
|
||||
}
|
||||
|
||||
for _, node := range sn.Nodes {
|
||||
s.Nodes = append(s.Nodes, node)
|
||||
}
|
||||
}
|
||||
|
||||
var services []*registry.Service
|
||||
for _, service := range serviceMap {
|
||||
services = append(services, service)
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) ListServices() ([]*registry.Service, error) {
|
||||
e.RLock()
|
||||
serviceMap := e.services
|
||||
e.RUnlock()
|
||||
|
||||
var services []*registry.Service
|
||||
|
||||
if len(serviceMap) > 0 {
|
||||
for _, service := range serviceMap {
|
||||
services = append(services, service...)
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
rsp, err := e.client.Get(context.Background(), prefix, &etcd.GetOptions{Recursive: true, Sort: true})
|
||||
if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, node := range rsp.Node.Nodes {
|
||||
service := ®istry.Service{}
|
||||
for _, n := range node.Nodes {
|
||||
i := decode(n.Value)
|
||||
service.Name = i.Name
|
||||
}
|
||||
services = append(services, service)
|
||||
}
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (e *etcdRegistry) Watch() (registry.Watcher, error) {
|
||||
// todo: fix watcher
|
||||
return newEtcdWatcher(e)
|
||||
}
|
||||
|
||||
func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry {
|
||||
var cAddrs []string
|
||||
|
||||
for _, addr := range addrs {
|
||||
if len(addr) == 0 {
|
||||
continue
|
||||
}
|
||||
cAddrs = append(cAddrs, addr)
|
||||
}
|
||||
|
||||
if len(cAddrs) == 0 {
|
||||
cAddrs = []string{"http://127.0.0.1:2379"}
|
||||
}
|
||||
|
||||
c, _ := etcd.New(etcd.Config{
|
||||
Endpoints: cAddrs,
|
||||
})
|
||||
|
||||
e := &etcdRegistry{
|
||||
client: etcd.NewKeysAPI(c),
|
||||
services: make(map[string][]*registry.Service),
|
||||
}
|
||||
|
||||
return e
|
||||
}
|
@ -1,151 +0,0 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type etcdWatcher struct {
|
||||
registry *etcdRegistry
|
||||
stop chan bool
|
||||
}
|
||||
|
||||
func addNodes(old, neu []*registry.Node) []*registry.Node {
|
||||
for _, n := range neu {
|
||||
var seen bool
|
||||
for i, o := range old {
|
||||
if o.Id == n.Id {
|
||||
seen = true
|
||||
old[i] = n
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
old = append(old, n)
|
||||
}
|
||||
}
|
||||
return old
|
||||
}
|
||||
|
||||
func addServices(old, neu []*registry.Service) []*registry.Service {
|
||||
for _, s := range neu {
|
||||
var seen bool
|
||||
for i, o := range old {
|
||||
if o.Version == s.Version {
|
||||
s.Nodes = addNodes(o.Nodes, s.Nodes)
|
||||
seen = true
|
||||
old[i] = s
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
old = append(old, s)
|
||||
}
|
||||
}
|
||||
return old
|
||||
}
|
||||
|
||||
func delNodes(old, del []*registry.Node) []*registry.Node {
|
||||
var nodes []*registry.Node
|
||||
for _, o := range old {
|
||||
var rem bool
|
||||
for _, n := range del {
|
||||
if o.Id == n.Id {
|
||||
rem = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !rem {
|
||||
nodes = append(nodes, o)
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func delServices(old, del []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
for i, o := range old {
|
||||
var rem bool
|
||||
for _, s := range del {
|
||||
if o.Version == s.Version {
|
||||
old[i].Nodes = delNodes(o.Nodes, s.Nodes)
|
||||
if len(old[i].Nodes) == 0 {
|
||||
rem = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !rem {
|
||||
services = append(services, o)
|
||||
}
|
||||
}
|
||||
return services
|
||||
}
|
||||
|
||||
func newEtcdWatcher(r *etcdRegistry) (registry.Watcher, error) {
|
||||
ew := &etcdWatcher{
|
||||
registry: r,
|
||||
stop: make(chan bool),
|
||||
}
|
||||
|
||||
w := r.client.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
|
||||
|
||||
c := context.Background()
|
||||
ctx, cancel := context.WithCancel(c)
|
||||
|
||||
go func() {
|
||||
<-ew.stop
|
||||
cancel()
|
||||
}()
|
||||
|
||||
go ew.watch(ctx, w)
|
||||
|
||||
return ew, nil
|
||||
}
|
||||
|
||||
func (e *etcdWatcher) watch(ctx context.Context, w etcd.Watcher) {
|
||||
for {
|
||||
rsp, err := w.Next(ctx)
|
||||
if err != nil && ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if rsp.Node.Dir {
|
||||
continue
|
||||
}
|
||||
|
||||
s := decode(rsp.Node.Value)
|
||||
if s == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
e.registry.Lock()
|
||||
|
||||
service, ok := e.registry.services[s.Name]
|
||||
if !ok {
|
||||
if rsp.Action == "create" {
|
||||
e.registry.services[s.Name] = []*registry.Service{s}
|
||||
}
|
||||
e.registry.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
switch rsp.Action {
|
||||
case "delete":
|
||||
services := delServices(service, []*registry.Service{s})
|
||||
if len(services) > 0 {
|
||||
e.registry.services[s.Name] = services
|
||||
} else {
|
||||
delete(e.registry.services, s.Name)
|
||||
}
|
||||
case "create":
|
||||
e.registry.services[s.Name] = addServices(service, []*registry.Service{s})
|
||||
}
|
||||
|
||||
e.registry.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (ew *etcdWatcher) Stop() {
|
||||
ew.stop <- true
|
||||
}
|
@ -1,372 +0,0 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/pborman/uuid"
|
||||
)
|
||||
|
||||
type action int
|
||||
|
||||
const (
|
||||
addAction action = iota
|
||||
delAction
|
||||
syncAction
|
||||
)
|
||||
|
||||
type broadcast struct {
|
||||
msg []byte
|
||||
notify chan<- struct{}
|
||||
}
|
||||
|
||||
type delegate struct {
|
||||
broadcasts *memberlist.TransmitLimitedQueue
|
||||
updates chan *update
|
||||
}
|
||||
|
||||
type memoryRegistry struct {
|
||||
broadcasts *memberlist.TransmitLimitedQueue
|
||||
updates chan *update
|
||||
|
||||
sync.RWMutex
|
||||
services map[string][]*registry.Service
|
||||
}
|
||||
|
||||
type update struct {
|
||||
Action action
|
||||
Service *registry.Service
|
||||
sync chan *registry.Service
|
||||
}
|
||||
|
||||
type watcher struct{}
|
||||
|
||||
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (b *broadcast) Message() []byte {
|
||||
return b.msg
|
||||
}
|
||||
|
||||
func (b *broadcast) Finished() {
|
||||
if b.notify != nil {
|
||||
close(b.notify)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *delegate) NodeMeta(limit int) []byte {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
func (d *delegate) NotifyMsg(b []byte) {
|
||||
if len(b) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
buf := make([]byte, len(b))
|
||||
copy(buf, b)
|
||||
|
||||
go func() {
|
||||
switch buf[0] {
|
||||
case 'd': // data
|
||||
var updates []*update
|
||||
if err := json.Unmarshal(buf[1:], &updates); err != nil {
|
||||
return
|
||||
}
|
||||
for _, u := range updates {
|
||||
d.updates <- u
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
||||
return d.broadcasts.GetBroadcasts(overhead, limit)
|
||||
}
|
||||
|
||||
func (d *delegate) LocalState(join bool) []byte {
|
||||
if !join {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
syncCh := make(chan *registry.Service, 1)
|
||||
m := map[string][]*registry.Service{}
|
||||
|
||||
d.updates <- &update{
|
||||
Action: syncAction,
|
||||
sync: syncCh,
|
||||
}
|
||||
|
||||
for s := range syncCh {
|
||||
m[s.Name] = append(m[s.Name], s)
|
||||
}
|
||||
|
||||
b, _ := json.Marshal(m)
|
||||
return b
|
||||
}
|
||||
|
||||
func (d *delegate) MergeRemoteState(buf []byte, join bool) {
|
||||
if len(buf) == 0 {
|
||||
return
|
||||
}
|
||||
if !join {
|
||||
return
|
||||
}
|
||||
|
||||
var m map[string][]*registry.Service
|
||||
if err := json.Unmarshal(buf, &m); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, services := range m {
|
||||
for _, service := range services {
|
||||
d.updates <- &update{
|
||||
Action: addAction,
|
||||
Service: service,
|
||||
sync: nil,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *memoryRegistry) run() {
|
||||
for u := range m.updates {
|
||||
switch u.Action {
|
||||
case addAction:
|
||||
m.Lock()
|
||||
if service, ok := m.services[u.Service.Name]; !ok {
|
||||
m.services[u.Service.Name] = []*registry.Service{u.Service}
|
||||
} else {
|
||||
m.services[u.Service.Name] = addServices(service, []*registry.Service{u.Service})
|
||||
}
|
||||
m.Unlock()
|
||||
case delAction:
|
||||
m.Lock()
|
||||
if service, ok := m.services[u.Service.Name]; ok {
|
||||
if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 {
|
||||
delete(m.services, u.Service.Name)
|
||||
} else {
|
||||
m.services[u.Service.Name] = services
|
||||
}
|
||||
}
|
||||
m.Unlock()
|
||||
case syncAction:
|
||||
if u.sync == nil {
|
||||
continue
|
||||
}
|
||||
m.RLock()
|
||||
for _, services := range m.services {
|
||||
for _, service := range services {
|
||||
u.sync <- service
|
||||
}
|
||||
}
|
||||
m.RUnlock()
|
||||
close(u.sync)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *memoryRegistry) Register(s *registry.Service) error {
|
||||
m.Lock()
|
||||
if service, ok := m.services[s.Name]; !ok {
|
||||
m.services[s.Name] = []*registry.Service{s}
|
||||
} else {
|
||||
m.services[s.Name] = addServices(service, []*registry.Service{s})
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
b, _ := json.Marshal([]*update{
|
||||
&update{
|
||||
Action: addAction,
|
||||
Service: s,
|
||||
},
|
||||
})
|
||||
|
||||
m.broadcasts.QueueBroadcast(&broadcast{
|
||||
msg: append([]byte("d"), b...),
|
||||
notify: nil,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryRegistry) Deregister(s *registry.Service) error {
|
||||
m.Lock()
|
||||
if service, ok := m.services[s.Name]; ok {
|
||||
if services := delServices(service, []*registry.Service{s}); len(services) == 0 {
|
||||
delete(m.services, s.Name)
|
||||
} else {
|
||||
m.services[s.Name] = services
|
||||
}
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
b, _ := json.Marshal([]*update{
|
||||
&update{
|
||||
Action: delAction,
|
||||
Service: s,
|
||||
},
|
||||
})
|
||||
|
||||
m.broadcasts.QueueBroadcast(&broadcast{
|
||||
msg: append([]byte("d"), b...),
|
||||
notify: nil,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryRegistry) GetService(name string) ([]*registry.Service, error) {
|
||||
m.RLock()
|
||||
service, ok := m.services[name]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Service %s not found", name)
|
||||
}
|
||||
return service, nil
|
||||
}
|
||||
|
||||
func (m *memoryRegistry) ListServices() ([]*registry.Service, error) {
|
||||
var services []*registry.Service
|
||||
m.RLock()
|
||||
for _, service := range m.services {
|
||||
services = append(services, service...)
|
||||
}
|
||||
m.RUnlock()
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (m *memoryRegistry) Watch() (registry.Watcher, error) {
|
||||
return &watcher{}, nil
|
||||
}
|
||||
|
||||
func (w *watcher) Stop() {
|
||||
return
|
||||
}
|
||||
|
||||
func addNodes(old, neu []*registry.Node) []*registry.Node {
|
||||
for _, n := range neu {
|
||||
var seen bool
|
||||
for i, o := range old {
|
||||
if o.Id == n.Id {
|
||||
seen = true
|
||||
old[i] = n
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
old = append(old, n)
|
||||
}
|
||||
}
|
||||
return old
|
||||
}
|
||||
|
||||
func addServices(old, neu []*registry.Service) []*registry.Service {
|
||||
for _, s := range neu {
|
||||
var seen bool
|
||||
for i, o := range old {
|
||||
if o.Version == s.Version {
|
||||
s.Nodes = addNodes(o.Nodes, s.Nodes)
|
||||
seen = true
|
||||
old[i] = s
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
old = append(old, s)
|
||||
}
|
||||
}
|
||||
return old
|
||||
}
|
||||
|
||||
func delNodes(old, del []*registry.Node) []*registry.Node {
|
||||
var nodes []*registry.Node
|
||||
for _, o := range old {
|
||||
var rem bool
|
||||
for _, n := range del {
|
||||
if o.Id == n.Id {
|
||||
rem = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !rem {
|
||||
nodes = append(nodes, o)
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func delServices(old, del []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
for i, o := range old {
|
||||
var rem bool
|
||||
for _, s := range del {
|
||||
if o.Version == s.Version {
|
||||
old[i].Nodes = delNodes(o.Nodes, s.Nodes)
|
||||
if len(old[i].Nodes) == 0 {
|
||||
rem = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !rem {
|
||||
services = append(services, o)
|
||||
}
|
||||
}
|
||||
return services
|
||||
}
|
||||
|
||||
func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry {
|
||||
cAddrs := []string{}
|
||||
hostname, _ := os.Hostname()
|
||||
updates := make(chan *update, 100)
|
||||
|
||||
for _, addr := range addrs {
|
||||
if len(addr) > 0 {
|
||||
cAddrs = append(cAddrs, addr)
|
||||
}
|
||||
}
|
||||
|
||||
broadcasts := &memberlist.TransmitLimitedQueue{
|
||||
NumNodes: func() int {
|
||||
return len(cAddrs)
|
||||
},
|
||||
RetransmitMult: 3,
|
||||
}
|
||||
|
||||
mr := &memoryRegistry{
|
||||
broadcasts: broadcasts,
|
||||
services: make(map[string][]*registry.Service),
|
||||
updates: updates,
|
||||
}
|
||||
|
||||
go mr.run()
|
||||
|
||||
c := memberlist.DefaultLocalConfig()
|
||||
c.BindPort = 0
|
||||
c.Name = hostname + "-" + uuid.NewUUID().String()
|
||||
c.Delegate = &delegate{
|
||||
updates: updates,
|
||||
broadcasts: broadcasts,
|
||||
}
|
||||
|
||||
m, err := memberlist.Create(c)
|
||||
if err != nil {
|
||||
log.Fatalf("Error creating memberlist: %v", err)
|
||||
}
|
||||
|
||||
if len(cAddrs) > 0 {
|
||||
_, err := m.Join(cAddrs)
|
||||
if err != nil {
|
||||
log.Fatalf("Error joining members: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Local memberlist node %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
|
||||
return mr
|
||||
}
|
@ -1,78 +0,0 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
func TestDelServices(t *testing.T) {
|
||||
services := []*registry.Service{
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
Address: "localhost",
|
||||
Port: 9999,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
Address: "localhost",
|
||||
Port: 6666,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
servs := delServices([]*registry.Service{services[0]}, []*registry.Service{services[1]})
|
||||
if i := len(servs); i > 0 {
|
||||
t.Errorf("Expected 0 nodes, got %d: %+v", i, servs)
|
||||
}
|
||||
t.Logf("Services %+v", servs)
|
||||
}
|
||||
|
||||
func TestDelNodes(t *testing.T) {
|
||||
services := []*registry.Service{
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
Address: "localhost",
|
||||
Port: 9999,
|
||||
},
|
||||
{
|
||||
Id: "foo-321",
|
||||
Address: "localhost",
|
||||
Port: 6666,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
Address: "localhost",
|
||||
Port: 6666,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
nodes := delNodes(services[0].Nodes, services[1].Nodes)
|
||||
if i := len(nodes); i != 1 {
|
||||
t.Errorf("Expected only 1 node, got %d: %+v", i, nodes)
|
||||
}
|
||||
t.Logf("Nodes %+v", nodes)
|
||||
}
|
@ -1,13 +0,0 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
type buffer struct {
|
||||
io.ReadWriter
|
||||
}
|
||||
|
||||
func (b *buffer) Close() error {
|
||||
return nil
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
package http
|
||||
|
||||
// This is a hack
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/transport"
|
||||
)
|
||||
|
||||
func NewTransport(addrs []string, opt ...transport.Option) transport.Transport {
|
||||
return transport.NewTransport(addrs, opt...)
|
||||
}
|
@ -12,6 +12,10 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type buffer struct {
|
||||
io.ReadWriter
|
||||
}
|
||||
|
||||
type httpTransport struct{}
|
||||
|
||||
type httpTransportClient struct {
|
||||
@ -33,6 +37,10 @@ type httpTransportListener struct {
|
||||
listener net.Listener
|
||||
}
|
||||
|
||||
func (b *buffer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpTransportClient) Send(m *Message) error {
|
||||
header := make(http.Header)
|
||||
|
||||
|
@ -1,162 +0,0 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/apcera/nats"
|
||||
"github.com/micro/go-micro/transport"
|
||||
)
|
||||
|
||||
type ntport struct {
|
||||
addrs []string
|
||||
}
|
||||
|
||||
type ntportClient struct {
|
||||
conn *nats.Conn
|
||||
addr string
|
||||
id string
|
||||
sub *nats.Subscription
|
||||
}
|
||||
|
||||
type ntportSocket struct {
|
||||
conn *nats.Conn
|
||||
m *nats.Msg
|
||||
}
|
||||
|
||||
type ntportListener struct {
|
||||
conn *nats.Conn
|
||||
addr string
|
||||
exit chan bool
|
||||
}
|
||||
|
||||
func (n *ntportClient) Send(m *transport.Message) error {
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return n.conn.PublishRequest(n.addr, n.id, b)
|
||||
}
|
||||
|
||||
func (n *ntportClient) Recv(m *transport.Message) error {
|
||||
rsp, err := n.sub.NextMsg(time.Second * 10)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var mr *transport.Message
|
||||
if err := json.Unmarshal(rsp.Data, &mr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*m = *mr
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *ntportClient) Close() error {
|
||||
n.sub.Unsubscribe()
|
||||
n.conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *ntportSocket) Recv(m *transport.Message) error {
|
||||
if m == nil {
|
||||
return errors.New("message passed in is nil")
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(n.m.Data, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *ntportSocket) Send(m *transport.Message) error {
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return n.conn.Publish(n.m.Reply, b)
|
||||
}
|
||||
|
||||
func (n *ntportSocket) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *ntportListener) Addr() string {
|
||||
return n.addr
|
||||
}
|
||||
|
||||
func (n *ntportListener) Close() error {
|
||||
n.exit <- true
|
||||
n.conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *ntportListener) Accept(fn func(transport.Socket)) error {
|
||||
s, err := n.conn.Subscribe(n.addr, func(m *nats.Msg) {
|
||||
fn(&ntportSocket{
|
||||
conn: n.conn,
|
||||
m: m,
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
<-n.exit
|
||||
return s.Unsubscribe()
|
||||
}
|
||||
|
||||
func (n *ntport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
|
||||
cAddr := nats.DefaultURL
|
||||
|
||||
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
|
||||
cAddr = n.addrs[0]
|
||||
}
|
||||
|
||||
c, err := nats.Connect(cAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id := nats.NewInbox()
|
||||
sub, err := c.SubscribeSync(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ntportClient{
|
||||
conn: c,
|
||||
addr: addr,
|
||||
id: id,
|
||||
sub: sub,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *ntport) Listen(addr string) (transport.Listener, error) {
|
||||
cAddr := nats.DefaultURL
|
||||
|
||||
if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") {
|
||||
cAddr = n.addrs[0]
|
||||
}
|
||||
|
||||
c, err := nats.Connect(cAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ntportListener{
|
||||
addr: nats.NewInbox(),
|
||||
conn: c,
|
||||
exit: make(chan bool, 1),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewTransport(addrs []string, opt ...transport.Option) transport.Transport {
|
||||
return &ntport{
|
||||
addrs: addrs,
|
||||
}
|
||||
}
|
@ -1,127 +0,0 @@
|
||||
package rabbitmq
|
||||
|
||||
//
|
||||
// All credit to Mondo
|
||||
//
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/nu7hatch/gouuid"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type rabbitMQChannel struct {
|
||||
uuid string
|
||||
connection *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
}
|
||||
|
||||
func newRabbitChannel(conn *amqp.Connection) (*rabbitMQChannel, error) {
|
||||
id, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rabbitCh := &rabbitMQChannel{
|
||||
uuid: id.String(),
|
||||
connection: conn,
|
||||
}
|
||||
if err := rabbitCh.Connect(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rabbitCh, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) Connect() error {
|
||||
var err error
|
||||
r.channel, err = r.connection.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) Close() error {
|
||||
if r.channel == nil {
|
||||
return errors.New("Channel is nil")
|
||||
}
|
||||
return r.channel.Close()
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) Publish(exchange, key string, message amqp.Publishing) error {
|
||||
if r.channel == nil {
|
||||
return errors.New("Channel is nil")
|
||||
}
|
||||
return r.channel.Publish(exchange, key, false, false, message)
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) DeclareExchange(exchange string) error {
|
||||
return r.channel.ExchangeDeclare(
|
||||
exchange, // name
|
||||
"topic", // kind
|
||||
false, // durable
|
||||
false, // autoDelete
|
||||
false, // internal
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) DeclareQueue(queue string) error {
|
||||
_, err := r.channel.QueueDeclare(
|
||||
queue, // name
|
||||
false, // durable
|
||||
true, // autoDelete
|
||||
false, // exclusive
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) DeclareDurableQueue(queue string) error {
|
||||
_, err := r.channel.QueueDeclare(
|
||||
queue, // name
|
||||
true, // durable
|
||||
false, // autoDelete
|
||||
false, // exclusive
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) DeclareReplyQueue(queue string) error {
|
||||
_, err := r.channel.QueueDeclare(
|
||||
queue, // name
|
||||
false, // durable
|
||||
true, // autoDelete
|
||||
true, // exclusive
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) ConsumeQueue(queue string) (<-chan amqp.Delivery, error) {
|
||||
return r.channel.Consume(
|
||||
queue, // queue
|
||||
r.uuid, // consumer
|
||||
true, // autoAck
|
||||
false, // exclusive
|
||||
false, // nolocal
|
||||
false, // nowait
|
||||
nil, // args
|
||||
)
|
||||
}
|
||||
|
||||
func (r *rabbitMQChannel) BindQueue(queue, exchange string) error {
|
||||
return r.channel.QueueBind(
|
||||
queue, // name
|
||||
queue, // key
|
||||
exchange, // exchange
|
||||
false, // noWait
|
||||
nil, // args
|
||||
)
|
||||
}
|
@ -1,142 +0,0 @@
|
||||
package rabbitmq
|
||||
|
||||
//
|
||||
// All credit to Mondo
|
||||
//
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultExchange = "micro"
|
||||
DefaultRabbitURL = "amqp://guest:guest@127.0.0.1:5672"
|
||||
)
|
||||
|
||||
type rabbitMQConn struct {
|
||||
Connection *amqp.Connection
|
||||
Channel *rabbitMQChannel
|
||||
notify chan bool
|
||||
exchange string
|
||||
url string
|
||||
|
||||
connected bool
|
||||
|
||||
mtx sync.Mutex
|
||||
close chan bool
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newRabbitMQConn(exchange string, urls []string) *rabbitMQConn {
|
||||
var url string
|
||||
|
||||
if len(urls) > 0 && strings.HasPrefix(urls[0], "amqp://") {
|
||||
url = urls[0]
|
||||
} else {
|
||||
url = DefaultRabbitURL
|
||||
}
|
||||
|
||||
if len(exchange) == 0 {
|
||||
exchange = DefaultExchange
|
||||
}
|
||||
|
||||
return &rabbitMQConn{
|
||||
exchange: exchange,
|
||||
url: url,
|
||||
notify: make(chan bool, 1),
|
||||
close: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Init() chan bool {
|
||||
go r.Connect(r.notify)
|
||||
return r.notify
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Connect(connected chan bool) {
|
||||
for {
|
||||
if err := r.tryToConnect(); err != nil {
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
connected <- true
|
||||
r.connected = true
|
||||
notifyClose := make(chan *amqp.Error)
|
||||
r.Connection.NotifyClose(notifyClose)
|
||||
|
||||
// Block until we get disconnected, or shut down
|
||||
select {
|
||||
case <-notifyClose:
|
||||
// Spin around and reconnect
|
||||
r.connected = false
|
||||
case <-r.close:
|
||||
// Shut down connection
|
||||
if err := r.Connection.Close(); err != nil {
|
||||
}
|
||||
r.connected = false
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) IsConnected() bool {
|
||||
return r.connected
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Close() {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
if r.closed {
|
||||
return
|
||||
}
|
||||
|
||||
close(r.close)
|
||||
r.closed = true
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) tryToConnect() error {
|
||||
var err error
|
||||
r.Connection, err = amqp.Dial(r.url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Channel, err = newRabbitChannel(r.Connection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Channel.DeclareExchange(r.exchange)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Consume(queue string) (<-chan amqp.Delivery, error) {
|
||||
consumerChannel, err := newRabbitChannel(r.Connection)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = consumerChannel.DeclareQueue(queue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
deliveries, err := consumerChannel.ConsumeQueue(queue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = consumerChannel.BindQueue(queue, r.exchange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return deliveries, nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQConn) Publish(exchange, key string, msg amqp.Publishing) error {
|
||||
return r.Channel.Publish(exchange, key, msg)
|
||||
}
|
@ -1,249 +0,0 @@
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"errors"
|
||||
uuid "github.com/nu7hatch/gouuid"
|
||||
"github.com/streadway/amqp"
|
||||
|
||||
"github.com/micro/go-micro/transport"
|
||||
)
|
||||
|
||||
const (
|
||||
directReplyQueue = "amq.rabbitmq.reply-to"
|
||||
)
|
||||
|
||||
type rmqtport struct {
|
||||
conn *rabbitMQConn
|
||||
addrs []string
|
||||
|
||||
once sync.Once
|
||||
replyTo string
|
||||
|
||||
sync.Mutex
|
||||
inflight map[string]chan amqp.Delivery
|
||||
}
|
||||
|
||||
type rmqtportClient struct {
|
||||
rt *rmqtport
|
||||
addr string
|
||||
corId string
|
||||
reply chan amqp.Delivery
|
||||
}
|
||||
|
||||
type rmqtportSocket struct {
|
||||
conn *rabbitMQConn
|
||||
d *amqp.Delivery
|
||||
}
|
||||
|
||||
type rmqtportListener struct {
|
||||
conn *rabbitMQConn
|
||||
addr string
|
||||
}
|
||||
|
||||
func (r *rmqtportClient) Send(m *transport.Message) error {
|
||||
if !r.rt.conn.IsConnected() {
|
||||
return errors.New("Not connected to AMQP")
|
||||
}
|
||||
|
||||
headers := amqp.Table{}
|
||||
for k, v := range m.Header {
|
||||
headers[k] = v
|
||||
}
|
||||
|
||||
message := amqp.Publishing{
|
||||
CorrelationId: r.corId,
|
||||
Timestamp: time.Now().UTC(),
|
||||
Body: m.Body,
|
||||
ReplyTo: r.rt.replyTo,
|
||||
Headers: headers,
|
||||
}
|
||||
|
||||
if err := r.rt.conn.Publish("micro", r.addr, message); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rmqtportClient) Recv(m *transport.Message) error {
|
||||
select {
|
||||
case d := <-r.reply:
|
||||
mr := &transport.Message{
|
||||
Header: make(map[string]string),
|
||||
Body: d.Body,
|
||||
}
|
||||
|
||||
for k, v := range d.Headers {
|
||||
mr.Header[k] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
|
||||
*m = *mr
|
||||
return nil
|
||||
case <-time.After(time.Second * 10):
|
||||
return errors.New("timed out")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rmqtportClient) Close() error {
|
||||
r.rt.popReq(r.corId)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rmqtportSocket) Recv(m *transport.Message) error {
|
||||
if m == nil {
|
||||
return errors.New("message passed in is nil")
|
||||
}
|
||||
|
||||
mr := &transport.Message{
|
||||
Header: make(map[string]string),
|
||||
Body: r.d.Body,
|
||||
}
|
||||
|
||||
for k, v := range r.d.Headers {
|
||||
mr.Header[k] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
|
||||
*m = *mr
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rmqtportSocket) Send(m *transport.Message) error {
|
||||
msg := amqp.Publishing{
|
||||
CorrelationId: r.d.CorrelationId,
|
||||
Timestamp: time.Now().UTC(),
|
||||
Body: m.Body,
|
||||
Headers: amqp.Table{},
|
||||
}
|
||||
|
||||
for k, v := range m.Header {
|
||||
msg.Headers[k] = v
|
||||
}
|
||||
|
||||
return r.conn.Publish("", r.d.ReplyTo, msg)
|
||||
}
|
||||
|
||||
func (r *rmqtportSocket) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rmqtportListener) Addr() string {
|
||||
return r.addr
|
||||
}
|
||||
|
||||
func (r *rmqtportListener) Close() error {
|
||||
r.conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rmqtportListener) Accept(fn func(transport.Socket)) error {
|
||||
deliveries, err := r.conn.Consume(r.addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
handler := func(d amqp.Delivery) {
|
||||
fn(&rmqtportSocket{
|
||||
d: &d,
|
||||
conn: r.conn,
|
||||
})
|
||||
}
|
||||
|
||||
for d := range deliveries {
|
||||
go handler(d)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rmqtport) putReq(id string) chan amqp.Delivery {
|
||||
r.Lock()
|
||||
ch := make(chan amqp.Delivery, 1)
|
||||
r.inflight[id] = ch
|
||||
r.Unlock()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (r *rmqtport) getReq(id string) chan amqp.Delivery {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
if ch, ok := r.inflight[id]; ok {
|
||||
return ch
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rmqtport) popReq(id string) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
if _, ok := r.inflight[id]; ok {
|
||||
delete(r.inflight, id)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rmqtport) init() {
|
||||
<-r.conn.Init()
|
||||
if err := r.conn.Channel.DeclareReplyQueue(r.replyTo); err != nil {
|
||||
return
|
||||
}
|
||||
deliveries, err := r.conn.Channel.ConsumeQueue(r.replyTo)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
for delivery := range deliveries {
|
||||
go r.handle(delivery)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *rmqtport) handle(delivery amqp.Delivery) {
|
||||
ch := r.getReq(delivery.CorrelationId)
|
||||
if ch == nil {
|
||||
return
|
||||
}
|
||||
ch <- delivery
|
||||
}
|
||||
|
||||
func (r *rmqtport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
|
||||
id, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.once.Do(r.init)
|
||||
|
||||
return &rmqtportClient{
|
||||
rt: r,
|
||||
addr: addr,
|
||||
corId: id.String(),
|
||||
reply: r.putReq(id.String()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *rmqtport) Listen(addr string) (transport.Listener, error) {
|
||||
id, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn := newRabbitMQConn("", r.addrs)
|
||||
<-conn.Init()
|
||||
|
||||
return &rmqtportListener{
|
||||
addr: id.String(),
|
||||
conn: conn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewTransport(addrs []string, opt ...transport.Option) transport.Transport {
|
||||
return &rmqtport{
|
||||
conn: newRabbitMQConn("", addrs),
|
||||
addrs: addrs,
|
||||
replyTo: directReplyQueue,
|
||||
inflight: make(map[string]chan amqp.Delivery),
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user