diff --git a/README.md b/README.md new file mode 100644 index 0000000..428b40e --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +# Gossip Registry + +Gossip is a zero dependency registry which uses github.com/hashicorp/memberlist to broadcast registry information +via the SWIM protocol. + +## Usage + +Start with the registry flag or env var + +```bash +MICRO_REGISTRY=gossip go run service.go +``` + +On startup you'll see something like + +```bash +2018/12/06 18:17:48 Registry Listening on 192.168.1.65:56390 +``` + +To join this gossip ring set the registry address using flag or env var + +```bash +MICRO_REGISTRY_ADDRESS=192.168.1.65:56390 +``` diff --git a/gossip.go b/gossip.go new file mode 100644 index 0000000..8cc6644 --- /dev/null +++ b/gossip.go @@ -0,0 +1,853 @@ +// Package gossip provides a gossip registry based on hashicorp/memberlist +package gossip + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "github.com/google/uuid" + "github.com/hashicorp/memberlist" + "github.com/micro/go-micro/config/cmd" + "github.com/micro/go-micro/registry" + pb "github.com/micro/go-micro/registry/gossip/proto" + log "github.com/micro/go-micro/util/log" + "github.com/mitchellh/hashstructure" +) + +// use registry.Result int32 values after it switches from string to int32 types +// type actionType int32 +// type updateType int32 + +const ( + actionTypeInvalid int32 = iota + actionTypeCreate + actionTypeDelete + actionTypeUpdate + actionTypeSync +) + +const ( + nodeActionUnknown int32 = iota + nodeActionJoin + nodeActionLeave + nodeActionUpdate +) + +func actionTypeString(t int32) string { + switch t { + case actionTypeCreate: + return "create" + case actionTypeDelete: + return "delete" + case actionTypeUpdate: + return "update" + case actionTypeSync: + return "sync" + } + return "invalid" +} + +const ( + updateTypeInvalid int32 = iota + updateTypeService +) + +type broadcast struct { + update *pb.Update + notify chan<- struct{} +} + +type delegate struct { + queue *memberlist.TransmitLimitedQueue + updates chan *update +} + +type event struct { + action int32 + node string +} + +type eventDelegate struct { + events chan *event +} + +func init() { + cmd.DefaultRegistries["gossip"] = NewRegistry +} + +func (ed *eventDelegate) NotifyJoin(n *memberlist.Node) { + ed.events <- &event{action: nodeActionJoin, node: n.Address()} +} +func (ed *eventDelegate) NotifyLeave(n *memberlist.Node) { + ed.events <- &event{action: nodeActionLeave, node: n.Address()} +} +func (ed *eventDelegate) NotifyUpdate(n *memberlist.Node) { + ed.events <- &event{action: nodeActionUpdate, node: n.Address()} +} + +type gossipRegistry struct { + queue *memberlist.TransmitLimitedQueue + updates chan *update + events chan *event + options registry.Options + member *memberlist.Memberlist + interval time.Duration + tcpInterval time.Duration + + connectRetry bool + connectTimeout time.Duration + sync.RWMutex + services map[string][]*registry.Service + + watchers map[string]chan *registry.Result + + mtu int + addrs []string + members map[string]int32 + done chan bool +} + +type update struct { + Update *pb.Update + Service *registry.Service + sync chan *registry.Service +} + +type updates struct { + sync.RWMutex + services map[uint64]*update +} + +var ( + // You should change this if using secure + DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes + ExpiryTick = time.Second * 1 // needs to be smaller than registry.RegisterTTL + MaxPacketSize = 512 +) + +func configure(g *gossipRegistry, opts ...registry.Option) error { + // loop through address list and get valid entries + addrs := func(curAddrs []string) []string { + var newAddrs []string + for _, addr := range curAddrs { + if trimAddr := strings.TrimSpace(addr); len(trimAddr) > 0 { + newAddrs = append(newAddrs, trimAddr) + } + } + return newAddrs + } + + // current address list + curAddrs := addrs(g.options.Addrs) + + // parse options + for _, o := range opts { + o(&g.options) + } + + // new address list + newAddrs := addrs(g.options.Addrs) + + // no new nodes and existing member. no configure + if (len(newAddrs) == len(curAddrs)) && g.member != nil { + return nil + } + + // shutdown old member + g.Stop() + + // lock internals + g.Lock() + + // new done chan + g.done = make(chan bool) + + // replace addresses + curAddrs = newAddrs + + // create a new default config + c := memberlist.DefaultLocalConfig() + + // sane good default options + c.LogOutput = ioutil.Discard // log to /dev/null + c.PushPullInterval = 0 // disable expensive tcp push/pull + c.ProtocolVersion = 4 // suport latest stable features + + // set config from options + if config, ok := g.options.Context.Value(configKey{}).(*memberlist.Config); ok && config != nil { + c = config + } + + // set address + if address, ok := g.options.Context.Value(addressKey{}).(string); ok { + host, port, err := net.SplitHostPort(address) + if err == nil { + p, err := strconv.Atoi(port) + if err == nil { + c.BindPort = p + } + c.BindAddr = host + } + } else { + // set bind to random port + c.BindPort = 0 + } + + // set the advertise address + if advertise, ok := g.options.Context.Value(advertiseKey{}).(string); ok { + host, port, err := net.SplitHostPort(advertise) + if err == nil { + p, err := strconv.Atoi(port) + if err == nil { + c.AdvertisePort = p + } + c.AdvertiseAddr = host + } + } + + // machine hostname + hostname, _ := os.Hostname() + + // set the name + c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-") + + // set a secret key if secure + if g.options.Secure { + k, ok := g.options.Context.Value(secretKey{}).([]byte) + if !ok { + // use the default secret + k = DefaultSecret + } + c.SecretKey = k + } + + // set connect retry + if v, ok := g.options.Context.Value(connectRetryKey{}).(bool); ok && v { + g.connectRetry = true + } + + // set connect timeout + if td, ok := g.options.Context.Value(connectTimeoutKey{}).(time.Duration); ok { + g.connectTimeout = td + } + + // create a queue + queue := &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + return len(curAddrs) + }, + RetransmitMult: 3, + } + + // set the delegate + c.Delegate = &delegate{ + updates: g.updates, + queue: queue, + } + + if g.connectRetry { + c.Events = &eventDelegate{ + events: g.events, + } + } + // create the memberlist + m, err := memberlist.Create(c) + if err != nil { + return err + } + + if len(curAddrs) > 0 { + for _, addr := range curAddrs { + g.members[addr] = nodeActionUnknown + } + } + + g.tcpInterval = c.PushPullInterval + g.addrs = curAddrs + g.queue = queue + g.member = m + g.interval = c.GossipInterval + + g.Unlock() + + log.Logf("[gossip] Registry Listening on %s", m.LocalNode().Address()) + + // try connect + return g.connect(curAddrs) +} + +func (*broadcast) UniqueBroadcast() {} + +func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { + return false +} + +func (b *broadcast) Message() []byte { + up, err := proto.Marshal(b.update) + if err != nil { + return nil + } + if l := len(up); l > MaxPacketSize { + log.Logf("[gossip] Registry broadcast message size %d bigger then MaxPacketSize %d", l, MaxPacketSize) + } + return up +} + +func (b *broadcast) Finished() { + if b.notify != nil { + close(b.notify) + } +} + +func (d *delegate) NodeMeta(limit int) []byte { + return []byte{} +} + +func (d *delegate) NotifyMsg(b []byte) { + if len(b) == 0 { + return + } + + go func() { + up := new(pb.Update) + if err := proto.Unmarshal(b, up); err != nil { + return + } + + // only process service action + if up.Type != updateTypeService { + return + } + + var service *registry.Service + + switch up.Metadata["Content-Type"] { + case "application/json": + if err := json.Unmarshal(up.Data, &service); err != nil { + return + } + // no other content type + default: + return + } + + // send update + d.updates <- &update{ + Update: up, + Service: service, + } + }() +} + +func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { + return d.queue.GetBroadcasts(overhead, limit) +} + +func (d *delegate) LocalState(join bool) []byte { + if !join { + return []byte{} + } + + syncCh := make(chan *registry.Service, 1) + services := map[string][]*registry.Service{} + + d.updates <- &update{ + Update: &pb.Update{ + Action: actionTypeSync, + }, + sync: syncCh, + } + + for srv := range syncCh { + services[srv.Name] = append(services[srv.Name], srv) + } + + b, _ := json.Marshal(services) + return b +} + +func (d *delegate) MergeRemoteState(buf []byte, join bool) { + if len(buf) == 0 { + return + } + if !join { + return + } + + var services map[string][]*registry.Service + if err := json.Unmarshal(buf, &services); err != nil { + return + } + for _, service := range services { + for _, srv := range service { + d.updates <- &update{ + Update: &pb.Update{Action: actionTypeCreate}, + Service: srv, + sync: nil, + } + } + } +} + +func (g *gossipRegistry) connect(addrs []string) error { + if len(addrs) == 0 { + return nil + } + + timeout := make(<-chan time.Time) + + if g.connectTimeout > 0 { + timeout = time.After(g.connectTimeout) + } + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + fn := func() (int, error) { + return g.member.Join(addrs) + } + + // don't wait for first try + if _, err := fn(); err == nil { + return nil + } + + // wait loop + for { + select { + // context closed + case <-g.options.Context.Done(): + return nil + // call close, don't wait anymore + case <-g.done: + return nil + // in case of timeout fail with a timeout error + case <-timeout: + return fmt.Errorf("[gossip] Registry connect timeout %v", g.addrs) + // got a tick, try to connect + case <-ticker.C: + if _, err := fn(); err == nil { + log.Debugf("[gossip] Registry connect success for %v", g.addrs) + return nil + } else { + log.Debugf("[gossip] Registry connect failed for %v", g.addrs) + } + } + } + + return nil +} + +func (g *gossipRegistry) publish(action string, services []*registry.Service) { + g.RLock() + for _, sub := range g.watchers { + go func(sub chan *registry.Result) { + for _, service := range services { + sub <- ®istry.Result{Action: action, Service: service} + } + }(sub) + } + g.RUnlock() +} + +func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) { + next := make(chan *registry.Result, 10) + exit := make(chan bool) + + id := uuid.New().String() + + g.Lock() + g.watchers[id] = next + g.Unlock() + + go func() { + <-exit + g.Lock() + delete(g.watchers, id) + close(next) + g.Unlock() + }() + + return next, exit +} + +func (g *gossipRegistry) Stop() error { + select { + case <-g.done: + return nil + default: + close(g.done) + g.Lock() + if g.member != nil { + g.member.Leave(g.interval * 2) + g.member.Shutdown() + g.member = nil + } + g.Unlock() + } + return nil +} + +// connectLoop attempts to reconnect to the memberlist +func (g *gossipRegistry) connectLoop() { + // try every second + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-g.done: + return + case <-g.options.Context.Done(): + g.Stop() + return + case <-ticker.C: + var addrs []string + + g.RLock() + + // only process if we have a memberlist + if g.member == nil { + g.RUnlock() + continue + } + + // self + local := g.member.LocalNode().Address() + + // operate on each member + for node, action := range g.members { + switch action { + // process leave event + case nodeActionLeave: + // don't process self + if node == local { + continue + } + addrs = append(addrs, node) + } + } + + g.RUnlock() + + // connect to all the members + // TODO: only connect to new members + if len(addrs) > 0 { + g.connect(addrs) + } + } + } +} + +func (g *gossipRegistry) expiryLoop(updates *updates) { + ticker := time.NewTicker(ExpiryTick) + defer ticker.Stop() + + g.RLock() + done := g.done + g.RUnlock() + + for { + select { + case <-done: + return + case <-ticker.C: + now := uint64(time.Now().UnixNano()) + + updates.Lock() + + // process all the updates + for k, v := range updates.services { + // check if expiry time has passed + if d := (v.Update.Expires); d < now { + // delete from records + delete(updates.services, k) + // set to delete + v.Update.Action = actionTypeDelete + // fire a new update + g.updates <- v + } + } + + updates.Unlock() + } + } +} + +// process member events +func (g *gossipRegistry) eventLoop() { + g.RLock() + done := g.done + g.RUnlock() + for { + select { + // return when done + case <-done: + return + case ev := <-g.events: + // TODO: nonblocking update + g.Lock() + if _, ok := g.members[ev.node]; ok { + g.members[ev.node] = ev.action + } + g.Unlock() + } + } +} + +func (g *gossipRegistry) run() { + updates := &updates{ + services: make(map[uint64]*update), + } + + // expiry loop + go g.expiryLoop(updates) + + // event loop + go g.eventLoop() + + g.RLock() + // connect loop + if g.connectRetry { + go g.connectLoop() + } + g.RUnlock() + + // process the updates + for u := range g.updates { + switch u.Update.Action { + case actionTypeCreate: + g.Lock() + if service, ok := g.services[u.Service.Name]; !ok { + g.services[u.Service.Name] = []*registry.Service{u.Service} + + } else { + g.services[u.Service.Name] = registry.Merge(service, []*registry.Service{u.Service}) + } + g.Unlock() + + // publish update to watchers + go g.publish(actionTypeString(actionTypeCreate), []*registry.Service{u.Service}) + + // we need to expire the node at some point in the future + if u.Update.Expires > 0 { + // create a hash of this service + if hash, err := hashstructure.Hash(u.Service, nil); err == nil { + updates.Lock() + updates.services[hash] = u + updates.Unlock() + } + } + case actionTypeDelete: + g.Lock() + if service, ok := g.services[u.Service.Name]; ok { + if services := registry.Remove(service, []*registry.Service{u.Service}); len(services) == 0 { + delete(g.services, u.Service.Name) + } else { + g.services[u.Service.Name] = services + } + } + g.Unlock() + + // publish update to watchers + go g.publish(actionTypeString(actionTypeDelete), []*registry.Service{u.Service}) + + // delete from expiry checks + if hash, err := hashstructure.Hash(u.Service, nil); err == nil { + updates.Lock() + delete(updates.services, hash) + updates.Unlock() + } + case actionTypeSync: + // no sync channel provided + if u.sync == nil { + continue + } + + g.RLock() + + // push all services through the sync chan + for _, service := range g.services { + for _, srv := range service { + u.sync <- srv + } + + // publish to watchers + go g.publish(actionTypeString(actionTypeCreate), service) + } + + g.RUnlock() + + // close the sync chan + close(u.sync) + } + } +} + +func (g *gossipRegistry) Init(opts ...registry.Option) error { + return configure(g, opts...) +} + +func (g *gossipRegistry) Options() registry.Options { + return g.options +} + +func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { + log.Debugf("[gossip] Registry registering service: %s", s.Name) + + b, err := json.Marshal(s) + if err != nil { + return err + } + + g.Lock() + if service, ok := g.services[s.Name]; !ok { + g.services[s.Name] = []*registry.Service{s} + } else { + g.services[s.Name] = registry.Merge(service, []*registry.Service{s}) + } + g.Unlock() + + var options registry.RegisterOptions + for _, o := range opts { + o(&options) + } + + if options.TTL == 0 && g.tcpInterval == 0 { + return fmt.Errorf("[gossip] Require register TTL or interval for memberlist.Config") + } + + up := &pb.Update{ + Expires: uint64(time.Now().Add(options.TTL).UnixNano()), + Action: actionTypeCreate, + Type: updateTypeService, + Metadata: map[string]string{ + "Content-Type": "application/json", + }, + Data: b, + } + + g.queue.QueueBroadcast(&broadcast{ + update: up, + notify: nil, + }) + + // send update to local watchers + g.updates <- &update{ + Update: up, + Service: s, + } + + // wait + <-time.After(g.interval * 2) + + return nil +} + +func (g *gossipRegistry) Deregister(s *registry.Service) error { + + log.Debugf("[gossip] Registry deregistering service: %s", s.Name) + + b, err := json.Marshal(s) + if err != nil { + return err + } + + g.Lock() + if service, ok := g.services[s.Name]; ok { + if services := registry.Remove(service, []*registry.Service{s}); len(services) == 0 { + delete(g.services, s.Name) + } else { + g.services[s.Name] = services + } + } + g.Unlock() + + up := &pb.Update{ + Action: actionTypeDelete, + Type: updateTypeService, + Metadata: map[string]string{ + "Content-Type": "application/json", + }, + Data: b, + } + + g.queue.QueueBroadcast(&broadcast{ + update: up, + notify: nil, + }) + + // send update to local watchers + g.updates <- &update{ + Update: up, + Service: s, + } + + // wait + <-time.After(g.interval * 2) + + return nil +} + +func (g *gossipRegistry) GetService(name string) ([]*registry.Service, error) { + g.RLock() + service, ok := g.services[name] + g.RUnlock() + if !ok { + return nil, registry.ErrNotFound + } + return service, nil +} + +func (g *gossipRegistry) ListServices() ([]*registry.Service, error) { + var services []*registry.Service + g.RLock() + for _, service := range g.services { + services = append(services, service...) + } + g.RUnlock() + return services, nil +} + +func (g *gossipRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { + n, e := g.subscribe() + return newGossipWatcher(n, e, opts...) +} + +func (g *gossipRegistry) String() string { + return "gossip" +} + +func NewRegistry(opts ...registry.Option) registry.Registry { + g := &gossipRegistry{ + options: registry.Options{ + Context: context.Background(), + }, + done: make(chan bool), + events: make(chan *event, 100), + updates: make(chan *update, 100), + services: make(map[string][]*registry.Service), + watchers: make(map[string]chan *registry.Result), + members: make(map[string]int32), + } + // run the updater + go g.run() + + // configure the gossiper + if err := configure(g, opts...); err != nil { + log.Fatalf("[gossip] Registry configuring error: %v", err) + } + // wait for setup + <-time.After(g.interval * 2) + + return g +} diff --git a/gossip_test.go b/gossip_test.go new file mode 100644 index 0000000..9154d08 --- /dev/null +++ b/gossip_test.go @@ -0,0 +1,214 @@ +package gossip + +import ( + "os" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/hashicorp/memberlist" + "github.com/micro/go-micro/registry" +) + +func newMemberlistConfig() *memberlist.Config { + mc := memberlist.DefaultLANConfig() + mc.DisableTcpPings = false + mc.GossipVerifyIncoming = false + mc.GossipVerifyOutgoing = false + mc.EnableCompression = false + mc.PushPullInterval = 3 * time.Second + mc.LogOutput = os.Stderr + mc.ProtocolVersion = 4 + mc.Name = uuid.New().String() + return mc +} + +func newRegistry(opts ...registry.Option) registry.Registry { + options := []registry.Option{ + ConnectRetry(true), + ConnectTimeout(60 * time.Second), + } + + options = append(options, opts...) + r := NewRegistry(options...) + return r +} + +func TestGossipRegistryBroadcast(t *testing.T) { + mc1 := newMemberlistConfig() + r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321")) + + mc2 := newMemberlistConfig() + r2 := newRegistry(Config(mc2), Address("127.0.0.1:54322"), registry.Addrs("127.0.0.1:54321")) + + defer r1.(*gossipRegistry).Stop() + defer r2.(*gossipRegistry).Stop() + + svc1 := ®istry.Service{Name: "service.1", Version: "0.0.0.1"} + svc2 := ®istry.Service{Name: "service.2", Version: "0.0.0.2"} + + if err := r1.Register(svc1, registry.RegisterTTL(10*time.Second)); err != nil { + t.Fatal(err) + } + if err := r2.Register(svc2, registry.RegisterTTL(10*time.Second)); err != nil { + t.Fatal(err) + } + + var found bool + svcs, err := r1.ListServices() + if err != nil { + t.Fatal(err) + } + + for _, svc := range svcs { + if svc.Name == "service.2" { + found = true + } + } + if !found { + t.Fatalf("[gossip registry] service.2 not found in r1, broadcast not work") + } + + found = false + + svcs, err = r2.ListServices() + if err != nil { + t.Fatal(err) + } + + for _, svc := range svcs { + if svc.Name == "service.1" { + found = true + } + } + + if !found { + t.Fatalf("[gossip registry] broadcast failed: service.1 not found in r2") + } + + if err := r1.Deregister(svc1); err != nil { + t.Fatal(err) + } + if err := r2.Deregister(svc2); err != nil { + t.Fatal(err) + } + +} +func TestGossipRegistryRetry(t *testing.T) { + mc1 := newMemberlistConfig() + r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321")) + + mc2 := newMemberlistConfig() + r2 := newRegistry(Config(mc2), Address("127.0.0.1:54322"), registry.Addrs("127.0.0.1:54321")) + + defer r1.(*gossipRegistry).Stop() + defer r2.(*gossipRegistry).Stop() + + svc1 := ®istry.Service{Name: "service.1", Version: "0.0.0.1"} + svc2 := ®istry.Service{Name: "service.2", Version: "0.0.0.2"} + + var mu sync.Mutex + ch := make(chan struct{}) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + go func() { + for { + select { + case <-ticker.C: + mu.Lock() + if r1 != nil { + r1.Register(svc1, registry.RegisterTTL(2*time.Second)) + } + if r2 != nil { + r2.Register(svc2, registry.RegisterTTL(2*time.Second)) + } + if ch != nil { + close(ch) + ch = nil + } + mu.Unlock() + } + } + }() + + <-ch + var found bool + + svcs, err := r2.ListServices() + if err != nil { + t.Fatal(err) + } + + for _, svc := range svcs { + if svc.Name == "service.1" { + found = true + } + } + + if !found { + t.Fatalf("[gossip registry] broadcast failed: service.1 not found in r2") + } + + if err = r1.(*gossipRegistry).Stop(); err != nil { + t.Fatalf("[gossip registry] failed to stop registry: %v", err) + } + + mu.Lock() + r1 = nil + mu.Unlock() + + <-time.After(3 * time.Second) + + found = false + svcs, err = r2.ListServices() + if err != nil { + t.Fatal(err) + } + + for _, svc := range svcs { + if svc.Name == "service.1" { + found = true + } + } + + if found { + t.Fatalf("[gossip registry] service.1 found in r2") + } + + if tr := os.Getenv("TRAVIS"); len(tr) > 0 { + t.Logf("[gossip registry] skip test on travis") + t.Skip() + return + } + + r1 = newRegistry(Config(mc1), Address("127.0.0.1:54321")) + <-time.After(2 * time.Second) + + found = false + svcs, err = r2.ListServices() + if err != nil { + t.Fatal(err) + } + + for _, svc := range svcs { + if svc.Name == "service.1" { + found = true + } + } + + if !found { + t.Fatalf("[gossip registry] connect retry failed: service.1 not found in r2") + } + + if err := r1.Deregister(svc1); err != nil { + t.Fatal(err) + } + if err := r2.Deregister(svc2); err != nil { + t.Fatal(err) + } + + r1.(*gossipRegistry).Stop() + r2.(*gossipRegistry).Stop() +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..cf9dc9d --- /dev/null +++ b/options.go @@ -0,0 +1,58 @@ +package gossip + +import ( + "context" + "time" + + "github.com/hashicorp/memberlist" + "github.com/micro/go-micro/registry" +) + +type secretKey struct{} +type addressKey struct{} +type configKey struct{} +type advertiseKey struct{} +type connectTimeoutKey struct{} +type connectRetryKey struct{} + +// helper for setting registry options +func setRegistryOption(k, v interface{}) registry.Option { + return func(o *registry.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} + +// Secret specifies an encryption key. The value should be either +// 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256. +func Secret(k []byte) registry.Option { + return setRegistryOption(secretKey{}, k) +} + +// Address to bind to - host:port +func Address(a string) registry.Option { + return setRegistryOption(addressKey{}, a) +} + +// Config sets *memberlist.Config for configuring gossip +func Config(c *memberlist.Config) registry.Option { + return setRegistryOption(configKey{}, c) +} + +// The address to advertise for other gossip members to connect to - host:port +func Advertise(a string) registry.Option { + return setRegistryOption(advertiseKey{}, a) +} + +// ConnectTimeout sets the registry connect timeout. Use -1 to specify infinite timeout +func ConnectTimeout(td time.Duration) registry.Option { + return setRegistryOption(connectTimeoutKey{}, td) +} + +// ConnectRetry enables reconnect to registry then connection closed, +// use with ConnectTimeout to specify how long retry +func ConnectRetry(v bool) registry.Option { + return setRegistryOption(connectRetryKey{}, v) +} diff --git a/proto/gossip.micro.go b/proto/gossip.micro.go new file mode 100644 index 0000000..fe9796d --- /dev/null +++ b/proto/gossip.micro.go @@ -0,0 +1,28 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: github.com/micro/go-micro/registry/gossip/proto/gossip.proto + +/* +Package gossip is a generated protocol buffer package. + +It is generated from these files: + github.com/micro/go-micro/registry/gossip/proto/gossip.proto + +It has these top-level messages: + Update +*/ +package gossip + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package diff --git a/proto/gossip.pb.go b/proto/gossip.pb.go new file mode 100644 index 0000000..e5d692f --- /dev/null +++ b/proto/gossip.pb.go @@ -0,0 +1,127 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: github.com/micro/go-micro/registry/gossip/proto/gossip.proto + +package gossip + +import ( + fmt "fmt" + math "math" + + proto "github.com/golang/protobuf/proto" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +// Update is the message broadcast +type Update struct { + // time to live for entry + Expires uint64 `protobuf:"varint,1,opt,name=expires,proto3" json:"expires,omitempty"` + // type of update + Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"` + // what action is taken + Action int32 `protobuf:"varint,3,opt,name=action,proto3" json:"action,omitempty"` + // any other associated metadata about the data + Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // the payload data; + Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Update) Reset() { *m = Update{} } +func (m *Update) String() string { return proto.CompactTextString(m) } +func (*Update) ProtoMessage() {} +func (*Update) Descriptor() ([]byte, []int) { + return fileDescriptor_18cba623e76e57f3, []int{0} +} + +func (m *Update) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Update.Unmarshal(m, b) +} +func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Update.Marshal(b, m, deterministic) +} +func (m *Update) XXX_Merge(src proto.Message) { + xxx_messageInfo_Update.Merge(m, src) +} +func (m *Update) XXX_Size() int { + return xxx_messageInfo_Update.Size(m) +} +func (m *Update) XXX_DiscardUnknown() { + xxx_messageInfo_Update.DiscardUnknown(m) +} + +var xxx_messageInfo_Update proto.InternalMessageInfo + +func (m *Update) GetExpires() uint64 { + if m != nil { + return m.Expires + } + return 0 +} + +func (m *Update) GetType() int32 { + if m != nil { + return m.Type + } + return 0 +} + +func (m *Update) GetAction() int32 { + if m != nil { + return m.Action + } + return 0 +} + +func (m *Update) GetMetadata() map[string]string { + if m != nil { + return m.Metadata + } + return nil +} + +func (m *Update) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func init() { + proto.RegisterType((*Update)(nil), "gossip.Update") + proto.RegisterMapType((map[string]string)(nil), "gossip.Update.MetadataEntry") +} + +func init() { + proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_18cba623e76e57f3) +} + +var fileDescriptor_18cba623e76e57f3 = []byte{ + // 227 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x8f, 0xc1, 0x4a, 0x03, 0x31, + 0x14, 0x45, 0x49, 0xa7, 0x4d, 0xed, 0x53, 0x41, 0x1e, 0x22, 0x41, 0x5c, 0x0c, 0xae, 0x66, 0xe3, + 0x0c, 0xe8, 0xa6, 0xa8, 0x5b, 0x97, 0x6e, 0x02, 0x7e, 0x40, 0x3a, 0x0d, 0x31, 0xe8, 0x34, 0x21, + 0x79, 0x15, 0xf3, 0xa9, 0xfe, 0x8d, 0x34, 0x89, 0x42, 0x77, 0xe7, 0x24, 0x37, 0xdc, 0x1b, 0x78, + 0x36, 0x96, 0xde, 0xf7, 0x9b, 0x7e, 0x74, 0xd3, 0x30, 0xd9, 0x31, 0xb8, 0xc1, 0xb8, 0xbb, 0x02, + 0x41, 0x1b, 0x1b, 0x29, 0xa4, 0xc1, 0xb8, 0x18, 0xad, 0x1f, 0x7c, 0x70, 0xe4, 0xaa, 0xf4, 0x59, + 0x90, 0x17, 0xbb, 0xfd, 0x61, 0xc0, 0xdf, 0xfc, 0x56, 0x91, 0x46, 0x01, 0x4b, 0xfd, 0xed, 0x6d, + 0xd0, 0x51, 0xb0, 0x96, 0x75, 0x73, 0xf9, 0xa7, 0x88, 0x30, 0xa7, 0xe4, 0xb5, 0x98, 0xb5, 0xac, + 0x5b, 0xc8, 0xcc, 0x78, 0x05, 0x5c, 0x8d, 0x64, 0xdd, 0x4e, 0x34, 0xf9, 0xb4, 0x1a, 0xae, 0xe1, + 0x64, 0xd2, 0xa4, 0xb6, 0x8a, 0x94, 0xe0, 0x6d, 0xd3, 0x9d, 0xde, 0xdf, 0xf4, 0xb5, 0xb9, 0xf4, + 0xf4, 0xaf, 0xf5, 0xfa, 0x65, 0x47, 0x21, 0xc9, 0xff, 0xf4, 0xa1, 0x25, 0xbf, 0x5a, 0xb6, 0xac, + 0x3b, 0x93, 0x99, 0xaf, 0x9f, 0xe0, 0xfc, 0x28, 0x8e, 0x17, 0xd0, 0x7c, 0xe8, 0x94, 0x07, 0xae, + 0xe4, 0x01, 0xf1, 0x12, 0x16, 0x5f, 0xea, 0x73, 0x5f, 0xd6, 0xad, 0x64, 0x91, 0xc7, 0xd9, 0x9a, + 0x6d, 0x78, 0xfe, 0xea, 0xc3, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x63, 0x7b, 0x1b, 0x2a, + 0x01, 0x00, 0x00, +} diff --git a/proto/gossip.proto b/proto/gossip.proto new file mode 100644 index 0000000..21acaaa --- /dev/null +++ b/proto/gossip.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package gossip; + +// Update is the message broadcast +message Update { + // time to live for entry + uint64 expires = 1; + // type of update + int32 type = 2; + // what action is taken + int32 action = 3; + // any other associated metadata about the data + map metadata = 6; + // the payload data; + bytes data = 7; +} diff --git a/watcher.go b/watcher.go new file mode 100644 index 0000000..57b3552 --- /dev/null +++ b/watcher.go @@ -0,0 +1,53 @@ +package gossip + +import ( + "github.com/micro/go-micro/registry" +) + +type gossipWatcher struct { + wo registry.WatchOptions + next chan *registry.Result + stop chan bool +} + +func newGossipWatcher(ch chan *registry.Result, stop chan bool, opts ...registry.WatchOption) (registry.Watcher, error) { + var wo registry.WatchOptions + for _, o := range opts { + o(&wo) + } + + return &gossipWatcher{ + wo: wo, + next: ch, + stop: stop, + }, nil +} + +func (m *gossipWatcher) Next() (*registry.Result, error) { + for { + select { + case r, ok := <-m.next: + if !ok { + return nil, registry.ErrWatcherStopped + } + // check watch options + if len(m.wo.Service) > 0 && r.Service.Name != m.wo.Service { + continue + } + nr := ®istry.Result{} + *nr = *r + return nr, nil + case <-m.stop: + return nil, registry.ErrWatcherStopped + } + } +} + +func (m *gossipWatcher) Stop() { + select { + case <-m.stop: + return + default: + close(m.stop) + } +}