844 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			844 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package gossip provides a gossip registry based on hashicorp/memberlist
 | 
						|
package gossip
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"io/ioutil"
 | 
						|
	"net"
 | 
						|
	"os"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/protobuf/proto"
 | 
						|
	"github.com/google/uuid"
 | 
						|
	"github.com/hashicorp/memberlist"
 | 
						|
	"github.com/micro/go-micro/registry"
 | 
						|
	pb "github.com/micro/go-micro/registry/gossip/proto"
 | 
						|
	log "github.com/micro/go-micro/util/log"
 | 
						|
	"github.com/mitchellh/hashstructure"
 | 
						|
)
 | 
						|
 | 
						|
// use registry.Result int32 values after it switches from string to int32 types
 | 
						|
// type actionType int32
 | 
						|
// type updateType int32
 | 
						|
 | 
						|
const (
 | 
						|
	actionTypeInvalid int32 = iota
 | 
						|
	actionTypeCreate
 | 
						|
	actionTypeDelete
 | 
						|
	actionTypeUpdate
 | 
						|
	actionTypeSync
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	nodeActionUnknown int32 = iota
 | 
						|
	nodeActionJoin
 | 
						|
	nodeActionLeave
 | 
						|
	nodeActionUpdate
 | 
						|
)
 | 
						|
 | 
						|
func actionTypeString(t int32) string {
 | 
						|
	switch t {
 | 
						|
	case actionTypeCreate:
 | 
						|
		return "create"
 | 
						|
	case actionTypeDelete:
 | 
						|
		return "delete"
 | 
						|
	case actionTypeUpdate:
 | 
						|
		return "update"
 | 
						|
	case actionTypeSync:
 | 
						|
		return "sync"
 | 
						|
	}
 | 
						|
	return "invalid"
 | 
						|
}
 | 
						|
 | 
						|
const (
 | 
						|
	updateTypeInvalid int32 = iota
 | 
						|
	updateTypeService
 | 
						|
)
 | 
						|
 | 
						|
type broadcast struct {
 | 
						|
	update *pb.Update
 | 
						|
	notify chan<- struct{}
 | 
						|
}
 | 
						|
 | 
						|
type delegate struct {
 | 
						|
	queue   *memberlist.TransmitLimitedQueue
 | 
						|
	updates chan *update
 | 
						|
}
 | 
						|
 | 
						|
type event struct {
 | 
						|
	action int32
 | 
						|
	node   string
 | 
						|
}
 | 
						|
 | 
						|
type eventDelegate struct {
 | 
						|
	events chan *event
 | 
						|
}
 | 
						|
 | 
						|
func (ed *eventDelegate) NotifyJoin(n *memberlist.Node) {
 | 
						|
	ed.events <- &event{action: nodeActionJoin, node: n.Address()}
 | 
						|
}
 | 
						|
func (ed *eventDelegate) NotifyLeave(n *memberlist.Node) {
 | 
						|
	ed.events <- &event{action: nodeActionLeave, node: n.Address()}
 | 
						|
}
 | 
						|
func (ed *eventDelegate) NotifyUpdate(n *memberlist.Node) {
 | 
						|
	ed.events <- &event{action: nodeActionUpdate, node: n.Address()}
 | 
						|
}
 | 
						|
 | 
						|
type gossipRegistry struct {
 | 
						|
	queue       *memberlist.TransmitLimitedQueue
 | 
						|
	updates     chan *update
 | 
						|
	events      chan *event
 | 
						|
	options     registry.Options
 | 
						|
	member      *memberlist.Memberlist
 | 
						|
	interval    time.Duration
 | 
						|
	tcpInterval time.Duration
 | 
						|
 | 
						|
	connectRetry   bool
 | 
						|
	connectTimeout time.Duration
 | 
						|
	sync.RWMutex
 | 
						|
	services map[string][]*registry.Service
 | 
						|
 | 
						|
	watchers map[string]chan *registry.Result
 | 
						|
 | 
						|
	mtu     int
 | 
						|
	addrs   []string
 | 
						|
	members map[string]int32
 | 
						|
	done    chan bool
 | 
						|
}
 | 
						|
 | 
						|
