// Package gossip provides a gossip registry based on hashicorp/memberlist package gossip import ( "context" "encoding/json" "fmt" "io/ioutil" "net" "os" "strconv" "strings" "sync" "time" "github.com/golang/protobuf/proto" "github.com/google/uuid" "github.com/hashicorp/memberlist" "github.com/micro/go-micro/registry" pb "github.com/micro/go-micro/registry/gossip/proto" log "github.com/micro/go-micro/util/log" "github.com/mitchellh/hashstructure" ) // use registry.Result int32 values after it switches from string to int32 types // type actionType int32 // type updateType int32 const ( actionTypeInvalid int32 = iota actionTypeCreate actionTypeDelete actionTypeUpdate actionTypeSync ) const ( nodeActionUnknown int32 = iota nodeActionJoin nodeActionLeave nodeActionUpdate ) func actionTypeString(t int32) string { switch t { case actionTypeCreate: return "create" case actionTypeDelete: return "delete" case actionTypeUpdate: return "update" case actionTypeSync: return "sync" } return "invalid" } const ( updateTypeInvalid int32 = iota updateTypeService ) type broadcast struct { update *pb.Update notify chan<- struct{} } type delegate struct { queue *memberlist.TransmitLimitedQueue updates chan *update } type event struct { action int32 node string } type eventDelegate struct { events chan *event } func (ed *eventDelegate) NotifyJoin(n *memberlist.Node) { ed.events <- &event{action: nodeActionJoin, node: n.Address()} } func (ed *eventDelegate) NotifyLeave(n *memberlist.Node) { ed.events <- &event{action: nodeActionLeave, node: n.Address()} } func (ed *eventDelegate) NotifyUpdate(n *memberlist.Node) { ed.events <- &event{action: nodeActionUpdate, node: n.Address()} } type gossipRegistry struct { queue *memberlist.TransmitLimitedQueue updates chan *update events chan *event options registry.Options member *memberlist.Memberlist interval time.Duration tcpInterval time.Duration connectRetry bool connectTimeout time.Duration sync.RWMutex services map[string][]*registry.Service watchers map[string]chan *registry.Result mtu int addrs []string members map[string]int32 done chan bool } type update struct { Update *pb.Update Service *registry.Service sync chan *registry.Service } type updates struct { sync.RWMutex services map[uint64]*update } var ( // You should change this if using secure DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes ExpiryTick = time.Second * 1 // needs to be smaller than registry.RegisterTTL MaxPacketSize = 512 ) func configure(g *gossipRegistry, opts ...registry.Option) error { // loop through address list and get valid entries addrs := func(curAddrs []string) []string { var newAddrs []string for _, addr := range curAddrs { if trimAddr := strings.TrimSpace(addr); len(trimAddr) > 0 { newAddrs = append(newAddrs, trimAddr) } } return newAddrs } // current address list curAddrs := addrs(g.options.Addrs) // parse options for _, o := range opts { o(&g.options) } // new address list newAddrs := addrs(g.options.Addrs) // no new nodes and existing member. no configure if (len(newAddrs) == len(curAddrs)) && g.member != nil { return nil } // shutdown old member g.Stop() // lock internals g.Lock() // new done chan g.done = make(chan bool) // replace addresses curAddrs = newAddrs // create a new default config c := memberlist.DefaultLocalConfig() // sane good default options c.LogOutput = ioutil.Discard // log to /dev/null c.PushPullInterval = 0 // disable expensive tcp push/pull c.ProtocolVersion = 4 // suport latest stable features // set config from options if config, ok := g.options.Context.Value(configKey{}).(*memberlist.Config); ok && config != nil { c = config } // set address if address, ok := g.options.Context.Value(addressKey{}).(string); ok { host, port, err := net.SplitHostPort(address) if err == nil { p, err := strconv.Atoi(port) if err == nil { c.BindPort = p } c.BindAddr = host } } else { // set bind to random port c.BindPort = 0 } // set the advertise address if advertise, ok := g.options.Context.Value(advertiseKey{}).(string); ok { host, port, err := net.SplitHostPort(advertise) if err == nil { p, err := strconv.Atoi(port) if err == nil { c.AdvertisePort = p } c.AdvertiseAddr = host } } // machine hostname hostname, _ := os.Hostname() // set the name c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-") // set a secret key if secure if g.options.Secure { k, ok := g.options.Context.Value(secretKey{}).([]byte) if !ok { // use the default secret k = DefaultSecret } c.SecretKey = k } // set connect retry if v, ok := g.options.Context.Value(connectRetryKey{}).(bool); ok && v { g.connectRetry = true } // set connect timeout if td, ok := g.options.Context.Value(connectTimeoutKey{}).(time.Duration); ok { g.connectTimeout = td } // create a queue queue := &memberlist.TransmitLimitedQueue{ NumNodes: func() int { return len(curAddrs) }, RetransmitMult: 3, } // set the delegate c.Delegate = &delegate{ updates: g.updates, queue: queue, } if g.connectRetry { c.Events = &eventDelegate{ events: g.events, } } // create the memberlist m, err := memberlist.Create(c) if err != nil { return err } if len(curAddrs) > 0 { for _, addr := range curAddrs { g.members[addr] = nodeActionUnknown } } g.tcpInterval = c.PushPullInterval g.addrs = curAddrs g.queue = queue g.member = m g.interval = c.GossipInterval g.Unlock() log.Logf("[gossip] Registry Listening on %s", m.LocalNode().Address()) // try connect return g.connect(curAddrs) } func (*broadcast) UniqueBroadcast() {} func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { return false } func (b *broadcast) Message() []byte { up, err := proto.Marshal(b.update) if err != nil { return nil } if l := len(up); l > MaxPacketSize { log.Logf("[gossip] Registry broadcast message size %d bigger then MaxPacketSize %d", l, MaxPacketSize) } return up } func (b *broadcast) Finished() { if b.notify != nil { close(b.notify) } } func (d *delegate) NodeMeta(limit int) []byte { return []byte{} } func (d *delegate) NotifyMsg(b []byte) { if len(b) == 0 { return } go func() { up := new(pb.Update) if err := proto.Unmarshal(b, up); err != nil { return } // only process service action if up.Type != updateTypeService { return } var service *registry.Service switch up.Metadata["Content-Type"] { case "application/json": if err := json.Unmarshal(up.Data, &service); err != nil { return } // no other content type default: return } // send update d.updates <- &update{ Update: up, Service: service, } }() } func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { return d.queue.GetBroadcasts(overhead, limit) } func (d *delegate) LocalState(join bool) []byte { if !join { return []byte{} } syncCh := make(chan *registry.Service, 1) services := map[string][]*registry.Service{} d.updates <- &update{ Update: &pb.Update{ Action: actionTypeSync, }, sync: syncCh, } for srv := range syncCh { services[srv.Name] = append(services[srv.Name], srv) } b, _ := json.Marshal(services) return b } func (d *delegate) MergeRemoteState(buf []byte, join bool) { if len(buf) == 0 { return } if !join { return } var services map[string][]*registry.Service if err := json.Unmarshal(buf, &services); err != nil { return } for _, service := range services { for _, srv := range service { d.updates <- &update{ Update: &pb.Update{Action: actionTypeCreate}, Service: srv, sync: nil, } } } } func (g *gossipRegistry) connect(addrs []string) error { if len(addrs) == 0 { return nil } timeout := make(<-chan time.Time) if g.connectTimeout > 0 { timeout = time.After(g.connectTimeout) } ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() fn := func() (int, error) { return g.member.Join(addrs) } // don't wait for first try if _, err := fn(); err == nil { return nil } // wait loop for { select { // context closed case <-g.options.Context.Done(): return nil // call close, don't wait anymore case <-g.done: return nil // in case of timeout fail with a timeout error case <-timeout: return fmt.Errorf("[gossip] Registry connect timeout %v", g.addrs) // got a tick, try to connect case <-ticker.C: if _, err := fn(); err == nil { log.Debugf("[gossip] Registry connect success for %v", g.addrs) return nil } else { log.Debugf("[gossip] Registry connect failed for %v", g.addrs) } } } return nil } func (g *gossipRegistry) publish(action string, services []*registry.Service) { g.RLock() for _, sub := range g.watchers { go func(sub chan *registry.Result) { for _, service := range services { sub <- ®istry.Result{Action: action, Service: service} } }(sub) } g.RUnlock() } func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) { next := make(chan *registry.Result, 10) exit := make(chan bool) id := uuid.New().String() g.Lock() g.watchers[id] = next g.Unlock() go func() { <-exit g.Lock() delete(g.watchers, id) close(next) g.Unlock() }() return next, exit } func (g *gossipRegistry) Stop() error { select { case <-g.done: return nil default: close(g.done) g.Lock() if g.member != nil { g.member.Leave(g.interval * 2) g.member.Shutdown() g.member = nil } g.Unlock() } return nil } // connectLoop attempts to reconnect to the memberlist func (g *gossipRegistry) connectLoop() { // try every second ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-g.done: return case <-g.options.Context.Done(): g.Stop() return case <-ticker.C: var addrs []string g.RLock() // only process if we have a memberlist if g.member == nil { g.RUnlock() continue } // self local := g.member.LocalNode().Address() // operate on each member for node, action := range g.members { switch action { // process leave event case nodeActionLeave: // don't process self if node == local { continue } addrs = append(addrs, node) } } g.RUnlock() // connect to all the members // TODO: only connect to new members if len(addrs) > 0 { g.connect(addrs) } } } } func (g *gossipRegistry) expiryLoop(updates *updates) { ticker := time.NewTicker(ExpiryTick) defer ticker.Stop() g.RLock() done := g.done g.RUnlock() for { select { case <-done: return case <-ticker.C: now := uint64(time.Now().UnixNano()) updates.Lock() // process all the updates for k, v := range updates.services { // check if expiry time has passed if d := (v.Update.Expires); d < now { // delete from records delete(updates.services, k) // set to delete v.Update.Action = actionTypeDelete // fire a new update g.updates <- v } } updates.Unlock() } } } // process member events func (g *gossipRegistry) eventLoop() { g.RLock() done := g.done g.RUnlock() for { select { // return when done case <-done: return case ev := <-g.events: // TODO: nonblocking update g.Lock() if _, ok := g.members[ev.node]; ok { g.members[ev.node] = ev.action } g.Unlock() } } } func (g *gossipRegistry) run() { updates := &updates{ services: make(map[uint64]*update), } // expiry loop go g.expiryLoop(updates) // event loop go g.eventLoop() g.RLock() // connect loop if g.connectRetry { go g.connectLoop() } g.RUnlock() // process the updates for u := range g.updates { switch u.Update.Action { case actionTypeCreate: g.Lock() if service, ok := g.services[u.Service.Name]; !ok { g.services[u.Service.Name] = []*registry.Service{u.Service} } else { g.services[u.Service.Name] = registry.Merge(service, []*registry.Service{u.Service}) } g.Unlock() // publish update to watchers go g.publish(actionTypeString(actionTypeCreate), []*registry.Service{u.Service}) // we need to expire the node at some point in the future if u.Update.Expires > 0 { // create a hash of this service if hash, err := hashstructure.Hash(u.Service, nil); err == nil { updates.Lock() updates.services[hash] = u updates.Unlock() } } case actionTypeDelete: g.Lock() if service, ok := g.services[u.Service.Name]; ok { if services := registry.Remove(service, []*registry.Service{u.Service}); len(services) == 0 { delete(g.services, u.Service.Name) } else { g.services[u.Service.Name] = services } } g.Unlock() // publish update to watchers go g.publish(actionTypeString(actionTypeDelete), []*registry.Service{u.Service}) // delete from expiry checks if hash, err := hashstructure.Hash(u.Service, nil); err == nil { updates.Lock() delete(updates.services, hash) updates.Unlock() } case actionTypeSync: // no sync channel provided if u.sync == nil { continue } g.RLock() // push all services through the sync chan for _, service := range g.services { for _, srv := range service { u.sync <- srv } // publish to watchers go g.publish(actionTypeString(actionTypeCreate), service) } g.RUnlock() // close the sync chan close(u.sync) } } } func (g *gossipRegistry) Init(opts ...registry.Option) error { return configure(g, opts...) } func (g *gossipRegistry) Options() registry.Options { return g.options } func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { log.Debugf("[gossip] Registry registering service: %s", s.Name) b, err := json.Marshal(s) if err != nil { return err } g.Lock() if service, ok := g.services[s.Name]; !ok { g.services[s.Name] = []*registry.Service{s} } else { g.services[s.Name] = registry.Merge(service, []*registry.Service{s}) } g.Unlock() var options registry.RegisterOptions for _, o := range opts { o(&options) } if options.TTL == 0 && g.tcpInterval == 0 { return fmt.Errorf("[gossip] Require register TTL or interval for memberlist.Config") } up := &pb.Update{ Expires: uint64(time.Now().Add(options.TTL).UnixNano()), Action: actionTypeCreate, Type: updateTypeService, Metadata: map[string]string{ "Content-Type": "application/json", }, Data: b, } g.queue.QueueBroadcast(&broadcast{ update: up, notify: nil, }) // send update to local watchers g.updates <- &update{ Update: up, Service: s, } // wait <-time.After(g.interval * 2) return nil } func (g *gossipRegistry) Deregister(s *registry.Service) error { log.Debugf("[gossip] Registry deregistering service: %s", s.Name) b, err := json.Marshal(s) if err != nil { return err } g.Lock() if service, ok := g.services[s.Name]; ok { if services := registry.Remove(service, []*registry.Service{s}); len(services) == 0 { delete(g.services, s.Name) } else { g.services[s.Name] = services } } g.Unlock() up := &pb.Update{ Action: actionTypeDelete, Type: updateTypeService, Metadata: map[string]string{ "Content-Type": "application/json", }, Data: b, } g.queue.QueueBroadcast(&broadcast{ update: up, notify: nil, }) // send update to local watchers g.updates <- &update{ Update: up, Service: s, } // wait <-time.After(g.interval * 2) return nil } func (g *gossipRegistry) GetService(name string) ([]*registry.Service, error) { g.RLock() service, ok := g.services[name] g.RUnlock() if !ok { return nil, registry.ErrNotFound } return service, nil } func (g *gossipRegistry) ListServices() ([]*registry.Service, error) { var services []*registry.Service g.RLock() for _, service := range g.services { services = append(services, service...) } g.RUnlock() return services, nil } func (g *gossipRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { n, e := g.subscribe() return newGossipWatcher(n, e, opts...) } func (g *gossipRegistry) String() string { return "gossip" } func NewRegistry(opts ...registry.Option) registry.Registry { g := &gossipRegistry{ options: registry.Options{ Context: context.Background(), }, done: make(chan bool), events: make(chan *event, 100), updates: make(chan *update, 100), services: make(map[string][]*registry.Service), watchers: make(map[string]chan *registry.Result), members: make(map[string]int32), } // run the updater go g.run() // configure the gossiper if err := configure(g, opts...); err != nil { log.Fatalf("[gossip] Registry configuring error: %v", err) } // wait for setup <-time.After(g.interval * 2) return g }