This commit is contained in:
Asim Aslam 2019-08-06 09:15:38 +01:00
parent 52d8d26018
commit e16420fdbd
3 changed files with 56 additions and 30 deletions

View File

@ -17,10 +17,12 @@ import (
) )
type consulRegistry struct { type consulRegistry struct {
Address string Address []string
Client *consul.Client
opts registry.Options opts registry.Options
client *consul.Client
config *consul.Config
// connect enabled // connect enabled
connect bool connect bool
@ -123,12 +125,12 @@ func configure(c *consulRegistry, opts ...registry.Option) {
config.HttpClient.Timeout = c.opts.Timeout config.HttpClient.Timeout = c.opts.Timeout
} }
// create the client // set address
client, _ := consul.NewClient(config) c.Address = c.opts.Addrs
// set address/client c.config = config
c.Address = config.Address
c.Client = client c.Client()
} }
func (c *consulRegistry) Init(opts ...registry.Option) error { func (c *consulRegistry) Init(opts ...registry.Option) error {
@ -148,7 +150,7 @@ func (c *consulRegistry) Deregister(s *registry.Service) error {
c.Unlock() c.Unlock()
node := s.Nodes[0] node := s.Nodes[0]
return c.Client.Agent().ServiceDeregister(node.Id) return c.Client().Agent().ServiceDeregister(node.Id)
} }
func (c *consulRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { func (c *consulRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
@ -193,7 +195,7 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
if time.Since(lastChecked) <= getDeregisterTTL(regInterval) { if time.Since(lastChecked) <= getDeregisterTTL(regInterval) {
return nil return nil
} }
services, _, err := c.Client.Health().Checks(s.Name, c.queryOptions) services, _, err := c.Client().Health().Checks(s.Name, c.queryOptions)
if err == nil { if err == nil {
for _, v := range services { for _, v := range services {
if v.ServiceID == node.Id { if v.ServiceID == node.Id {
@ -204,7 +206,7 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
} else { } else {
// if the err is nil we're all good, bail out // if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register // if not, we don't know what the state is, so full re-register
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil { if err := c.Client().Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil return nil
} }
} }
@ -256,7 +258,7 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
} }
} }
if err := c.Client.Agent().ServiceRegister(asr); err != nil { if err := c.Client().Agent().ServiceRegister(asr); err != nil {
return err return err
} }
@ -272,7 +274,7 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
} }
// pass the healthcheck // pass the healthcheck
return c.Client.Agent().PassTTL("service:"+node.Id, "") return c.Client().Agent().PassTTL("service:"+node.Id, "")
} }
func (c *consulRegistry) GetService(name string) ([]*registry.Service, error) { func (c *consulRegistry) GetService(name string) ([]*registry.Service, error) {
@ -281,9 +283,9 @@ func (c *consulRegistry) GetService(name string) ([]*registry.Service, error) {
// if we're connect enabled only get connect services // if we're connect enabled only get connect services
if c.connect { if c.connect {
rsp, _, err = c.Client.Health().Connect(name, "", false, c.queryOptions) rsp, _, err = c.Client().Health().Connect(name, "", false, c.queryOptions)
} else { } else {
rsp, _, err = c.Client.Health().Service(name, "", false, c.queryOptions) rsp, _, err = c.Client().Health().Service(name, "", false, c.queryOptions)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -351,7 +353,7 @@ func (c *consulRegistry) GetService(name string) ([]*registry.Service, error) {
} }
func (c *consulRegistry) ListServices() ([]*registry.Service, error) { func (c *consulRegistry) ListServices() ([]*registry.Service, error) {
rsp, _, err := c.Client.Catalog().Services(c.queryOptions) rsp, _, err := c.Client().Catalog().Services(c.queryOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -377,6 +379,28 @@ func (c *consulRegistry) Options() registry.Options {
return c.opts return c.opts
} }
func (c *consulRegistry) Client() *consul.Client {
if c.client != nil {
return c.client
}
if len(c.Address) == 0 {
tmp, _ := consul.NewClient(c.config)
return tmp
}
c.config.Address = c.Address[0]
tmpClint, _ := consul.NewClient(c.config)
_, err := tmpClint.Agent().Host()
if err != nil {
c.Address = c.Address[1:]
return c.Client()
}
c.client = tmpClint
return c.client
}
func NewRegistry(opts ...registry.Option) registry.Registry { func NewRegistry(opts ...registry.Option) registry.Registry {
cr := &consulRegistry{ cr := &consulRegistry{
opts: registry.Options{}, opts: registry.Options{},

View File

@ -50,22 +50,24 @@ func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) {
} }
cfg := consul.DefaultConfig() cfg := consul.DefaultConfig()
cfg.Address = l.Addr().String() cfg.Address = l.Addr().String()
cl, _ := consul.NewClient(cfg)
go newMockServer(r, l) go newMockServer(r, l)
return &consulRegistry{ var cr = &consulRegistry{
Address: cfg.Address, config: cfg,
Client: cl, Address: []string{cfg.Address},
opts: registry.Options{}, opts: registry.Options{},
register: make(map[string]uint64), register: make(map[string]uint64),
lastChecked: make(map[string]time.Time), lastChecked: make(map[string]time.Time),
queryOptions: &consul.QueryOptions{ queryOptions: &consul.QueryOptions{
AllowStale: true, AllowStale: true,
}, },
}, func() { }
l.Close() cr.Client()
}
return cr, func() {
l.Close()
}
} }
func newServiceList(svc []*consul.ServiceEntry) []byte { func newServiceList(svc []*consul.ServiceEntry) []byte {

View File

@ -45,7 +45,7 @@ func newConsulWatcher(cr *consulRegistry, opts ...registry.WatchOption) (registr
} }
wp.Handler = cw.handle wp.Handler = cw.handle
go wp.RunWithClientAndLogger(cr.Client, log.New(os.Stderr, "", log.LstdFlags)) go wp.RunWithClientAndLogger(cr.Client(), log.New(os.Stderr, "", log.LstdFlags))
cw.wp = wp cw.wp = wp
return cw, nil return cw, nil
@ -209,7 +209,7 @@ func (cw *consulWatcher) handle(idx uint64, data interface{}) {
}) })
if err == nil { if err == nil {
wp.Handler = cw.serviceHandler wp.Handler = cw.serviceHandler
go wp.RunWithClientAndLogger(cw.r.Client, log.New(os.Stderr, "", log.LstdFlags)) go wp.RunWithClientAndLogger(cw.r.Client(), log.New(os.Stderr, "", log.LstdFlags))
cw.watchers[service] = wp cw.watchers[service] = wp
cw.next <- &registry.Result{Action: "create", Service: &registry.Service{Name: service}} cw.next <- &registry.Result{Action: "create", Service: &registry.Service{Name: service}}
} }