micro/sync/leader/etcd/etcd.go

146 lines
2.3 KiB
Go
Raw Normal View History

2019-05-31 00:43:23 +01:00
package etcd
import (
"context"
"log"
"path"
"strings"
"github.com/micro/go-micro/sync/leader"
client "go.etcd.io/etcd/clientv3"
cc "go.etcd.io/etcd/clientv3/concurrency"
)
type etcdLeader struct {
opts leader.Options
path string
client *client.Client
}
type etcdElected struct {
s *cc.Session
e *cc.Election
id string
}
func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) {
var options leader.ElectOptions
for _, o := range opts {
o(&options)
}
// make path
path := path.Join(e.path, strings.Replace(id, "/", "-", -1))
s, err := cc.NewSession(e.client)
if err != nil {
return nil, err
}
l := cc.NewElection(s, path)
ctx, _ := context.WithCancel(context.Background())
if err := l.Campaign(ctx, id); err != nil {
return nil, err
}
return &etcdElected{
e: l,
id: id,
}, nil
}
func (e *etcdLeader) Follow() chan string {
ch := make(chan string)
s, err := cc.NewSession(e.client)
if err != nil {
return ch
}
l := cc.NewElection(s, e.path)
ech := l.Observe(context.Background())
go func() {
for {
select {
case r, ok := <-ech:
if !ok {
return
}
ch <- string(r.Kvs[0].Value)
}
}
}()
return ch
}
func (e *etcdLeader) String() string {
return "etcd"
}
func (e *etcdElected) Reelect() error {
ctx, _ := context.WithCancel(context.Background())
return e.e.Campaign(ctx, e.id)
}
func (e *etcdElected) Revoked() chan bool {
ch := make(chan bool, 1)
ech := e.e.Observe(context.Background())
go func() {
for r := range ech {
if string(r.Kvs[0].Value) != e.id {
ch <- true
close(ch)
return
}
}
}()
return ch
}
func (e *etcdElected) Resign() error {
return e.e.Resign(context.Background())
}
func (e *etcdElected) Id() string {
return e.id
}
func NewLeader(opts ...leader.Option) leader.Leader {
var options leader.Options
for _, o := range opts {
o(&options)
}
var endpoints []string
for _, addr := range options.Nodes {
if len(addr) > 0 {
endpoints = append(endpoints, addr)
}
}
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
// TODO: parse addresses
c, err := client.New(client.Config{
Endpoints: endpoints,
})
if err != nil {
log.Fatal(err)
}
return &etcdLeader{
path: "/micro/leader",
client: c,
opts: options,
}
}