type update struct {
 | 
						|
	Update  *pb.Update
 | 
						|
	Service *registry.Service
 | 
						|
	sync    chan *registry.Service
 | 
						|
}
 | 
						|
 | 
						|
type updates struct {
 | 
						|
	sync.RWMutex
 | 
						|
	services map[uint64]*update
 | 
						|
}
 | 
						|
 | 
						|
var (
 | 
						|
	// You should change this if using secure
 | 
						|
	DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes
 | 
						|
	ExpiryTick    = time.Second * 1            // needs to be smaller than registry.RegisterTTL
 | 
						|
	MaxPacketSize = 512
 | 
						|
)
 | 
						|
 | 
						|
func configure(g *gossipRegistry, opts ...registry.Option) error {
 | 
						|
	// loop through address list and get valid entries
 | 
						|
	addrs := func(curAddrs []string) []string {
 | 
						|
		var newAddrs []string
 | 
						|
		for _, addr := range curAddrs {
 | 
						|
			if trimAddr := strings.TrimSpace(addr); len(trimAddr) > 0 {
 | 
						|
				newAddrs = append(newAddrs, trimAddr)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return newAddrs
 | 
						|
	}
 | 
						|
 | 
						|
	// current address list
 | 
						|
	curAddrs := addrs(g.options.Addrs)
 | 
						|
 | 
						|
	// parse options
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&g.options)
 | 
						|
	}
 | 
						|
 | 
						|
	// new address list
 | 
						|
	newAddrs := addrs(g.options.Addrs)
 | 
						|
 | 
						|
	// no new nodes and existing member. no configure
 | 
						|
	if (len(newAddrs) == len(curAddrs)) && g.member != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// shutdown old member
 | 
						|
	g.Stop()
 | 
						|
 | 
						|
	// lock internals
 | 
						|
	g.Lock()
 | 
						|
 | 
						|
	// new done chan
 | 
						|
	g.done = make(chan bool)
 | 
						|
 | 
						|
	// replace addresses
 | 
						|
	curAddrs = newAddrs
 | 
						|
 | 
						|
	// create a new default config
 | 
						|
	c := memberlist.DefaultLocalConfig()
 | 
						|
 | 
						|
	// sane good default options
 | 
						|
	c.LogOutput = ioutil.Discard // log to /dev/null
 | 
						|
	c.PushPullInterval = 0       // disable expensive tcp push/pull
 | 
						|
	c.ProtocolVersion = 4        // suport latest stable features
 | 
						|
 | 
						|
	// set config from options
 | 
						|
	if config, ok := g.options.Context.Value(configKey{}).(*memberlist.Config); ok && config != nil {
 | 
						|
		c = config
 | 
						|
	}
 | 
						|
 | 
						|
	// set address
 | 
						|
	if address, ok := g.options.Context.Value(addressKey{}).(string); ok {
 | 
						|
		host, port, err := net.SplitHostPort(address)
 | 
						|
		if err == nil {
 | 
						|
			p, err := strconv.Atoi(port)
 | 
						|
			if err == nil {
 | 
						|
				c.BindPort = p
 | 
						|
			}
 | 
						|
			c.BindAddr = host
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		// set bind to random port
 | 
						|
		c.BindPort = 0
 | 
						|
	}
 | 
						|
 | 
						|
	// set the advertise address
 | 
						|
	if advertise, ok := g.options.Context.Value(advertiseKey{}).(string); ok {
 | 
						|
		host, port, err := net.SplitHostPort(advertise)
 | 
						|
		if err == nil {
 | 
						|
			p, err := strconv.Atoi(port)
 | 
						|
			if err == nil {
 | 
						|
				c.AdvertisePort = p
 | 
						|
			}
 | 
						|
			c.AdvertiseAddr = host
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// machine hostname
 | 
						|
	hostname, _ := os.Hostname()
 | 
						|
 | 
						|
	// set the name
 | 
						|
	c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-")
 | 
						|
 | 
						|
	// set a secret key if secure
 | 
						|
	if g.options.Secure {
 | 
						|
		k, ok := g.options.Context.Value(secretKey{}).([]byte)
 | 
						|
		if !ok {
 | 
						|
			// use the default secret
 | 
						|
			k = DefaultSecret
 | 
						|
		}
 | 
						|
		c.SecretKey = k
 | 
						|
	}
 | 
						|
 | 
						|
	// set connect retry
 | 
						|
	if v, ok := g.options.Context.Value(connectRetryKey{}).(bool); ok && v {
 | 
						|
		g.connectRetry = true
 | 
						|
	}
 | 
						|
 | 
						|
	// set connect timeout
 | 
						|
	if td, ok := g.options.Context.Value(connectTimeoutKey{}).(time.Duration); ok {
 | 
						|
		g.connectTimeout = td
 | 
						|
	}
 | 
						|
 | 
						|
	// create a queue
 | 
						|
	queue := &memberlist.TransmitLimitedQueue{
 | 
						|
		NumNodes: func() int {
 | 
						|
			return len(curAddrs)
 | 
						|
		},
 | 
						|
		RetransmitMult: 3,
 | 
						|
	}
 | 
						|
 | 
						|
	// set the delegate
 | 
						|
	c.Delegate = &delegate{
 | 
						|
		updates: g.updates,
 | 
						|
		queue:   queue,
 | 
						|
	}
 | 
						|
 | 
						|
	if g.connectRetry {
 | 
						|
		c.Events = &eventDelegate{
 | 
						|
			events: g.events,
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// create the memberlist
 | 
						|
	m, err := memberlist.Create(c)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if len(curAddrs) > 0 {
 | 
						|
		for _, addr := range curAddrs {
 | 
						|
			g.members[addr] = nodeActionUnknown
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	g.tcpInterval = c.PushPullInterval
 | 
						|
	g.addrs = curAddrs
 | 
						|
	g.queue = queue
 | 
						|
	g.member = m
 | 
						|
	g.interval = c.GossipInterval
 | 
						|
 | 
						|
	g.Unlock()
 | 
						|
 | 
						|
	log.Logf("[gossip] Registry Listening on %s", m.LocalNode().Address())
 | 
						|
 | 
						|
	// try connect
 | 
						|
	return g.connect(curAddrs)
 | 
						|
}
 | 
						|
 | 
						|
func (*broadcast) UniqueBroadcast() {}
 | 
						|
 | 
						|
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
func (b *broadcast) Message() []byte {
 | 
						|
	up, err := proto.Marshal(b.update)
 | 
						|
	if err != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if l := len(up); l > MaxPacketSize {
 | 
						|
		log.Logf("[gossip] broadcast message size %d bigger then MaxPacketSize %d", l, MaxPacketSize)
 | 
						|
	}
 | 
						|
	return up
 | 
						|
}
 | 
						|
 | 
						|
func (b *broadcast) Finished() {
 | 
						|
	if b.notify != nil {
 | 
						|
		close(b.notify)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (d *delegate) NodeMeta(limit int) []byte {
 | 
						|
	return []byte{}
 | 
						|
}
 | 
						|
 | 
						|
func (d *delegate) NotifyMsg(b []byte) {
 | 
						|
	if len(b) == 0 {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	go func() {
 | 
						|
		up := new(pb.Update)
 | 
						|
		if err := proto.Unmarshal(b, up); err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// only process service action
 | 
						|
		if up.Type != updateTypeService {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		var service *registry.Service
 | 
						|
 | 
						|
		switch up.Metadata["Content-Type"] {
 | 
						|
		case "application/json":
 | 
						|
			if err := json.Unmarshal(up.Data, &service); err != nil {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		// no other content type
 | 
						|
		default:
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// send update
 | 
						|
		d.updates <- &update{
 | 
						|
			Update:  up,
 | 
						|
			Service: service,
 | 
						|
		}
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
 | 
						|
	return d.queue.GetBroadcasts(overhead, limit)
 | 
						|
}
 | 
						|
 | 
						|
func (d *delegate) LocalState(join bool) []byte {
 | 
						|
	if !join {
 | 
						|
		return []byte{}
 | 
						|
	}
 | 
						|
 | 
						|
	syncCh := make(chan *registry.Service, 1)
 | 
						|
	services := map[string][]*registry.Service{}
 | 
						|
 | 
						|
	d.updates <- &update{
 | 
						|
		Update: &pb.Update{
 | 
						|
			Action: actionTypeSync,
 | 
						|
		},
 | 
						|
		sync: syncCh,
 | 
						|
	}
 | 
						|
 | 
						|
	for srv := range syncCh {
 | 
						|
		services[srv.Name] = append(services[srv.Name], srv)
 | 
						|
	}
 | 
						|
 | 
						|
	b, _ := json.Marshal(services)
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (d *delegate) MergeRemoteState(buf []byte, join bool) {
 | 
						|
	if len(buf) == 0 {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if !join {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	var services map[string][]*registry.Service
 | 
						|
	if err := json.Unmarshal(buf, &services); err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	for _, service := range services {
 | 
						|
		for _, srv := range service {
 | 
						|
			d.updates <- &update{
 | 
						|
				Update:  &pb.Update{Action: actionTypeCreate},
 | 
						|
				Service: srv,
 | 
						|
				sync:    nil,
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) connect(addrs []string) error {
 | 
						|
	if len(addrs) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	timeout := make(<-chan time.Time)
 | 
						|
 | 
						|
	if g.connectTimeout > 0 {
 | 
						|
		timeout = time.After(g.connectTimeout)
 | 
						|
	}
 | 
						|
 | 
						|
	ticker := time.NewTicker(1 * time.Second)
 | 
						|
	defer ticker.Stop()
 | 
						|
 | 
						|
	fn := func() (int, error) {
 | 
						|
		return g.member.Join(addrs)
 | 
						|
	}
 | 
						|
 | 
						|
	// don't wait for first try
 | 
						|
	if _, err := fn(); err == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// wait loop
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		// context closed
 | 
						|
		case <-g.options.Context.Done():
 | 
						|
			return nil
 | 
						|
		// call close, don't wait anymore
 | 
						|
		case <-g.done:
 | 
						|
			return nil
 | 
						|
		//  in case of timeout fail with a timeout error
 | 
						|
		case <-timeout:
 | 
						|
			return fmt.Errorf("[gossip] connect timeout %v", g.addrs)
 | 
						|
		// got a tick, try to connect
 | 
						|
		case <-ticker.C:
 | 
						|
			if _, err := fn(); err == nil {
 | 
						|
				log.Logf("[gossip] connect success for %v", g.addrs)
 | 
						|
				return nil
 | 
						|
			} else {
 | 
						|
				log.Logf("[gossip] connect failed for %v", g.addrs)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) publish(action string, services []*registry.Service) {
 | 
						|
	g.RLock()
 | 
						|
	for _, sub := range g.watchers {
 | 
						|
		go func(sub chan *registry.Result) {
 | 
						|
			for _, service := range services {
 | 
						|
				sub <- ®istry.Result{Action: action, Service: service}
 | 
						|
			}
 | 
						|
		}(sub)
 | 
						|
	}
 | 
						|
	g.RUnlock()
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
 | 
						|
	next := make(chan *registry.Result, 10)
 | 
						|
	exit := make(chan bool)
 | 
						|
 | 
						|
	id := uuid.New().String()
 | 
						|
 | 
						|
	g.Lock()
 | 
						|
	g.watchers[id] = next
 | 
						|
	g.Unlock()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		<-exit
 | 
						|
		g.Lock()
 | 
						|
		delete(g.watchers, id)
 | 
						|
		close(next)
 | 
						|
		g.Unlock()
 | 
						|
	}()
 | 
						|
 | 
						|
	return next, exit
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) Stop() error {
 | 
						|
	select {
 | 
						|
	case <-g.done:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
		close(g.done)
 | 
						|
		g.Lock()
 | 
						|
		if g.member != nil {
 | 
						|
			g.member.Leave(g.interval * 2)
 | 
						|
			g.member.Shutdown()
 | 
						|
			g.member = nil
 | 
						|
		}
 | 
						|
		g.Unlock()
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// connectLoop attempts to reconnect to the memberlist
 | 
						|
func (g *gossipRegistry) connectLoop() {
 | 
						|
	// try every second
 | 
						|
	ticker := time.NewTicker(1 * time.Second)
 | 
						|
	defer ticker.Stop()
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-g.done:
 | 
						|
			return
 | 
						|
		case <-g.options.Context.Done():
 | 
						|
			g.Stop()
 | 
						|
			return
 | 
						|
		case <-ticker.C:
 | 
						|
			var addrs []string
 | 
						|
 | 
						|
			g.RLock()
 | 
						|
 | 
						|
			// only process if we have a memberlist
 | 
						|
			if g.member == nil {
 | 
						|
				g.RUnlock()
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			// self
 | 
						|
			local := g.member.LocalNode().Address()
 | 
						|
 | 
						|
			// operate on each member
 | 
						|
			for node, action := range g.members {
 | 
						|
				switch action {
 | 
						|
				// process leave event
 | 
						|
				case nodeActionLeave:
 | 
						|
					// don't process self
 | 
						|
					if node == local {
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					addrs = append(addrs, node)
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			g.RUnlock()
 | 
						|
 | 
						|
			// connect to all the members
 | 
						|
			// TODO: only connect to new members
 | 
						|
			if len(addrs) > 0 {
 | 
						|
				g.connect(addrs)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) expiryLoop(updates *updates) {
 | 
						|
	ticker := time.NewTicker(ExpiryTick)
 | 
						|
	defer ticker.Stop()
 | 
						|
 | 
						|
	g.RLock()
 | 
						|
	done := g.done
 | 
						|
	g.RUnlock()
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-done:
 | 
						|
			return
 | 
						|
		case <-ticker.C:
 | 
						|
			now := uint64(time.Now().UnixNano())
 | 
						|
 | 
						|
			updates.Lock()
 | 
						|
 | 
						|
			// process all the updates
 | 
						|
			for k, v := range updates.services {
 | 
						|
				// check if expiry time has passed
 | 
						|
				if d := (v.Update.Expires); d < now {
 | 
						|
					// delete from records
 | 
						|
					delete(updates.services, k)
 | 
						|
					// set to delete
 | 
						|
					v.Update.Action = actionTypeDelete
 | 
						|
					// fire a new update
 | 
						|
					g.updates <- v
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			updates.Unlock()
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// process member events
 | 
						|
func (g *gossipRegistry) eventLoop() {
 | 
						|
	g.RLock()
 | 
						|
	done := g.done
 | 
						|
	g.RUnlock()
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		// return when done
 | 
						|
		case <-done:
 | 
						|
			return
 | 
						|
		case ev := <-g.events:
 | 
						|
			// TODO: nonblocking update
 | 
						|
			g.Lock()
 | 
						|
			if _, ok := g.members[ev.node]; ok {
 | 
						|
				g.members[ev.node] = ev.action
 | 
						|
			}
 | 
						|
			g.Unlock()
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) run() {
 | 
						|
	updates := &updates{
 | 
						|
		services: make(map[uint64]*update),
 | 
						|
	}
 | 
						|
 | 
						|
	// expiry loop
 | 
						|
	go g.expiryLoop(updates)
 | 
						|
 | 
						|
	// event loop
 | 
						|
	go g.eventLoop()
 | 
						|
 | 
						|
	g.RLock()
 | 
						|
	// connect loop
 | 
						|
	if g.connectRetry {
 | 
						|
		go g.connectLoop()
 | 
						|
	}
 | 
						|
	g.RUnlock()
 | 
						|
 | 
						|
	// process the updates
 | 
						|
	for u := range g.updates {
 | 
						|
		switch u.Update.Action {
 | 
						|
		case actionTypeCreate:
 | 
						|
			g.Lock()
 | 
						|
			if service, ok := g.services[u.Service.Name]; !ok {
 | 
						|
				g.services[u.Service.Name] = []*registry.Service{u.Service}
 | 
						|
 | 
						|
			} else {
 | 
						|
				g.services[u.Service.Name] = registry.Merge(service, []*registry.Service{u.Service})
 | 
						|
			}
 | 
						|
			g.Unlock()
 | 
						|
 | 
						|
			// publish update to watchers
 | 
						|
			go g.publish(actionTypeString(actionTypeCreate), []*registry.Service{u.Service})
 | 
						|
 | 
						|
			// we need to expire the node at some point in the future
 | 
						|
			if u.Update.Expires > 0 {
 | 
						|
				// create a hash of this service
 | 
						|
				if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
 | 
						|
					updates.Lock()
 | 
						|
					updates.services[hash] = u
 | 
						|
					updates.Unlock()
 | 
						|
				}
 | 
						|
			}
 | 
						|
		case actionTypeDelete:
 | 
						|
			g.Lock()
 | 
						|
			if service, ok := g.services[u.Service.Name]; ok {
 | 
						|
				if services := registry.Remove(service, []*registry.Service{u.Service}); len(services) == 0 {
 | 
						|
					delete(g.services, u.Service.Name)
 | 
						|
				} else {
 | 
						|
					g.services[u.Service.Name] = services
 | 
						|
				}
 | 
						|
			}
 | 
						|
			g.Unlock()
 | 
						|
 | 
						|
			// publish update to watchers
 | 
						|
			go g.publish(actionTypeString(actionTypeDelete), []*registry.Service{u.Service})
 | 
						|
 | 
						|
			// delete from expiry checks
 | 
						|
			if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
 | 
						|
				updates.Lock()
 | 
						|
				delete(updates.services, hash)
 | 
						|
				updates.Unlock()
 | 
						|
			}
 | 
						|
		case actionTypeSync:
 | 
						|
			// no sync channel provided
 | 
						|
			if u.sync == nil {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			g.RLock()
 | 
						|
 | 
						|
			// push all services through the sync chan
 | 
						|
			for _, service := range g.services {
 | 
						|
				for _, srv := range service {
 | 
						|
					u.sync <- srv
 | 
						|
				}
 | 
						|
 | 
						|
				// publish to watchers
 | 
						|
				go g.publish(actionTypeString(actionTypeCreate), service)
 | 
						|
			}
 | 
						|
 | 
						|
			g.RUnlock()
 | 
						|
 | 
						|
			// close the sync chan
 | 
						|
			close(u.sync)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) Init(opts ...registry.Option) error {
 | 
						|
	return configure(g, opts...)
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) Options() registry.Options {
 | 
						|
	return g.options
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
 | 
						|
	b, err := json.Marshal(s)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	g.Lock()
 | 
						|
	if service, ok := g.services[s.Name]; !ok {
 | 
						|
		g.services[s.Name] = []*registry.Service{s}
 | 
						|
	} else {
 | 
						|
		g.services[s.Name] = registry.Merge(service, []*registry.Service{s})
 | 
						|
	}
 | 
						|
	g.Unlock()
 | 
						|
 | 
						|
	var options registry.RegisterOptions
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	if options.TTL == 0 && g.tcpInterval == 0 {
 | 
						|
		return fmt.Errorf("Require register TTL or interval for memberlist.Config")
 | 
						|
	}
 | 
						|
 | 
						|
	up := &pb.Update{
 | 
						|
		Expires: uint64(time.Now().Add(options.TTL).UnixNano()),
 | 
						|
		Action:  actionTypeCreate,
 | 
						|
		Type:    updateTypeService,
 | 
						|
		Metadata: map[string]string{
 | 
						|
			"Content-Type": "application/json",
 | 
						|
		},
 | 
						|
		Data: b,
 | 
						|
	}
 | 
						|
 | 
						|
	g.queue.QueueBroadcast(&broadcast{
 | 
						|
		update: up,
 | 
						|
		notify: nil,
 | 
						|
	})
 | 
						|
 | 
						|
	// send update to local watchers
 | 
						|
	g.updates <- &update{
 | 
						|
		Update:  up,
 | 
						|
		Service: s,
 | 
						|
	}
 | 
						|
 | 
						|
	// wait
 | 
						|
	<-time.After(g.interval * 2)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) Deregister(s *registry.Service) error {
 | 
						|
	b, err := json.Marshal(s)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	g.Lock()
 | 
						|
	if service, ok := g.services[s.Name]; ok {
 | 
						|
		if services := registry.Remove(service, []*registry.Service{s}); len(services) == 0 {
 | 
						|
			delete(g.services, s.Name)
 | 
						|
		} else {
 | 
						|
			g.services[s.Name] = services
 | 
						|
		}
 | 
						|
	}
 | 
						|
	g.Unlock()
 | 
						|
 | 
						|
	up := &pb.Update{
 | 
						|
		Action: actionTypeDelete,
 | 
						|
		Type:   updateTypeService,
 | 
						|
		Metadata: map[string]string{
 | 
						|
			"Content-Type": "application/json",
 | 
						|
		},
 | 
						|
		Data: b,
 | 
						|
	}
 | 
						|
 | 
						|
	g.queue.QueueBroadcast(&broadcast{
 | 
						|
		update: up,
 | 
						|
		notify: nil,
 | 
						|
	})
 | 
						|
 | 
						|
	// send update to local watchers
 | 
						|
	g.updates <- &update{
 | 
						|
		Update:  up,
 | 
						|
		Service: s,
 | 
						|
	}
 | 
						|
 | 
						|
	// wait
 | 
						|
	<-time.After(g.interval * 2)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) GetService(name string) ([]*registry.Service, error) {
 | 
						|
	g.RLock()
 | 
						|
	service, ok := g.services[name]
 | 
						|
	g.RUnlock()
 | 
						|
	if !ok {
 | 
						|
		return nil, registry.ErrNotFound
 | 
						|
	}
 | 
						|
	return service, nil
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) ListServices() ([]*registry.Service, error) {
 | 
						|
	var services []*registry.Service
 | 
						|
	g.RLock()
 | 
						|
	for _, service := range g.services {
 | 
						|
		services = append(services, service...)
 | 
						|
	}
 | 
						|
	g.RUnlock()
 | 
						|
	return services, nil
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
 | 
						|
	n, e := g.subscribe()
 | 
						|
	return newGossipWatcher(n, e, opts...)
 | 
						|
}
 | 
						|
 | 
						|
func (g *gossipRegistry) String() string {
 | 
						|
	return "gossip"
 | 
						|
}
 | 
						|
 | 
						|
func NewRegistry(opts ...registry.Option) registry.Registry {
 | 
						|
	g := &gossipRegistry{
 | 
						|
		options: registry.Options{
 | 
						|
			Context: context.Background(),
 | 
						|
		},
 | 
						|
		done:     make(chan bool),
 | 
						|
		events:   make(chan *event, 100),
 | 
						|
		updates:  make(chan *update, 100),
 | 
						|
		services: make(map[string][]*registry.Service),
 | 
						|
		watchers: make(map[string]chan *registry.Result),
 | 
						|
		members:  make(map[string]int32),
 | 
						|
	}
 | 
						|
	// run the updater
 | 
						|
	go g.run()
 | 
						|
 | 
						|
	// configure the gossiper
 | 
						|
	if err := configure(g, opts...); err != nil {
 | 
						|
		log.Fatalf("[gossip] Error configuring registry: %v", err)
 | 
						|
	}
 | 
						|
	// wait for setup
 | 
						|
	<-time.After(g.interval * 2)
 | 
						|
 | 
						|
	return g
 | 
						|
}
 |