package registry import ( "crypto/tls" "encoding/json" "errors" "fmt" "net" "net/http" "runtime" "time" consul "github.com/hashicorp/consul/api" ) type consulRegistry struct { Address string Client *consul.Client Options Options } func newTransport(config *tls.Config) *http.Transport { if config == nil { config = &tls.Config{ InsecureSkipVerify: true, } } t := &http.Transport{ Proxy: http.ProxyFromEnvironment, Dial: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, }).Dial, TLSHandshakeTimeout: 10 * time.Second, TLSClientConfig: config, } runtime.SetFinalizer(&t, func(tr **http.Transport) { (*tr).CloseIdleConnections() }) return t } func encodeEndpoints(en []*Endpoint) []string { var tags []string for _, e := range en { if b, err := json.Marshal(e); err == nil { tags = append(tags, "e="+string(b)) } } return tags } func decodeEndpoints(tags []string) []*Endpoint { var en []*Endpoint for _, tag := range tags { if len(tag) == 0 || tag[0] != 'e' { continue } var e *Endpoint if err := json.Unmarshal([]byte(tag[2:]), &e); err == nil { en = append(en, e) } } return en } func encodeMetadata(md map[string]string) []string { var tags []string for k, v := range md { if b, err := json.Marshal(map[string]string{ k: v, }); err == nil { tags = append(tags, "t="+string(b)) } } return tags } func decodeMetadata(tags []string) map[string]string { md := make(map[string]string) for _, tag := range tags { if len(tag) == 0 || tag[0] != 't' { continue } var kv map[string]string if err := json.Unmarshal([]byte(tag[2:]), &kv); err == nil { for k, v := range kv { md[k] = v } } } return md } func encodeVersion(v string) string { return "v=" + v } func decodeVersion(tags []string) (string, bool) { for _, tag := range tags { if len(tag) == 0 || tag[0] != 'v' { continue } return tag[2:], true } return "", false } func newConsulRegistry(addrs []string, opts ...Option) Registry { var opt Options for _, o := range opts { o(&opt) } // use default config config := consul.DefaultConfig() // set timeout if opt.Timeout > 0 { config.HttpClient.Timeout = opt.Timeout } // check if there are any addrs if len(addrs) > 0 { addr, port, err := net.SplitHostPort(addrs[0]) if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { port = "8500" addr = addrs[0] config.Address = fmt.Sprintf("%s:%s", addr, port) } else if err == nil { config.Address = fmt.Sprintf("%s:%s", addr, port) } } // requires secure connection? if opt.Secure || opt.TLSConfig != nil { config.Scheme = "https" // We're going to support InsecureSkipVerify config.HttpClient.Transport = newTransport(opt.TLSConfig) } // create the client client, _ := consul.NewClient(config) cr := &consulRegistry{ Address: config.Address, Client: client, Options: opt, } return cr } func (c *consulRegistry) Deregister(s *Service) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } node := s.Nodes[0] _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{ Node: node.Id, Address: node.Address, ServiceID: node.Id, }, nil) return err } func (c *consulRegistry) Register(s *Service) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } node := s.Nodes[0] tags := encodeMetadata(node.Metadata) tags = append(tags, encodeEndpoints(s.Endpoints)...) tags = append(tags, encodeVersion(s.Version)) if _, err := c.Client.Catalog().Register(&consul.CatalogRegistration{ // TODO: remove setting node and address Node: node.Id, Address: node.Address, Service: &consul.AgentService{ ID: node.Id, Service: s.Name, Port: node.Port, Tags: tags, Address: node.Address, }, }, nil); err != nil { return err } return nil } func (c *consulRegistry) GetService(name string) ([]*Service, error) { rsp, _, err := c.Client.Catalog().Service(name, "", nil) if err != nil { return nil, err } serviceMap := map[string]*Service{} for _, s := range rsp { if s.ServiceName != name { continue } // version is now a tag version, found := decodeVersion(s.ServiceTags) // service ID is now the node id id := s.ServiceID // key is always the version key := version // address is service address address := s.ServiceAddress // if we can't get the new type of version // use old the old ways if !found { // id was set as node id = s.Node // key was service id key = s.ServiceID // version was service id version = s.ServiceID // address was address address = s.Address } svc, ok := serviceMap[key] if !ok { svc = &Service{ Endpoints: decodeEndpoints(s.ServiceTags), Name: s.ServiceName, Version: version, } serviceMap[key] = svc } svc.Nodes = append(svc.Nodes, &Node{ Id: id, Address: address, Port: s.ServicePort, Metadata: decodeMetadata(s.ServiceTags), }) } var services []*Service for _, service := range serviceMap { services = append(services, service) } return services, nil } func (c *consulRegistry) ListServices() ([]*Service, error) { rsp, _, err := c.Client.Catalog().Services(nil) if err != nil { return nil, err } var services []*Service for service, _ := range rsp { services = append(services, &Service{Name: service}) } return services, nil } func (c *consulRegistry) Watch() (Watcher, error) { return newConsulWatcher(c) } func (c *consulRegistry) String() string { return "consul" }