Remove consul registry (#818)
This commit is contained in:
parent
f07a6ac29b
commit
d4832e8f34
@ -27,7 +27,6 @@ import (
|
|||||||
|
|
||||||
// registries
|
// registries
|
||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
"github.com/micro/go-micro/registry/consul"
|
|
||||||
"github.com/micro/go-micro/registry/etcd"
|
"github.com/micro/go-micro/registry/etcd"
|
||||||
"github.com/micro/go-micro/registry/mdns"
|
"github.com/micro/go-micro/registry/mdns"
|
||||||
rmem "github.com/micro/go-micro/registry/memory"
|
rmem "github.com/micro/go-micro/registry/memory"
|
||||||
@ -155,7 +154,7 @@ var (
|
|||||||
cli.StringFlag{
|
cli.StringFlag{
|
||||||
Name: "registry",
|
Name: "registry",
|
||||||
EnvVar: "MICRO_REGISTRY",
|
EnvVar: "MICRO_REGISTRY",
|
||||||
Usage: "Registry for discovery. consul, etcd, mdns",
|
Usage: "Registry for discovery. etcd, mdns",
|
||||||
},
|
},
|
||||||
cli.StringFlag{
|
cli.StringFlag{
|
||||||
Name: "registry_address",
|
Name: "registry_address",
|
||||||
@ -196,7 +195,6 @@ var (
|
|||||||
DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
|
DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
|
||||||
"go.micro.registry": regSrv.NewRegistry,
|
"go.micro.registry": regSrv.NewRegistry,
|
||||||
"service": regSrv.NewRegistry,
|
"service": regSrv.NewRegistry,
|
||||||
"consul": consul.NewRegistry,
|
|
||||||
"etcd": etcd.NewRegistry,
|
"etcd": etcd.NewRegistry,
|
||||||
"mdns": mdns.NewRegistry,
|
"mdns": mdns.NewRegistry,
|
||||||
"memory": rmem.NewRegistry,
|
"memory": rmem.NewRegistry,
|
||||||
|
@ -1,438 +0,0 @@
|
|||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/tls"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"runtime"
|
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
consul "github.com/hashicorp/consul/api"
|
|
||||||
"github.com/micro/go-micro/registry"
|
|
||||||
mnet "github.com/micro/go-micro/util/net"
|
|
||||||
hash "github.com/mitchellh/hashstructure"
|
|
||||||
)
|
|
||||||
|
|
||||||
type consulRegistry struct {
|
|
||||||
Address []string
|
|
||||||
opts registry.Options
|
|
||||||
|
|
||||||
client *consul.Client
|
|
||||||
config *consul.Config
|
|
||||||
|
|
||||||
// connect enabled
|
|
||||||
connect bool
|
|
||||||
|
|
||||||
queryOptions *consul.QueryOptions
|
|
||||||
|
|
||||||
sync.Mutex
|
|
||||||
register map[string]uint64
|
|
||||||
// lastChecked tracks when a node was last checked as existing in Consul
|
|
||||||
lastChecked map[string]time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
func getDeregisterTTL(t time.Duration) time.Duration {
|
|
||||||
// splay slightly for the watcher?
|
|
||||||
splay := time.Second * 5
|
|
||||||
deregTTL := t + splay
|
|
||||||
|
|
||||||
// consul has a minimum timeout on deregistration of 1 minute.
|
|
||||||
if t < time.Minute {
|
|
||||||
deregTTL = time.Minute + splay
|
|
||||||
}
|
|
||||||
|
|
||||||
return deregTTL
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTransport(config *tls.Config) *http.Transport {
|
|
||||||
if config == nil {
|
|
||||||
config = &tls.Config{
|
|
||||||
InsecureSkipVerify: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t := &http.Transport{
|
|
||||||
Proxy: http.ProxyFromEnvironment,
|
|
||||||
Dial: (&net.Dialer{
|
|
||||||
Timeout: 30 * time.Second,
|
|
||||||
KeepAlive: 30 * time.Second,
|
|
||||||
}).Dial,
|
|
||||||
TLSHandshakeTimeout: 10 * time.Second,
|
|
||||||
TLSClientConfig: config,
|
|
||||||
}
|
|
||||||
runtime.SetFinalizer(&t, func(tr **http.Transport) {
|
|
||||||
(*tr).CloseIdleConnections()
|
|
||||||
})
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
|
|
||||||
func configure(c *consulRegistry, opts ...registry.Option) {
|
|
||||||
// set opts
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&c.opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// use default config
|
|
||||||
config := consul.DefaultConfig()
|
|
||||||
|
|
||||||
if c.opts.Context != nil {
|
|
||||||
// Use the consul config passed in the options, if available
|
|
||||||
if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok {
|
|
||||||
config = co
|
|
||||||
}
|
|
||||||
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
|
|
||||||
c.connect = cn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use the consul query options passed in the options, if available
|
|
||||||
if qo, ok := c.opts.Context.Value("consul_query_options").(*consul.QueryOptions); ok && qo != nil {
|
|
||||||
c.queryOptions = qo
|
|
||||||
}
|
|
||||||
if as, ok := c.opts.Context.Value("consul_allow_stale").(bool); ok {
|
|
||||||
c.queryOptions.AllowStale = as
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if there are any addrs
|
|
||||||
var addrs []string
|
|
||||||
|
|
||||||
// iterate the options addresses
|
|
||||||
for _, address := range c.opts.Addrs {
|
|
||||||
// check we have a port
|
|
||||||
addr, port, err := net.SplitHostPort(address)
|
|
||||||
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
|
||||||
port = "8500"
|
|
||||||
addr = address
|
|
||||||
addrs = append(addrs, net.JoinHostPort(addr, port))
|
|
||||||
} else if err == nil {
|
|
||||||
addrs = append(addrs, net.JoinHostPort(addr, port))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the addrs
|
|
||||||
if len(addrs) > 0 {
|
|
||||||
c.Address = addrs
|
|
||||||
config.Address = c.Address[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.HttpClient == nil {
|
|
||||||
config.HttpClient = new(http.Client)
|
|
||||||
}
|
|
||||||
|
|
||||||
// requires secure connection?
|
|
||||||
if c.opts.Secure || c.opts.TLSConfig != nil {
|
|
||||||
config.Scheme = "https"
|
|
||||||
// We're going to support InsecureSkipVerify
|
|
||||||
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)
|
|
||||||
}
|
|
||||||
|
|
||||||
// set timeout
|
|
||||||
if c.opts.Timeout > 0 {
|
|
||||||
config.HttpClient.Timeout = c.opts.Timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the config
|
|
||||||
c.config = config
|
|
||||||
|
|
||||||
// remove client
|
|
||||||
c.client = nil
|
|
||||||
|
|
||||||
// setup the client
|
|
||||||
c.Client()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) Init(opts ...registry.Option) error {
|
|
||||||
configure(c, opts...)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) Deregister(s *registry.Service) error {
|
|
||||||
if len(s.Nodes) == 0 {
|
|
||||||
return errors.New("Require at least one node")
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete our hash and time check of the service
|
|
||||||
c.Lock()
|
|
||||||
delete(c.register, s.Name)
|
|
||||||
delete(c.lastChecked, s.Name)
|
|
||||||
c.Unlock()
|
|
||||||
|
|
||||||
node := s.Nodes[0]
|
|
||||||
return c.Client().Agent().ServiceDeregister(node.Id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
|
||||||
if len(s.Nodes) == 0 {
|
|
||||||
return errors.New("Require at least one node")
|
|
||||||
}
|
|
||||||
|
|
||||||
var regTCPCheck bool
|
|
||||||
var regInterval time.Duration
|
|
||||||
|
|
||||||
var options registry.RegisterOptions
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&options)
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.opts.Context != nil {
|
|
||||||
if tcpCheckInterval, ok := c.opts.Context.Value("consul_tcp_check").(time.Duration); ok {
|
|
||||||
regTCPCheck = true
|
|
||||||
regInterval = tcpCheckInterval
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// create hash of service; uint64
|
|
||||||
h, err := hash.Hash(s, nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// use first node
|
|
||||||
node := s.Nodes[0]
|
|
||||||
|
|
||||||
// get existing hash and last checked time
|
|
||||||
c.Lock()
|
|
||||||
v, ok := c.register[s.Name]
|
|
||||||
lastChecked := c.lastChecked[s.Name]
|
|
||||||
c.Unlock()
|
|
||||||
|
|
||||||
// if it's already registered and matches then just pass the check
|
|
||||||
if ok && v == h {
|
|
||||||
if options.TTL == time.Duration(0) {
|
|
||||||
// ensure that our service hasn't been deregistered by Consul
|
|
||||||
if time.Since(lastChecked) <= getDeregisterTTL(regInterval) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
services, _, err := c.Client().Health().Checks(s.Name, c.queryOptions)
|
|
||||||
if err == nil {
|
|
||||||
for _, v := range services {
|
|
||||||
if v.ServiceID == node.Id {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// if the err is nil we're all good, bail out
|
|
||||||
// if not, we don't know what the state is, so full re-register
|
|
||||||
if err := c.Client().Agent().PassTTL("service:"+node.Id, ""); err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// encode the tags
|
|
||||||
tags := encodeMetadata(node.Metadata)
|
|
||||||
tags = append(tags, encodeEndpoints(s.Endpoints)...)
|
|
||||||
tags = append(tags, encodeVersion(s.Version)...)
|
|
||||||
|
|
||||||
var check *consul.AgentServiceCheck
|
|
||||||
|
|
||||||
if regTCPCheck {
|
|
||||||
deregTTL := getDeregisterTTL(regInterval)
|
|
||||||
|
|
||||||
check = &consul.AgentServiceCheck{
|
|
||||||
TCP: node.Address,
|
|
||||||
Interval: fmt.Sprintf("%v", regInterval),
|
|
||||||
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the TTL is greater than 0 create an associated check
|
|
||||||
} else if options.TTL > time.Duration(0) {
|
|
||||||
deregTTL := getDeregisterTTL(options.TTL)
|
|
||||||
|
|
||||||
check = &consul.AgentServiceCheck{
|
|
||||||
TTL: fmt.Sprintf("%v", options.TTL),
|
|
||||||
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
host, pt, _ := net.SplitHostPort(node.Address)
|
|
||||||
if host == "" {
|
|
||||||
host = node.Address
|
|
||||||
}
|
|
||||||
port, _ := strconv.Atoi(pt)
|
|
||||||
|
|
||||||
// register the service
|
|
||||||
asr := &consul.AgentServiceRegistration{
|
|
||||||
ID: node.Id,
|
|
||||||
Name: s.Name,
|
|
||||||
Tags: tags,
|
|
||||||
Port: port,
|
|
||||||
Address: host,
|
|
||||||
Check: check,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Specify consul connect
|
|
||||||
if c.connect {
|
|
||||||
asr.Connect = &consul.AgentServiceConnect{
|
|
||||||
Native: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.Client().Agent().ServiceRegister(asr); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// save our hash and time check of the service
|
|
||||||
c.Lock()
|
|
||||||
c.register[s.Name] = h
|
|
||||||
c.lastChecked[s.Name] = time.Now()
|
|
||||||
c.Unlock()
|
|
||||||
|
|
||||||
// if the TTL is 0 we don't mess with the checks
|
|
||||||
if options.TTL == time.Duration(0) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// pass the healthcheck
|
|
||||||
return c.Client().Agent().PassTTL("service:"+node.Id, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) GetService(name string) ([]*registry.Service, error) {
|
|
||||||
var rsp []*consul.ServiceEntry
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// if we're connect enabled only get connect services
|
|
||||||
if c.connect {
|
|
||||||
rsp, _, err = c.Client().Health().Connect(name, "", false, c.queryOptions)
|
|
||||||
} else {
|
|
||||||
rsp, _, err = c.Client().Health().Service(name, "", false, c.queryOptions)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceMap := map[string]*registry.Service{}
|
|
||||||
|
|
||||||
for _, s := range rsp {
|
|
||||||
if s.Service.Service != name {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// version is now a tag
|
|
||||||
version, _ := decodeVersion(s.Service.Tags)
|
|
||||||
// service ID is now the node id
|
|
||||||
id := s.Service.ID
|
|
||||||
// key is always the version
|
|
||||||
key := version
|
|
||||||
|
|
||||||
// address is service address
|
|
||||||
address := s.Service.Address
|
|
||||||
|
|
||||||
// use node address
|
|
||||||
if len(address) == 0 {
|
|
||||||
address = s.Node.Address
|
|
||||||
}
|
|
||||||
|
|
||||||
svc, ok := serviceMap[key]
|
|
||||||
if !ok {
|
|
||||||
svc = ®istry.Service{
|
|
||||||
Endpoints: decodeEndpoints(s.Service.Tags),
|
|
||||||
Name: s.Service.Service,
|
|
||||||
Version: version,
|
|
||||||
}
|
|
||||||
serviceMap[key] = svc
|
|
||||||
}
|
|
||||||
|
|
||||||
var del bool
|
|
||||||
|
|
||||||
for _, check := range s.Checks {
|
|
||||||
// delete the node if the status is critical
|
|
||||||
if check.Status == "critical" {
|
|
||||||
del = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if delete then skip the node
|
|
||||||
if del {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
svc.Nodes = append(svc.Nodes, ®istry.Node{
|
|
||||||
Id: id,
|
|
||||||
Address: mnet.HostPort(address, s.Service.Port),
|
|
||||||
Metadata: decodeMetadata(s.Service.Tags),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
var services []*registry.Service
|
|
||||||
for _, service := range serviceMap {
|
|
||||||
services = append(services, service)
|
|
||||||
}
|
|
||||||
return services, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) ListServices() ([]*registry.Service, error) {
|
|
||||||
rsp, _, err := c.Client().Catalog().Services(c.queryOptions)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var services []*registry.Service
|
|
||||||
|
|
||||||
for service := range rsp {
|
|
||||||
services = append(services, ®istry.Service{Name: service})
|
|
||||||
}
|
|
||||||
|
|
||||||
return services, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
|
||||||
return newConsulWatcher(c, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) String() string {
|
|
||||||
return "consul"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) Options() registry.Options {
|
|
||||||
return c.opts
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulRegistry) Client() *consul.Client {
|
|
||||||
if c.client != nil {
|
|
||||||
return c.client
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, addr := range c.Address {
|
|
||||||
// set the address
|
|
||||||
c.config.Address = addr
|
|
||||||
|
|
||||||
// create a new client
|
|
||||||
tmpClient, _ := consul.NewClient(c.config)
|
|
||||||
|
|
||||||
// test the client
|
|
||||||
_, err := tmpClient.Agent().Host()
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the client
|
|
||||||
c.client = tmpClient
|
|
||||||
return c.client
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the default
|
|
||||||
c.client, _ = consul.NewClient(c.config)
|
|
||||||
|
|
||||||
// return the client
|
|
||||||
return c.client
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
|
||||||
cr := &consulRegistry{
|
|
||||||
opts: registry.Options{},
|
|
||||||
register: make(map[string]uint64),
|
|
||||||
lastChecked: make(map[string]time.Time),
|
|
||||||
queryOptions: &consul.QueryOptions{
|
|
||||||
AllowStale: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
configure(cr, opts...)
|
|
||||||
return cr
|
|
||||||
}
|
|
@ -1,170 +0,0 @@
|
|||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"compress/zlib"
|
|
||||||
"encoding/hex"
|
|
||||||
"encoding/json"
|
|
||||||
"io/ioutil"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/registry"
|
|
||||||
)
|
|
||||||
|
|
||||||
func encode(buf []byte) string {
|
|
||||||
var b bytes.Buffer
|
|
||||||
defer b.Reset()
|
|
||||||
|
|
||||||
w := zlib.NewWriter(&b)
|
|
||||||
if _, err := w.Write(buf); err != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
w.Close()
|
|
||||||
|
|
||||||
return hex.EncodeToString(b.Bytes())
|
|
||||||
}
|
|
||||||
|
|
||||||
func decode(d string) []byte {
|
|
||||||
hr, err := hex.DecodeString(d)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
br := bytes.NewReader(hr)
|
|
||||||
zr, err := zlib.NewReader(br)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
rbuf, err := ioutil.ReadAll(zr)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return rbuf
|
|
||||||
}
|
|
||||||
|
|
||||||
func encodeEndpoints(en []*registry.Endpoint) []string {
|
|
||||||
var tags []string
|
|
||||||
for _, e := range en {
|
|
||||||
if b, err := json.Marshal(e); err == nil {
|
|
||||||
tags = append(tags, "e-"+encode(b))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tags
|
|
||||||
}
|
|
||||||
|
|
||||||
func decodeEndpoints(tags []string) []*registry.Endpoint {
|
|
||||||
var en []*registry.Endpoint
|
|
||||||
|
|
||||||
// use the first format you find
|
|
||||||
var ver byte
|
|
||||||
|
|
||||||
for _, tag := range tags {
|
|
||||||
if len(tag) == 0 || tag[0] != 'e' {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// check version
|
|
||||||
if ver > 0 && tag[1] != ver {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var e *registry.Endpoint
|
|
||||||
var buf []byte
|
|
||||||
|
|
||||||
// Old encoding was plain
|
|
||||||
if tag[1] == '=' {
|
|
||||||
buf = []byte(tag[2:])
|
|
||||||
}
|
|
||||||
|
|
||||||
// New encoding is hex
|
|
||||||
if tag[1] == '-' {
|
|
||||||
buf = decode(tag[2:])
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.Unmarshal(buf, &e); err == nil {
|
|
||||||
en = append(en, e)
|
|
||||||
}
|
|
||||||
|
|
||||||
// set version
|
|
||||||
ver = tag[1]
|
|
||||||
}
|
|
||||||
return en
|
|
||||||
}
|
|
||||||
|
|
||||||
func encodeMetadata(md map[string]string) []string {
|
|
||||||
var tags []string
|
|
||||||
for k, v := range md {
|
|
||||||
if b, err := json.Marshal(map[string]string{
|
|
||||||
k: v,
|
|
||||||
}); err == nil {
|
|
||||||
// new encoding
|
|
||||||
tags = append(tags, "t-"+encode(b))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tags
|
|
||||||
}
|
|
||||||
|
|
||||||
func decodeMetadata(tags []string) map[string]string {
|
|
||||||
md := make(map[string]string)
|
|
||||||
|
|
||||||
var ver byte
|
|
||||||
|
|
||||||
for _, tag := range tags {
|
|
||||||
if len(tag) == 0 || tag[0] != 't' {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// check version
|
|
||||||
if ver > 0 && tag[1] != ver {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var kv map[string]string
|
|
||||||
var buf []byte
|
|
||||||
|
|
||||||
// Old encoding was plain
|
|
||||||
if tag[1] == '=' {
|
|
||||||
buf = []byte(tag[2:])
|
|
||||||
}
|
|
||||||
|
|
||||||
// New encoding is hex
|
|
||||||
if tag[1] == '-' {
|
|
||||||
buf = decode(tag[2:])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now unmarshal
|
|
||||||
if err := json.Unmarshal(buf, &kv); err == nil {
|
|
||||||
for k, v := range kv {
|
|
||||||
md[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// set version
|
|
||||||
ver = tag[1]
|
|
||||||
}
|
|
||||||
return md
|
|
||||||
}
|
|
||||||
|
|
||||||
func encodeVersion(v string) []string {
|
|
||||||
return []string{"v-" + encode([]byte(v))}
|
|
||||||
}
|
|
||||||
|
|
||||||
func decodeVersion(tags []string) (string, bool) {
|
|
||||||
for _, tag := range tags {
|
|
||||||
if len(tag) < 2 || tag[0] != 'v' {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Old encoding was plain
|
|
||||||
if tag[1] == '=' {
|
|
||||||
return tag[2:], true
|
|
||||||
}
|
|
||||||
|
|
||||||
// New encoding is hex
|
|
||||||
if tag[1] == '-' {
|
|
||||||
return string(decode(tag[2:])), true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
}
|
|
@ -1,147 +0,0 @@
|
|||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/registry"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestEncodingEndpoints(t *testing.T) {
|
|
||||||
eps := []*registry.Endpoint{
|
|
||||||
®istry.Endpoint{
|
|
||||||
Name: "endpoint1",
|
|
||||||
Request: ®istry.Value{
|
|
||||||
Name: "request",
|
|
||||||
Type: "request",
|
|
||||||
},
|
|
||||||
Response: ®istry.Value{
|
|
||||||
Name: "response",
|
|
||||||
Type: "response",
|
|
||||||
},
|
|
||||||
Metadata: map[string]string{
|
|
||||||
"foo1": "bar1",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
®istry.Endpoint{
|
|
||||||
Name: "endpoint2",
|
|
||||||
Request: ®istry.Value{
|
|
||||||
Name: "request",
|
|
||||||
Type: "request",
|
|
||||||
},
|
|
||||||
Response: ®istry.Value{
|
|
||||||
Name: "response",
|
|
||||||
Type: "response",
|
|
||||||
},
|
|
||||||
Metadata: map[string]string{
|
|
||||||
"foo2": "bar2",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
®istry.Endpoint{
|
|
||||||
Name: "endpoint3",
|
|
||||||
Request: ®istry.Value{
|
|
||||||
Name: "request",
|
|
||||||
Type: "request",
|
|
||||||
},
|
|
||||||
Response: ®istry.Value{
|
|
||||||
Name: "response",
|
|
||||||
Type: "response",
|
|
||||||
},
|
|
||||||
Metadata: map[string]string{
|
|
||||||
"foo3": "bar3",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
testEp := func(ep *registry.Endpoint, enc string) {
|
|
||||||
// encode endpoint
|
|
||||||
e := encodeEndpoints([]*registry.Endpoint{ep})
|
|
||||||
|
|
||||||
// check there are two tags; old and new
|
|
||||||
if len(e) != 1 {
|
|
||||||
t.Fatalf("Expected 1 encoded tags, got %v", e)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check old encoding
|
|
||||||
var seen bool
|
|
||||||
|
|
||||||
for _, en := range e {
|
|
||||||
if en == enc {
|
|
||||||
seen = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !seen {
|
|
||||||
t.Fatalf("Expected %s but not found", enc)
|
|
||||||
}
|
|
||||||
|
|
||||||
// decode
|
|
||||||
d := decodeEndpoints([]string{enc})
|
|
||||||
if len(d) == 0 {
|
|
||||||
t.Fatalf("Expected %v got %v", ep, d)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check name
|
|
||||||
if d[0].Name != ep.Name {
|
|
||||||
t.Fatalf("Expected ep %s got %s", ep.Name, d[0].Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check all the metadata exists
|
|
||||||
for k, v := range ep.Metadata {
|
|
||||||
if gv := d[0].Metadata[k]; gv != v {
|
|
||||||
t.Fatalf("Expected key %s val %s got val %s", k, v, gv)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, ep := range eps {
|
|
||||||
// JSON encoded
|
|
||||||
jencoded, err := json.Marshal(ep)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// HEX encoded
|
|
||||||
hencoded := encode(jencoded)
|
|
||||||
// endpoint tag
|
|
||||||
hepTag := "e-" + hencoded
|
|
||||||
testEp(ep, hepTag)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEncodingVersion(t *testing.T) {
|
|
||||||
testData := []struct {
|
|
||||||
decoded string
|
|
||||||
encoded string
|
|
||||||
}{
|
|
||||||
{"1.0.0", "v-789c32d433d03300040000ffff02ce00ee"},
|
|
||||||
{"latest", "v-789cca492c492d2e01040000ffff08cc028e"},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, data := range testData {
|
|
||||||
e := encodeVersion(data.decoded)
|
|
||||||
|
|
||||||
if e[0] != data.encoded {
|
|
||||||
t.Fatalf("Expected %s got %s", data.encoded, e)
|
|
||||||
}
|
|
||||||
|
|
||||||
d, ok := decodeVersion(e)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Unexpected %t for %s", ok, data.encoded)
|
|
||||||
}
|
|
||||||
|
|
||||||
if d != data.decoded {
|
|
||||||
t.Fatalf("Expected %s got %s", data.decoded, d)
|
|
||||||
}
|
|
||||||
|
|
||||||
d, ok = decodeVersion([]string{data.encoded})
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Unexpected %t for %s", ok, data.encoded)
|
|
||||||
}
|
|
||||||
|
|
||||||
if d != data.decoded {
|
|
||||||
t.Fatalf("Expected %s got %s", data.decoded, d)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,81 +0,0 @@
|
|||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
consul "github.com/hashicorp/consul/api"
|
|
||||||
"github.com/micro/go-micro/registry"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Connect specifies services should be registered as Consul Connect services
|
|
||||||
func Connect() registry.Option {
|
|
||||||
return func(o *registry.Options) {
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, "consul_connect", true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Config(c *consul.Config) registry.Option {
|
|
||||||
return func(o *registry.Options) {
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, "consul_config", c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AllowStale sets whether any Consul server (non-leader) can service
|
|
||||||
// a read. This allows for lower latency and higher throughput
|
|
||||||
// at the cost of potentially stale data.
|
|
||||||
// Works similar to Consul DNS Config option [1].
|
|
||||||
// Defaults to true.
|
|
||||||
//
|
|
||||||
// [1] https://www.consul.io/docs/agent/options.html#allow_stale
|
|
||||||
//
|
|
||||||
func AllowStale(v bool) registry.Option {
|
|
||||||
return func(o *registry.Options) {
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, "consul_allow_stale", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueryOptions specifies the QueryOptions to be used when calling
|
|
||||||
// Consul. See `Consul API` for more information [1].
|
|
||||||
//
|
|
||||||
// [1] https://godoc.org/github.com/hashicorp/consul/api#QueryOptions
|
|
||||||
//
|
|
||||||
func QueryOptions(q *consul.QueryOptions) registry.Option {
|
|
||||||
return func(o *registry.Options) {
|
|
||||||
if q == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, "consul_query_options", q)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// TCPCheck will tell the service provider to check the service address
|
|
||||||
// and port every `t` interval. It will enabled only if `t` is greater than 0.
|
|
||||||
// See `TCP + Interval` for more information [1].
|
|
||||||
//
|
|
||||||
// [1] https://www.consul.io/docs/agent/checks.html
|
|
||||||
//
|
|
||||||
func TCPCheck(t time.Duration) registry.Option {
|
|
||||||
return func(o *registry.Options) {
|
|
||||||
if t <= time.Duration(0) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, "consul_tcp_check", t)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,208 +0,0 @@
|
|||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
consul "github.com/hashicorp/consul/api"
|
|
||||||
"github.com/micro/go-micro/registry"
|
|
||||||
)
|
|
||||||
|
|
||||||
type mockRegistry struct {
|
|
||||||
body []byte
|
|
||||||
status int
|
|
||||||
err error
|
|
||||||
url string
|
|
||||||
}
|
|
||||||
|
|
||||||
func encodeData(obj interface{}) ([]byte, error) {
|
|
||||||
buf := bytes.NewBuffer(nil)
|
|
||||||
enc := json.NewEncoder(buf)
|
|
||||||
if err := enc.Encode(obj); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return buf.Bytes(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMockServer(rg *mockRegistry, l net.Listener) error {
|
|
||||||
mux := http.NewServeMux()
|
|
||||||
mux.HandleFunc(rg.url, func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if rg.err != nil {
|
|
||||||
http.Error(w, rg.err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
w.WriteHeader(rg.status)
|
|
||||||
w.Write(rg.body)
|
|
||||||
})
|
|
||||||
return http.Serve(l, mux)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) {
|
|
||||||
l, err := net.Listen("tcp", "localhost:0")
|
|
||||||
if err != nil {
|
|
||||||
// blurgh?!!
|
|
||||||
panic(err.Error())
|
|
||||||
}
|
|
||||||
cfg := consul.DefaultConfig()
|
|
||||||
cfg.Address = l.Addr().String()
|
|
||||||
|
|
||||||
go newMockServer(r, l)
|
|
||||||
|
|
||||||
var cr = &consulRegistry{
|
|
||||||
config: cfg,
|
|
||||||
Address: []string{cfg.Address},
|
|
||||||
opts: registry.Options{},
|
|
||||||
register: make(map[string]uint64),
|
|
||||||
lastChecked: make(map[string]time.Time),
|
|
||||||
queryOptions: &consul.QueryOptions{
|
|
||||||
AllowStale: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
cr.Client()
|
|
||||||
|
|
||||||
return cr, func() {
|
|
||||||
l.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newServiceList(svc []*consul.ServiceEntry) []byte {
|
|
||||||
bts, _ := encodeData(svc)
|
|
||||||
return bts
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConsul_GetService_WithError(t *testing.T) {
|
|
||||||
cr, cl := newConsulTestRegistry(&mockRegistry{
|
|
||||||
err: errors.New("client-error"),
|
|
||||||
url: "/v1/health/service/service-name",
|
|
||||||
})
|
|
||||||
defer cl()
|
|
||||||
|
|
||||||
if _, err := cr.GetService("test-service"); err == nil {
|
|
||||||
t.Fatalf("Expected error not to be `nil`")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConsul_GetService_WithHealthyServiceNodes(t *testing.T) {
|
|
||||||
// warning is still seen as healthy, critical is not
|
|
||||||
svcs := []*consul.ServiceEntry{
|
|
||||||
newServiceEntry(
|
|
||||||
"node-name-1", "node-address-1", "service-name", "v1.0.0",
|
|
||||||
[]*consul.HealthCheck{
|
|
||||||
newHealthCheck("node-name-1", "service-name", "passing"),
|
|
||||||
newHealthCheck("node-name-1", "service-name", "warning"),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
newServiceEntry(
|
|
||||||
"node-name-2", "node-address-2", "service-name", "v1.0.0",
|
|
||||||
[]*consul.HealthCheck{
|
|
||||||
newHealthCheck("node-name-2", "service-name", "passing"),
|
|
||||||
newHealthCheck("node-name-2", "service-name", "warning"),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
cr, cl := newConsulTestRegistry(&mockRegistry{
|
|
||||||
status: 200,
|
|
||||||
body: newServiceList(svcs),
|
|
||||||
url: "/v1/health/service/service-name",
|
|
||||||
})
|
|
||||||
defer cl()
|
|
||||||
|
|
||||||
svc, err := cr.GetService("service-name")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("Unexpected error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if exp, act := 1, len(svc); exp != act {
|
|
||||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
|
||||||
}
|
|
||||||
|
|
||||||
if exp, act := 2, len(svc[0].Nodes); exp != act {
|
|
||||||
t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConsul_GetService_WithUnhealthyServiceNode(t *testing.T) {
|
|
||||||
// warning is still seen as healthy, critical is not
|
|
||||||
svcs := []*consul.ServiceEntry{
|
|
||||||
newServiceEntry(
|
|
||||||
"node-name-1", "node-address-1", "service-name", "v1.0.0",
|
|
||||||
[]*consul.HealthCheck{
|
|
||||||
newHealthCheck("node-name-1", "service-name", "passing"),
|
|
||||||
newHealthCheck("node-name-1", "service-name", "warning"),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
newServiceEntry(
|
|
||||||
"node-name-2", "node-address-2", "service-name", "v1.0.0",
|
|
||||||
[]*consul.HealthCheck{
|
|
||||||
newHealthCheck("node-name-2", "service-name", "passing"),
|
|
||||||
newHealthCheck("node-name-2", "service-name", "critical"),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
cr, cl := newConsulTestRegistry(&mockRegistry{
|
|
||||||
status: 200,
|
|
||||||
body: newServiceList(svcs),
|
|
||||||
url: "/v1/health/service/service-name",
|
|
||||||
})
|
|
||||||
defer cl()
|
|
||||||
|
|
||||||
svc, err := cr.GetService("service-name")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("Unexpected error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if exp, act := 1, len(svc); exp != act {
|
|
||||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
|
||||||
}
|
|
||||||
|
|
||||||
if exp, act := 1, len(svc[0].Nodes); exp != act {
|
|
||||||
t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConsul_GetService_WithUnhealthyServiceNodes(t *testing.T) {
|
|
||||||
// warning is still seen as healthy, critical is not
|
|
||||||
svcs := []*consul.ServiceEntry{
|
|
||||||
newServiceEntry(
|
|
||||||
"node-name-1", "node-address-1", "service-name", "v1.0.0",
|
|
||||||
[]*consul.HealthCheck{
|
|
||||||
newHealthCheck("node-name-1", "service-name", "passing"),
|
|
||||||
newHealthCheck("node-name-1", "service-name", "critical"),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
newServiceEntry(
|
|
||||||
"node-name-2", "node-address-2", "service-name", "v1.0.0",
|
|
||||||
[]*consul.HealthCheck{
|
|
||||||
newHealthCheck("node-name-2", "service-name", "passing"),
|
|
||||||
newHealthCheck("node-name-2", "service-name", "critical"),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
cr, cl := newConsulTestRegistry(&mockRegistry{
|
|
||||||
status: 200,
|
|
||||||
body: newServiceList(svcs),
|
|
||||||
url: "/v1/health/service/service-name",
|
|
||||||
})
|
|
||||||
defer cl()
|
|
||||||
|
|
||||||
svc, err := cr.GetService("service-name")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("Unexpected error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if exp, act := 1, len(svc); exp != act {
|
|
||||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
|
||||||
}
|
|
||||||
|
|
||||||
if exp, act := 0, len(svc[0].Nodes); exp != act {
|
|
||||||
t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,290 +0,0 @@
|
|||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/api"
|
|
||||||
"github.com/hashicorp/consul/api/watch"
|
|
||||||
"github.com/micro/go-micro/registry"
|
|
||||||
)
|
|
||||||
|
|
||||||
type consulWatcher struct {
|
|
||||||
r *consulRegistry
|
|
||||||
wo registry.WatchOptions
|
|
||||||
wp *watch.Plan
|
|
||||||
watchers map[string]*watch.Plan
|
|
||||||
|
|
||||||
next chan *registry.Result
|
|
||||||
exit chan bool
|
|
||||||
|
|
||||||
sync.RWMutex
|
|
||||||
services map[string][]*registry.Service
|
|
||||||
}
|
|
||||||
|
|
||||||
func newConsulWatcher(cr *consulRegistry, opts ...registry.WatchOption) (registry.Watcher, error) {
|
|
||||||
var wo registry.WatchOptions
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&wo)
|
|
||||||
}
|
|
||||||
|
|
||||||
cw := &consulWatcher{
|
|
||||||
r: cr,
|
|
||||||
wo: wo,
|
|
||||||
exit: make(chan bool),
|
|
||||||
next: make(chan *registry.Result, 10),
|
|
||||||
watchers: make(map[string]*watch.Plan),
|
|
||||||
services: make(map[string][]*registry.Service),
|
|
||||||
}
|
|
||||||
|
|
||||||
wp, err := watch.Parse(map[string]interface{}{"type": "services"})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
wp.Handler = cw.handle
|
|
||||||
go wp.RunWithClientAndLogger(cr.Client(), log.New(os.Stderr, "", log.LstdFlags))
|
|
||||||
cw.wp = wp
|
|
||||||
|
|
||||||
return cw, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
|
|
||||||
entries, ok := data.([]*api.ServiceEntry)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceMap := map[string]*registry.Service{}
|
|
||||||
serviceName := ""
|
|
||||||
|
|
||||||
for _, e := range entries {
|
|
||||||
serviceName = e.Service.Service
|
|
||||||
// version is now a tag
|
|
||||||
version, _ := decodeVersion(e.Service.Tags)
|
|
||||||
// service ID is now the node id
|
|
||||||
id := e.Service.ID
|
|
||||||
// key is always the version
|
|
||||||
key := version
|
|
||||||
// address is service address
|
|
||||||
address := e.Service.Address
|
|
||||||
|
|
||||||
// use node address
|
|
||||||
if len(address) == 0 {
|
|
||||||
address = e.Node.Address
|
|
||||||
}
|
|
||||||
|
|
||||||
svc, ok := serviceMap[key]
|
|
||||||
if !ok {
|
|
||||||
svc = ®istry.Service{
|
|
||||||
Endpoints: decodeEndpoints(e.Service.Tags),
|
|
||||||
Name: e.Service.Service,
|
|
||||||
Version: version,
|
|
||||||
}
|
|
||||||
serviceMap[key] = svc
|
|
||||||
}
|
|
||||||
|
|
||||||
var del bool
|
|
||||||
|
|
||||||
for _, check := range e.Checks {
|
|
||||||
// delete the node if the status is critical
|
|
||||||
if check.Status == "critical" {
|
|
||||||
del = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if delete then skip the node
|
|
||||||
if del {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
svc.Nodes = append(svc.Nodes, ®istry.Node{
|
|
||||||
Id: id,
|
|
||||||
Address: fmt.Sprintf("%s:%d", address, e.Service.Port),
|
|
||||||
Metadata: decodeMetadata(e.Service.Tags),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
cw.RLock()
|
|
||||||
// make a copy
|
|
||||||
rservices := make(map[string][]*registry.Service)
|
|
||||||
for k, v := range cw.services {
|
|
||||||
rservices[k] = v
|
|
||||||
}
|
|
||||||
cw.RUnlock()
|
|
||||||
|
|
||||||
var newServices []*registry.Service
|
|
||||||
|
|
||||||
// serviceMap is the new set of services keyed by name+version
|
|
||||||
for _, newService := range serviceMap {
|
|
||||||
// append to the new set of cached services
|
|
||||||
newServices = append(newServices, newService)
|
|
||||||
|
|
||||||
// check if the service exists in the existing cache
|
|
||||||
oldServices, ok := rservices[serviceName]
|
|
||||||
if !ok {
|
|
||||||
// does not exist? then we're creating brand new entries
|
|
||||||
cw.next <- ®istry.Result{Action: "create", Service: newService}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// service exists. ok let's figure out what to update and delete version wise
|
|
||||||
action := "create"
|
|
||||||
|
|
||||||
for _, oldService := range oldServices {
|
|
||||||
// does this version exist?
|
|
||||||
// no? then default to create
|
|
||||||
if oldService.Version != newService.Version {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// yes? then it's an update
|
|
||||||
action = "update"
|
|
||||||
|
|
||||||
var nodes []*registry.Node
|
|
||||||
// check the old nodes to see if they've been deleted
|
|
||||||
for _, oldNode := range oldService.Nodes {
|
|
||||||
var seen bool
|
|
||||||
for _, newNode := range newService.Nodes {
|
|
||||||
if newNode.Id == oldNode.Id {
|
|
||||||
seen = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// does the old node exist in the new set of nodes
|
|
||||||
// no? then delete that shit
|
|
||||||
if !seen {
|
|
||||||
nodes = append(nodes, oldNode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// it's an update rather than creation
|
|
||||||
if len(nodes) > 0 {
|
|
||||||
delService := registry.CopyService(oldService)
|
|
||||||
delService.Nodes = nodes
|
|
||||||
cw.next <- ®istry.Result{Action: "delete", Service: delService}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cw.next <- ®istry.Result{Action: action, Service: newService}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now check old versions that may not be in new services map
|
|
||||||
for _, old := range rservices[serviceName] {
|
|
||||||
// old version does not exist in new version map
|
|
||||||
// kill it with fire!
|
|
||||||
if _, ok := serviceMap[old.Version]; !ok {
|
|
||||||
cw.next <- ®istry.Result{Action: "delete", Service: old}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cw.Lock()
|
|
||||||
cw.services[serviceName] = newServices
|
|
||||||
cw.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cw *consulWatcher) handle(idx uint64, data interface{}) {
|
|
||||||
services, ok := data.(map[string][]string)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// add new watchers
|
|
||||||
for service, _ := range services {
|
|
||||||
// Filter on watch options
|
|
||||||
// wo.Service: Only watch services we care about
|
|
||||||
if len(cw.wo.Service) > 0 && service != cw.wo.Service {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, ok := cw.watchers[service]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
wp, err := watch.Parse(map[string]interface{}{
|
|
||||||
"type": "service",
|
|
||||||
"service": service,
|
|
||||||
})
|
|
||||||
if err == nil {
|
|
||||||
wp.Handler = cw.serviceHandler
|
|
||||||
go wp.RunWithClientAndLogger(cw.r.Client(), log.New(os.Stderr, "", log.LstdFlags))
|
|
||||||
cw.watchers[service] = wp
|
|
||||||
cw.next <- ®istry.Result{Action: "create", Service: ®istry.Service{Name: service}}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cw.RLock()
|
|
||||||
// make a copy
|
|
||||||
rservices := make(map[string][]*registry.Service)
|
|
||||||
for k, v := range cw.services {
|
|
||||||
rservices[k] = v
|
|
||||||
}
|
|
||||||
cw.RUnlock()
|
|
||||||
|
|
||||||
// remove unknown services from registry
|
|
||||||
// save the things we want to delete
|
|
||||||
deleted := make(map[string][]*registry.Service)
|
|
||||||
|
|
||||||
for service, _ := range rservices {
|
|
||||||
if _, ok := services[service]; !ok {
|
|
||||||
cw.Lock()
|
|
||||||
// save this before deleting
|
|
||||||
deleted[service] = cw.services[service]
|
|
||||||
delete(cw.services, service)
|
|
||||||
cw.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove unknown services from watchers
|
|
||||||
for service, w := range cw.watchers {
|
|
||||||
if _, ok := services[service]; !ok {
|
|
||||||
w.Stop()
|
|
||||||
delete(cw.watchers, service)
|
|
||||||
for _, oldService := range deleted[service] {
|
|
||||||
// send a delete for the service nodes that we're removing
|
|
||||||
cw.next <- ®istry.Result{Action: "delete", Service: oldService}
|
|
||||||
}
|
|
||||||
// sent the empty list as the last resort to indicate to delete the entire service
|
|
||||||
cw.next <- ®istry.Result{Action: "delete", Service: ®istry.Service{Name: service}}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cw *consulWatcher) Next() (*registry.Result, error) {
|
|
||||||
select {
|
|
||||||
case <-cw.exit:
|
|
||||||
return nil, registry.ErrWatcherStopped
|
|
||||||
case r, ok := <-cw.next:
|
|
||||||
if !ok {
|
|
||||||
return nil, registry.ErrWatcherStopped
|
|
||||||
}
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
// NOTE: This is a dead code path: e.g. it will never be reached
|
|
||||||
// as we return in all previous code paths never leading to this return
|
|
||||||
return nil, registry.ErrWatcherStopped
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cw *consulWatcher) Stop() {
|
|
||||||
select {
|
|
||||||
case <-cw.exit:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
close(cw.exit)
|
|
||||||
if cw.wp == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
cw.wp.Stop()
|
|
||||||
|
|
||||||
// drain results
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-cw.next:
|
|
||||||
default:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,86 +0,0 @@
|
|||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/api"
|
|
||||||
"github.com/micro/go-micro/registry"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestHealthyServiceHandler(t *testing.T) {
|
|
||||||
watcher := newWatcher()
|
|
||||||
serviceEntry := newServiceEntry(
|
|
||||||
"node-name", "node-address", "service-name", "v1.0.0",
|
|
||||||
[]*api.HealthCheck{
|
|
||||||
newHealthCheck("node-name", "service-name", "passing"),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
watcher.serviceHandler(1234, []*api.ServiceEntry{serviceEntry})
|
|
||||||
|
|
||||||
if len(watcher.services["service-name"][0].Nodes) != 1 {
|
|
||||||
t.Errorf("Expected length of the service nodes to be 1")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestUnhealthyServiceHandler(t *testing.T) {
|
|
||||||
watcher := newWatcher()
|
|
||||||
serviceEntry := newServiceEntry(
|
|
||||||
"node-name", "node-address", "service-name", "v1.0.0",
|
|
||||||
[]*api.HealthCheck{
|
|
||||||
newHealthCheck("node-name", "service-name", "critical"),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
watcher.serviceHandler(1234, []*api.ServiceEntry{serviceEntry})
|
|
||||||
|
|
||||||
if len(watcher.services["service-name"][0].Nodes) != 0 {
|
|
||||||
t.Errorf("Expected length of the service nodes to be 0")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestUnhealthyNodeServiceHandler(t *testing.T) {
|
|
||||||
watcher := newWatcher()
|
|
||||||
serviceEntry := newServiceEntry(
|
|
||||||
"node-name", "node-address", "service-name", "v1.0.0",
|
|
||||||
[]*api.HealthCheck{
|
|
||||||
newHealthCheck("node-name", "service-name", "passing"),
|
|
||||||
newHealthCheck("node-name", "serfHealth", "critical"),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
watcher.serviceHandler(1234, []*api.ServiceEntry{serviceEntry})
|
|
||||||
|
|
||||||
if len(watcher.services["service-name"][0].Nodes) != 0 {
|
|
||||||
t.Errorf("Expected length of the service nodes to be 0")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newWatcher() *consulWatcher {
|
|
||||||
return &consulWatcher{
|
|
||||||
exit: make(chan bool),
|
|
||||||
next: make(chan *registry.Result, 10),
|
|
||||||
services: make(map[string][]*registry.Service),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newHealthCheck(node, name, status string) *api.HealthCheck {
|
|
||||||
return &api.HealthCheck{
|
|
||||||
Node: node,
|
|
||||||
Name: name,
|
|
||||||
Status: status,
|
|
||||||
ServiceName: name,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newServiceEntry(node, address, name, version string, checks []*api.HealthCheck) *api.ServiceEntry {
|
|
||||||
return &api.ServiceEntry{
|
|
||||||
Node: &api.Node{Node: node, Address: name},
|
|
||||||
Service: &api.AgentService{
|
|
||||||
Service: name,
|
|
||||||
Address: address,
|
|
||||||
Tags: encodeVersion(version),
|
|
||||||
},
|
|
||||||
Checks: checks,
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user