Merge pull request #73 from micro/consul_register
Don't re-register if the state hasn't changed
This commit is contained in:
commit
0e93f0772a
@ -7,15 +7,20 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
consul "github.com/hashicorp/consul/api"
|
consul "github.com/hashicorp/consul/api"
|
||||||
|
hash "github.com/mitchellh/hashstructure"
|
||||||
)
|
)
|
||||||
|
|
||||||
type consulRegistry struct {
|
type consulRegistry struct {
|
||||||
Address string
|
Address string
|
||||||
Client *consul.Client
|
Client *consul.Client
|
||||||
Options Options
|
Options Options
|
||||||
|
|
||||||
|
sync.Mutex
|
||||||
|
register map[string]uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTransport(config *tls.Config) *http.Transport {
|
func newTransport(config *tls.Config) *http.Transport {
|
||||||
@ -77,9 +82,10 @@ func newConsulRegistry(opts ...Option) Registry {
|
|||||||
client, _ := consul.NewClient(config)
|
client, _ := consul.NewClient(config)
|
||||||
|
|
||||||
cr := &consulRegistry{
|
cr := &consulRegistry{
|
||||||
Address: config.Address,
|
Address: config.Address,
|
||||||
Client: client,
|
Client: client,
|
||||||
Options: options,
|
Options: options,
|
||||||
|
register: make(map[string]uint64),
|
||||||
}
|
}
|
||||||
|
|
||||||
return cr
|
return cr
|
||||||
@ -90,6 +96,11 @@ func (c *consulRegistry) Deregister(s *Service) error {
|
|||||||
return errors.New("Require at least one node")
|
return errors.New("Require at least one node")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// delete our hash of the service
|
||||||
|
c.Lock()
|
||||||
|
delete(c.register, s.Name)
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
node := s.Nodes[0]
|
node := s.Nodes[0]
|
||||||
return c.Client.Agent().ServiceDeregister(node.Id)
|
return c.Client.Agent().ServiceDeregister(node.Id)
|
||||||
}
|
}
|
||||||
@ -104,20 +115,44 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
|||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create hash of service; uint64
|
||||||
|
h, err := hash.Hash(s, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// use first node
|
||||||
node := s.Nodes[0]
|
node := s.Nodes[0]
|
||||||
|
|
||||||
|
// get existing hash
|
||||||
|
c.Lock()
|
||||||
|
v, ok := c.register[s.Name]
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
|
// if it's already registered and matches then just pass the check
|
||||||
|
if ok && v == h {
|
||||||
|
// if the err is nil we're all good, bail out
|
||||||
|
// if not, we don't know what the state is, so full re-register
|
||||||
|
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// encode the tags
|
||||||
tags := encodeMetadata(node.Metadata)
|
tags := encodeMetadata(node.Metadata)
|
||||||
tags = append(tags, encodeEndpoints(s.Endpoints)...)
|
tags = append(tags, encodeEndpoints(s.Endpoints)...)
|
||||||
tags = append(tags, encodeVersion(s.Version)...)
|
tags = append(tags, encodeVersion(s.Version)...)
|
||||||
|
|
||||||
var check *consul.AgentServiceCheck
|
var check *consul.AgentServiceCheck
|
||||||
|
|
||||||
|
// if the TTL is greater than 0 create an associated check
|
||||||
if options.TTL > time.Duration(0) {
|
if options.TTL > time.Duration(0) {
|
||||||
check = &consul.AgentServiceCheck{
|
check = &consul.AgentServiceCheck{
|
||||||
TTL: fmt.Sprintf("%v", options.TTL),
|
TTL: fmt.Sprintf("%v", options.TTL),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// register the service
|
||||||
if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
|
if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
|
||||||
ID: node.Id,
|
ID: node.Id,
|
||||||
Name: s.Name,
|
Name: s.Name,
|
||||||
@ -129,10 +164,17 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// save our hash of the service
|
||||||
|
c.Lock()
|
||||||
|
c.register[s.Name] = h
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
|
// if the TTL is 0 we don't mess with the checks
|
||||||
if options.TTL == time.Duration(0) {
|
if options.TTL == time.Duration(0) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pass the healthcheck
|
||||||
return c.Client.Agent().PassTTL("service:"+node.Id, "")
|
return c.Client.Agent().PassTTL("service:"+node.Id, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,17 +200,10 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
|||||||
// address is service address
|
// address is service address
|
||||||
address := s.Service.Address
|
address := s.Service.Address
|
||||||
|
|
||||||
// if we can't get the new type of version
|
// if we can't get the version we bail
|
||||||
// use old the old ways
|
// use old the old ways
|
||||||
if !found {
|
if !found {
|
||||||
// id was set as node
|
continue
|
||||||
id = s.Node.Node
|
|
||||||
// key was service id
|
|
||||||
key = s.Service.ID
|
|
||||||
// version was service id
|
|
||||||
version = s.Service.ID
|
|
||||||
// address was address
|
|
||||||
address = s.Node.Address
|
|
||||||
}
|
}
|
||||||
|
|
||||||
svc, ok := serviceMap[key]
|
svc, ok := serviceMap[key]
|
||||||
|
@ -62,17 +62,9 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
|
|||||||
// address is service address
|
// address is service address
|
||||||
address := e.Service.Address
|
address := e.Service.Address
|
||||||
|
|
||||||
// if we can't get the new type of version
|
// if we can't get the version we bail
|
||||||
// use old the old ways
|
|
||||||
if !found {
|
if !found {
|
||||||
// id was set as node
|
continue
|
||||||
id = e.Node.Node
|
|
||||||
// key was service id
|
|
||||||
key = e.Service.ID
|
|
||||||
// version was service id
|
|
||||||
version = e.Service.ID
|
|
||||||
// address was address
|
|
||||||
address = e.Node.Address
|
|
||||||
}
|
}
|
||||||
|
|
||||||
svc, ok := serviceMap[key]
|
svc, ok := serviceMap[key]
|
||||||
@ -156,7 +148,7 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
|
|||||||
for _, old := range rservices[serviceName] {
|
for _, old := range rservices[serviceName] {
|
||||||
// old version does not exist in new version map
|
// old version does not exist in new version map
|
||||||
// kill it with fire!
|
// kill it with fire!
|
||||||
if _, ok := serviceMap[serviceName+old.Version]; !ok {
|
if _, ok := serviceMap[old.Version]; !ok {
|
||||||
cw.next <- &Result{Action: "delete", Service: old}
|
cw.next <- &Result{Action: "delete", Service: old}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user