micro/registry/consul_registry.go

252 lines
4.8 KiB
Go
Raw Normal View History

2015-01-14 02:31:27 +03:00
package registry
import (
2016-01-16 23:25:18 +03:00
"crypto/tls"
"encoding/json"
2015-01-14 02:31:27 +03:00
"errors"
"fmt"
"net"
2016-01-16 23:25:18 +03:00
"net/http"
"runtime"
"time"
2015-01-14 02:31:27 +03:00
consul "github.com/hashicorp/consul/api"
2015-01-14 02:31:27 +03:00
)
type consulRegistry struct {
2015-02-15 02:00:47 +03:00
Address string
Client *consul.Client
2015-12-19 21:28:08 +03:00
Options Options
}
2016-01-17 02:39:47 +03:00
func newTransport(config *tls.Config) *http.Transport {
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
2016-01-16 23:25:18 +03:00
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
2016-01-17 02:39:47 +03:00
TLSClientConfig: config,
2016-01-16 23:25:18 +03:00
}
runtime.SetFinalizer(&t, func(tr **http.Transport) {
(*tr).CloseIdleConnections()
})
return t
}
2015-10-11 14:05:20 +03:00
func encodeEndpoints(en []*Endpoint) []string {
var tags []string
for _, e := range en {
if b, err := json.Marshal(e); err == nil {
tags = append(tags, "e="+string(b))
}
}
return tags
}
func decodeEndpoints(tags []string) []*Endpoint {
var en []*Endpoint
for _, tag := range tags {
if len(tag) == 0 || tag[0] != 'e' {
continue
}
var e *Endpoint
if err := json.Unmarshal([]byte(tag[2:]), &e); err == nil {
en = append(en, e)
}
}
return en
}
2015-05-27 00:39:48 +03:00
func encodeMetadata(md map[string]string) []string {
var tags []string
for k, v := range md {
if b, err := json.Marshal(map[string]string{
k: v,
}); err == nil {
2015-10-11 14:05:20 +03:00
tags = append(tags, "t="+string(b))
}
}
return tags
}
2015-05-27 00:39:48 +03:00
func decodeMetadata(tags []string) map[string]string {
md := make(map[string]string)
for _, tag := range tags {
2015-10-11 14:05:20 +03:00
if len(tag) == 0 || tag[0] != 't' {
continue
}
var kv map[string]string
2015-10-11 14:05:20 +03:00
if err := json.Unmarshal([]byte(tag[2:]), &kv); err == nil {
for k, v := range kv {
md[k] = v
}
}
}
return md
2015-02-15 02:00:47 +03:00
}
2015-01-14 02:31:27 +03:00
func newConsulRegistry(addrs []string, opts ...Option) Registry {
2015-12-19 21:28:08 +03:00
var opt Options
for _, o := range opts {
o(&opt)
}
// use default config
config := consul.DefaultConfig()
2015-12-19 21:28:08 +03:00
// set timeout
if opt.Timeout > 0 {
config.HttpClient.Timeout = opt.Timeout
}
// check if there are any addrs
if len(addrs) > 0 {
addr, port, err := net.SplitHostPort(addrs[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}
2015-12-19 21:28:08 +03:00
2016-01-16 23:25:18 +03:00
// requires secure connection?
if opt.Secure {
config.Scheme = "https"
// We're going to support InsecureSkipVerify
2016-01-17 02:39:47 +03:00
config.HttpClient.Transport = newTransport(opt.TLSConfig)
2016-01-16 23:25:18 +03:00
}
2015-12-19 21:28:08 +03:00
// create the client
2015-10-26 23:23:57 +03:00
client, _ := consul.NewClient(config)
cr := &consulRegistry{
2015-12-05 04:12:29 +03:00
Address: config.Address,
Client: client,
2015-12-19 21:28:08 +03:00
Options: opt,
}
return cr
}
func (c *consulRegistry) Deregister(s *Service) error {
if len(s.Nodes) == 0 {
2015-01-14 02:31:27 +03:00
return errors.New("Require at least one node")
}
node := s.Nodes[0]
2015-01-14 02:31:27 +03:00
_, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{
Node: node.Id,
Address: node.Address,
2015-01-14 02:31:27 +03:00
}, nil)
return err
}
func (c *consulRegistry) Register(s *Service) error {
if len(s.Nodes) == 0 {
2015-01-14 02:31:27 +03:00
return errors.New("Require at least one node")
}
node := s.Nodes[0]
2015-05-27 00:39:48 +03:00
tags := encodeMetadata(node.Metadata)
2015-10-11 14:05:20 +03:00
tags = append(tags, encodeEndpoints(s.Endpoints)...)
2015-01-14 02:31:27 +03:00
_, err := c.Client.Catalog().Register(&consul.CatalogRegistration{
Node: node.Id,
Address: node.Address,
2015-01-14 02:31:27 +03:00
Service: &consul.AgentService{
2015-11-08 04:48:48 +03:00
ID: s.Version,
Service: s.Name,
Port: node.Port,
Tags: tags,
2015-01-14 02:31:27 +03:00
},
}, nil)
return err
}
2015-11-08 04:48:48 +03:00
func (c *consulRegistry) GetService(name string) ([]*Service, error) {
2015-01-14 02:31:27 +03:00
rsp, _, err := c.Client.Catalog().Service(name, "", nil)
if err != nil {
return nil, err
}
2015-11-08 04:48:48 +03:00
serviceMap := map[string]*Service{}
2015-01-14 02:31:27 +03:00
for _, s := range rsp {
if s.ServiceName != name {
continue
}
2015-11-08 04:48:48 +03:00
id := s.Node
key := s.ServiceID
version := s.ServiceID
// We're adding service version but
// don't want to break backwards compatibility
if id == version {
key = "default"
version = ""
}
svc, ok := serviceMap[key]
if !ok {
svc = &Service{
Endpoints: decodeEndpoints(s.ServiceTags),
Name: s.ServiceName,
Version: version,
}
serviceMap[key] = svc
}
svc.Nodes = append(svc.Nodes, &Node{
Id: id,
Address: s.Address,
Port: s.ServicePort,
2015-05-27 00:39:48 +03:00
Metadata: decodeMetadata(s.ServiceTags),
2015-01-14 02:31:27 +03:00
})
}
2015-11-08 04:48:48 +03:00
var services []*Service
for _, service := range serviceMap {
services = append(services, service)
}
return services, nil
2015-01-14 02:31:27 +03:00
}
func (c *consulRegistry) ListServices() ([]*Service, error) {
2015-12-19 21:28:08 +03:00
rsp, _, err := c.Client.Catalog().Services(nil)
if err != nil {
return nil, err
}
2015-12-05 04:12:29 +03:00
var services []*Service
for service, _ := range rsp {
services = append(services, &Service{Name: service})
}
return services, nil
}
func (c *consulRegistry) Watch() (Watcher, error) {
return newConsulWatcher(c)
2015-01-14 02:31:27 +03:00
}
2015-12-20 00:56:14 +03:00
func (c *consulRegistry) String() string {
return "consul"
}