micro/registry/consul_registry.go

294 lines
6.1 KiB
Go
Raw Normal View History

2015-01-13 23:31:27 +00:00
package registry
import (
2016-01-16 20:25:18 +00:00
"crypto/tls"
2015-01-13 23:31:27 +00:00
"errors"
"fmt"
"net"
2016-01-16 20:25:18 +00:00
"net/http"
"runtime"
"sync"
2016-01-16 20:25:18 +00:00
"time"
2015-01-13 23:31:27 +00:00
consul "github.com/hashicorp/consul/api"
hash "github.com/mitchellh/hashstructure"
2015-01-13 23:31:27 +00:00
)
type consulRegistry struct {
2015-02-14 23:00:47 +00:00
Address string
Client *consul.Client
2017-09-28 11:16:56 +01:00
opts Options
sync.Mutex
register map[string]uint64
}
2016-01-16 23:39:47 +00:00
func newTransport(config *tls.Config) *http.Transport {
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
2016-01-16 20:25:18 +00:00
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
2016-01-16 23:39:47 +00:00
TLSClientConfig: config,
2016-01-16 20:25:18 +00:00
}
runtime.SetFinalizer(&t, func(tr **http.Transport) {
(*tr).CloseIdleConnections()
})
return t
}
2016-03-15 22:20:21 +00:00
func newConsulRegistry(opts ...Option) Registry {
var options Options
2015-12-19 18:28:08 +00:00
for _, o := range opts {
2016-03-15 22:20:21 +00:00
o(&options)
2015-12-19 18:28:08 +00:00
}
// use default config
config := consul.DefaultConfig()
if options.Context != nil {
// Use the consul config passed in the options, if available
2016-11-18 07:30:50 +00:00
if c, ok := options.Context.Value("consul_config").(*consul.Config); ok {
config = c
}
}
2015-12-19 18:28:08 +00:00
// check if there are any addrs
2016-03-15 22:20:21 +00:00
if len(options.Addrs) > 0 {
addr, port, err := net.SplitHostPort(options.Addrs[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
2016-03-15 22:20:21 +00:00
addr = options.Addrs[0]
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}
2015-12-19 18:28:08 +00:00
2016-01-16 20:25:18 +00:00
// requires secure connection?
2016-03-15 22:20:21 +00:00
if options.Secure || options.TLSConfig != nil {
if config.HttpClient == nil {
config.HttpClient = new(http.Client)
}
2016-01-16 20:25:18 +00:00
config.Scheme = "https"
// We're going to support InsecureSkipVerify
2016-03-15 22:20:21 +00:00
config.HttpClient.Transport = newTransport(options.TLSConfig)
2016-01-16 20:25:18 +00:00
}
2015-12-19 18:28:08 +00:00
// create the client
2015-10-26 20:23:57 +00:00
client, _ := consul.NewClient(config)
// set timeout
if options.Timeout > 0 {
config.HttpClient.Timeout = options.Timeout
}
cr := &consulRegistry{
2016-04-09 21:34:45 +01:00
Address: config.Address,
Client: client,
2017-09-28 11:16:56 +01:00
opts: options,
2016-04-09 21:34:45 +01:00
register: make(map[string]uint64),
}
return cr
}
func (c *consulRegistry) Deregister(s *Service) error {
if len(s.Nodes) == 0 {
2015-01-13 23:31:27 +00:00
return errors.New("Require at least one node")
}
2016-01-26 20:44:29 +00:00
// delete our hash of the service
c.Lock()
delete(c.register, s.Name)
c.Unlock()
2016-01-26 20:44:29 +00:00
node := s.Nodes[0]
2016-01-26 23:32:27 +00:00
return c.Client.Agent().ServiceDeregister(node.Id)
2015-01-13 23:31:27 +00:00
}
2016-01-26 23:32:27 +00:00
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
if len(s.Nodes) == 0 {
2015-01-13 23:31:27 +00:00
return errors.New("Require at least one node")
}
2016-01-26 23:32:27 +00:00
var options RegisterOptions
for _, o := range opts {
o(&options)
}
// create hash of service; uint64
h, err := hash.Hash(s, nil)
if err != nil {
return err
}
// use first node
node := s.Nodes[0]
// get existing hash
c.Lock()
v, ok := c.register[s.Name]
c.Unlock()
// if it's already registered and matches then just pass the check
if ok && v == h {
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
}
}
// encode the tags
2015-05-26 22:39:48 +01:00
tags := encodeMetadata(node.Metadata)
2015-10-11 12:05:20 +01:00
tags = append(tags, encodeEndpoints(s.Endpoints)...)
tags = append(tags, encodeVersion(s.Version)...)
2015-01-13 23:31:27 +00:00
2016-01-27 00:32:16 +00:00
var check *consul.AgentServiceCheck
// if the TTL is greater than 0 create an associated check
2016-01-27 00:32:16 +00:00
if options.TTL > time.Duration(0) {
// splay slightly for the watcher?
splay := time.Second * 5
deregTTL := options.TTL + splay
// consul has a minimum timeout on deregistration of 1 minute.
if options.TTL < time.Minute {
deregTTL = time.Minute + splay
}
2016-01-27 00:32:16 +00:00
check = &consul.AgentServiceCheck{
TTL: fmt.Sprintf("%v", options.TTL),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
2016-01-27 00:32:16 +00:00
}
}
// register the service
2016-01-27 00:15:46 +00:00
if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
ID: node.Id,
Name: s.Name,
Tags: tags,
Port: node.Port,
Address: node.Address,
2016-01-27 00:32:16 +00:00
Check: check,
2016-01-27 00:15:46 +00:00
}); err != nil {
return err
}
2015-01-13 23:31:27 +00:00
// save our hash of the service
c.Lock()
c.register[s.Name] = h
c.Unlock()
// if the TTL is 0 we don't mess with the checks
2016-01-27 00:32:16 +00:00
if options.TTL == time.Duration(0) {
return nil
}
// pass the healthcheck
2016-01-27 00:15:46 +00:00
return c.Client.Agent().PassTTL("service:"+node.Id, "")
2015-01-13 23:31:27 +00:00
}
2015-11-08 01:48:48 +00:00
func (c *consulRegistry) GetService(name string) ([]*Service, error) {
rsp, _, err := c.Client.Health().Service(name, "", false, nil)
2015-01-13 23:31:27 +00:00
if err != nil {
return nil, err
}
2015-11-08 01:48:48 +00:00
serviceMap := map[string]*Service{}
2015-01-13 23:31:27 +00:00
for _, s := range rsp {
2016-01-26 23:32:27 +00:00
if s.Service.Service != name {
2015-01-13 23:31:27 +00:00
continue
}
// version is now a tag
2018-03-01 17:35:13 +00:00
version, _ := decodeVersion(s.Service.Tags)
// service ID is now the node id
2016-01-26 23:32:27 +00:00
id := s.Service.ID
// key is always the version
key := version
2018-03-01 17:35:13 +00:00
// address is service address
2016-01-26 23:32:27 +00:00
address := s.Service.Address
2018-03-01 17:35:13 +00:00
// use node address
if len(address) == 0 {
address = s.Node.Address
2015-11-08 01:48:48 +00:00
}
svc, ok := serviceMap[key]
if !ok {
svc = &Service{
2016-01-26 23:32:27 +00:00
Endpoints: decodeEndpoints(s.Service.Tags),
Name: s.Service.Service,
2015-11-08 01:48:48 +00:00
Version: version,
}
serviceMap[key] = svc
}
var del bool
2018-03-01 17:35:13 +00:00
for _, check := range s.Checks {
// delete the node if the status is critical
if check.Status == "critical" {
del = true
break
}
}
// if delete then skip the node
if del {
continue
}
2015-11-08 01:48:48 +00:00
svc.Nodes = append(svc.Nodes, &Node{
Id: id,
Address: address,
2016-01-26 23:32:27 +00:00
Port: s.Service.Port,
Metadata: decodeMetadata(s.Service.Tags),
2015-01-13 23:31:27 +00:00
})
}
2015-11-08 01:48:48 +00:00
var services []*Service
for _, service := range serviceMap {
services = append(services, service)
}
return services, nil
2015-01-13 23:31:27 +00:00
}
func (c *consulRegistry) ListServices() ([]*Service, error) {
2015-12-19 18:28:08 +00:00
rsp, _, err := c.Client.Catalog().Services(nil)
if err != nil {
return nil, err
}
2015-12-05 01:12:29 +00:00
var services []*Service
for service := range rsp {
services = append(services, &Service{Name: service})
}
return services, nil
}
2018-02-19 17:12:37 +00:00
func (c *consulRegistry) Watch(opts ...WatchOption) (Watcher, error) {
return newConsulWatcher(c, opts...)
2015-01-13 23:31:27 +00:00
}
2015-12-19 21:56:14 +00:00
func (c *consulRegistry) String() string {
return "consul"
}
2017-09-28 11:16:56 +01:00
func (c *consulRegistry) Options() Options {
return c.opts
}