add endpoint data for consul registry

This commit is contained in:
Asim 2015-10-11 12:05:20 +01:00
parent ac995ba983
commit fcbd2acdde
3 changed files with 35 additions and 2 deletions

View File

@ -16,13 +16,38 @@ type consulRegistry struct {
services map[string]*Service services map[string]*Service
} }
func encodeEndpoints(en []*Endpoint) []string {
var tags []string
for _, e := range en {
if b, err := json.Marshal(e); err == nil {
tags = append(tags, "e="+string(b))
}
}
return tags
}
func decodeEndpoints(tags []string) []*Endpoint {
var en []*Endpoint
for _, tag := range tags {
if len(tag) == 0 || tag[0] != 'e' {
continue
}
var e *Endpoint
if err := json.Unmarshal([]byte(tag[2:]), &e); err == nil {
en = append(en, e)
}
}
return en
}
func encodeMetadata(md map[string]string) []string { func encodeMetadata(md map[string]string) []string {
var tags []string var tags []string
for k, v := range md { for k, v := range md {
if b, err := json.Marshal(map[string]string{ if b, err := json.Marshal(map[string]string{
k: v, k: v,
}); err == nil { }); err == nil {
tags = append(tags, string(b)) tags = append(tags, "t="+string(b))
} }
} }
return tags return tags
@ -31,8 +56,12 @@ func encodeMetadata(md map[string]string) []string {
func decodeMetadata(tags []string) map[string]string { func decodeMetadata(tags []string) map[string]string {
md := make(map[string]string) md := make(map[string]string)
for _, tag := range tags { for _, tag := range tags {
if len(tag) == 0 || tag[0] != 't' {
continue
}
var kv map[string]string var kv map[string]string
if err := json.Unmarshal([]byte(tag), &kv); err == nil { if err := json.Unmarshal([]byte(tag[2:]), &kv); err == nil {
for k, v := range kv { for k, v := range kv {
md[k] = v md[k] = v
} }
@ -80,6 +109,7 @@ func (c *consulRegistry) Register(s *Service) error {
node := s.Nodes[0] node := s.Nodes[0]
tags := encodeMetadata(node.Metadata) tags := encodeMetadata(node.Metadata)
tags = append(tags, encodeEndpoints(s.Endpoints)...)
_, err := c.Client.Catalog().Register(&consul.CatalogRegistration{ _, err := c.Client.Catalog().Register(&consul.CatalogRegistration{
Node: node.Id, Node: node.Id,
@ -116,6 +146,7 @@ func (c *consulRegistry) GetService(name string) (*Service, error) {
continue continue
} }
cs.Endpoints = decodeEndpoints(s.ServiceTags)
cs.Name = s.ServiceName cs.Name = s.ServiceName
cs.Nodes = append(cs.Nodes, &Node{ cs.Nodes = append(cs.Nodes, &Node{
Id: s.ServiceID, Id: s.ServiceID,

View File

@ -42,6 +42,7 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
cs := &Service{} cs := &Service{}
for _, e := range entries { for _, e := range entries {
cs.Endpoints = decodeEndpoints(e.Service.Tags)
cs.Name = e.Service.Service cs.Name = e.Service.Service
cs.Nodes = append(cs.Nodes, &Node{ cs.Nodes = append(cs.Nodes, &Node{
Id: e.Service.ID, Id: e.Service.ID,

View File

@ -21,6 +21,7 @@ func newRpcHandler(handler interface{}) Handler {
for m := 0; m < typ.NumMethod(); m++ { for m := 0; m < typ.NumMethod(); m++ {
if e := extractEndpoint(typ.Method(m)); e != nil { if e := extractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
endpoints = append(endpoints, e) endpoints = append(endpoints, e)
} }
} }