ConsulRegistry: use health checks to select nodes.
Consul sees a healthcheck that is in the warning state as a "failed" node. This means that when we ask Consul for services that are passing, it would not return nodes that have warning healthchecks. In the cache, we only check on critical to skip for nodes. This makes the cache out of sync with the non-cache implementation. This patch reworks the non-cache implementation to ask for all nodes (even unhealthy ones) and does the same check as within the cache, skip nodes that have critical healthchecks. We've noticed this issue when we deployed custom healthchecks where the cache was acting properly, but after 1 minute we saw "None Available" errors. This is due to the TTL expiry on the cache, which is then followed by doing a non cached request.
This commit is contained in:
parent
b8241f3026
commit
e59f7a7ace
@ -179,7 +179,7 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
}
|
||||
|
||||
func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
rsp, _, err := c.Client.Health().Service(name, "", true, nil)
|
||||
rsp, _, err := c.Client.Health().Service(name, "", false, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -216,6 +216,20 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
serviceMap[key] = svc
|
||||
}
|
||||
|
||||
var del bool
|
||||
for _, check := range s.Checks {
|
||||
// delete the node if the status is critical
|
||||
if check.Status == "critical" {
|
||||
del = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// if delete then skip the node
|
||||
if del {
|
||||
continue
|
||||
}
|
||||
|
||||
svc.Nodes = append(svc.Nodes, &Node{
|
||||
Id: id,
|
||||
Address: address,
|
||||
|
183
registry/consul_registry_test.go
Normal file
183
registry/consul_registry_test.go
Normal file
@ -0,0 +1,183 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
func TestConsul_GetService_WithError(t *testing.T) {
|
||||
cr := newConsulTestRegistry(&mockTransport{
|
||||
err: errors.New("client-error"),
|
||||
})
|
||||
|
||||
if _, err := cr.GetService("test-service"); err == nil {
|
||||
t.Fatalf("Expected error not to be `nil`")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsul_GetService_WithHealthyServiceNodes(t *testing.T) {
|
||||
// warning is still seen as healthy, critical is not
|
||||
svcs := []*consul.ServiceEntry{
|
||||
newServiceEntry(
|
||||
"node-name-1", "node-address-1", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-1", "service-name", "passing"),
|
||||
newHealthCheck("node-name-1", "service-name", "warning"),
|
||||
},
|
||||
),
|
||||
newServiceEntry(
|
||||
"node-name-2", "node-address-2", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-2", "service-name", "passing"),
|
||||
newHealthCheck("node-name-2", "service-name", "warning"),
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
cr := newConsulTestRegistry(&mockTransport{
|
||||
status: 200,
|
||||
body: newServiceList(svcs),
|
||||
url: "/v1/health/service/service-name",
|
||||
})
|
||||
|
||||
svc, _ := cr.GetService("service-name")
|
||||
if exp, act := 1, len(svc); exp != act {
|
||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
|
||||
if exp, act := 2, len(svc[0].Nodes); exp != act {
|
||||
t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsul_GetService_WithUnhealthyServiceNode(t *testing.T) {
|
||||
// warning is still seen as healthy, critical is not
|
||||
svcs := []*consul.ServiceEntry{
|
||||
newServiceEntry(
|
||||
"node-name-1", "node-address-1", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-1", "service-name", "passing"),
|
||||
newHealthCheck("node-name-1", "service-name", "warning"),
|
||||
},
|
||||
),
|
||||
newServiceEntry(
|
||||
"node-name-2", "node-address-2", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-2", "service-name", "passing"),
|
||||
newHealthCheck("node-name-2", "service-name", "critical"),
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
cr := newConsulTestRegistry(&mockTransport{
|
||||
status: 200,
|
||||
body: newServiceList(svcs),
|
||||
url: "/v1/health/service/service-name",
|
||||
})
|
||||
|
||||
svc, _ := cr.GetService("service-name")
|
||||
if exp, act := 1, len(svc); exp != act {
|
||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
|
||||
if exp, act := 1, len(svc[0].Nodes); exp != act {
|
||||
t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsul_GetService_WithUnhealthyServiceNodes(t *testing.T) {
|
||||
// warning is still seen as healthy, critical is not
|
||||
svcs := []*consul.ServiceEntry{
|
||||
newServiceEntry(
|
||||
"node-name-1", "node-address-1", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-1", "service-name", "passing"),
|
||||
newHealthCheck("node-name-1", "service-name", "critical"),
|
||||
},
|
||||
),
|
||||
newServiceEntry(
|
||||
"node-name-2", "node-address-2", "service-name", "v1.0.0",
|
||||
[]*consul.HealthCheck{
|
||||
newHealthCheck("node-name-2", "service-name", "passing"),
|
||||
newHealthCheck("node-name-2", "service-name", "critical"),
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
cr := newConsulTestRegistry(&mockTransport{
|
||||
status: 200,
|
||||
body: newServiceList(svcs),
|
||||
url: "/v1/health/service/service-name",
|
||||
})
|
||||
|
||||
svc, _ := cr.GetService("service-name")
|
||||
if exp, act := 1, len(svc); exp != act {
|
||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
|
||||
if exp, act := 0, len(svc[0].Nodes); exp != act {
|
||||
t.Fatalf("Expected len of nodes to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
}
|
||||
|
||||
func newServiceList(svc []*consul.ServiceEntry) []byte {
|
||||
bts, _ := encodeData(svc)
|
||||
return bts
|
||||
}
|
||||
|
||||
func newConsulTestRegistry(t *mockTransport) *consulRegistry {
|
||||
cfg := &consul.Config{
|
||||
HttpClient: mockHttpClient(t),
|
||||
}
|
||||
cl, _ := consul.NewClient(cfg)
|
||||
|
||||
return &consulRegistry{
|
||||
Address: cfg.Address,
|
||||
Client: cl,
|
||||
register: make(map[string]uint64),
|
||||
}
|
||||
}
|
||||
|
||||
func mockHttpClient(t *mockTransport) *http.Client {
|
||||
return &http.Client{
|
||||
Transport: t,
|
||||
}
|
||||
}
|
||||
|
||||
type mockTransport struct {
|
||||
body []byte
|
||||
status int
|
||||
err error
|
||||
url string
|
||||
}
|
||||
|
||||
func encodeData(obj interface{}) ([]byte, error) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
enc := json.NewEncoder(buf)
|
||||
if err := enc.Encode(obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
if t.err != nil {
|
||||
return nil, t.err
|
||||
}
|
||||
|
||||
if t.url != "" && fmt.Sprintf("http://127.0.0.1:8500%s", t.url) != req.URL.String() {
|
||||
return nil, errors.New("URLs do not match")
|
||||
}
|
||||
|
||||
return &http.Response{
|
||||
StatusCode: t.status,
|
||||
Body: ioutil.NopCloser(bytes.NewReader(t.body)),
|
||||
}, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user