Add ability to specify Consul options and default to AllowStale for all gets

This commit is contained in:
Blair McMillan 2018-11-23 17:11:37 +10:00
parent 4dc593eca3
commit 7171c00e42
3 changed files with 60 additions and 9 deletions

View File

@ -27,6 +27,40 @@ func Config(c *consul.Config) registry.Option {
} }
} }
// AllowStale sets whether any Consul server (non-leader) can service
// a read. This allows for lower latency and higher throughput
// at the cost of potentially stale data.
// Works similar to Consul DNS Config option [1].
// Defaults to true.
//
// [1] https://www.consul.io/docs/agent/options.html#allow_stale
//
func AllowStale(v bool) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "consul_allow_stale", v)
}
}
// QueryOptions specifies the QueryOptions to be used when calling
// Consul. See `Consul API` for more information [1].
//
// [1] https://godoc.org/github.com/hashicorp/consul/api#QueryOptions
//
func QueryOptions(q *consul.QueryOptions) registry.Option {
return func(o *registry.Options) {
if q == nil {
return
}
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "consul_query_options", q)
}
}
// //
// TCPCheck will tell the service provider to check the service address // TCPCheck will tell the service provider to check the service address
// and port every `t` interval. It will enabled only if `t` is greater than 0. // and port every `t` interval. It will enabled only if `t` is greater than 0.

View File

@ -22,6 +22,8 @@ type consulRegistry struct {
// connect enabled // connect enabled
connect bool connect bool
queryOptions *consul.QueryOptions
sync.Mutex sync.Mutex
register map[string]uint64 register map[string]uint64
// lastChecked tracks when a node was last checked as existing in Consul // lastChecked tracks when a node was last checked as existing in Consul
@ -80,6 +82,14 @@ func configure(c *consulRegistry, opts ...Option) {
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok { if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
c.connect = cn c.connect = cn
} }
// Use the consul query options passed in the options, if available
if qo, ok := c.opts.Context.Value("consul_query_options").(*consul.QueryOptions); ok && qo != nil {
c.queryOptions = qo
}
if as, ok := c.opts.Context.Value("consul_allow_stale").(bool); ok {
c.queryOptions.AllowStale = as
}
} }
// check if there are any addrs // check if there are any addrs
@ -123,6 +133,9 @@ func newConsulRegistry(opts ...Option) Registry {
opts: Options{}, opts: Options{},
register: make(map[string]uint64), register: make(map[string]uint64),
lastChecked: make(map[string]time.Time), lastChecked: make(map[string]time.Time),
queryOptions: &consul.QueryOptions{
AllowStale: true,
},
} }
configure(cr, opts...) configure(cr, opts...)
return cr return cr
@ -189,9 +202,7 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
if time.Since(c.lastChecked[s.Name]) <= getDeregisterTTL(regInterval) { if time.Since(c.lastChecked[s.Name]) <= getDeregisterTTL(regInterval) {
return nil return nil
} }
services, _, err := c.Client.Health().Checks(s.Name, &consul.QueryOptions{ services, _, err := c.Client.Health().Checks(s.Name, c.queryOptions)
AllowStale: true,
})
if err == nil { if err == nil {
for _, v := range services { for _, v := range services {
if v.ServiceID == node.Id { if v.ServiceID == node.Id {
@ -276,9 +287,9 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
// if we're connect enabled only get connect services // if we're connect enabled only get connect services
if c.connect { if c.connect {
rsp, _, err = c.Client.Health().Connect(name, "", false, nil) rsp, _, err = c.Client.Health().Connect(name, "", false, c.queryOptions)
} else { } else {
rsp, _, err = c.Client.Health().Service(name, "", false, nil) rsp, _, err = c.Client.Health().Service(name, "", false, c.queryOptions)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -347,7 +358,7 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
} }
func (c *consulRegistry) ListServices() ([]*Service, error) { func (c *consulRegistry) ListServices() ([]*Service, error) {
rsp, _, err := c.Client.Catalog().Services(nil) rsp, _, err := c.Client.Catalog().Services(c.queryOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -7,6 +7,7 @@ import (
"net" "net"
"net/http" "net/http"
"testing" "testing"
"time"
consul "github.com/hashicorp/consul/api" consul "github.com/hashicorp/consul/api"
) )
@ -53,9 +54,14 @@ func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) {
go newMockServer(r, l) go newMockServer(r, l)
return &consulRegistry{ return &consulRegistry{
Address: cfg.Address, Address: cfg.Address,
Client: cl, Client: cl,
register: make(map[string]uint64), opts: Options{},
register: make(map[string]uint64),
lastChecked: make(map[string]time.Time),
queryOptions: &consul.QueryOptions{
AllowStale: true,
},
}, func() { }, func() {
l.Close() l.Close()
} }