diff --git a/broker/http/http.go b/broker/http/http.go deleted file mode 100644 index 8a8ce540..00000000 --- a/broker/http/http.go +++ /dev/null @@ -1,11 +0,0 @@ -package http - -// This is a hack - -import ( - "github.com/micro/go-micro/broker" -) - -func NewBroker(addrs []string, opt ...broker.Option) broker.Broker { - return broker.NewBroker(addrs, opt...) -} diff --git a/broker/nats/nats.go b/broker/nats/nats.go deleted file mode 100644 index 84f91b96..00000000 --- a/broker/nats/nats.go +++ /dev/null @@ -1,98 +0,0 @@ -package nats - -import ( - "encoding/json" - "strings" - - "github.com/apcera/nats" - "github.com/micro/go-micro/broker" -) - -type nbroker struct { - addrs []string - conn *nats.Conn -} - -type subscriber struct { - s *nats.Subscription -} - -func (n *subscriber) Topic() string { - return n.s.Subject -} - -func (n *subscriber) Unsubscribe() error { - return n.s.Unsubscribe() -} - -func (n *nbroker) Address() string { - if len(n.addrs) > 0 { - return n.addrs[0] - } - return "" -} - -func (n *nbroker) Connect() error { - if n.conn != nil { - return nil - } - - opts := nats.DefaultOptions - opts.Servers = n.addrs - c, err := opts.Connect() - if err != nil { - return err - } - n.conn = c - return nil -} - -func (n *nbroker) Disconnect() error { - n.conn.Close() - return nil -} - -func (n *nbroker) Init() error { - return nil -} - -func (n *nbroker) Publish(topic string, msg *broker.Message) error { - b, err := json.Marshal(msg) - if err != nil { - return err - } - return n.conn.Publish(topic, b) -} - -func (n *nbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) { - sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) { - var m *broker.Message - if err := json.Unmarshal(msg.Data, &m); err != nil { - return - } - handler(m) - }) - if err != nil { - return nil, err - } - return &subscriber{s: sub}, nil -} - -func NewBroker(addrs []string, opt ...broker.Option) broker.Broker { - var cAddrs []string - for _, addr := range addrs { - if len(addr) == 0 { - continue - } - if !strings.HasPrefix(addr, "nats://") { - addr = "nats://" + addr - } - cAddrs = append(cAddrs, addr) - } - if len(cAddrs) == 0 { - cAddrs = []string{nats.DefaultURL} - } - return &nbroker{ - addrs: cAddrs, - } -} diff --git a/broker/rabbitmq/channel.go b/broker/rabbitmq/channel.go deleted file mode 100644 index 28dc1eae..00000000 --- a/broker/rabbitmq/channel.go +++ /dev/null @@ -1,127 +0,0 @@ -package rabbitmq - -// -// All credit to Mondo -// - -import ( - "errors" - - "github.com/nu7hatch/gouuid" - "github.com/streadway/amqp" -) - -type rabbitMQChannel struct { - uuid string - connection *amqp.Connection - channel *amqp.Channel -} - -func newRabbitChannel(conn *amqp.Connection) (*rabbitMQChannel, error) { - id, err := uuid.NewV4() - if err != nil { - return nil, err - } - rabbitCh := &rabbitMQChannel{ - uuid: id.String(), - connection: conn, - } - if err := rabbitCh.Connect(); err != nil { - return nil, err - } - return rabbitCh, nil - -} - -func (r *rabbitMQChannel) Connect() error { - var err error - r.channel, err = r.connection.Channel() - if err != nil { - return err - } - return nil -} - -func (r *rabbitMQChannel) Close() error { - if r.channel == nil { - return errors.New("Channel is nil") - } - return r.channel.Close() -} - -func (r *rabbitMQChannel) Publish(exchange, key string, message amqp.Publishing) error { - if r.channel == nil { - return errors.New("Channel is nil") - } - return r.channel.Publish(exchange, key, false, false, message) -} - -func (r *rabbitMQChannel) DeclareExchange(exchange string) error { - return r.channel.ExchangeDeclare( - exchange, // name - "topic", // kind - false, // durable - false, // autoDelete - false, // internal - false, // noWait - nil, // args - ) -} - -func (r *rabbitMQChannel) DeclareQueue(queue string) error { - _, err := r.channel.QueueDeclare( - queue, // name - false, // durable - true, // autoDelete - false, // exclusive - false, // noWait - nil, // args - ) - return err -} - -func (r *rabbitMQChannel) DeclareDurableQueue(queue string) error { - _, err := r.channel.QueueDeclare( - queue, // name - true, // durable - false, // autoDelete - false, // exclusive - false, // noWait - nil, // args - ) - return err -} - -func (r *rabbitMQChannel) DeclareReplyQueue(queue string) error { - _, err := r.channel.QueueDeclare( - queue, // name - false, // durable - true, // autoDelete - true, // exclusive - false, // noWait - nil, // args - ) - return err -} - -func (r *rabbitMQChannel) ConsumeQueue(queue string) (<-chan amqp.Delivery, error) { - return r.channel.Consume( - queue, // queue - r.uuid, // consumer - true, // autoAck - false, // exclusive - false, // nolocal - false, // nowait - nil, // args - ) -} - -func (r *rabbitMQChannel) BindQueue(queue, exchange string) error { - return r.channel.QueueBind( - queue, // name - queue, // key - exchange, // exchange - false, // noWait - nil, // args - ) -} diff --git a/broker/rabbitmq/connection.go b/broker/rabbitmq/connection.go deleted file mode 100644 index 0f079581..00000000 --- a/broker/rabbitmq/connection.go +++ /dev/null @@ -1,147 +0,0 @@ -package rabbitmq - -// -// All credit to Mondo -// - -import ( - "strings" - "sync" - "time" - - "github.com/streadway/amqp" -) - -var ( - DefaultExchange = "micro" - DefaultRabbitURL = "amqp://guest:guest@127.0.0.1:5672" -) - -type rabbitMQConn struct { - Connection *amqp.Connection - Channel *rabbitMQChannel - ExchangeChannel *rabbitMQChannel - notify chan bool - exchange string - url string - - connected bool - - mtx sync.Mutex - close chan bool - closed bool -} - -func newRabbitMQConn(exchange string, urls []string) *rabbitMQConn { - var url string - - if len(urls) > 0 && strings.HasPrefix(urls[0], "amqp://") { - url = urls[0] - } else { - url = DefaultRabbitURL - } - - if len(exchange) == 0 { - exchange = DefaultExchange - } - - return &rabbitMQConn{ - exchange: exchange, - url: url, - notify: make(chan bool, 1), - close: make(chan bool), - } -} - -func (r *rabbitMQConn) Init() chan bool { - go r.Connect(r.notify) - return r.notify -} - -func (r *rabbitMQConn) Connect(connected chan bool) { - for { - if err := r.tryToConnect(); err != nil { - time.Sleep(1 * time.Second) - continue - } - connected <- true - r.connected = true - notifyClose := make(chan *amqp.Error) - r.Connection.NotifyClose(notifyClose) - - // Block until we get disconnected, or shut down - select { - case <-notifyClose: - // Spin around and reconnect - r.connected = false - case <-r.close: - // Shut down connection - if err := r.Connection.Close(); err != nil { - } - r.connected = false - return - } - } -} - -func (r *rabbitMQConn) IsConnected() bool { - return r.connected -} - -func (r *rabbitMQConn) Close() { - r.mtx.Lock() - defer r.mtx.Unlock() - - if r.closed { - return - } - - close(r.close) - r.closed = true -} - -func (r *rabbitMQConn) tryToConnect() error { - var err error - r.Connection, err = amqp.Dial(r.url) - if err != nil { - return err - } - r.Channel, err = newRabbitChannel(r.Connection) - if err != nil { - return err - } - r.Channel.DeclareExchange(r.exchange) - r.ExchangeChannel, err = newRabbitChannel(r.Connection) - if err != nil { - return err - } - return nil -} - -func (r *rabbitMQConn) Consume(queue string) (*rabbitMQChannel, <-chan amqp.Delivery, error) { - consumerChannel, err := newRabbitChannel(r.Connection) - if err != nil { - return nil, nil, err - } - - err = consumerChannel.DeclareQueue(queue) - if err != nil { - return nil, nil, err - } - - deliveries, err := consumerChannel.ConsumeQueue(queue) - if err != nil { - return nil, nil, err - } - - err = consumerChannel.BindQueue(queue, r.exchange) - if err != nil { - return nil, nil, err - } - - return consumerChannel, deliveries, nil -} - -func (r *rabbitMQConn) Publish(exchange, key string, msg amqp.Publishing) error { - return r.ExchangeChannel.Publish(exchange, key, msg) -} diff --git a/broker/rabbitmq/rabbitmq.go b/broker/rabbitmq/rabbitmq.go deleted file mode 100644 index cc89f5d1..00000000 --- a/broker/rabbitmq/rabbitmq.go +++ /dev/null @@ -1,91 +0,0 @@ -package rabbitmq - -import ( - "github.com/micro/go-micro/broker" - "github.com/streadway/amqp" -) - -type rbroker struct { - conn *rabbitMQConn - addrs []string -} - -type subscriber struct { - topic string - ch *rabbitMQChannel -} - -func (s *subscriber) Topic() string { - return s.topic -} - -func (s *subscriber) Unsubscribe() error { - return s.ch.Close() -} - -func (r *rbroker) Publish(topic string, msg *broker.Message) error { - m := amqp.Publishing{ - Body: msg.Body, - Headers: amqp.Table{}, - } - - for k, v := range msg.Header { - m.Headers[k] = v - } - - return r.conn.Publish("", topic, m) -} - -func (r *rbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) { - ch, sub, err := r.conn.Consume(topic) - if err != nil { - return nil, err - } - - fn := func(msg amqp.Delivery) { - header := make(map[string]string) - for k, v := range msg.Headers { - header[k], _ = v.(string) - } - handler(&broker.Message{ - Header: header, - Body: msg.Body, - }) - } - - go func() { - for d := range sub { - go fn(d) - } - }() - - return &subscriber{ch: ch, topic: topic}, nil -} - -func (r *rbroker) Address() string { - if len(r.addrs) > 0 { - return r.addrs[0] - } - return "" -} - -func (r *rbroker) Init() error { - return nil -} - -func (r *rbroker) Connect() error { - <-r.conn.Init() - return nil -} - -func (r *rbroker) Disconnect() error { - r.conn.Close() - return nil -} - -func NewBroker(addrs []string, opt ...broker.Option) broker.Broker { - return &rbroker{ - conn: newRabbitMQConn("", addrs), - addrs: addrs, - } -} diff --git a/cmd/cmd.go b/cmd/cmd.go index 1af324c9..6fb669fd 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -15,21 +15,6 @@ import ( "github.com/micro/go-micro/registry" "github.com/micro/go-micro/server" "github.com/micro/go-micro/transport" - - // brokers - "github.com/micro/go-micro/broker/http" - "github.com/micro/go-micro/broker/nats" - "github.com/micro/go-micro/broker/rabbitmq" - - // registries - "github.com/micro/go-micro/registry/consul" - "github.com/micro/go-micro/registry/etcd" - "github.com/micro/go-micro/registry/memory" - - // transport - thttp "github.com/micro/go-micro/transport/http" - tnats "github.com/micro/go-micro/transport/nats" - trmq "github.com/micro/go-micro/transport/rabbitmq" ) var ( @@ -132,21 +117,15 @@ var ( } Brokers = map[string]func([]string, ...broker.Option) broker.Broker{ - "http": http.NewBroker, - "nats": nats.NewBroker, - "rabbitmq": rabbitmq.NewBroker, + "http": broker.NewBroker, } Registries = map[string]func([]string, ...registry.Option) registry.Registry{ - "consul": consul.NewRegistry, - "etcd": etcd.NewRegistry, - "memory": memory.NewRegistry, + "consul": registry.NewRegistry, } Transports = map[string]func([]string, ...transport.Option) transport.Transport{ - "http": thttp.NewTransport, - "rabbitmq": trmq.NewTransport, - "nats": tnats.NewTransport, + "http": transport.NewTransport, } ) diff --git a/registry/consul/consul.go b/registry/consul/consul.go deleted file mode 100644 index e8869b89..00000000 --- a/registry/consul/consul.go +++ /dev/null @@ -1,11 +0,0 @@ -package consul - -// This is a hack - -import ( - "github.com/micro/go-micro/registry" -) - -func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { - return registry.NewRegistry(addrs, opt...) -} diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go deleted file mode 100644 index 7cda61c3..00000000 --- a/registry/etcd/etcd.go +++ /dev/null @@ -1,193 +0,0 @@ -package etcd - -import ( - "encoding/json" - "errors" - "path/filepath" - "strings" - "sync" - - etcd "github.com/coreos/etcd/client" - "github.com/micro/go-micro/registry" - "golang.org/x/net/context" -) - -var ( - prefix = "/micro-registry" -) - -type etcdRegistry struct { - client etcd.KeysAPI - - sync.RWMutex - services map[string][]*registry.Service -} - -func encode(s *registry.Service) string { - b, _ := json.Marshal(s) - return string(b) -} - -func decode(ds string) *registry.Service { - var s *registry.Service - json.Unmarshal([]byte(ds), &s) - return s -} - -func nodePath(s, id string) string { - service := strings.Replace(s, "/", "-", -1) - node := strings.Replace(id, "/", "-", -1) - return filepath.Join(prefix, service, node) -} - -func servicePath(s string) string { - return filepath.Join(prefix, strings.Replace(s, "/", "-", -1)) -} - -func (e *etcdRegistry) Deregister(s *registry.Service) error { - if len(s.Nodes) == 0 { - return errors.New("Require at least one node") - } - - for _, node := range s.Nodes { - _, err := e.client.Delete(context.Background(), nodePath(s.Name, node.Id), &etcd.DeleteOptions{Recursive: false}) - if err != nil { - return err - } - } - - e.client.Delete(context.Background(), servicePath(s.Name), &etcd.DeleteOptions{Dir: true}) - return nil -} - -func (e *etcdRegistry) Register(s *registry.Service) error { - if len(s.Nodes) == 0 { - return errors.New("Require at least one node") - } - - service := ®istry.Service{ - Name: s.Name, - Version: s.Version, - Metadata: s.Metadata, - Endpoints: s.Endpoints, - } - - e.client.Set(context.Background(), servicePath(s.Name), "", &etcd.SetOptions{Dir: true}) - - for _, node := range s.Nodes { - service.Nodes = []*registry.Node{node} - _, err := e.client.Set(context.Background(), nodePath(service.Name, node.Id), encode(service), &etcd.SetOptions{}) - if err != nil { - return err - } - } - - return nil -} - -func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) { - e.RLock() - service, ok := e.services[name] - e.RUnlock() - - if ok { - return service, nil - } - - rsp, err := e.client.Get(context.Background(), servicePath(name), &etcd.GetOptions{}) - if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") { - return nil, err - } - - serviceMap := map[string]*registry.Service{} - - for _, n := range rsp.Node.Nodes { - if n.Dir { - continue - } - sn := decode(n.Value) - - s, ok := serviceMap[sn.Version] - if !ok { - s = ®istry.Service{ - Name: sn.Name, - Version: sn.Version, - Metadata: sn.Metadata, - Endpoints: sn.Endpoints, - } - serviceMap[s.Version] = s - } - - for _, node := range sn.Nodes { - s.Nodes = append(s.Nodes, node) - } - } - - var services []*registry.Service - for _, service := range serviceMap { - services = append(services, service) - } - return services, nil -} - -func (e *etcdRegistry) ListServices() ([]*registry.Service, error) { - e.RLock() - serviceMap := e.services - e.RUnlock() - - var services []*registry.Service - - if len(serviceMap) > 0 { - for _, service := range serviceMap { - services = append(services, service...) - } - return services, nil - } - - rsp, err := e.client.Get(context.Background(), prefix, &etcd.GetOptions{Recursive: true, Sort: true}) - if err != nil && !strings.HasPrefix(err.Error(), "100: Key not found") { - return nil, err - } - - for _, node := range rsp.Node.Nodes { - service := ®istry.Service{} - for _, n := range node.Nodes { - i := decode(n.Value) - service.Name = i.Name - } - services = append(services, service) - } - - return services, nil -} - -func (e *etcdRegistry) Watch() (registry.Watcher, error) { - // todo: fix watcher - return newEtcdWatcher(e) -} - -func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { - var cAddrs []string - - for _, addr := range addrs { - if len(addr) == 0 { - continue - } - cAddrs = append(cAddrs, addr) - } - - if len(cAddrs) == 0 { - cAddrs = []string{"http://127.0.0.1:2379"} - } - - c, _ := etcd.New(etcd.Config{ - Endpoints: cAddrs, - }) - - e := &etcdRegistry{ - client: etcd.NewKeysAPI(c), - services: make(map[string][]*registry.Service), - } - - return e -} diff --git a/registry/etcd/watcher.go b/registry/etcd/watcher.go deleted file mode 100644 index 56df64e7..00000000 --- a/registry/etcd/watcher.go +++ /dev/null @@ -1,151 +0,0 @@ -package etcd - -import ( - etcd "github.com/coreos/etcd/client" - "github.com/micro/go-micro/registry" - "golang.org/x/net/context" -) - -type etcdWatcher struct { - registry *etcdRegistry - stop chan bool -} - -func addNodes(old, neu []*registry.Node) []*registry.Node { - for _, n := range neu { - var seen bool - for i, o := range old { - if o.Id == n.Id { - seen = true - old[i] = n - break - } - } - if !seen { - old = append(old, n) - } - } - return old -} - -func addServices(old, neu []*registry.Service) []*registry.Service { - for _, s := range neu { - var seen bool - for i, o := range old { - if o.Version == s.Version { - s.Nodes = addNodes(o.Nodes, s.Nodes) - seen = true - old[i] = s - break - } - } - if !seen { - old = append(old, s) - } - } - return old -} - -func delNodes(old, del []*registry.Node) []*registry.Node { - var nodes []*registry.Node - for _, o := range old { - var rem bool - for _, n := range del { - if o.Id == n.Id { - rem = true - break - } - } - if !rem { - nodes = append(nodes, o) - } - } - return nodes -} - -func delServices(old, del []*registry.Service) []*registry.Service { - var services []*registry.Service - for i, o := range old { - var rem bool - for _, s := range del { - if o.Version == s.Version { - old[i].Nodes = delNodes(o.Nodes, s.Nodes) - if len(old[i].Nodes) == 0 { - rem = true - } - } - } - if !rem { - services = append(services, o) - } - } - return services -} - -func newEtcdWatcher(r *etcdRegistry) (registry.Watcher, error) { - ew := &etcdWatcher{ - registry: r, - stop: make(chan bool), - } - - w := r.client.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true}) - - c := context.Background() - ctx, cancel := context.WithCancel(c) - - go func() { - <-ew.stop - cancel() - }() - - go ew.watch(ctx, w) - - return ew, nil -} - -func (e *etcdWatcher) watch(ctx context.Context, w etcd.Watcher) { - for { - rsp, err := w.Next(ctx) - if err != nil && ctx.Err() != nil { - return - } - - if rsp.Node.Dir { - continue - } - - s := decode(rsp.Node.Value) - if s == nil { - continue - } - - e.registry.Lock() - - service, ok := e.registry.services[s.Name] - if !ok { - if rsp.Action == "create" { - e.registry.services[s.Name] = []*registry.Service{s} - } - e.registry.Unlock() - continue - } - - switch rsp.Action { - case "delete": - services := delServices(service, []*registry.Service{s}) - if len(services) > 0 { - e.registry.services[s.Name] = services - } else { - delete(e.registry.services, s.Name) - } - case "create": - e.registry.services[s.Name] = addServices(service, []*registry.Service{s}) - } - - e.registry.Unlock() - } -} - -func (ew *etcdWatcher) Stop() { - ew.stop <- true -} diff --git a/registry/memory/memory.go b/registry/memory/memory.go deleted file mode 100644 index 8afa0e81..00000000 --- a/registry/memory/memory.go +++ /dev/null @@ -1,372 +0,0 @@ -package memory - -import ( - "encoding/json" - "fmt" - "os" - "sync" - - log "github.com/golang/glog" - "github.com/hashicorp/memberlist" - "github.com/micro/go-micro/registry" - "github.com/pborman/uuid" -) - -type action int - -const ( - addAction action = iota - delAction - syncAction -) - -type broadcast struct { - msg []byte - notify chan<- struct{} -} - -type delegate struct { - broadcasts *memberlist.TransmitLimitedQueue - updates chan *update -} - -type memoryRegistry struct { - broadcasts *memberlist.TransmitLimitedQueue - updates chan *update - - sync.RWMutex - services map[string][]*registry.Service -} - -type update struct { - Action action - Service *registry.Service - sync chan *registry.Service -} - -type watcher struct{} - -func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { - return false -} - -func (b *broadcast) Message() []byte { - return b.msg -} - -func (b *broadcast) Finished() { - if b.notify != nil { - close(b.notify) - } -} - -func (d *delegate) NodeMeta(limit int) []byte { - return []byte{} -} - -func (d *delegate) NotifyMsg(b []byte) { - if len(b) == 0 { - return - } - - buf := make([]byte, len(b)) - copy(buf, b) - - go func() { - switch buf[0] { - case 'd': // data - var updates []*update - if err := json.Unmarshal(buf[1:], &updates); err != nil { - return - } - for _, u := range updates { - d.updates <- u - } - } - }() -} - -func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { - return d.broadcasts.GetBroadcasts(overhead, limit) -} - -func (d *delegate) LocalState(join bool) []byte { - if !join { - return []byte{} - } - - syncCh := make(chan *registry.Service, 1) - m := map[string][]*registry.Service{} - - d.updates <- &update{ - Action: syncAction, - sync: syncCh, - } - - for s := range syncCh { - m[s.Name] = append(m[s.Name], s) - } - - b, _ := json.Marshal(m) - return b -} - -func (d *delegate) MergeRemoteState(buf []byte, join bool) { - if len(buf) == 0 { - return - } - if !join { - return - } - - var m map[string][]*registry.Service - if err := json.Unmarshal(buf, &m); err != nil { - return - } - - for _, services := range m { - for _, service := range services { - d.updates <- &update{ - Action: addAction, - Service: service, - sync: nil, - } - } - } -} - -func (m *memoryRegistry) run() { - for u := range m.updates { - switch u.Action { - case addAction: - m.Lock() - if service, ok := m.services[u.Service.Name]; !ok { - m.services[u.Service.Name] = []*registry.Service{u.Service} - } else { - m.services[u.Service.Name] = addServices(service, []*registry.Service{u.Service}) - } - m.Unlock() - case delAction: - m.Lock() - if service, ok := m.services[u.Service.Name]; ok { - if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 { - delete(m.services, u.Service.Name) - } else { - m.services[u.Service.Name] = services - } - } - m.Unlock() - case syncAction: - if u.sync == nil { - continue - } - m.RLock() - for _, services := range m.services { - for _, service := range services { - u.sync <- service - } - } - m.RUnlock() - close(u.sync) - } - } -} - -func (m *memoryRegistry) Register(s *registry.Service) error { - m.Lock() - if service, ok := m.services[s.Name]; !ok { - m.services[s.Name] = []*registry.Service{s} - } else { - m.services[s.Name] = addServices(service, []*registry.Service{s}) - } - m.Unlock() - - b, _ := json.Marshal([]*update{ - &update{ - Action: addAction, - Service: s, - }, - }) - - m.broadcasts.QueueBroadcast(&broadcast{ - msg: append([]byte("d"), b...), - notify: nil, - }) - - return nil -} - -func (m *memoryRegistry) Deregister(s *registry.Service) error { - m.Lock() - if service, ok := m.services[s.Name]; ok { - if services := delServices(service, []*registry.Service{s}); len(services) == 0 { - delete(m.services, s.Name) - } else { - m.services[s.Name] = services - } - } - m.Unlock() - - b, _ := json.Marshal([]*update{ - &update{ - Action: delAction, - Service: s, - }, - }) - - m.broadcasts.QueueBroadcast(&broadcast{ - msg: append([]byte("d"), b...), - notify: nil, - }) - - return nil -} - -func (m *memoryRegistry) GetService(name string) ([]*registry.Service, error) { - m.RLock() - service, ok := m.services[name] - m.RUnlock() - if !ok { - return nil, fmt.Errorf("Service %s not found", name) - } - return service, nil -} - -func (m *memoryRegistry) ListServices() ([]*registry.Service, error) { - var services []*registry.Service - m.RLock() - for _, service := range m.services { - services = append(services, service...) - } - m.RUnlock() - return services, nil -} - -func (m *memoryRegistry) Watch() (registry.Watcher, error) { - return &watcher{}, nil -} - -func (w *watcher) Stop() { - return -} - -func addNodes(old, neu []*registry.Node) []*registry.Node { - for _, n := range neu { - var seen bool - for i, o := range old { - if o.Id == n.Id { - seen = true - old[i] = n - break - } - } - if !seen { - old = append(old, n) - } - } - return old -} - -func addServices(old, neu []*registry.Service) []*registry.Service { - for _, s := range neu { - var seen bool - for i, o := range old { - if o.Version == s.Version { - s.Nodes = addNodes(o.Nodes, s.Nodes) - seen = true - old[i] = s - break - } - } - if !seen { - old = append(old, s) - } - } - return old -} - -func delNodes(old, del []*registry.Node) []*registry.Node { - var nodes []*registry.Node - for _, o := range old { - var rem bool - for _, n := range del { - if o.Id == n.Id { - rem = true - break - } - } - if !rem { - nodes = append(nodes, o) - } - } - return nodes -} - -func delServices(old, del []*registry.Service) []*registry.Service { - var services []*registry.Service - for i, o := range old { - var rem bool - for _, s := range del { - if o.Version == s.Version { - old[i].Nodes = delNodes(o.Nodes, s.Nodes) - if len(old[i].Nodes) == 0 { - rem = true - } - } - } - if !rem { - services = append(services, o) - } - } - return services -} - -func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { - cAddrs := []string{} - hostname, _ := os.Hostname() - updates := make(chan *update, 100) - - for _, addr := range addrs { - if len(addr) > 0 { - cAddrs = append(cAddrs, addr) - } - } - - broadcasts := &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(cAddrs) - }, - RetransmitMult: 3, - } - - mr := &memoryRegistry{ - broadcasts: broadcasts, - services: make(map[string][]*registry.Service), - updates: updates, - } - - go mr.run() - - c := memberlist.DefaultLocalConfig() - c.BindPort = 0 - c.Name = hostname + "-" + uuid.NewUUID().String() - c.Delegate = &delegate{ - updates: updates, - broadcasts: broadcasts, - } - - m, err := memberlist.Create(c) - if err != nil { - log.Fatalf("Error creating memberlist: %v", err) - } - - if len(cAddrs) > 0 { - _, err := m.Join(cAddrs) - if err != nil { - log.Fatalf("Error joining members: %v", err) - } - } - - log.Infof("Local memberlist node %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port) - return mr -} diff --git a/registry/memory/memory_test.go b/registry/memory/memory_test.go deleted file mode 100644 index f19d1c52..00000000 --- a/registry/memory/memory_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package memory - -import ( - "testing" - - "github.com/micro/go-micro/registry" -) - -func TestDelServices(t *testing.T) { - services := []*registry.Service{ - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-123", - Address: "localhost", - Port: 9999, - }, - }, - }, - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-123", - Address: "localhost", - Port: 6666, - }, - }, - }, - } - - servs := delServices([]*registry.Service{services[0]}, []*registry.Service{services[1]}) - if i := len(servs); i > 0 { - t.Errorf("Expected 0 nodes, got %d: %+v", i, servs) - } - t.Logf("Services %+v", servs) -} - -func TestDelNodes(t *testing.T) { - services := []*registry.Service{ - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-123", - Address: "localhost", - Port: 9999, - }, - { - Id: "foo-321", - Address: "localhost", - Port: 6666, - }, - }, - }, - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-123", - Address: "localhost", - Port: 6666, - }, - }, - }, - } - - nodes := delNodes(services[0].Nodes, services[1].Nodes) - if i := len(nodes); i != 1 { - t.Errorf("Expected only 1 node, got %d: %+v", i, nodes) - } - t.Logf("Nodes %+v", nodes) -} diff --git a/transport/buffer.go b/transport/buffer.go deleted file mode 100644 index b3fac087..00000000 --- a/transport/buffer.go +++ /dev/null @@ -1,13 +0,0 @@ -package transport - -import ( - "io" -) - -type buffer struct { - io.ReadWriter -} - -func (b *buffer) Close() error { - return nil -} diff --git a/transport/http/http.go b/transport/http/http.go deleted file mode 100644 index 30a8b43e..00000000 --- a/transport/http/http.go +++ /dev/null @@ -1,11 +0,0 @@ -package http - -// This is a hack - -import ( - "github.com/micro/go-micro/transport" -) - -func NewTransport(addrs []string, opt ...transport.Option) transport.Transport { - return transport.NewTransport(addrs, opt...) -} diff --git a/transport/http_transport.go b/transport/http_transport.go index 75ede08b..429e7627 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -12,6 +12,10 @@ import ( "sync" ) +type buffer struct { + io.ReadWriter +} + type httpTransport struct{} type httpTransportClient struct { @@ -33,6 +37,10 @@ type httpTransportListener struct { listener net.Listener } +func (b *buffer) Close() error { + return nil +} + func (h *httpTransportClient) Send(m *Message) error { header := make(http.Header) diff --git a/transport/nats/nats.go b/transport/nats/nats.go deleted file mode 100644 index 3cbbc78a..00000000 --- a/transport/nats/nats.go +++ /dev/null @@ -1,162 +0,0 @@ -package nats - -import ( - "encoding/json" - "errors" - "strings" - "time" - - "github.com/apcera/nats" - "github.com/micro/go-micro/transport" -) - -type ntport struct { - addrs []string -} - -type ntportClient struct { - conn *nats.Conn - addr string - id string - sub *nats.Subscription -} - -type ntportSocket struct { - conn *nats.Conn - m *nats.Msg -} - -type ntportListener struct { - conn *nats.Conn - addr string - exit chan bool -} - -func (n *ntportClient) Send(m *transport.Message) error { - b, err := json.Marshal(m) - if err != nil { - return err - } - - return n.conn.PublishRequest(n.addr, n.id, b) -} - -func (n *ntportClient) Recv(m *transport.Message) error { - rsp, err := n.sub.NextMsg(time.Second * 10) - if err != nil { - return err - } - - var mr *transport.Message - if err := json.Unmarshal(rsp.Data, &mr); err != nil { - return err - } - - *m = *mr - return nil -} - -func (n *ntportClient) Close() error { - n.sub.Unsubscribe() - n.conn.Close() - return nil -} - -func (n *ntportSocket) Recv(m *transport.Message) error { - if m == nil { - return errors.New("message passed in is nil") - } - - if err := json.Unmarshal(n.m.Data, &m); err != nil { - return err - } - return nil -} - -func (n *ntportSocket) Send(m *transport.Message) error { - b, err := json.Marshal(m) - if err != nil { - return err - } - return n.conn.Publish(n.m.Reply, b) -} - -func (n *ntportSocket) Close() error { - return nil -} - -func (n *ntportListener) Addr() string { - return n.addr -} - -func (n *ntportListener) Close() error { - n.exit <- true - n.conn.Close() - return nil -} - -func (n *ntportListener) Accept(fn func(transport.Socket)) error { - s, err := n.conn.Subscribe(n.addr, func(m *nats.Msg) { - fn(&ntportSocket{ - conn: n.conn, - m: m, - }) - }) - if err != nil { - return err - } - - <-n.exit - return s.Unsubscribe() -} - -func (n *ntport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { - cAddr := nats.DefaultURL - - if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") { - cAddr = n.addrs[0] - } - - c, err := nats.Connect(cAddr) - if err != nil { - return nil, err - } - - id := nats.NewInbox() - sub, err := c.SubscribeSync(id) - if err != nil { - return nil, err - } - - return &ntportClient{ - conn: c, - addr: addr, - id: id, - sub: sub, - }, nil -} - -func (n *ntport) Listen(addr string) (transport.Listener, error) { - cAddr := nats.DefaultURL - - if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") { - cAddr = n.addrs[0] - } - - c, err := nats.Connect(cAddr) - if err != nil { - return nil, err - } - - return &ntportListener{ - addr: nats.NewInbox(), - conn: c, - exit: make(chan bool, 1), - }, nil -} - -func NewTransport(addrs []string, opt ...transport.Option) transport.Transport { - return &ntport{ - addrs: addrs, - } -} diff --git a/transport/rabbitmq/channel.go b/transport/rabbitmq/channel.go deleted file mode 100644 index 28dc1eae..00000000 --- a/transport/rabbitmq/channel.go +++ /dev/null @@ -1,127 +0,0 @@ -package rabbitmq - -// -// All credit to Mondo -// - -import ( - "errors" - - "github.com/nu7hatch/gouuid" - "github.com/streadway/amqp" -) - -type rabbitMQChannel struct { - uuid string - connection *amqp.Connection - channel *amqp.Channel -} - -func newRabbitChannel(conn *amqp.Connection) (*rabbitMQChannel, error) { - id, err := uuid.NewV4() - if err != nil { - return nil, err - } - rabbitCh := &rabbitMQChannel{ - uuid: id.String(), - connection: conn, - } - if err := rabbitCh.Connect(); err != nil { - return nil, err - } - return rabbitCh, nil - -} - -func (r *rabbitMQChannel) Connect() error { - var err error - r.channel, err = r.connection.Channel() - if err != nil { - return err - } - return nil -} - -func (r *rabbitMQChannel) Close() error { - if r.channel == nil { - return errors.New("Channel is nil") - } - return r.channel.Close() -} - -func (r *rabbitMQChannel) Publish(exchange, key string, message amqp.Publishing) error { - if r.channel == nil { - return errors.New("Channel is nil") - } - return r.channel.Publish(exchange, key, false, false, message) -} - -func (r *rabbitMQChannel) DeclareExchange(exchange string) error { - return r.channel.ExchangeDeclare( - exchange, // name - "topic", // kind - false, // durable - false, // autoDelete - false, // internal - false, // noWait - nil, // args - ) -} - -func (r *rabbitMQChannel) DeclareQueue(queue string) error { - _, err := r.channel.QueueDeclare( - queue, // name - false, // durable - true, // autoDelete - false, // exclusive - false, // noWait - nil, // args - ) - return err -} - -func (r *rabbitMQChannel) DeclareDurableQueue(queue string) error { - _, err := r.channel.QueueDeclare( - queue, // name - true, // durable - false, // autoDelete - false, // exclusive - false, // noWait - nil, // args - ) - return err -} - -func (r *rabbitMQChannel) DeclareReplyQueue(queue string) error { - _, err := r.channel.QueueDeclare( - queue, // name - false, // durable - true, // autoDelete - true, // exclusive - false, // noWait - nil, // args - ) - return err -} - -func (r *rabbitMQChannel) ConsumeQueue(queue string) (<-chan amqp.Delivery, error) { - return r.channel.Consume( - queue, // queue - r.uuid, // consumer - true, // autoAck - false, // exclusive - false, // nolocal - false, // nowait - nil, // args - ) -} - -func (r *rabbitMQChannel) BindQueue(queue, exchange string) error { - return r.channel.QueueBind( - queue, // name - queue, // key - exchange, // exchange - false, // noWait - nil, // args - ) -} diff --git a/transport/rabbitmq/connection.go b/transport/rabbitmq/connection.go deleted file mode 100644 index a9da4df9..00000000 --- a/transport/rabbitmq/connection.go +++ /dev/null @@ -1,142 +0,0 @@ -package rabbitmq - -// -// All credit to Mondo -// - -import ( - "strings" - "sync" - "time" - - "github.com/streadway/amqp" -) - -var ( - DefaultExchange = "micro" - DefaultRabbitURL = "amqp://guest:guest@127.0.0.1:5672" -) - -type rabbitMQConn struct { - Connection *amqp.Connection - Channel *rabbitMQChannel - notify chan bool - exchange string - url string - - connected bool - - mtx sync.Mutex - close chan bool - closed bool -} - -func newRabbitMQConn(exchange string, urls []string) *rabbitMQConn { - var url string - - if len(urls) > 0 && strings.HasPrefix(urls[0], "amqp://") { - url = urls[0] - } else { - url = DefaultRabbitURL - } - - if len(exchange) == 0 { - exchange = DefaultExchange - } - - return &rabbitMQConn{ - exchange: exchange, - url: url, - notify: make(chan bool, 1), - close: make(chan bool), - } -} - -func (r *rabbitMQConn) Init() chan bool { - go r.Connect(r.notify) - return r.notify -} - -func (r *rabbitMQConn) Connect(connected chan bool) { - for { - if err := r.tryToConnect(); err != nil { - time.Sleep(1 * time.Second) - continue - } - connected <- true - r.connected = true - notifyClose := make(chan *amqp.Error) - r.Connection.NotifyClose(notifyClose) - - // Block until we get disconnected, or shut down - select { - case <-notifyClose: - // Spin around and reconnect - r.connected = false - case <-r.close: - // Shut down connection - if err := r.Connection.Close(); err != nil { - } - r.connected = false - return - } - } -} - -func (r *rabbitMQConn) IsConnected() bool { - return r.connected -} - -func (r *rabbitMQConn) Close() { - r.mtx.Lock() - defer r.mtx.Unlock() - - if r.closed { - return - } - - close(r.close) - r.closed = true -} - -func (r *rabbitMQConn) tryToConnect() error { - var err error - r.Connection, err = amqp.Dial(r.url) - if err != nil { - return err - } - r.Channel, err = newRabbitChannel(r.Connection) - if err != nil { - return err - } - r.Channel.DeclareExchange(r.exchange) - return nil -} - -func (r *rabbitMQConn) Consume(queue string) (<-chan amqp.Delivery, error) { - consumerChannel, err := newRabbitChannel(r.Connection) - if err != nil { - return nil, err - } - - err = consumerChannel.DeclareQueue(queue) - if err != nil { - return nil, err - } - - deliveries, err := consumerChannel.ConsumeQueue(queue) - if err != nil { - return nil, err - } - - err = consumerChannel.BindQueue(queue, r.exchange) - if err != nil { - return nil, err - } - - return deliveries, nil -} - -func (r *rabbitMQConn) Publish(exchange, key string, msg amqp.Publishing) error { - return r.Channel.Publish(exchange, key, msg) -} diff --git a/transport/rabbitmq/rabbitmq.go b/transport/rabbitmq/rabbitmq.go deleted file mode 100644 index fd64ee22..00000000 --- a/transport/rabbitmq/rabbitmq.go +++ /dev/null @@ -1,249 +0,0 @@ -package rabbitmq - -import ( - "fmt" - "sync" - "time" - - "errors" - uuid "github.com/nu7hatch/gouuid" - "github.com/streadway/amqp" - - "github.com/micro/go-micro/transport" -) - -const ( - directReplyQueue = "amq.rabbitmq.reply-to" -) - -type rmqtport struct { - conn *rabbitMQConn - addrs []string - - once sync.Once - replyTo string - - sync.Mutex - inflight map[string]chan amqp.Delivery -} - -type rmqtportClient struct { - rt *rmqtport - addr string - corId string - reply chan amqp.Delivery -} - -type rmqtportSocket struct { - conn *rabbitMQConn - d *amqp.Delivery -} - -type rmqtportListener struct { - conn *rabbitMQConn - addr string -} - -func (r *rmqtportClient) Send(m *transport.Message) error { - if !r.rt.conn.IsConnected() { - return errors.New("Not connected to AMQP") - } - - headers := amqp.Table{} - for k, v := range m.Header { - headers[k] = v - } - - message := amqp.Publishing{ - CorrelationId: r.corId, - Timestamp: time.Now().UTC(), - Body: m.Body, - ReplyTo: r.rt.replyTo, - Headers: headers, - } - - if err := r.rt.conn.Publish("micro", r.addr, message); err != nil { - return err - } - - return nil -} - -func (r *rmqtportClient) Recv(m *transport.Message) error { - select { - case d := <-r.reply: - mr := &transport.Message{ - Header: make(map[string]string), - Body: d.Body, - } - - for k, v := range d.Headers { - mr.Header[k] = fmt.Sprintf("%v", v) - } - - *m = *mr - return nil - case <-time.After(time.Second * 10): - return errors.New("timed out") - } -} - -func (r *rmqtportClient) Close() error { - r.rt.popReq(r.corId) - return nil -} - -func (r *rmqtportSocket) Recv(m *transport.Message) error { - if m == nil { - return errors.New("message passed in is nil") - } - - mr := &transport.Message{ - Header: make(map[string]string), - Body: r.d.Body, - } - - for k, v := range r.d.Headers { - mr.Header[k] = fmt.Sprintf("%v", v) - } - - *m = *mr - return nil -} - -func (r *rmqtportSocket) Send(m *transport.Message) error { - msg := amqp.Publishing{ - CorrelationId: r.d.CorrelationId, - Timestamp: time.Now().UTC(), - Body: m.Body, - Headers: amqp.Table{}, - } - - for k, v := range m.Header { - msg.Headers[k] = v - } - - return r.conn.Publish("", r.d.ReplyTo, msg) -} - -func (r *rmqtportSocket) Close() error { - return nil -} - -func (r *rmqtportListener) Addr() string { - return r.addr -} - -func (r *rmqtportListener) Close() error { - r.conn.Close() - return nil -} - -func (r *rmqtportListener) Accept(fn func(transport.Socket)) error { - deliveries, err := r.conn.Consume(r.addr) - if err != nil { - return err - } - - handler := func(d amqp.Delivery) { - fn(&rmqtportSocket{ - d: &d, - conn: r.conn, - }) - } - - for d := range deliveries { - go handler(d) - } - - return nil -} - -func (r *rmqtport) putReq(id string) chan amqp.Delivery { - r.Lock() - ch := make(chan amqp.Delivery, 1) - r.inflight[id] = ch - r.Unlock() - return ch -} - -func (r *rmqtport) getReq(id string) chan amqp.Delivery { - r.Lock() - defer r.Unlock() - if ch, ok := r.inflight[id]; ok { - return ch - } - return nil -} - -func (r *rmqtport) popReq(id string) { - r.Lock() - defer r.Unlock() - if _, ok := r.inflight[id]; ok { - delete(r.inflight, id) - } -} - -func (r *rmqtport) init() { - <-r.conn.Init() - if err := r.conn.Channel.DeclareReplyQueue(r.replyTo); err != nil { - return - } - deliveries, err := r.conn.Channel.ConsumeQueue(r.replyTo) - if err != nil { - return - } - go func() { - for delivery := range deliveries { - go r.handle(delivery) - } - }() -} - -func (r *rmqtport) handle(delivery amqp.Delivery) { - ch := r.getReq(delivery.CorrelationId) - if ch == nil { - return - } - ch <- delivery -} - -func (r *rmqtport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { - id, err := uuid.NewV4() - if err != nil { - return nil, err - } - - r.once.Do(r.init) - - return &rmqtportClient{ - rt: r, - addr: addr, - corId: id.String(), - reply: r.putReq(id.String()), - }, nil -} - -func (r *rmqtport) Listen(addr string) (transport.Listener, error) { - id, err := uuid.NewV4() - if err != nil { - return nil, err - } - - conn := newRabbitMQConn("", r.addrs) - <-conn.Init() - - return &rmqtportListener{ - addr: id.String(), - conn: conn, - }, nil -} - -func NewTransport(addrs []string, opt ...transport.Option) transport.Transport { - return &rmqtport{ - conn: newRabbitMQConn("", addrs), - addrs: addrs, - replyTo: directReplyQueue, - inflight: make(map[string]chan amqp.Delivery), - } -}