diff --git a/broker/broker.go b/broker/broker.go index 1844a828..55eefa23 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -27,7 +27,7 @@ type Subscriber interface { type options struct{} -type Options func(*options) +type Option func(*options) var ( Address string @@ -35,13 +35,17 @@ var ( DefaultBroker Broker ) +func NewBroker(addrs []string, opt ...Option) Broker { + return newHttpBroker([]string{Address}, opt...) +} + func Init() error { if len(Id) == 0 { Id = "broker-" + uuid.NewUUID().String() } if DefaultBroker == nil { - DefaultBroker = NewHttpBroker([]string{Address}) + DefaultBroker = newHttpBroker([]string{Address}) } return DefaultBroker.Init() diff --git a/broker/http/http.go b/broker/http/http.go new file mode 100644 index 00000000..e76c6497 --- /dev/null +++ b/broker/http/http.go @@ -0,0 +1,11 @@ +package http + +// This is a hack + +import ( + "github.com/myodc/go-micro/broker" +) + +func NewBroker(addrs []string, opt ...broker.Option) broker.Broker { + return broker.NewBroker(addrs, opt...) +} diff --git a/broker/http_broker.go b/broker/http_broker.go index 4d2ae687..6eb30769 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -21,39 +21,54 @@ import ( "github.com/myodc/go-micro/registry" ) -type HttpBroker struct { +type httpBroker struct { id string address string - unsubscribe chan *HttpSubscriber + unsubscribe chan *httpSubscriber sync.RWMutex - subscribers map[string][]*HttpSubscriber + subscribers map[string][]*httpSubscriber running bool exit chan chan error } -type HttpSubscriber struct { +type httpSubscriber struct { id string topic string - ch chan *HttpSubscriber + ch chan *httpSubscriber fn func(*Message) svc registry.Service } var ( - SubPath = "/_sub" + DefaultSubPath = "/_sub" ) -func (h *HttpSubscriber) Topic() string { +func newHttpBroker(addrs []string, opt ...Option) Broker { + addr := ":0" + if len(addrs) > 0 { + addr = addrs[0] + } + + return &httpBroker{ + id: Id, + address: addr, + subscribers: make(map[string][]*httpSubscriber), + unsubscribe: make(chan *httpSubscriber), + exit: make(chan chan error), + } +} + +func (h *httpSubscriber) Topic() string { return h.topic } -func (h *HttpSubscriber) Unsubscribe() error { +func (h *httpSubscriber) Unsubscribe() error { h.ch <- h return nil } -func (h *HttpBroker) start() error { +func (h *httpBroker) start() error { h.Lock() defer h.Unlock() @@ -87,7 +102,7 @@ func (h *HttpBroker) start() error { h.stop() case subscriber := <-h.unsubscribe: h.Lock() - var subscribers []*HttpSubscriber + var subscribers []*httpSubscriber for _, sub := range h.subscribers[subscriber.topic] { if sub.id == subscriber.id { registry.Deregister(sub.svc) @@ -104,13 +119,13 @@ func (h *HttpBroker) start() error { return nil } -func (h *HttpBroker) stop() error { +func (h *httpBroker) stop() error { ch := make(chan error) h.exit <- ch return <-ch } -func (h *HttpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { +func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { if req.Method != "POST" { err := errors.BadRequest("go.micro.broker", "Method not allowed") http.Error(w, err.Error(), http.StatusMethodNotAllowed) @@ -148,28 +163,28 @@ func (h *HttpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { h.RUnlock() } -func (h *HttpBroker) Address() string { +func (h *httpBroker) Address() string { return h.address } -func (h *HttpBroker) Connect() error { +func (h *httpBroker) Connect() error { return h.start() } -func (h *HttpBroker) Disconnect() error { +func (h *httpBroker) Disconnect() error { return h.stop() } -func (h *HttpBroker) Init() error { +func (h *httpBroker) Init() error { if len(h.id) == 0 { h.id = "broker-" + uuid.NewUUID().String() } - http.Handle(SubPath, h) + http.Handle(DefaultSubPath, h) return nil } -func (h *HttpBroker) Publish(topic string, data []byte) error { +func (h *httpBroker) Publish(topic string, data []byte) error { s, err := registry.GetService("topic:" + topic) if err != nil { return err @@ -186,7 +201,7 @@ func (h *HttpBroker) Publish(topic string, data []byte) error { } for _, node := range s.Nodes() { - r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address(), node.Port(), SubPath), "application/json", bytes.NewBuffer(b)) + r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address(), node.Port(), DefaultSubPath), "application/json", bytes.NewBuffer(b)) if err == nil { r.Body.Close() } @@ -195,7 +210,7 @@ func (h *HttpBroker) Publish(topic string, data []byte) error { return nil } -func (h *HttpBroker) Subscribe(topic string, function func(*Message)) (Subscriber, error) { +func (h *httpBroker) Subscribe(topic string, function func(*Message)) (Subscriber, error) { // parse address for host, port parts := strings.Split(h.Address(), ":") host := strings.Join(parts[:len(parts)-1], ":") @@ -205,7 +220,7 @@ func (h *HttpBroker) Subscribe(topic string, function func(*Message)) (Subscribe node := registry.NewNode(h.id, host, port) service := registry.NewService("topic:"+topic, node) - subscriber := &HttpSubscriber{ + subscriber := &httpSubscriber{ id: uuid.NewUUID().String(), topic: topic, ch: h.unsubscribe, @@ -224,18 +239,3 @@ func (h *HttpBroker) Subscribe(topic string, function func(*Message)) (Subscribe return subscriber, nil } - -func NewHttpBroker(addrs []string, opts ...Options) Broker { - addr := ":0" - if len(addrs) > 0 { - addr = addrs[0] - } - - return &HttpBroker{ - id: Id, - address: addr, - subscribers: make(map[string][]*HttpSubscriber), - unsubscribe: make(chan *HttpSubscriber), - exit: make(chan chan error), - } -} diff --git a/broker/nats_broker.go b/broker/nats/nats.go similarity index 60% rename from broker/nats_broker.go rename to broker/nats/nats.go index 7ebff133..520b45a8 100644 --- a/broker/nats_broker.go +++ b/broker/nats/nats.go @@ -1,4 +1,4 @@ -package broker +package nats import ( "encoding/json" @@ -7,33 +7,34 @@ import ( "code.google.com/p/go-uuid/uuid" "github.com/apcera/nats" + "github.com/myodc/go-micro/broker" ) -type NatsBroker struct { +type nbroker struct { addrs []string conn *nats.Conn } -type NatsSubscriber struct { +type subscriber struct { s *nats.Subscription } -func (n *NatsSubscriber) Topic() string { +func (n *subscriber) Topic() string { return n.s.Subject } -func (n *NatsSubscriber) Unsubscribe() error { +func (n *subscriber) Unsubscribe() error { return n.s.Unsubscribe() } -func (n *NatsBroker) Address() string { +func (n *nbroker) Address() string { if len(n.addrs) > 0 { return n.addrs[0] } return "" } -func (n *NatsBroker) Connect() error { +func (n *nbroker) Connect() error { if n.conn != nil { return nil } @@ -48,17 +49,17 @@ func (n *NatsBroker) Connect() error { return nil } -func (n *NatsBroker) Disconnect() error { +func (n *nbroker) Disconnect() error { n.conn.Close() return nil } -func (n *NatsBroker) Init() error { +func (n *nbroker) Init() error { return nil } -func (n *NatsBroker) Publish(topic string, data []byte) error { - b, err := json.Marshal(&Message{ +func (n *nbroker) Publish(topic string, data []byte) error { + b, err := json.Marshal(&broker.Message{ Id: uuid.NewUUID().String(), Timestamp: time.Now().Unix(), Topic: topic, @@ -70,9 +71,9 @@ func (n *NatsBroker) Publish(topic string, data []byte) error { return n.conn.Publish(topic, b) } -func (n *NatsBroker) Subscribe(topic string, function func(*Message)) (Subscriber, error) { - subscriber, err := n.conn.Subscribe(topic, func(msg *nats.Msg) { - var data *Message +func (n *nbroker) Subscribe(topic string, function func(*broker.Message)) (broker.Subscriber, error) { + sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) { + var data *broker.Message if err := json.Unmarshal(msg.Data, &data); err != nil { return } @@ -81,10 +82,10 @@ func (n *NatsBroker) Subscribe(topic string, function func(*Message)) (Subscribe if err != nil { return nil, err } - return &NatsSubscriber{s: subscriber}, nil + return &subscriber{s: sub}, nil } -func NewNatsBroker(addrs []string, opts ...Options) Broker { +func NewBroker(addrs []string, opt ...broker.Option) broker.Broker { var cAddrs []string for _, addr := range addrs { if len(addr) == 0 { @@ -98,7 +99,7 @@ func NewNatsBroker(addrs []string, opts ...Options) Broker { if len(cAddrs) == 0 { cAddrs = []string{nats.DefaultURL} } - return &NatsBroker{ + return &nbroker{ addrs: cAddrs, } } diff --git a/client/client.go b/client/client.go index ec3ff421..542b8e8e 100644 --- a/client/client.go +++ b/client/client.go @@ -37,7 +37,7 @@ func CallRemote(ctx context.Context, address string, request Request, response i return DefaultClient.CallRemote(ctx, address, request, response) } -func New(opt ...Option) Client { +func NewClient(opt ...Option) Client { return newRpcClient(opt...) } diff --git a/cmd/cmd.go b/cmd/cmd.go index 4457c13c..1a7b8be2 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -14,6 +14,25 @@ import ( "github.com/myodc/go-micro/server" "github.com/myodc/go-micro/store" "github.com/myodc/go-micro/transport" + + // brokers + "github.com/myodc/go-micro/broker/http" + "github.com/myodc/go-micro/broker/nats" + + // registries + "github.com/myodc/go-micro/registry/consul" + "github.com/myodc/go-micro/registry/kubernetes" + + // stores + sconsul "github.com/myodc/go-micro/store/consul" + "github.com/myodc/go-micro/store/etcd" + "github.com/myodc/go-micro/store/memcached" + "github.com/myodc/go-micro/store/memory" + + // transport + thttp "github.com/myodc/go-micro/transport/http" + tnats "github.com/myodc/go-micro/transport/nats" + "github.com/myodc/go-micro/transport/rabbitmq" ) var ( @@ -78,43 +97,45 @@ func Setup(c *cli.Context) error { switch c.String("broker") { case "http": - broker.DefaultBroker = broker.NewHttpBroker(bAddrs) + broker.DefaultBroker = http.NewBroker(bAddrs) case "nats": - broker.DefaultBroker = broker.NewNatsBroker(bAddrs) + broker.DefaultBroker = nats.NewBroker(bAddrs) } rAddrs := strings.Split(c.String("registry_address"), ",") switch c.String("registry") { case "kubernetes": - registry.DefaultRegistry = registry.NewKubernetesRegistry(rAddrs) + registry.DefaultRegistry = kubernetes.NewRegistry(rAddrs) case "consul": - registry.DefaultRegistry = registry.NewConsulRegistry(rAddrs) + registry.DefaultRegistry = consul.NewRegistry(rAddrs) } sAddrs := strings.Split(c.String("store_address"), ",") switch c.String("store") { + case "consul": + store.DefaultStore = sconsul.NewStore(sAddrs) case "memcached": - store.DefaultStore = store.NewMemcacheStore(sAddrs) + store.DefaultStore = memcached.NewStore(sAddrs) case "memory": - store.DefaultStore = store.NewMemoryStore(sAddrs) + store.DefaultStore = memory.NewStore(sAddrs) case "etcd": - store.DefaultStore = store.NewEtcdStore(sAddrs) + store.DefaultStore = etcd.NewStore(sAddrs) } tAddrs := strings.Split(c.String("transport_address"), ",") switch c.String("transport") { case "http": - transport.DefaultTransport = transport.NewHttpTransport(tAddrs) + transport.DefaultTransport = thttp.NewTransport(tAddrs) case "rabbitmq": - transport.DefaultTransport = transport.NewRabbitMQTransport(tAddrs) + transport.DefaultTransport = rabbitmq.NewTransport(tAddrs) case "nats": - transport.DefaultTransport = transport.NewNatsTransport(tAddrs) + transport.DefaultTransport = tnats.NewTransport(tAddrs) } - client.DefaultClient = client.New() + client.DefaultClient = client.NewClient() return nil } diff --git a/registry/consul/consul.go b/registry/consul/consul.go new file mode 100644 index 00000000..03a605d1 --- /dev/null +++ b/registry/consul/consul.go @@ -0,0 +1,11 @@ +package consul + +// This is a hack + +import ( + "github.com/myodc/go-micro/registry" +) + +func NewRegistry(addrs []string, opt ...registry.Option) registry.Registry { + return registry.NewRegistry(addrs, opt...) +} diff --git a/registry/consul_node.go b/registry/consul_node.go index 5cfbbdc9..f8061364 100644 --- a/registry/consul_node.go +++ b/registry/consul_node.go @@ -1,20 +1,20 @@ package registry -type ConsulNode struct { +type consulNode struct { Node string NodeId string NodeAddress string NodePort int } -func (c *ConsulNode) Id() string { +func (c *consulNode) Id() string { return c.NodeId } -func (c *ConsulNode) Address() string { +func (c *consulNode) Address() string { return c.NodeAddress } -func (c *ConsulNode) Port() int { +func (c *consulNode) Port() int { return c.NodePort } diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 9abfb278..437bcd94 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -7,7 +7,7 @@ import ( consul "github.com/hashicorp/consul/api" ) -type ConsulRegistry struct { +type consulRegistry struct { Address string Client *consul.Client @@ -15,7 +15,24 @@ type ConsulRegistry struct { services map[string]Service } -func (c *ConsulRegistry) Deregister(s Service) error { +func newConsulRegistry(addrs []string, opts ...Option) Registry { + config := consul.DefaultConfig() + client, _ := consul.NewClient(config) + if len(addrs) > 0 { + config.Address = addrs[0] + } + + cr := &consulRegistry{ + Address: config.Address, + Client: client, + services: make(map[string]Service), + } + + cr.Watch() + return cr +} + +func (c *consulRegistry) Deregister(s Service) error { if len(s.Nodes()) == 0 { return errors.New("Require at least one node") } @@ -31,7 +48,7 @@ func (c *ConsulRegistry) Deregister(s Service) error { return err } -func (c *ConsulRegistry) Register(s Service) error { +func (c *consulRegistry) Register(s Service) error { if len(s.Nodes()) == 0 { return errors.New("Require at least one node") } @@ -51,7 +68,7 @@ func (c *ConsulRegistry) Register(s Service) error { return err } -func (c *ConsulRegistry) GetService(name string) (Service, error) { +func (c *consulRegistry) GetService(name string) (Service, error) { c.mtx.RLock() service, ok := c.services[name] c.mtx.RUnlock() @@ -65,7 +82,7 @@ func (c *ConsulRegistry) GetService(name string) (Service, error) { return nil, err } - cs := &ConsulService{} + cs := &consulService{} for _, s := range rsp { if s.ServiceName != name { @@ -73,7 +90,7 @@ func (c *ConsulRegistry) GetService(name string) (Service, error) { } cs.ServiceName = s.ServiceName - cs.ServiceNodes = append(cs.ServiceNodes, &ConsulNode{ + cs.ServiceNodes = append(cs.ServiceNodes, &consulNode{ Node: s.Node, NodeId: s.ServiceID, NodeAddress: s.Address, @@ -84,7 +101,7 @@ func (c *ConsulRegistry) GetService(name string) (Service, error) { return cs, nil } -func (c *ConsulRegistry) ListServices() ([]Service, error) { +func (c *consulRegistry) ListServices() ([]Service, error) { c.mtx.RLock() serviceMap := c.services c.mtx.RUnlock() @@ -104,29 +121,29 @@ func (c *ConsulRegistry) ListServices() ([]Service, error) { } for service, _ := range rsp { - services = append(services, &ConsulService{ServiceName: service}) + services = append(services, &consulService{ServiceName: service}) } return services, nil } -func (c *ConsulRegistry) NewService(name string, nodes ...Node) Service { - var snodes []*ConsulNode +func (c *consulRegistry) NewService(name string, nodes ...Node) Service { + var snodes []*consulNode for _, node := range nodes { - if n, ok := node.(*ConsulNode); ok { + if n, ok := node.(*consulNode); ok { snodes = append(snodes, n) } } - return &ConsulService{ + return &consulService{ ServiceName: name, ServiceNodes: snodes, } } -func (c *ConsulRegistry) NewNode(id, address string, port int) Node { - return &ConsulNode{ +func (c *consulRegistry) NewNode(id, address string, port int) Node { + return &consulNode{ Node: id, NodeId: id, NodeAddress: address, @@ -134,23 +151,6 @@ func (c *ConsulRegistry) NewNode(id, address string, port int) Node { } } -func (c *ConsulRegistry) Watch() { - NewConsulWatcher(c) -} - -func NewConsulRegistry(addrs []string, opts ...Options) Registry { - config := consul.DefaultConfig() - client, _ := consul.NewClient(config) - if len(addrs) > 0 { - config.Address = addrs[0] - } - - cr := &ConsulRegistry{ - Address: config.Address, - Client: client, - services: make(map[string]Service), - } - - cr.Watch() - return cr +func (c *consulRegistry) Watch() { + newConsulWatcher(c) } diff --git a/registry/consul_service.go b/registry/consul_service.go index 4b9b88c2..697137bd 100644 --- a/registry/consul_service.go +++ b/registry/consul_service.go @@ -1,15 +1,15 @@ package registry -type ConsulService struct { +type consulService struct { ServiceName string - ServiceNodes []*ConsulNode + ServiceNodes []*consulNode } -func (c *ConsulService) Name() string { +func (c *consulService) Name() string { return c.ServiceName } -func (c *ConsulService) Nodes() []Node { +func (c *consulService) Nodes() []Node { var nodes []Node for _, node := range c.ServiceNodes { diff --git a/registry/consul_watcher.go b/registry/consul_watcher.go index 5510968d..5dcf7ff1 100644 --- a/registry/consul_watcher.go +++ b/registry/consul_watcher.go @@ -5,8 +5,8 @@ import ( "github.com/hashicorp/consul/watch" ) -type ConsulWatcher struct { - Registry *ConsulRegistry +type consulWatcher struct { + Registry *consulRegistry wp *watch.WatchPlan watchers map[string]*watch.WatchPlan } @@ -15,17 +15,33 @@ type serviceWatcher struct { name string } -func (cw *ConsulWatcher) serviceHandler(idx uint64, data interface{}) { +func newConsulWatcher(cr *consulRegistry) *consulWatcher { + cw := &consulWatcher{ + Registry: cr, + watchers: make(map[string]*watch.WatchPlan), + } + + wp, err := watch.Parse(map[string]interface{}{"type": "services"}) + if err == nil { + wp.Handler = cw.Handle + go wp.Run(cr.Address) + cw.wp = wp + } + + return cw +} + +func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) { entries, ok := data.([]*api.ServiceEntry) if !ok { return } - cs := &ConsulService{} + cs := &consulService{} for _, e := range entries { cs.ServiceName = e.Service.Service - cs.ServiceNodes = append(cs.ServiceNodes, &ConsulNode{ + cs.ServiceNodes = append(cs.ServiceNodes, &consulNode{ Node: e.Node.Node, NodeId: e.Service.ID, NodeAddress: e.Node.Address, @@ -38,7 +54,7 @@ func (cw *ConsulWatcher) serviceHandler(idx uint64, data interface{}) { cw.Registry.mtx.Unlock() } -func (cw *ConsulWatcher) Handle(idx uint64, data interface{}) { +func (cw *consulWatcher) Handle(idx uint64, data interface{}) { services, ok := data.(map[string][]string) if !ok { return @@ -82,25 +98,9 @@ func (cw *ConsulWatcher) Handle(idx uint64, data interface{}) { } } -func (cw *ConsulWatcher) Stop() { +func (cw *consulWatcher) Stop() { if cw.wp == nil { return } cw.wp.Stop() } - -func NewConsulWatcher(cr *ConsulRegistry) *ConsulWatcher { - cw := &ConsulWatcher{ - Registry: cr, - watchers: make(map[string]*watch.WatchPlan), - } - - wp, err := watch.Parse(map[string]interface{}{"type": "services"}) - if err == nil { - wp.Handler = cw.Handle - go wp.Run(cr.Address) - cw.wp = wp - } - - return cw -} diff --git a/registry/kubernetes/kubernetes.go b/registry/kubernetes/kubernetes.go new file mode 100644 index 00000000..3634f5f5 --- /dev/null +++ b/registry/kubernetes/kubernetes.go @@ -0,0 +1,139 @@ +package kubernetes + +import ( + "fmt" + "os" + "sync" + + "github.com/myodc/go-micro/registry" + + k8s "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +type kregistry struct { + client *k8s.Client + namespace string + + mtx sync.RWMutex + services map[string]registry.Service +} + +func (c *kregistry) Watch() { + newWatcher(c) +} + +func (c *kregistry) Deregister(s registry.Service) error { + return nil +} + +func (c *kregistry) Register(s registry.Service) error { + return nil +} + +func (c *kregistry) GetService(name string) (registry.Service, error) { + c.mtx.RLock() + svc, ok := c.services[name] + c.mtx.RUnlock() + + if ok { + return svc, nil + } + + selector := labels.SelectorFromSet(labels.Set{"name": name}) + + services, err := c.client.Services(c.namespace).List(selector) + if err != nil { + return nil, err + } + + if len(services.Items) == 0 { + return nil, fmt.Errorf("Service not found") + } + + ks := &service{name: name} + for _, item := range services.Items { + ks.nodes = append(ks.nodes, &node{ + address: item.Spec.PortalIP, + port: item.Spec.Ports[0].Port, + }) + } + + return ks, nil +} + +func (c *kregistry) ListServices() ([]registry.Service, error) { + c.mtx.RLock() + serviceMap := c.services + c.mtx.RUnlock() + + var services []registry.Service + + if len(serviceMap) > 0 { + for _, service := range serviceMap { + services = append(services, service) + } + return services, nil + } + + rsp, err := c.client.Services(c.namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + + for _, svc := range rsp.Items { + if len(svc.ObjectMeta.Labels["name"]) == 0 { + continue + } + + services = append(services, &service{ + name: svc.ObjectMeta.Labels["name"], + }) + } + + return services, nil +} + +func (c *kregistry) NewService(name string, nodes ...registry.Node) registry.Service { + var snodes []*node + + for _, nod := range nodes { + if n, ok := nod.(*node); ok { + snodes = append(snodes, n) + } + } + + return &service{ + name: name, + nodes: snodes, + } +} + +func (c *kregistry) NewNode(id, address string, port int) registry.Node { + return &node{ + id: id, + address: address, + port: port, + } +} + +func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry { + host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT") + if len(addrs) > 0 { + host = addrs[0] + } + + client, _ := k8s.New(&k8s.Config{ + Host: host, + }) + + kr := &kregistry{ + client: client, + namespace: "default", + services: make(map[string]registry.Service), + } + + kr.Watch() + + return kr +} diff --git a/registry/kubernetes/node.go b/registry/kubernetes/node.go new file mode 100644 index 00000000..0d26a32e --- /dev/null +++ b/registry/kubernetes/node.go @@ -0,0 +1,19 @@ +package kubernetes + +type node struct { + id string + address string + port int +} + +func (n *node) Id() string { + return n.id +} + +func (n *node) Address() string { + return n.address +} + +func (n *node) Port() int { + return n.port +} diff --git a/registry/kubernetes/service.go b/registry/kubernetes/service.go new file mode 100644 index 00000000..304235e2 --- /dev/null +++ b/registry/kubernetes/service.go @@ -0,0 +1,24 @@ +package kubernetes + +import ( + "github.com/myodc/go-micro/registry" +) + +type service struct { + name string + nodes []*node +} + +func (s *service) Name() string { + return s.name +} + +func (s *service) Nodes() []registry.Node { + var nodes []registry.Node + + for _, node := range s.nodes { + nodes = append(nodes, node) + } + + return nodes +} diff --git a/registry/kubernetes/watcher.go b/registry/kubernetes/watcher.go new file mode 100644 index 00000000..d8c53766 --- /dev/null +++ b/registry/kubernetes/watcher.go @@ -0,0 +1,72 @@ +package kubernetes + +import ( + "fmt" + "net" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +type watcher struct { + registry *kregistry +} + +func (k *watcher) OnUpdate(services []api.Service) { + fmt.Println("got update") + activeServices := util.StringSet{} + for _, svc := range services { + fmt.Printf("%#v\n", svc.ObjectMeta) + name, exists := svc.ObjectMeta.Labels["name"] + if !exists { + continue + } + + activeServices.Insert(name) + serviceIP := net.ParseIP(svc.Spec.PortalIP) + + ks := &service{ + name: name, + nodes: []*node{ + &node{ + address: serviceIP.String(), + port: svc.Spec.Ports[0].Port, + }, + }, + } + + k.registry.mtx.Lock() + k.registry.services[name] = ks + k.registry.mtx.Unlock() + } + + k.registry.mtx.Lock() + defer k.registry.mtx.Unlock() + for name, _ := range k.registry.services { + if !activeServices.Has(name) { + delete(k.registry.services, name) + } + } +} + +func newWatcher(kr *kregistry) *watcher { + serviceConfig := config.NewServiceConfig() + endpointsConfig := config.NewEndpointsConfig() + + config.NewSourceAPI( + kr.client.Services(api.NamespaceAll), + kr.client.Endpoints(api.NamespaceAll), + time.Second*10, + serviceConfig.Channel("api"), + endpointsConfig.Channel("api"), + ) + + ks := &watcher{ + registry: kr, + } + + serviceConfig.RegisterHandler(ks) + return ks +} diff --git a/registry/kubernetes_node.go b/registry/kubernetes_node.go deleted file mode 100644 index aa758a6b..00000000 --- a/registry/kubernetes_node.go +++ /dev/null @@ -1,19 +0,0 @@ -package registry - -type KubernetesNode struct { - NodeId string - NodeAddress string - NodePort int -} - -func (c *KubernetesNode) Id() string { - return c.NodeId -} - -func (c *KubernetesNode) Address() string { - return c.NodeAddress -} - -func (c *KubernetesNode) Port() int { - return c.NodePort -} diff --git a/registry/kubernetes_registry.go b/registry/kubernetes_registry.go deleted file mode 100644 index 6cdf016e..00000000 --- a/registry/kubernetes_registry.go +++ /dev/null @@ -1,137 +0,0 @@ -package registry - -import ( - "fmt" - "os" - "sync" - - k8s "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" -) - -type KubernetesRegistry struct { - Client *k8s.Client - Namespace string - - mtx sync.RWMutex - services map[string]Service -} - -func (c *KubernetesRegistry) Watch() { - NewKubernetesWatcher(c) -} - -func (c *KubernetesRegistry) Deregister(s Service) error { - return nil -} - -func (c *KubernetesRegistry) Register(s Service) error { - return nil -} - -func (c *KubernetesRegistry) GetService(name string) (Service, error) { - c.mtx.RLock() - service, ok := c.services[name] - c.mtx.RUnlock() - - if ok { - return service, nil - } - - selector := labels.SelectorFromSet(labels.Set{"name": name}) - - services, err := c.Client.Services(c.Namespace).List(selector) - if err != nil { - return nil, err - } - - if len(services.Items) == 0 { - return nil, fmt.Errorf("Service not found") - } - - ks := &KubernetesService{ServiceName: name} - for _, item := range services.Items { - ks.ServiceNodes = append(ks.ServiceNodes, &KubernetesNode{ - NodeAddress: item.Spec.PortalIP, - NodePort: item.Spec.Ports[0].Port, - }) - } - - return ks, nil -} - -func (c *KubernetesRegistry) ListServices() ([]Service, error) { - c.mtx.RLock() - serviceMap := c.services - c.mtx.RUnlock() - - var services []Service - - if len(serviceMap) > 0 { - for _, service := range serviceMap { - services = append(services, service) - } - return services, nil - } - - rsp, err := c.Client.Services(c.Namespace).List(labels.Everything()) - if err != nil { - return nil, err - } - - for _, service := range rsp.Items { - if len(service.ObjectMeta.Labels["name"]) == 0 { - continue - } - - services = append(services, &KubernetesService{ - ServiceName: service.ObjectMeta.Labels["name"], - }) - } - - return services, nil -} - -func (c *KubernetesRegistry) NewService(name string, nodes ...Node) Service { - var snodes []*KubernetesNode - - for _, node := range nodes { - if n, ok := node.(*KubernetesNode); ok { - snodes = append(snodes, n) - } - } - - return &KubernetesService{ - ServiceName: name, - ServiceNodes: snodes, - } -} - -func (c *KubernetesRegistry) NewNode(id, address string, port int) Node { - return &KubernetesNode{ - NodeId: id, - NodeAddress: address, - NodePort: port, - } -} - -func NewKubernetesRegistry(addrs []string, opts ...Options) Registry { - host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT") - if len(addrs) > 0 { - host = addrs[0] - } - - client, _ := k8s.New(&k8s.Config{ - Host: host, - }) - - kr := &KubernetesRegistry{ - Client: client, - Namespace: "default", - services: make(map[string]Service), - } - - kr.Watch() - - return kr -} diff --git a/registry/kubernetes_service.go b/registry/kubernetes_service.go deleted file mode 100644 index c81c7ae2..00000000 --- a/registry/kubernetes_service.go +++ /dev/null @@ -1,20 +0,0 @@ -package registry - -type KubernetesService struct { - ServiceName string - ServiceNodes []*KubernetesNode -} - -func (c *KubernetesService) Name() string { - return c.ServiceName -} - -func (c *KubernetesService) Nodes() []Node { - var nodes []Node - - for _, node := range c.ServiceNodes { - nodes = append(nodes, node) - } - - return nodes -} diff --git a/registry/kubernetes_watcher.go b/registry/kubernetes_watcher.go deleted file mode 100644 index 319d8132..00000000 --- a/registry/kubernetes_watcher.go +++ /dev/null @@ -1,72 +0,0 @@ -package registry - -import ( - "fmt" - "net" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" -) - -type KubernetesWatcher struct { - Registry *KubernetesRegistry -} - -func (k *KubernetesWatcher) OnUpdate(services []api.Service) { - fmt.Println("got update") - activeServices := util.StringSet{} - for _, service := range services { - fmt.Printf("%#v\n", service.ObjectMeta) - name, exists := service.ObjectMeta.Labels["name"] - if !exists { - continue - } - - activeServices.Insert(name) - serviceIP := net.ParseIP(service.Spec.PortalIP) - - ks := &KubernetesService{ - ServiceName: name, - ServiceNodes: []*KubernetesNode{ - &KubernetesNode{ - NodeAddress: serviceIP.String(), - NodePort: service.Spec.Ports[0].Port, - }, - }, - } - - k.Registry.mtx.Lock() - k.Registry.services[name] = ks - k.Registry.mtx.Unlock() - } - - k.Registry.mtx.Lock() - defer k.Registry.mtx.Unlock() - for name, _ := range k.Registry.services { - if !activeServices.Has(name) { - delete(k.Registry.services, name) - } - } -} - -func NewKubernetesWatcher(kr *KubernetesRegistry) *KubernetesWatcher { - serviceConfig := config.NewServiceConfig() - endpointsConfig := config.NewEndpointsConfig() - - config.NewSourceAPI( - kr.Client.Services(api.NamespaceAll), - kr.Client.Endpoints(api.NamespaceAll), - time.Second*10, - serviceConfig.Channel("api"), - endpointsConfig.Channel("api"), - ) - - ks := &KubernetesWatcher{ - Registry: kr, - } - - serviceConfig.RegisterHandler(ks) - return ks -} diff --git a/registry/registry.go b/registry/registry.go index 60295b73..75d0857d 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -11,12 +11,16 @@ type Registry interface { type options struct{} -type Options func(*options) +type Option func(*options) var ( - DefaultRegistry = NewConsulRegistry([]string{}) + DefaultRegistry = newConsulRegistry([]string{}) ) +func NewRegistry(addrs []string, opt ...Option) Registry { + return newConsulRegistry(addrs, opt...) +} + func Register(s Service) error { return DefaultRegistry.Register(s) } diff --git a/server/server.go b/server/server.go index ecdb03c9..f9d12fde 100644 --- a/server/server.go +++ b/server/server.go @@ -60,7 +60,7 @@ func Init() error { return DefaultServer.Init() } -func New(address string, opt ...Option) Server { +func NewServer(address string, opt ...Option) Server { return newRpcServer(address, opt...) } diff --git a/store/consul/consul.go b/store/consul/consul.go new file mode 100644 index 00000000..4c103b17 --- /dev/null +++ b/store/consul/consul.go @@ -0,0 +1,11 @@ +package consul + +// This is a hack + +import ( + "github.com/myodc/go-micro/store" +) + +func NewStore(addrs []string, opt ...store.Option) store.Store { + return store.NewStore(addrs, opt...) +} diff --git a/store/consul_item.go b/store/consul_item.go index 00c5c718..3bffc6d5 100644 --- a/store/consul_item.go +++ b/store/consul_item.go @@ -1,14 +1,14 @@ package store -type ConsulItem struct { +type consulItem struct { key string value []byte } -func (c *ConsulItem) Key() string { +func (c *consulItem) Key() string { return c.key } -func (c *ConsulItem) Value() []byte { +func (c *consulItem) Value() []byte { return c.value } diff --git a/store/consul_store.go b/store/consul_store.go index 194fa06a..a51bd61b 100644 --- a/store/consul_store.go +++ b/store/consul_store.go @@ -6,11 +6,11 @@ import ( consul "github.com/hashicorp/consul/api" ) -type ConsulStore struct { +type consulStore struct { Client *consul.Client } -func (c *ConsulStore) Get(key string) (Item, error) { +func (c *consulStore) Get(key string) (Item, error) { kv, _, err := c.Client.KV().Get(key, nil) if err != nil { return nil, err @@ -19,18 +19,18 @@ func (c *ConsulStore) Get(key string) (Item, error) { return nil, errors.New("key not found") } - return &ConsulItem{ + return &consulItem{ key: kv.Key, value: kv.Value, }, nil } -func (c *ConsulStore) Del(key string) error { +func (c *consulStore) Del(key string) error { _, err := c.Client.KV().Delete(key, nil) return err } -func (c *ConsulStore) Put(item Item) error { +func (c *consulStore) Put(item Item) error { _, err := c.Client.KV().Put(&consul.KVPair{ Key: item.Key(), Value: item.Value(), @@ -39,14 +39,14 @@ func (c *ConsulStore) Put(item Item) error { return err } -func (c *ConsulStore) NewItem(key string, value []byte) Item { - return &ConsulItem{ +func (c *consulStore) NewItem(key string, value []byte) Item { + return &consulItem{ key: key, value: value, } } -func NewConsulStore(addrs []string, opts ...Options) Store { +func newConsulStore(addrs []string, opt ...Option) Store { config := consul.DefaultConfig() if len(addrs) > 0 { config.Address = addrs[0] @@ -54,7 +54,7 @@ func NewConsulStore(addrs []string, opts ...Options) Store { client, _ := consul.NewClient(config) - return &ConsulStore{ + return &consulStore{ Client: client, } } diff --git a/store/etcd_store.go b/store/etcd/etcd.go similarity index 51% rename from store/etcd_store.go rename to store/etcd/etcd.go index a221f086..03f3b8db 100644 --- a/store/etcd_store.go +++ b/store/etcd/etcd.go @@ -1,16 +1,30 @@ -package store +package etcd import ( "errors" "github.com/coreos/go-etcd/etcd" + "github.com/myodc/go-micro/store" ) -type EtcdStore struct { +type estore struct { Client *etcd.Client } -func (e *EtcdStore) Get(key string) (Item, error) { +type item struct { + key string + value []byte +} + +func (i *item) Key() string { + return i.key +} + +func (i *item) Value() []byte { + return i.value +} + +func (e *estore) Get(key string) (store.Item, error) { kv, err := e.Client.Get(key, false, false) if err != nil { return nil, err @@ -19,38 +33,38 @@ func (e *EtcdStore) Get(key string) (Item, error) { return nil, errors.New("key not found") } - return &EtcdItem{ + return &item{ key: kv.Node.Key, value: []byte(kv.Node.Value), }, nil } -func (e *EtcdStore) Del(key string) error { +func (e *estore) Del(key string) error { _, err := e.Client.Delete(key, false) return err } -func (e *EtcdStore) Put(item Item) error { +func (e *estore) Put(item store.Item) error { _, err := e.Client.Set(item.Key(), string(item.Value()), 0) return err } -func (e *EtcdStore) NewItem(key string, value []byte) Item { - return &EtcdItem{ +func (e *estore) NewItem(key string, value []byte) store.Item { + return &item{ key: key, value: value, } } -func NewEtcdStore(addrs []string, opts ...Options) Store { +func NewStore(addrs []string, opts ...store.Option) store.Store { if len(addrs) == 0 { addrs = []string{"127.0.0.1:2379"} } client := etcd.NewClient(addrs) - return &EtcdStore{ + return &estore{ Client: client, } } diff --git a/store/etcd_item.go b/store/etcd_item.go deleted file mode 100644 index 979d9511..00000000 --- a/store/etcd_item.go +++ /dev/null @@ -1,14 +0,0 @@ -package store - -type EtcdItem struct { - key string - value []byte -} - -func (c *EtcdItem) Key() string { - return c.key -} - -func (c *EtcdItem) Value() []byte { - return c.value -} diff --git a/store/memcached_store.go b/store/memcached/memcached.go similarity index 53% rename from store/memcached_store.go rename to store/memcached/memcached.go index 139e5218..95530141 100644 --- a/store/memcached_store.go +++ b/store/memcached/memcached.go @@ -1,16 +1,30 @@ -package store +package memcached import ( "errors" mc "github.com/bradfitz/gomemcache/memcache" + "github.com/myodc/go-micro/store" ) -type MemcacheStore struct { +type mstore struct { Client *mc.Client } -func (m *MemcacheStore) Get(key string) (Item, error) { +type item struct { + key string + value []byte +} + +func (i *item) Key() string { + return i.key +} + +func (i *item) Value() []byte { + return i.value +} + +func (m *mstore) Get(key string) (store.Item, error) { kv, err := m.Client.Get(key) if err != nil && err == mc.ErrCacheMiss { return nil, errors.New("key not found") @@ -22,35 +36,35 @@ func (m *MemcacheStore) Get(key string) (Item, error) { return nil, errors.New("key not found") } - return &MemcacheItem{ + return &item{ key: kv.Key, value: kv.Value, }, nil } -func (m *MemcacheStore) Del(key string) error { +func (m *mstore) Del(key string) error { return m.Client.Delete(key) } -func (m *MemcacheStore) Put(item Item) error { +func (m *mstore) Put(item store.Item) error { return m.Client.Set(&mc.Item{ Key: item.Key(), Value: item.Value(), }) } -func (m *MemcacheStore) NewItem(key string, value []byte) Item { - return &MemcacheItem{ +func (m *mstore) NewItem(key string, value []byte) store.Item { + return &item{ key: key, value: value, } } -func NewMemcacheStore(addrs []string, opts ...Options) Store { +func NewStore(addrs []string, opts ...store.Option) store.Store { if len(addrs) == 0 { addrs = []string{"127.0.0.1:11211"} } - return &MemcacheStore{ + return &mstore{ Client: mc.New(addrs...), } } diff --git a/store/memcached_item.go b/store/memcached_item.go deleted file mode 100644 index 7b35cf7a..00000000 --- a/store/memcached_item.go +++ /dev/null @@ -1,14 +0,0 @@ -package store - -type MemcacheItem struct { - key string - value []byte -} - -func (m *MemcacheItem) Key() string { - return m.key -} - -func (m *MemcacheItem) Value() []byte { - return m.value -} diff --git a/store/memory/memory.go b/store/memory/memory.go new file mode 100644 index 00000000..e5e84fb1 --- /dev/null +++ b/store/memory/memory.go @@ -0,0 +1,63 @@ +package memory + +import ( + "errors" + "sync" + + "github.com/myodc/go-micro/store" +) + +type mstore struct { + sync.RWMutex + store map[string]store.Item +} + +type item struct { + key string + value []byte +} + +func (i *item) Key() string { + return i.key +} + +func (i *item) Value() []byte { + return i.value +} + +func (m *mstore) Get(key string) (store.Item, error) { + m.RLock() + v, ok := m.store[key] + m.RUnlock() + if !ok { + return nil, errors.New("key not found") + } + return v, nil +} + +func (m *mstore) Del(key string) error { + m.Lock() + delete(m.store, key) + m.Unlock() + return nil +} + +func (m *mstore) Put(item store.Item) error { + m.Lock() + m.store[item.Key()] = item + m.Unlock() + return nil +} + +func (m *mstore) NewItem(key string, value []byte) store.Item { + return &item{ + key: key, + value: value, + } +} + +func NewStore(addrs []string, opt ...store.Option) store.Store { + return &mstore{ + store: make(map[string]store.Item), + } +} diff --git a/store/memory_item.go b/store/memory_item.go deleted file mode 100644 index c842ca55..00000000 --- a/store/memory_item.go +++ /dev/null @@ -1,14 +0,0 @@ -package store - -type MemoryItem struct { - key string - value []byte -} - -func (m *MemoryItem) Key() string { - return m.key -} - -func (m *MemoryItem) Value() []byte { - return m.value -} diff --git a/store/memory_store.go b/store/memory_store.go deleted file mode 100644 index d85a0d68..00000000 --- a/store/memory_store.go +++ /dev/null @@ -1,48 +0,0 @@ -package store - -import ( - "errors" - "sync" -) - -type MemoryStore struct { - sync.RWMutex - store map[string]Item -} - -func (m *MemoryStore) Get(key string) (Item, error) { - m.RLock() - v, ok := m.store[key] - m.RUnlock() - if !ok { - return nil, errors.New("key not found") - } - return v, nil -} - -func (m *MemoryStore) Del(key string) error { - m.Lock() - delete(m.store, key) - m.Unlock() - return nil -} - -func (m *MemoryStore) Put(item Item) error { - m.Lock() - m.store[item.Key()] = item - m.Unlock() - return nil -} - -func (m *MemoryStore) NewItem(key string, value []byte) Item { - return &MemoryItem{ - key: key, - value: value, - } -} - -func NewMemoryStore(addrs []string, opts ...Options) Store { - return &MemoryStore{ - store: make(map[string]Item), - } -} diff --git a/store/store.go b/store/store.go index 6cbaadb3..389604a7 100644 --- a/store/store.go +++ b/store/store.go @@ -9,12 +9,16 @@ type Store interface { type options struct{} -type Options func(*options) +type Option func(*options) var ( - DefaultStore = NewConsulStore([]string{}) + DefaultStore = newConsulStore([]string{}) ) +func NewStore(addrs []string, opt ...Option) Store { + return newConsulStore(addrs, opt...) +} + func Get(key string) (Item, error) { return DefaultStore.Get(key) } diff --git a/transport/http/http.go b/transport/http/http.go new file mode 100644 index 00000000..9ec56681 --- /dev/null +++ b/transport/http/http.go @@ -0,0 +1,11 @@ +package http + +// This is a hack + +import ( + "github.com/myodc/go-micro/transport" +) + +func NewTransport(addrs []string, opt ...transport.Option) transport.Transport { + return transport.NewTransport(addrs, opt...) +} diff --git a/transport/http_transport.go b/transport/http_transport.go index 48f004da..f17f3d2c 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -13,21 +13,21 @@ type headerRoundTripper struct { r http.RoundTripper } -type HttpTransport struct { +type httpTransport struct { client *http.Client } -type HttpTransportClient struct { - ht *HttpTransport +type httpTransportClient struct { + ht *httpTransport addr string } -type HttpTransportSocket struct { +type httpTransportSocket struct { r *http.Request w http.ResponseWriter } -type HttpTransportListener struct { +type httpTransportListener struct { listener net.Listener } @@ -36,7 +36,7 @@ func (t *headerRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) return t.r.RoundTrip(r) } -func (h *HttpTransportClient) Send(m *Message) (*Message, error) { +func (h *httpTransportClient) Send(m *Message) (*Message, error) { header := make(http.Header) for k, v := range m.Header { @@ -88,11 +88,11 @@ func (h *HttpTransportClient) Send(m *Message) (*Message, error) { return mr, nil } -func (h *HttpTransportClient) Close() error { +func (h *httpTransportClient) Close() error { return nil } -func (h *HttpTransportSocket) Recv(m *Message) error { +func (h *httpTransportSocket) Recv(m *Message) error { if m == nil { return errors.New("message passed in is nil") } @@ -119,7 +119,7 @@ func (h *HttpTransportSocket) Recv(m *Message) error { return nil } -func (h *HttpTransportSocket) Send(m *Message) error { +func (h *httpTransportSocket) Send(m *Message) error { for k, v := range m.Header { h.w.Header().Set(k, v) } @@ -128,22 +128,22 @@ func (h *HttpTransportSocket) Send(m *Message) error { return err } -func (h *HttpTransportSocket) Close() error { +func (h *httpTransportSocket) Close() error { return nil } -func (h *HttpTransportListener) Addr() string { +func (h *httpTransportListener) Addr() string { return h.listener.Addr().String() } -func (h *HttpTransportListener) Close() error { +func (h *httpTransportListener) Close() error { return h.listener.Close() } -func (h *HttpTransportListener) Accept(fn func(Socket)) error { +func (h *httpTransportListener) Accept(fn func(Socket)) error { srv := &http.Server{ Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fn(&HttpTransportSocket{ + fn(&httpTransportSocket{ r: r, w: w, }) @@ -153,27 +153,27 @@ func (h *HttpTransportListener) Accept(fn func(Socket)) error { return srv.Serve(h.listener) } -func (h *HttpTransport) Dial(addr string) (Client, error) { - return &HttpTransportClient{ +func (h *httpTransport) Dial(addr string) (Client, error) { + return &httpTransportClient{ ht: h, addr: addr, }, nil } -func (h *HttpTransport) Listen(addr string) (Listener, error) { +func (h *httpTransport) Listen(addr string) (Listener, error) { l, err := net.Listen("tcp", addr) if err != nil { return nil, err } - return &HttpTransportListener{ + return &httpTransportListener{ listener: l, }, nil } -func NewHttpTransport(addrs []string) *HttpTransport { +func newHttpTransport(addrs []string, opt ...Option) *httpTransport { client := &http.Client{} client.Transport = &headerRoundTripper{http.DefaultTransport} - return &HttpTransport{client: client} + return &httpTransport{client: client} } diff --git a/transport/nats_transport.go b/transport/nats/nats.go similarity index 62% rename from transport/nats_transport.go rename to transport/nats/nats.go index faa0296e..ce53b311 100644 --- a/transport/nats_transport.go +++ b/transport/nats/nats.go @@ -1,4 +1,4 @@ -package transport +package nats import ( "encoding/json" @@ -7,29 +7,30 @@ import ( "time" "github.com/apcera/nats" + "github.com/myodc/go-micro/transport" ) -type NatsTransport struct { +type ntport struct { addrs []string } -type NatsTransportClient struct { +type ntportClient struct { conn *nats.Conn addr string } -type NatsTransportSocket struct { +type ntportSocket struct { conn *nats.Conn m *nats.Msg } -type NatsTransportListener struct { +type ntportListener struct { conn *nats.Conn addr string exit chan bool } -func (n *NatsTransportClient) Send(m *Message) (*Message, error) { +func (n *ntportClient) Send(m *transport.Message) (*transport.Message, error) { b, err := json.Marshal(m) if err != nil { return nil, err @@ -40,7 +41,7 @@ func (n *NatsTransportClient) Send(m *Message) (*Message, error) { return nil, err } - var mr *Message + var mr *transport.Message if err := json.Unmarshal(rsp.Data, &mr); err != nil { return nil, err } @@ -48,12 +49,12 @@ func (n *NatsTransportClient) Send(m *Message) (*Message, error) { return mr, nil } -func (n *NatsTransportClient) Close() error { +func (n *ntportClient) Close() error { n.conn.Close() return nil } -func (n *NatsTransportSocket) Recv(m *Message) error { +func (n *ntportSocket) Recv(m *transport.Message) error { if m == nil { return errors.New("message passed in is nil") } @@ -64,7 +65,7 @@ func (n *NatsTransportSocket) Recv(m *Message) error { return nil } -func (n *NatsTransportSocket) Send(m *Message) error { +func (n *ntportSocket) Send(m *transport.Message) error { b, err := json.Marshal(m) if err != nil { return err @@ -72,23 +73,23 @@ func (n *NatsTransportSocket) Send(m *Message) error { return n.conn.Publish(n.m.Reply, b) } -func (n *NatsTransportSocket) Close() error { +func (n *ntportSocket) Close() error { return nil } -func (n *NatsTransportListener) Addr() string { +func (n *ntportListener) Addr() string { return n.addr } -func (n *NatsTransportListener) Close() error { +func (n *ntportListener) Close() error { n.exit <- true n.conn.Close() return nil } -func (n *NatsTransportListener) Accept(fn func(Socket)) error { +func (n *ntportListener) Accept(fn func(transport.Socket)) error { s, err := n.conn.Subscribe(n.addr, func(m *nats.Msg) { - fn(&NatsTransportSocket{ + fn(&ntportSocket{ conn: n.conn, m: m, }) @@ -101,7 +102,7 @@ func (n *NatsTransportListener) Accept(fn func(Socket)) error { return s.Unsubscribe() } -func (n *NatsTransport) Dial(addr string) (Client, error) { +func (n *ntport) Dial(addr string) (transport.Client, error) { cAddr := nats.DefaultURL if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") { @@ -113,13 +114,13 @@ func (n *NatsTransport) Dial(addr string) (Client, error) { return nil, err } - return &NatsTransportClient{ + return &ntportClient{ conn: c, addr: addr, }, nil } -func (n *NatsTransport) Listen(addr string) (Listener, error) { +func (n *ntport) Listen(addr string) (transport.Listener, error) { cAddr := nats.DefaultURL if len(n.addrs) > 0 && strings.HasPrefix(n.addrs[0], "nats://") { @@ -131,15 +132,15 @@ func (n *NatsTransport) Listen(addr string) (Listener, error) { return nil, err } - return &NatsTransportListener{ + return &ntportListener{ addr: nats.NewInbox(), conn: c, exit: make(chan bool, 1), }, nil } -func NewNatsTransport(addrs []string) *NatsTransport { - return &NatsTransport{ +func NewTransport(addrs []string, opt ...transport.Option) transport.Transport { + return &ntport{ addrs: addrs, } } diff --git a/transport/rabbitmq_channel.go b/transport/rabbitmq/channel.go similarity index 99% rename from transport/rabbitmq_channel.go rename to transport/rabbitmq/channel.go index 1e2633d4..28dc1eae 100644 --- a/transport/rabbitmq_channel.go +++ b/transport/rabbitmq/channel.go @@ -1,4 +1,4 @@ -package transport +package rabbitmq // // All credit to Mondo diff --git a/transport/rabbitmq_connection.go b/transport/rabbitmq/connection.go similarity index 99% rename from transport/rabbitmq_connection.go rename to transport/rabbitmq/connection.go index ecd9c13b..296e8d50 100644 --- a/transport/rabbitmq_connection.go +++ b/transport/rabbitmq/connection.go @@ -1,4 +1,4 @@ -package transport +package rabbitmq // // All credit to Mondo diff --git a/transport/rabbitmq_transport.go b/transport/rabbitmq/rabbitmq.go similarity index 70% rename from transport/rabbitmq_transport.go rename to transport/rabbitmq/rabbitmq.go index 731cfca2..cc94962e 100644 --- a/transport/rabbitmq_transport.go +++ b/transport/rabbitmq/rabbitmq.go @@ -1,4 +1,4 @@ -package transport +package rabbitmq import ( "fmt" @@ -8,16 +8,18 @@ import ( "errors" uuid "github.com/nu7hatch/gouuid" "github.com/streadway/amqp" + + "github.com/myodc/go-micro/transport" ) -type RabbitMQTransport struct { +type rmqtport struct { conn *rabbitMQConn addrs []string } -type RabbitMQTransportClient struct { +type rmqtportClient struct { once sync.Once - rt *RabbitMQTransport + rt *rmqtport addr string replyTo string @@ -25,17 +27,17 @@ type RabbitMQTransportClient struct { inflight map[string]chan amqp.Delivery } -type RabbitMQTransportSocket struct { +type rmqtportSocket struct { conn *rabbitMQConn d *amqp.Delivery } -type RabbitMQTransportListener struct { +type rmqtportListener struct { conn *rabbitMQConn addr string } -func (r *RabbitMQTransportClient) init() { +func (r *rmqtportClient) init() { <-r.rt.conn.Init() if err := r.rt.conn.Channel.DeclareReplyQueue(r.replyTo); err != nil { return @@ -51,7 +53,7 @@ func (r *RabbitMQTransportClient) init() { }() } -func (r *RabbitMQTransportClient) handle(delivery amqp.Delivery) { +func (r *rmqtportClient) handle(delivery amqp.Delivery) { ch := r.getReq(delivery.CorrelationId) if ch == nil { return @@ -62,7 +64,7 @@ func (r *RabbitMQTransportClient) handle(delivery amqp.Delivery) { } } -func (r *RabbitMQTransportClient) putReq(id string) chan amqp.Delivery { +func (r *rmqtportClient) putReq(id string) chan amqp.Delivery { r.Lock() ch := make(chan amqp.Delivery, 1) r.inflight[id] = ch @@ -70,7 +72,7 @@ func (r *RabbitMQTransportClient) putReq(id string) chan amqp.Delivery { return ch } -func (r *RabbitMQTransportClient) getReq(id string) chan amqp.Delivery { +func (r *rmqtportClient) getReq(id string) chan amqp.Delivery { r.Lock() defer r.Unlock() if ch, ok := r.inflight[id]; ok { @@ -80,7 +82,7 @@ func (r *RabbitMQTransportClient) getReq(id string) chan amqp.Delivery { return nil } -func (r *RabbitMQTransportClient) Send(m *Message) (*Message, error) { +func (r *rmqtportClient) Send(m *transport.Message) (*transport.Message, error) { r.once.Do(r.init) if !r.rt.conn.IsConnected() { @@ -115,7 +117,7 @@ func (r *RabbitMQTransportClient) Send(m *Message) (*Message, error) { select { case d := <-replyChan: - mr := &Message{ + mr := &transport.Message{ Header: make(map[string]string), Body: d.Body, } @@ -130,16 +132,16 @@ func (r *RabbitMQTransportClient) Send(m *Message) (*Message, error) { } } -func (r *RabbitMQTransportClient) Close() error { +func (r *rmqtportClient) Close() error { return nil } -func (r *RabbitMQTransportSocket) Recv(m *Message) error { +func (r *rmqtportSocket) Recv(m *transport.Message) error { if m == nil { return errors.New("message passed in is nil") } - mr := &Message{ + mr := &transport.Message{ Header: make(map[string]string), Body: r.d.Body, } @@ -152,7 +154,7 @@ func (r *RabbitMQTransportSocket) Recv(m *Message) error { return nil } -func (r *RabbitMQTransportSocket) Send(m *Message) error { +func (r *rmqtportSocket) Send(m *transport.Message) error { msg := amqp.Publishing{ CorrelationId: r.d.CorrelationId, Timestamp: time.Now().UTC(), @@ -167,27 +169,27 @@ func (r *RabbitMQTransportSocket) Send(m *Message) error { return r.conn.Publish("", r.d.ReplyTo, msg) } -func (r *RabbitMQTransportSocket) Close() error { +func (r *rmqtportSocket) Close() error { return nil } -func (r *RabbitMQTransportListener) Addr() string { +func (r *rmqtportListener) Addr() string { return r.addr } -func (r *RabbitMQTransportListener) Close() error { +func (r *rmqtportListener) Close() error { r.conn.Close() return nil } -func (r *RabbitMQTransportListener) Accept(fn func(Socket)) error { +func (r *rmqtportListener) Accept(fn func(transport.Socket)) error { deliveries, err := r.conn.Consume(r.addr) if err != nil { return err } handler := func(d amqp.Delivery) { - fn(&RabbitMQTransportSocket{ + fn(&rmqtportSocket{ d: &d, conn: r.conn, }) @@ -200,13 +202,13 @@ func (r *RabbitMQTransportListener) Accept(fn func(Socket)) error { return nil } -func (r *RabbitMQTransport) Dial(addr string) (Client, error) { +func (r *rmqtport) Dial(addr string) (transport.Client, error) { id, err := uuid.NewV4() if err != nil { return nil, err } - return &RabbitMQTransportClient{ + return &rmqtportClient{ rt: r, addr: addr, inflight: make(map[string]chan amqp.Delivery), @@ -214,7 +216,7 @@ func (r *RabbitMQTransport) Dial(addr string) (Client, error) { }, nil } -func (r *RabbitMQTransport) Listen(addr string) (Listener, error) { +func (r *rmqtport) Listen(addr string) (transport.Listener, error) { id, err := uuid.NewV4() if err != nil { return nil, err @@ -223,14 +225,14 @@ func (r *RabbitMQTransport) Listen(addr string) (Listener, error) { conn := newRabbitMQConn("", r.addrs) <-conn.Init() - return &RabbitMQTransportListener{ + return &rmqtportListener{ addr: id.String(), conn: conn, }, nil } -func NewRabbitMQTransport(addrs []string) *RabbitMQTransport { - return &RabbitMQTransport{ +func NewTransport(addrs []string, opt ...transport.Option) transport.Transport { + return &rmqtport{ conn: newRabbitMQConn("", addrs), addrs: addrs, } diff --git a/transport/transport.go b/transport/transport.go index 0b53e1d5..8211e50d 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -27,10 +27,18 @@ type Transport interface { Listen(addr string) (Listener, error) } +type options struct{} + +type Option func(*options) + var ( - DefaultTransport Transport = NewHttpTransport([]string{}) + DefaultTransport Transport = newHttpTransport([]string{}) ) +func NewTransport(addrs []string, opt ...Option) Transport { + return newHttpTransport(addrs, opt...) +} + func Dial(addr string) (Client, error) { return DefaultTransport.Dial(addr) }