159 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			159 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package consul
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"log"
 | 
						|
	"net"
 | 
						|
	"os"
 | 
						|
	"path"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/hashicorp/consul/api"
 | 
						|
	"github.com/hashicorp/consul/api/watch"
 | 
						|
	"github.com/micro/go-micro/sync/leader"
 | 
						|
)
 | 
						|
 | 
						|
type consulLeader struct {
 | 
						|
	opts leader.Options
 | 
						|
	c    *api.Client
 | 
						|
}
 | 
						|
 | 
						|
type consulElected struct {
 | 
						|
	c    *api.Client
 | 
						|
	l    *api.Lock
 | 
						|
	id   string
 | 
						|
	key  string
 | 
						|
	opts leader.ElectOptions
 | 
						|
 | 
						|
	mtx sync.RWMutex
 | 
						|
	rv  <-chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
func (c *consulLeader) Elect(id string, opts ...leader.ElectOption) (leader.Elected, error) {
 | 
						|
	var options leader.ElectOptions
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	key := path.Join("micro/leader", c.opts.Group)
 | 
						|
 | 
						|
	lc, err := c.c.LockOpts(&api.LockOptions{
 | 
						|
		Key:   key,
 | 
						|
		Value: []byte(id),
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	rv, err := lc.Lock(nil)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &consulElected{
 | 
						|
		c:    c.c,
 | 
						|
		key:  key,
 | 
						|
		rv:   rv,
 | 
						|
		id:   id,
 | 
						|
		l:    lc,
 | 
						|
		opts: options,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *consulLeader) Follow() chan string {
 | 
						|
	ch := make(chan string, 1)
 | 
						|
 | 
						|
	key := path.Join("/micro/leader", c.opts.Group)
 | 
						|
 | 
						|
	p, err := watch.Parse(map[string]interface{}{
 | 
						|
		"type": "key",
 | 
						|
		"key":  key,
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return ch
 | 
						|
	}
 | 
						|
	p.Handler = func(idx uint64, raw interface{}) {
 | 
						|
		if raw == nil {
 | 
						|
			return // ignore
 | 
						|
		}
 | 
						|
		v, ok := raw.(*api.KVPair)
 | 
						|
		if !ok || v == nil {
 | 
						|
			return // ignore
 | 
						|
		}
 | 
						|
		ch <- string(v.Value)
 | 
						|
	}
 | 
						|
 | 
						|
	go p.RunWithClientAndLogger(c.c, log.New(os.Stdout, "consul: ", log.Lshortfile))
 | 
						|
	return ch
 | 
						|
}
 | 
						|
 | 
						|
func (c *consulLeader) String() string {
 | 
						|
	return "consul"
 | 
						|
}
 | 
						|
 | 
						|
func (c *consulElected) Id() string {
 | 
						|
	return c.id
 | 
						|
}
 | 
						|
 | 
						|
func (c *consulElected) Reelect() error {
 | 
						|
	rv, err := c.l.Lock(nil)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	c.mtx.Lock()
 | 
						|
	c.rv = rv
 | 
						|
	c.mtx.Unlock()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *consulElected) Revoked() chan bool {
 | 
						|
	ch := make(chan bool, 1)
 | 
						|
	c.mtx.RLock()
 | 
						|
	rv := c.rv
 | 
						|
	c.mtx.RUnlock()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		<-rv
 | 
						|
		ch <- true
 | 
						|
		close(ch)
 | 
						|
	}()
 | 
						|
 | 
						|
	return ch
 | 
						|
}
 | 
						|
 | 
						|
func (c *consulElected) Resign() error {
 | 
						|
	return c.l.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func NewLeader(opts ...leader.Option) leader.Leader {
 | 
						|
	options := leader.Options{
 | 
						|
		Group: "default",
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	config := api.DefaultConfig()
 | 
						|
 | 
						|
	// set host
 | 
						|
	// config.Host something
 | 
						|
	// check if there are any addrs
 | 
						|
	if len(options.Nodes) > 0 {
 | 
						|
		addr, port, err := net.SplitHostPort(options.Nodes[0])
 | 
						|
		if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
 | 
						|
			port = "8500"
 | 
						|
			config.Address = fmt.Sprintf("%s:%s", addr, port)
 | 
						|
		} else if err == nil {
 | 
						|
			config.Address = fmt.Sprintf("%s:%s", addr, port)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	client, _ := api.NewClient(config)
 | 
						|
 | 
						|
	return &consulLeader{
 | 
						|
		opts: options,
 | 
						|
		c:    client,
 | 
						|
	}
 | 
						|
}
 |