137 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			137 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package etcd
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"log"
 | 
						|
	"path"
 | 
						|
	"strings"
 | 
						|
 | 
						|
	client "github.com/coreos/etcd/clientv3"
 | 
						|
	cc "github.com/coreos/etcd/clientv3/concurrency"
 | 
						|
	"github.com/micro/go-micro/sync/leader"
 | 
						|
)
 | 
						|
 | 
						|
type etcdLeader struct {
 | 
						|
	opts   leader.Options
 | 
						|
	path   string
 | 
						|
	client *client.Client
 | 
						|
}
 | 
						|
 | 
						|
type etcdElected struct {
 | 
						|
	s  *cc.Session
 | 
						|
	e  *cc.Election
 | 
						|
	id string
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) {
 | 
						|
	var options leader.ElectOptions
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	// make path
 | 
						|
	path := path.Join(e.path, strings.Replace(id, "/", "-", -1))
 | 
						|
 | 
						|
	s, err := cc.NewSession(e.client)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	l := cc.NewElection(s, path)
 | 
						|
 | 
						|
	if err := l.Campaign(context.TODO(), id); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &etcdElected{
 | 
						|
		e:  l,
 | 
						|
		id: id,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdLeader) Follow() chan string {
 | 
						|
	ch := make(chan string)
 | 
						|
 | 
						|
	s, err := cc.NewSession(e.client)
 | 
						|
	if err != nil {
 | 
						|
		return ch
 | 
						|
	}
 | 
						|
 | 
						|
	l := cc.NewElection(s, e.path)
 | 
						|
	ech := l.Observe(context.Background())
 | 
						|
 | 
						|
	go func() {
 | 
						|
		for r := range ech {
 | 
						|
			ch <- string(r.Kvs[0].Value)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	return ch
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdLeader) String() string {
 | 
						|
	return "etcd"
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdElected) Reelect() error {
 | 
						|
	return e.e.Campaign(context.TODO(), e.id)
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdElected) Revoked() chan bool {
 | 
						|
	ch := make(chan bool, 1)
 | 
						|
	ech := e.e.Observe(context.Background())
 | 
						|
 | 
						|
	go func() {
 | 
						|
		for r := range ech {
 | 
						|
			if string(r.Kvs[0].Value) != e.id {
 | 
						|
				ch <- true
 | 
						|
				close(ch)
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	return ch
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdElected) Resign() error {
 | 
						|
	return e.e.Resign(context.Background())
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdElected) Id() string {
 | 
						|
	return e.id
 | 
						|
}
 | 
						|
 | 
						|
func NewLeader(opts ...leader.Option) leader.Leader {
 | 
						|
	var options leader.Options
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	var endpoints []string
 | 
						|
 | 
						|
	for _, addr := range options.Nodes {
 | 
						|
		if len(addr) > 0 {
 | 
						|
			endpoints = append(endpoints, addr)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if len(endpoints) == 0 {
 | 
						|
		endpoints = []string{"http://127.0.0.1:2379"}
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO: parse addresses
 | 
						|
	c, err := client.New(client.Config{
 | 
						|
		Endpoints: endpoints,
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		log.Fatal(err)
 | 
						|
	}
 | 
						|
 | 
						|
	return &etcdLeader{
 | 
						|
		path:   "/micro/leader",
 | 
						|
		client: c,
 | 
						|
		opts:   options,
 | 
						|
	}
 | 
						|
}
 |