diff --git a/registry/consul/options.go b/registry/consul/options.go index 335cb57e..29bc3ee5 100644 --- a/registry/consul/options.go +++ b/registry/consul/options.go @@ -27,6 +27,40 @@ func Config(c *consul.Config) registry.Option { } } +// AllowStale sets whether any Consul server (non-leader) can service +// a read. This allows for lower latency and higher throughput +// at the cost of potentially stale data. +// Works similar to Consul DNS Config option [1]. +// Defaults to true. +// +// [1] https://www.consul.io/docs/agent/options.html#allow_stale +// +func AllowStale(v bool) registry.Option { + return func(o *registry.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, "consul_allow_stale", v) + } +} + +// QueryOptions specifies the QueryOptions to be used when calling +// Consul. See `Consul API` for more information [1]. +// +// [1] https://godoc.org/github.com/hashicorp/consul/api#QueryOptions +// +func QueryOptions(q *consul.QueryOptions) registry.Option { + return func(o *registry.Options) { + if q == nil { + return + } + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, "consul_query_options", q) + } +} + // // TCPCheck will tell the service provider to check the service address // and port every `t` interval. It will enabled only if `t` is greater than 0. diff --git a/registry/consul_registry.go b/registry/consul_registry.go index af5d0adf..52ccf759 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -22,6 +22,8 @@ type consulRegistry struct { // connect enabled connect bool + queryOptions *consul.QueryOptions + sync.Mutex register map[string]uint64 // lastChecked tracks when a node was last checked as existing in Consul @@ -80,6 +82,14 @@ func configure(c *consulRegistry, opts ...Option) { if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok { c.connect = cn } + + // Use the consul query options passed in the options, if available + if qo, ok := c.opts.Context.Value("consul_query_options").(*consul.QueryOptions); ok && qo != nil { + c.queryOptions = qo + } + if as, ok := c.opts.Context.Value("consul_allow_stale").(bool); ok { + c.queryOptions.AllowStale = as + } } // check if there are any addrs @@ -123,6 +133,9 @@ func newConsulRegistry(opts ...Option) Registry { opts: Options{}, register: make(map[string]uint64), lastChecked: make(map[string]time.Time), + queryOptions: &consul.QueryOptions{ + AllowStale: true, + }, } configure(cr, opts...) return cr @@ -189,9 +202,7 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { if time.Since(c.lastChecked[s.Name]) <= getDeregisterTTL(regInterval) { return nil } - services, _, err := c.Client.Health().Checks(s.Name, &consul.QueryOptions{ - AllowStale: true, - }) + services, _, err := c.Client.Health().Checks(s.Name, c.queryOptions) if err == nil { for _, v := range services { if v.ServiceID == node.Id { @@ -276,9 +287,9 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) { // if we're connect enabled only get connect services if c.connect { - rsp, _, err = c.Client.Health().Connect(name, "", false, nil) + rsp, _, err = c.Client.Health().Connect(name, "", false, c.queryOptions) } else { - rsp, _, err = c.Client.Health().Service(name, "", false, nil) + rsp, _, err = c.Client.Health().Service(name, "", false, c.queryOptions) } if err != nil { return nil, err @@ -347,7 +358,7 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) { } func (c *consulRegistry) ListServices() ([]*Service, error) { - rsp, _, err := c.Client.Catalog().Services(nil) + rsp, _, err := c.Client.Catalog().Services(c.queryOptions) if err != nil { return nil, err } diff --git a/registry/consul_registry_test.go b/registry/consul_registry_test.go index 425a5b26..1758cfaa 100644 --- a/registry/consul_registry_test.go +++ b/registry/consul_registry_test.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "testing" + "time" consul "github.com/hashicorp/consul/api" ) @@ -53,9 +54,14 @@ func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) { go newMockServer(r, l) return &consulRegistry{ - Address: cfg.Address, - Client: cl, - register: make(map[string]uint64), + Address: cfg.Address, + Client: cl, + opts: Options{}, + register: make(map[string]uint64), + lastChecked: make(map[string]time.Time), + queryOptions: &consul.QueryOptions{ + AllowStale: true, + }, }, func() { l.Close() }