Service and node should be structs rather than interface

This commit is contained in:
Asim 2015-05-25 22:14:28 +01:00
parent d4a7deb594
commit 7aa2c82ced
14 changed files with 140 additions and 231 deletions

View File

@ -40,7 +40,7 @@ type httpSubscriber struct {
topic string topic string
ch chan *httpSubscriber ch chan *httpSubscriber
fn func(context.Context, *Message) fn func(context.Context, *Message)
svc registry.Service svc *registry.Service
} }
// used in brokers where there is no support for headers // used in brokers where there is no support for headers
@ -219,8 +219,8 @@ func (h *httpBroker) Publish(ctx context.Context, topic string, body []byte) err
return err return err
} }
for _, node := range s.Nodes() { for _, node := range s.Nodes {
r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address(), node.Port(), DefaultSubPath), "application/json", bytes.NewBuffer(b)) r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", bytes.NewBuffer(b))
if err == nil { if err == nil {
r.Body.Close() r.Body.Close()
} }
@ -236,8 +236,16 @@ func (h *httpBroker) Subscribe(topic string, function func(context.Context, *Mes
port, _ := strconv.Atoi(parts[len(parts)-1]) port, _ := strconv.Atoi(parts[len(parts)-1])
// register service // register service
node := registry.NewNode(h.id, host, port) node := &registry.Node{
service := registry.NewService("topic:"+topic, node) Id: h.id,
Address: host,
Port: port,
}
service := &registry.Service{
Name: "topic:" + topic,
Nodes: []*registry.Node{node},
}
subscriber := &httpSubscriber{ subscriber := &httpSubscriber{
id: uuid.NewUUID().String(), id: uuid.NewUUID().String(),
@ -247,7 +255,7 @@ func (h *httpBroker) Subscribe(topic string, function func(context.Context, *Mes
svc: service, svc: service,
} }
log.Infof("Registering subscriber %s", node.Id()) log.Infof("Registering subscriber %s", node.Id)
if err := registry.Register(service); err != nil { if err := registry.Register(service); err != nil {
return nil, err return nil, err
} }

View File

@ -159,16 +159,16 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
if len(service.Nodes()) == 0 { if len(service.Nodes) == 0 {
return errors.NotFound("go.micro.client", "Service not found") return errors.NotFound("go.micro.client", "Service not found")
} }
n := rand.Int() % len(service.Nodes()) n := rand.Int() % len(service.Nodes)
node := service.Nodes()[n] node := service.Nodes[n]
address := node.Address() address := node.Address
if node.Port() > 0 { if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port()) address = fmt.Sprintf("%s:%d", address, node.Port)
} }
return r.call(ctx, address, request, response) return r.call(ctx, address, request, response)

View File

@ -1,20 +0,0 @@
package registry
type consulNode struct {
Node string
NodeId string
NodeAddress string
NodePort int
}
func (c *consulNode) Id() string {
return c.NodeId
}
func (c *consulNode) Address() string {
return c.NodeAddress
}
func (c *consulNode) Port() int {
return c.NodePort
}

View File

@ -1,6 +1,7 @@
package registry package registry
import ( import (
"encoding/json"
"errors" "errors"
"sync" "sync"
@ -12,7 +13,32 @@ type consulRegistry struct {
Client *consul.Client Client *consul.Client
mtx sync.RWMutex mtx sync.RWMutex
services map[string]Service services map[string]*Service
}
func encodeMetaData(md map[string]string) []string {
var tags []string
for k, v := range md {
if b, err := json.Marshal(map[string]string{
k: v,
}); err == nil {
tags = append(tags, string(b))
}
}
return tags
}
func decodeMetaData(tags []string) map[string]string {
md := make(map[string]string)
for _, tag := range tags {
var kv map[string]string
if err := json.Unmarshal([]byte(tag), &kv); err == nil {
for k, v := range kv {
md[k] = v
}
}
}
return md
} }
func newConsulRegistry(addrs []string, opts ...Option) Registry { func newConsulRegistry(addrs []string, opts ...Option) Registry {
@ -25,50 +51,53 @@ func newConsulRegistry(addrs []string, opts ...Option) Registry {
cr := &consulRegistry{ cr := &consulRegistry{
Address: config.Address, Address: config.Address,
Client: client, Client: client,
services: make(map[string]Service), services: make(map[string]*Service),
} }
cr.Watch() cr.Watch()
return cr return cr
} }
func (c *consulRegistry) Deregister(s Service) error { func (c *consulRegistry) Deregister(s *Service) error {
if len(s.Nodes()) == 0 { if len(s.Nodes) == 0 {
return errors.New("Require at least one node") return errors.New("Require at least one node")
} }
node := s.Nodes()[0] node := s.Nodes[0]
_, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{ _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{
Node: node.Id(), Node: node.Id,
Address: node.Address(), Address: node.Address,
ServiceID: node.Id(), ServiceID: node.Id,
}, nil) }, nil)
return err return err
} }
func (c *consulRegistry) Register(s Service) error { func (c *consulRegistry) Register(s *Service) error {
if len(s.Nodes()) == 0 { if len(s.Nodes) == 0 {
return errors.New("Require at least one node") return errors.New("Require at least one node")
} }
node := s.Nodes()[0] node := s.Nodes[0]
tags := encodeMetaData(node.MetaData)
_, err := c.Client.Catalog().Register(&consul.CatalogRegistration{ _, err := c.Client.Catalog().Register(&consul.CatalogRegistration{
Node: node.Id(), Node: node.Id,
Address: node.Address(), Address: node.Address,
Service: &consul.AgentService{ Service: &consul.AgentService{
ID: node.Id(), ID: node.Id,
Service: s.Name(), Service: s.Name,
Port: node.Port(), Port: node.Port,
Tags: tags,
}, },
}, nil) }, nil)
return err return err
} }
func (c *consulRegistry) GetService(name string) (Service, error) { func (c *consulRegistry) GetService(name string) (*Service, error) {
c.mtx.RLock() c.mtx.RLock()
service, ok := c.services[name] service, ok := c.services[name]
c.mtx.RUnlock() c.mtx.RUnlock()
@ -82,31 +111,31 @@ func (c *consulRegistry) GetService(name string) (Service, error) {
return nil, err return nil, err
} }
cs := &consulService{} cs := &Service{}
for _, s := range rsp { for _, s := range rsp {
if s.ServiceName != name { if s.ServiceName != name {
continue continue
} }
cs.ServiceName = s.ServiceName cs.Name = s.ServiceName
cs.ServiceNodes = append(cs.ServiceNodes, &consulNode{ cs.Nodes = append(cs.Nodes, &Node{
Node: s.Node, Id: s.ServiceID,
NodeId: s.ServiceID, Address: s.Address,
NodeAddress: s.Address, Port: s.ServicePort,
NodePort: s.ServicePort, MetaData: decodeMetaData(s.ServiceTags),
}) })
} }
return cs, nil return cs, nil
} }
func (c *consulRegistry) ListServices() ([]Service, error) { func (c *consulRegistry) ListServices() ([]*Service, error) {
c.mtx.RLock() c.mtx.RLock()
serviceMap := c.services serviceMap := c.services
c.mtx.RUnlock() c.mtx.RUnlock()
var services []Service var services []*Service
if len(serviceMap) > 0 { if len(serviceMap) > 0 {
for _, service := range services { for _, service := range services {
@ -121,36 +150,12 @@ func (c *consulRegistry) ListServices() ([]Service, error) {
} }
for service, _ := range rsp { for service, _ := range rsp {
services = append(services, &consulService{ServiceName: service}) services = append(services, &Service{Name: service})
} }
return services, nil return services, nil
} }
func (c *consulRegistry) NewService(name string, nodes ...Node) Service {
var snodes []*consulNode
for _, node := range nodes {
if n, ok := node.(*consulNode); ok {
snodes = append(snodes, n)
}
}
return &consulService{
ServiceName: name,
ServiceNodes: snodes,
}
}
func (c *consulRegistry) NewNode(id, address string, port int) Node {
return &consulNode{
Node: id,
NodeId: id,
NodeAddress: address,
NodePort: port,
}
}
func (c *consulRegistry) Watch() { func (c *consulRegistry) Watch() {
newConsulWatcher(c) newConsulWatcher(c)
} }

View File

@ -1,20 +0,0 @@
package registry
type consulService struct {
ServiceName string
ServiceNodes []*consulNode
}
func (c *consulService) Name() string {
return c.ServiceName
}
func (c *consulService) Nodes() []Node {
var nodes []Node
for _, node := range c.ServiceNodes {
nodes = append(nodes, node)
}
return nodes
}

View File

@ -37,20 +37,20 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
return return
} }
cs := &consulService{} cs := &Service{}
for _, e := range entries { for _, e := range entries {
cs.ServiceName = e.Service.Service cs.Name = e.Service.Service
cs.ServiceNodes = append(cs.ServiceNodes, &consulNode{ cs.Nodes = append(cs.Nodes, &Node{
Node: e.Node.Node, Id: e.Service.ID,
NodeId: e.Service.ID, Address: e.Node.Address,
NodeAddress: e.Node.Address, Port: e.Service.Port,
NodePort: e.Service.Port, MetaData: decodeMetaData(e.Service.Tags),
}) })
} }
cw.Registry.mtx.Lock() cw.Registry.mtx.Lock()
cw.Registry.services[cs.ServiceName] = cs cw.Registry.services[cs.Name] = cs
cw.Registry.mtx.Unlock() cw.Registry.mtx.Unlock()
} }

