micro/sync/leader/consul/consul.go

159 lines
2.6 KiB
Go
Raw Normal View History

2019-05-31 00:43:23 +01:00
package consul
import (
"fmt"
"log"
"net"
"os"
"path"
"sync"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"github.com/micro/go-micro/sync/leader"
)
type consulLeader struct {
opts leader.Options
c *api.Client
}
type consulElected struct {
c *api.Client
l *api.Lock
id string
key string
opts leader.ElectOptions
mtx sync.RWMutex
rv <-chan struct{}
}
func (c *consulLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) {
var options leader.ElectOptions
for _, o := range opts {
o(&options)
}
key := path.Join("micro/leader", c.opts.Group)
lc, err := c.c.LockOpts(&api.LockOptions{
Key: key,
Value: []byte(id),
})
if err != nil {
return nil, err
}
rv, err := lc.Lock(nil)
if err != nil {
return nil, err
}
return &consulElected{
c: c.c,
key: key,
rv: rv,
id: id,
l: lc,
opts: options,
}, nil
}
func (c *consulLeader) Follow() chan string {
ch := make(chan string, 1)
key := path.Join("/micro/leader", c.opts.Group)
p, err := watch.Parse(map[string]interface{}{
"type": "key",
"key": key,
})
if err != nil {
return ch
}
p.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(*api.KVPair)
if !ok || v == nil {
return // ignore
}
ch <- string(v.Value)
}
go p.RunWithClientAndLogger(c.c, log.New(os.Stdout, "consul: ", log.Lshortfile))
return ch
}
func (c *consulLeader) String() string {
return "consul"
}
func (c *consulElected) Id() string {
return c.id
}
func (c *consulElected) Reelect() error {
rv, err := c.l.Lock(nil)
if err != nil {
return err
}
c.mtx.Lock()
c.rv = rv
c.mtx.Unlock()
return nil
}
func (c *consulElected) Revoked() chan bool {
ch := make(chan bool, 1)
c.mtx.RLock()
rv := c.rv
c.mtx.RUnlock()
go func() {
<-rv
ch <- true
close(ch)
}()
return ch
}
func (c *consulElected) Resign() error {
return c.l.Unlock()
}
func NewLeader(opts ...leader.Option) leader.Leader {
options := leader.Options{
Group: "default",
}
for _, o := range opts {
o(&options)
}
config := api.DefaultConfig()
// set host
// config.Host something
// check if there are any addrs
if len(options.Nodes) > 0 {
addr, port, err := net.SplitHostPort(options.Nodes[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}
client, _ := api.NewClient(config)
return &consulLeader{
opts: options,
c: client,
}
}