Merge pull request #238 from myabuyllc/registry-tcp-check
Add option to enable TCP check with Consul registry
This commit is contained in:
		| @@ -2,6 +2,7 @@ package consul | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	consul "github.com/hashicorp/consul/api" | 	consul "github.com/hashicorp/consul/api" | ||||||
| 	"github.com/micro/go-micro/registry" | 	"github.com/micro/go-micro/registry" | ||||||
| @@ -15,3 +16,22 @@ func Config(c *consul.Config) registry.Option { | |||||||
| 		o.Context = context.WithValue(o.Context, "consul_config", c) | 		o.Context = context.WithValue(o.Context, "consul_config", c) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // | ||||||
|  | // TCPCheck will tell the service provider to check the service address | ||||||
|  | // and port every `t` interval. It will enabled only if `t` is greater than 0. | ||||||
|  | // See `TCP + Interval` for more information [1]. | ||||||
|  | // | ||||||
|  | // [1] https://www.consul.io/docs/agent/checks.html | ||||||
|  | // | ||||||
|  | func TCPCheck(t time.Duration) registry.Option { | ||||||
|  | 	return func(o *registry.Options) { | ||||||
|  | 		if t <= time.Duration(0) { | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		if o.Context == nil { | ||||||
|  | 			o.Context = context.Background() | ||||||
|  | 		} | ||||||
|  | 		o.Context = context.WithValue(o.Context, "consul_tcp_check", t) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
| @@ -115,16 +115,39 @@ func (c *consulRegistry) Deregister(s *Service) error { | |||||||
| 	return c.Client.Agent().ServiceDeregister(node.Id) | 	return c.Client.Agent().ServiceDeregister(node.Id) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func getDeregisterTTL(t time.Duration) time.Duration { | ||||||
|  | 	// splay slightly for the watcher? | ||||||
|  | 	splay := time.Second * 5 | ||||||
|  | 	deregTTL := t + splay | ||||||
|  |  | ||||||
|  | 	// consul has a minimum timeout on deregistration of 1 minute. | ||||||
|  | 	if t < time.Minute { | ||||||
|  | 		deregTTL = time.Minute + splay | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return deregTTL | ||||||
|  | } | ||||||
|  |  | ||||||
| func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { | func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { | ||||||
| 	if len(s.Nodes) == 0 { | 	if len(s.Nodes) == 0 { | ||||||
| 		return errors.New("Require at least one node") | 		return errors.New("Require at least one node") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	var regTCPCheck bool | ||||||
|  | 	var regInterval time.Duration | ||||||
|  |  | ||||||
| 	var options RegisterOptions | 	var options RegisterOptions | ||||||
| 	for _, o := range opts { | 	for _, o := range opts { | ||||||
| 		o(&options) | 		o(&options) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if c.opts.Context != nil { | ||||||
|  | 		if tcpCheckInterval, ok := c.opts.Context.Value("consul_tcp_check").(time.Duration); ok { | ||||||
|  | 			regTCPCheck = true | ||||||
|  | 			regInterval = tcpCheckInterval | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// create hash of service; uint64 | 	// create hash of service; uint64 | ||||||
| 	h, err := hash.Hash(s, nil) | 	h, err := hash.Hash(s, nil) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -155,16 +178,19 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { | |||||||
|  |  | ||||||
| 	var check *consul.AgentServiceCheck | 	var check *consul.AgentServiceCheck | ||||||
|  |  | ||||||
| 	// if the TTL is greater than 0 create an associated check | 	if regTCPCheck { | ||||||
| 	if options.TTL > time.Duration(0) { | 		deregTTL := getDeregisterTTL(regInterval) | ||||||
| 		// splay slightly for the watcher? |  | ||||||
| 		splay := time.Second * 5 | 		check = &consul.AgentServiceCheck{ | ||||||
| 		deregTTL := options.TTL + splay | 			TCP:                            fmt.Sprintf("%s:%d", node.Address, node.Port), | ||||||
| 		// consul has a minimum timeout on deregistration of 1 minute. | 			Interval:                       fmt.Sprintf("%v", regInterval), | ||||||
| 		if options.TTL < time.Minute { | 			DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL), | ||||||
| 			deregTTL = time.Minute + splay |  | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		// if the TTL is greater than 0 create an associated check | ||||||
|  | 	} else if options.TTL > time.Duration(0) { | ||||||
|  | 		deregTTL := getDeregisterTTL(options.TTL) | ||||||
|  |  | ||||||
| 		check = &consul.AgentServiceCheck{ | 		check = &consul.AgentServiceCheck{ | ||||||
| 			TTL: fmt.Sprintf("%v", options.TTL), | 			TTL: fmt.Sprintf("%v", options.TTL), | ||||||
| 			DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL), | 			DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL), | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user