View File

@ -16,22 +16,22 @@ type kregistry struct {
namespace string namespace string
mtx sync.RWMutex mtx sync.RWMutex
services map[string]registry.Service services map[string]*registry.Service
} }
func (c *kregistry) Watch() { func (c *kregistry) Watch() {
newWatcher(c) newWatcher(c)
} }
func (c *kregistry) Deregister(s registry.Service) error { func (c *kregistry) Deregister(s *registry.Service) error {
return nil return nil
} }
func (c *kregistry) Register(s registry.Service) error { func (c *kregistry) Register(s *registry.Service) error {
return nil return nil
} }
func (c *kregistry) GetService(name string) (registry.Service, error) { func (c *kregistry) GetService(name string) (*registry.Service, error) {
c.mtx.RLock() c.mtx.RLock()
svc, ok := c.services[name] svc, ok := c.services[name]
c.mtx.RUnlock() c.mtx.RUnlock()
@ -51,23 +51,26 @@ func (c *kregistry) GetService(name string) (registry.Service, error) {
return nil, fmt.Errorf("Service not found") return nil, fmt.Errorf("Service not found")
} }
ks := &service{name: name} ks := &registry.Service{
Name: name,
}
for _, item := range services.Items { for _, item := range services.Items {
ks.nodes = append(ks.nodes, &node{ ks.Nodes = append(ks.Nodes, &registry.Node{
address: item.Spec.PortalIP, Address: item.Spec.PortalIP,
port: item.Spec.Ports[0].Port, Port: item.Spec.Ports[0].Port,
}) })
} }
return ks, nil return ks, nil
} }
func (c *kregistry) ListServices() ([]registry.Service, error) { func (c *kregistry) ListServices() ([]*registry.Service, error) {
c.mtx.RLock() c.mtx.RLock()
serviceMap := c.services serviceMap := c.services
c.mtx.RUnlock() c.mtx.RUnlock()
var services []registry.Service var services []*registry.Service
if len(serviceMap) > 0 { if len(serviceMap) > 0 {
for _, service := range serviceMap { for _, service := range serviceMap {
@ -86,37 +89,14 @@ func (c *kregistry) ListServices() ([]registry.Service, error) {
continue continue
} }
services = append(services, &service{ services = append(services, &registry.Service{
name: svc.ObjectMeta.Labels["name"], Name: svc.ObjectMeta.Labels["name"],
}) })
} }
return services, nil return services, nil
} }
func (c *kregistry) NewService(name string, nodes ...registry.Node) registry.Service {
var snodes []*node
for _, nod := range nodes {
if n, ok := nod.(*node); ok {
snodes = append(snodes, n)
}
}
return &service{
name: name,
nodes: snodes,
}
}
func (c *kregistry) NewNode(id, address string, port int) registry.Node {
return &node{
id: id,
address: address,
port: port,
}
}
func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry { func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry {
host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT") host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT")
if len(addrs) > 0 { if len(addrs) > 0 {
@ -130,7 +110,7 @@ func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry {
kr := &kregistry{ kr := &kregistry{
client: client, client: client,
namespace: "default", namespace: "default",
services: make(map[string]registry.Service), services: make(map[string]*registry.Service),
} }
kr.Watch() kr.Watch()

View File

@ -1,19 +0,0 @@
package kubernetes
type node struct {
id string
address string
port int
}
func (n *node) Id() string {
return n.id
}
func (n *node) Address() string {
return n.address
}
func (n *node) Port() int {
return n.port
}

View File

@ -1,24 +0,0 @@
package kubernetes
import (
"github.com/myodc/go-micro/registry"
)
type service struct {
name string
nodes []*node
}
func (s *service) Name() string {
return s.name
}
func (s *service) Nodes() []registry.Node {
var nodes []registry.Node
for _, node := range s.nodes {
nodes = append(nodes, node)
}
return nodes
}

View File

@ -8,6 +8,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/myodc/go-micro/registry"
) )
type watcher struct { type watcher struct {
@ -27,12 +28,12 @@ func (k *watcher) OnUpdate(services []api.Service) {
activeServices.Insert(name) activeServices.Insert(name)
serviceIP := net.ParseIP(svc.Spec.PortalIP) serviceIP := net.ParseIP(svc.Spec.PortalIP)
ks := &service{ ks := &registry.Service{
name: name, Name: name,
nodes: []*node{ Nodes: []*registry.Node{
&node{ &registry.Node{
address: serviceIP.String(), Address: serviceIP.String(),
port: svc.Spec.Ports[0].Port, Port: svc.Spec.Ports[0].Port,
}, },
}, },
} }

View File

@ -1,11 +0,0 @@
package registry
type Node interface {
Id() string
Address() string
Port() int
}
func NewNode(id, address string, port int) Node {
return DefaultRegistry.NewNode(id, address, port)
}

View File

@ -1,12 +1,23 @@
package registry package registry
type Registry interface { type Registry interface {
Register(Service) error Register(*Service) error
Deregister(Service) error Deregister(*Service) error
GetService(string) (Service, error) GetService(string) (*Service, error)
ListServices() ([]Service, error) ListServices() ([]*Service, error)
NewService(string, ...Node) Service }
NewNode(string, string, int) Node
type Service struct {
Name string
MetaData map[string]string
Nodes []*Node
}
type Node struct {
Id string
Address string
Port int
MetaData map[string]string
} }
type options struct{} type options struct{}
@ -21,18 +32,18 @@ func NewRegistry(addrs []string, opt ...Option) Registry {
return newConsulRegistry(addrs, opt...) return newConsulRegistry(addrs, opt...)
} }
func Register(s Service) error { func Register(s *Service) error {
return DefaultRegistry.Register(s) return DefaultRegistry.Register(s)
} }
func Deregister(s Service) error { func Deregister(s *Service) error {
return DefaultRegistry.Deregister(s) return DefaultRegistry.Deregister(s)
} }
func GetService(name string) (Service, error) { func GetService(name string) (*Service, error) {
return DefaultRegistry.GetService(name) return DefaultRegistry.GetService(name)
} }
func ListServices() ([]Service, error) { func ListServices() ([]*Service, error) {
return DefaultRegistry.ListServices() return DefaultRegistry.ListServices()
} }

View File

@ -1,10 +0,0 @@
package registry
type Service interface {
Name() string
Nodes() []Node
}
func NewService(name string, nodes ...Node) Service {
return DefaultRegistry.NewService(name, nodes...)
}

View File

@ -93,10 +93,18 @@ func Run() error {
} }
// register service // register service
node := registry.NewNode(Id, host, port) node := &registry.Node{
service := registry.NewService(Name, node) Id: Id,
Address: host,
Port: port,
}
log.Infof("Registering node: %s", node.Id()) service := &registry.Service{
Name: Name,
Nodes: []*registry.Node{node},
}
log.Infof("Registering node: %s", node.Id)
err := registry.Register(service) err := registry.Register(service)
if err != nil { if err != nil {
@ -107,7 +115,7 @@ func Run() error {
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
log.Infof("Received signal %s", <-ch) log.Infof("Received signal %s", <-ch)
log.Infof("Deregistering %s", node.Id()) log.Infof("Deregistering %s", node.Id)
registry.Deregister(service) registry.Deregister(service)
return Stop() return Stop()