From 8d2b12258f4b891fb5637ff44b9402a5b5ad4c61 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 3 Oct 2019 09:29:48 +0100 Subject: [PATCH] Remove gossip registry --- config/cmd/cmd.go | 2 - registry/gossip/README.md | 24 - registry/gossip/gossip.go | 848 -------------------------- registry/gossip/gossip_test.go | 214 ------- registry/gossip/options.go | 58 -- registry/gossip/proto/gossip.micro.go | 28 - registry/gossip/proto/gossip.pb.go | 127 ---- registry/gossip/proto/gossip.proto | 17 - registry/gossip/watcher.go | 53 -- 9 files changed, 1371 deletions(-) delete mode 100644 registry/gossip/README.md delete mode 100644 registry/gossip/gossip.go delete mode 100644 registry/gossip/gossip_test.go delete mode 100644 registry/gossip/options.go delete mode 100644 registry/gossip/proto/gossip.micro.go delete mode 100644 registry/gossip/proto/gossip.pb.go delete mode 100644 registry/gossip/proto/gossip.proto delete mode 100644 registry/gossip/watcher.go diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index 82d4714c..cfb8b313 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -28,7 +28,6 @@ import ( "github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry/consul" "github.com/micro/go-micro/registry/etcd" - "github.com/micro/go-micro/registry/gossip" "github.com/micro/go-micro/registry/mdns" rmem "github.com/micro/go-micro/registry/memory" regSrv "github.com/micro/go-micro/registry/service" @@ -196,7 +195,6 @@ var ( "service": regSrv.NewRegistry, "consul": consul.NewRegistry, "etcd": etcd.NewRegistry, - "gossip": gossip.NewRegistry, "mdns": mdns.NewRegistry, "memory": rmem.NewRegistry, } diff --git a/registry/gossip/README.md b/registry/gossip/README.md deleted file mode 100644 index bfeca969..00000000 --- a/registry/gossip/README.md +++ /dev/null @@ -1,24 +0,0 @@ -# Gossip Registry - -Gossip is a zero dependency registry which uses github.com/hashicorp/memberlist to broadcast registry information -via the SWIM protocol. - -## Usage - -Start with the registry flag or env var - -```bash -MICRO_REGISTRY=gossip go run service.go -``` - -On startup you'll see something like - -```bash -2018/12/06 18:17:48 Registry Listening on 192.168.1.65:56390 -``` - -To join this gossip ring set the registry address using flag or env var - -```bash -MICRO_REGISTRY_ADDRESS=192.168.1.65:56390 -``` diff --git a/registry/gossip/gossip.go b/registry/gossip/gossip.go deleted file mode 100644 index 416422f1..00000000 --- a/registry/gossip/gossip.go +++ /dev/null @@ -1,848 +0,0 @@ -// Package gossip provides a gossip registry based on hashicorp/memberlist -package gossip - -import ( - "context" - "encoding/json" - "fmt" - "io/ioutil" - "net" - "os" - "strconv" - "strings" - "sync" - "time" - - "github.com/golang/protobuf/proto" - "github.com/google/uuid" - "github.com/hashicorp/memberlist" - "github.com/micro/go-micro/registry" - pb "github.com/micro/go-micro/registry/gossip/proto" - log "github.com/micro/go-micro/util/log" - "github.com/mitchellh/hashstructure" -) - -// use registry.Result int32 values after it switches from string to int32 types -// type actionType int32 -// type updateType int32 - -const ( - actionTypeInvalid int32 = iota - actionTypeCreate - actionTypeDelete - actionTypeUpdate - actionTypeSync -) - -const ( - nodeActionUnknown int32 = iota - nodeActionJoin - nodeActionLeave - nodeActionUpdate -) - -func actionTypeString(t int32) string { - switch t { - case actionTypeCreate: - return "create" - case actionTypeDelete: - return "delete" - case actionTypeUpdate: - return "update" - case actionTypeSync: - return "sync" - } - return "invalid" -} - -const ( - updateTypeInvalid int32 = iota - updateTypeService -) - -type broadcast struct { - update *pb.Update - notify chan<- struct{} -} - -type delegate struct { - queue *memberlist.TransmitLimitedQueue - updates chan *update -} - -type event struct { - action int32 - node string -} - -type eventDelegate struct { - events chan *event -} - -func (ed *eventDelegate) NotifyJoin(n *memberlist.Node) { - ed.events <- &event{action: nodeActionJoin, node: n.Address()} -} -func (ed *eventDelegate) NotifyLeave(n *memberlist.Node) { - ed.events <- &event{action: nodeActionLeave, node: n.Address()} -} -func (ed *eventDelegate) NotifyUpdate(n *memberlist.Node) { - ed.events <- &event{action: nodeActionUpdate, node: n.Address()} -} - -type gossipRegistry struct { - queue *memberlist.TransmitLimitedQueue - updates chan *update - events chan *event - options registry.Options - member *memberlist.Memberlist - interval time.Duration - tcpInterval time.Duration - - connectRetry bool - connectTimeout time.Duration - sync.RWMutex - services map[string][]*registry.Service - - watchers map[string]chan *registry.Result - - mtu int - addrs []string - members map[string]int32 - done chan bool -} - -type update struct { - Update *pb.Update - Service *registry.Service - sync chan *registry.Service -} - -type updates struct { - sync.RWMutex - services map[uint64]*update -} - -var ( - // You should change this if using secure - DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes - ExpiryTick = time.Second * 1 // needs to be smaller than registry.RegisterTTL - MaxPacketSize = 512 -) - -func configure(g *gossipRegistry, opts ...registry.Option) error { - // loop through address list and get valid entries - addrs := func(curAddrs []string) []string { - var newAddrs []string - for _, addr := range curAddrs { - if trimAddr := strings.TrimSpace(addr); len(trimAddr) > 0 { - newAddrs = append(newAddrs, trimAddr) - } - } - return newAddrs - } - - // current address list - curAddrs := addrs(g.options.Addrs) - - // parse options - for _, o := range opts { - o(&g.options) - } - - // new address list - newAddrs := addrs(g.options.Addrs) - - // no new nodes and existing member. no configure - if (len(newAddrs) == len(curAddrs)) && g.member != nil { - return nil - } - - // shutdown old member - g.Stop() - - // lock internals - g.Lock() - - // new done chan - g.done = make(chan bool) - - // replace addresses - curAddrs = newAddrs - - // create a new default config - c := memberlist.DefaultLocalConfig() - - // sane good default options - c.LogOutput = ioutil.Discard // log to /dev/null - c.PushPullInterval = 0 // disable expensive tcp push/pull - c.ProtocolVersion = 4 // suport latest stable features - - // set config from options - if config, ok := g.options.Context.Value(configKey{}).(*memberlist.Config); ok && config != nil { - c = config - } - - // set address - if address, ok := g.options.Context.Value(addressKey{}).(string); ok { - host, port, err := net.SplitHostPort(address) - if err == nil { - p, err := strconv.Atoi(port) - if err == nil { - c.BindPort = p - } - c.BindAddr = host - } - } else { - // set bind to random port - c.BindPort = 0 - } - - // set the advertise address - if advertise, ok := g.options.Context.Value(advertiseKey{}).(string); ok { - host, port, err := net.SplitHostPort(advertise) - if err == nil { - p, err := strconv.Atoi(port) - if err == nil { - c.AdvertisePort = p - } - c.AdvertiseAddr = host - } - } - - // machine hostname - hostname, _ := os.Hostname() - - // set the name - c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-") - - // set a secret key if secure - if g.options.Secure { - k, ok := g.options.Context.Value(secretKey{}).([]byte) - if !ok { - // use the default secret - k = DefaultSecret - } - c.SecretKey = k - } - - // set connect retry - if v, ok := g.options.Context.Value(connectRetryKey{}).(bool); ok && v { - g.connectRetry = true - } - - // set connect timeout - if td, ok := g.options.Context.Value(connectTimeoutKey{}).(time.Duration); ok { - g.connectTimeout = td - } - - // create a queue - queue := &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return len(curAddrs) - }, - RetransmitMult: 3, - } - - // set the delegate - c.Delegate = &delegate{ - updates: g.updates, - queue: queue, - } - - if g.connectRetry { - c.Events = &eventDelegate{ - events: g.events, - } - } - // create the memberlist - m, err := memberlist.Create(c) - if err != nil { - return err - } - - if len(curAddrs) > 0 { - for _, addr := range curAddrs { - g.members[addr] = nodeActionUnknown - } - } - - g.tcpInterval = c.PushPullInterval - g.addrs = curAddrs - g.queue = queue - g.member = m - g.interval = c.GossipInterval - - g.Unlock() - - log.Logf("[gossip] Registry Listening on %s", m.LocalNode().Address()) - - // try connect - return g.connect(curAddrs) -} - -func (*broadcast) UniqueBroadcast() {} - -func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { - return false -} - -func (b *broadcast) Message() []byte { - up, err := proto.Marshal(b.update) - if err != nil { - return nil - } - if l := len(up); l > MaxPacketSize { - log.Logf("[gossip] Registry broadcast message size %d bigger then MaxPacketSize %d", l, MaxPacketSize) - } - return up -} - -func (b *broadcast) Finished() { - if b.notify != nil { - close(b.notify) - } -} - -func (d *delegate) NodeMeta(limit int) []byte { - return []byte{} -} - -func (d *delegate) NotifyMsg(b []byte) { - if len(b) == 0 { - return - } - - go func() { - up := new(pb.Update) - if err := proto.Unmarshal(b, up); err != nil { - return - } - - // only process service action - if up.Type != updateTypeService { - return - } - - var service *registry.Service - - switch up.Metadata["Content-Type"] { - case "application/json": - if err := json.Unmarshal(up.Data, &service); err != nil { - return - } - // no other content type - default: - return - } - - // send update - d.updates <- &update{ - Update: up, - Service: service, - } - }() -} - -func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { - return d.queue.GetBroadcasts(overhead, limit) -} - -func (d *delegate) LocalState(join bool) []byte { - if !join { - return []byte{} - } - - syncCh := make(chan *registry.Service, 1) - services := map[string][]*registry.Service{} - - d.updates <- &update{ - Update: &pb.Update{ - Action: actionTypeSync, - }, - sync: syncCh, - } - - for srv := range syncCh { - services[srv.Name] = append(services[srv.Name], srv) - } - - b, _ := json.Marshal(services) - return b -} - -func (d *delegate) MergeRemoteState(buf []byte, join bool) { - if len(buf) == 0 { - return - } - if !join { - return - } - - var services map[string][]*registry.Service - if err := json.Unmarshal(buf, &services); err != nil { - return - } - for _, service := range services { - for _, srv := range service { - d.updates <- &update{ - Update: &pb.Update{Action: actionTypeCreate}, - Service: srv, - sync: nil, - } - } - } -} - -func (g *gossipRegistry) connect(addrs []string) error { - if len(addrs) == 0 { - return nil - } - - timeout := make(<-chan time.Time) - - if g.connectTimeout > 0 { - timeout = time.After(g.connectTimeout) - } - - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - fn := func() (int, error) { - return g.member.Join(addrs) - } - - // don't wait for first try - if _, err := fn(); err == nil { - return nil - } - - // wait loop - for { - select { - // context closed - case <-g.options.Context.Done(): - return nil - // call close, don't wait anymore - case <-g.done: - return nil - // in case of timeout fail with a timeout error - case <-timeout: - return fmt.Errorf("[gossip] Registry connect timeout %v", g.addrs) - // got a tick, try to connect - case <-ticker.C: - if _, err := fn(); err == nil { - log.Debugf("[gossip] Registry connect success for %v", g.addrs) - return nil - } else { - log.Debugf("[gossip] Registry connect failed for %v", g.addrs) - } - } - } - - return nil -} - -func (g *gossipRegistry) publish(action string, services []*registry.Service) { - g.RLock() - for _, sub := range g.watchers { - go func(sub chan *registry.Result) { - for _, service := range services { - sub <- ®istry.Result{Action: action, Service: service} - } - }(sub) - } - g.RUnlock() -} - -func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) { - next := make(chan *registry.Result, 10) - exit := make(chan bool) - - id := uuid.New().String() - - g.Lock() - g.watchers[id] = next - g.Unlock() - - go func() { - <-exit - g.Lock() - delete(g.watchers, id) - close(next) - g.Unlock() - }() - - return next, exit -} - -func (g *gossipRegistry) Stop() error { - select { - case <-g.done: - return nil - default: - close(g.done) - g.Lock() - if g.member != nil { - g.member.Leave(g.interval * 2) - g.member.Shutdown() - g.member = nil - } - g.Unlock() - } - return nil -} - -// connectLoop attempts to reconnect to the memberlist -func (g *gossipRegistry) connectLoop() { - // try every second - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for { - select { - case <-g.done: - return - case <-g.options.Context.Done(): - g.Stop() - return - case <-ticker.C: - var addrs []string - - g.RLock() - - // only process if we have a memberlist - if g.member == nil { - g.RUnlock() - continue - } - - // self - local := g.member.LocalNode().Address() - - // operate on each member - for node, action := range g.members { - switch action { - // process leave event - case nodeActionLeave: - // don't process self - if node == local { - continue - } - addrs = append(addrs, node) - } - } - - g.RUnlock() - - // connect to all the members - // TODO: only connect to new members - if len(addrs) > 0 { - g.connect(addrs) - } - } - } -} - -func (g *gossipRegistry) expiryLoop(updates *updates) { - ticker := time.NewTicker(ExpiryTick) - defer ticker.Stop() - - g.RLock() - done := g.done - g.RUnlock() - - for { - select { - case <-done: - return - case <-ticker.C: - now := uint64(time.Now().UnixNano()) - - updates.Lock() - - // process all the updates - for k, v := range updates.services { - // check if expiry time has passed - if d := (v.Update.Expires); d < now { - // delete from records - delete(updates.services, k) - // set to delete - v.Update.Action = actionTypeDelete - // fire a new update - g.updates <- v - } - } - - updates.Unlock() - } - } -} - -// process member events -func (g *gossipRegistry) eventLoop() { - g.RLock() - done := g.done - g.RUnlock() - for { - select { - // return when done - case <-done: - return - case ev := <-g.events: - // TODO: nonblocking update - g.Lock() - if _, ok := g.members[ev.node]; ok { - g.members[ev.node] = ev.action - } - g.Unlock() - } - } -} - -func (g *gossipRegistry) run() { - updates := &updates{ - services: make(map[uint64]*update), - } - - // expiry loop - go g.expiryLoop(updates) - - // event loop - go g.eventLoop() - - g.RLock() - // connect loop - if g.connectRetry { - go g.connectLoop() - } - g.RUnlock() - - // process the updates - for u := range g.updates { - switch u.Update.Action { - case actionTypeCreate: - g.Lock() - if service, ok := g.services[u.Service.Name]; !ok { - g.services[u.Service.Name] = []*registry.Service{u.Service} - - } else { - g.services[u.Service.Name] = registry.Merge(service, []*registry.Service{u.Service}) - } - g.Unlock() - - // publish update to watchers - go g.publish(actionTypeString(actionTypeCreate), []*registry.Service{u.Service}) - - // we need to expire the node at some point in the future - if u.Update.Expires > 0 { - // create a hash of this service - if hash, err := hashstructure.Hash(u.Service, nil); err == nil { - updates.Lock() - updates.services[hash] = u - updates.Unlock() - } - } - case actionTypeDelete: - g.Lock() - if service, ok := g.services[u.Service.Name]; ok { - if services := registry.Remove(service, []*registry.Service{u.Service}); len(services) == 0 { - delete(g.services, u.Service.Name) - } else { - g.services[u.Service.Name] = services - } - } - g.Unlock() - - // publish update to watchers - go g.publish(actionTypeString(actionTypeDelete), []*registry.Service{u.Service}) - - // delete from expiry checks - if hash, err := hashstructure.Hash(u.Service, nil); err == nil { - updates.Lock() - delete(updates.services, hash) - updates.Unlock() - } - case actionTypeSync: - // no sync channel provided - if u.sync == nil { - continue - } - - g.RLock() - - // push all services through the sync chan - for _, service := range g.services { - for _, srv := range service { - u.sync <- srv - } - - // publish to watchers - go g.publish(actionTypeString(actionTypeCreate), service) - } - - g.RUnlock() - - // close the sync chan - close(u.sync) - } - } -} - -func (g *gossipRegistry) Init(opts ...registry.Option) error { - return configure(g, opts...) -} - -func (g *gossipRegistry) Options() registry.Options { - return g.options -} - -func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { - log.Debugf("[gossip] Registry registering service: %s", s.Name) - - b, err := json.Marshal(s) - if err != nil { - return err - } - - g.Lock() - if service, ok := g.services[s.Name]; !ok { - g.services[s.Name] = []*registry.Service{s} - } else { - g.services[s.Name] = registry.Merge(service, []*registry.Service{s}) - } - g.Unlock() - - var options registry.RegisterOptions - for _, o := range opts { - o(&options) - } - - if options.TTL == 0 && g.tcpInterval == 0 { - return fmt.Errorf("[gossip] Require register TTL or interval for memberlist.Config") - } - - up := &pb.Update{ - Expires: uint64(time.Now().Add(options.TTL).UnixNano()), - Action: actionTypeCreate, - Type: updateTypeService, - Metadata: map[string]string{ - "Content-Type": "application/json", - }, - Data: b, - } - - g.queue.QueueBroadcast(&broadcast{ - update: up, - notify: nil, - }) - - // send update to local watchers - g.updates <- &update{ - Update: up, - Service: s, - } - - // wait - <-time.After(g.interval * 2) - - return nil -} - -func (g *gossipRegistry) Deregister(s *registry.Service) error { - - log.Debugf("[gossip] Registry deregistering service: %s", s.Name) - - b, err := json.Marshal(s) - if err != nil { - return err - } - - g.Lock() - if service, ok := g.services[s.Name]; ok { - if services := registry.Remove(service, []*registry.Service{s}); len(services) == 0 { - delete(g.services, s.Name) - } else { - g.services[s.Name] = services - } - } - g.Unlock() - - up := &pb.Update{ - Action: actionTypeDelete, - Type: updateTypeService, - Metadata: map[string]string{ - "Content-Type": "application/json", - }, - Data: b, - } - - g.queue.QueueBroadcast(&broadcast{ - update: up, - notify: nil, - }) - - // send update to local watchers - g.updates <- &update{ - Update: up, - Service: s, - } - - // wait - <-time.After(g.interval * 2) - - return nil -} - -func (g *gossipRegistry) GetService(name string) ([]*registry.Service, error) { - g.RLock() - service, ok := g.services[name] - g.RUnlock() - if !ok { - return nil, registry.ErrNotFound - } - return service, nil -} - -func (g *gossipRegistry) ListServices() ([]*registry.Service, error) { - var services []*registry.Service - g.RLock() - for _, service := range g.services { - services = append(services, service...) - } - g.RUnlock() - return services, nil -} - -func (g *gossipRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { - n, e := g.subscribe() - return newGossipWatcher(n, e, opts...) -} - -func (g *gossipRegistry) String() string { - return "gossip" -} - -func NewRegistry(opts ...registry.Option) registry.Registry { - g := &gossipRegistry{ - options: registry.Options{ - Context: context.Background(), - }, - done: make(chan bool), - events: make(chan *event, 100), - updates: make(chan *update, 100), - services: make(map[string][]*registry.Service), - watchers: make(map[string]chan *registry.Result), - members: make(map[string]int32), - } - // run the updater - go g.run() - - // configure the gossiper - if err := configure(g, opts...); err != nil { - log.Fatalf("[gossip] Registry configuring error: %v", err) - } - // wait for setup - <-time.After(g.interval * 2) - - return g -} diff --git a/registry/gossip/gossip_test.go b/registry/gossip/gossip_test.go deleted file mode 100644 index 9154d08d..00000000 --- a/registry/gossip/gossip_test.go +++ /dev/null @@ -1,214 +0,0 @@ -package gossip - -import ( - "os" - "sync" - "testing" - "time" - - "github.com/google/uuid" - "github.com/hashicorp/memberlist" - "github.com/micro/go-micro/registry" -) - -func newMemberlistConfig() *memberlist.Config { - mc := memberlist.DefaultLANConfig() - mc.DisableTcpPings = false - mc.GossipVerifyIncoming = false - mc.GossipVerifyOutgoing = false - mc.EnableCompression = false - mc.PushPullInterval = 3 * time.Second - mc.LogOutput = os.Stderr - mc.ProtocolVersion = 4 - mc.Name = uuid.New().String() - return mc -} - -func newRegistry(opts ...registry.Option) registry.Registry { - options := []registry.Option{ - ConnectRetry(true), - ConnectTimeout(60 * time.Second), - } - - options = append(options, opts...) - r := NewRegistry(options...) - return r -} - -func TestGossipRegistryBroadcast(t *testing.T) { - mc1 := newMemberlistConfig() - r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321")) - - mc2 := newMemberlistConfig() - r2 := newRegistry(Config(mc2), Address("127.0.0.1:54322"), registry.Addrs("127.0.0.1:54321")) - - defer r1.(*gossipRegistry).Stop() - defer r2.(*gossipRegistry).Stop() - - svc1 := ®istry.Service{Name: "service.1", Version: "0.0.0.1"} - svc2 := ®istry.Service{Name: "service.2", Version: "0.0.0.2"} - - if err := r1.Register(svc1, registry.RegisterTTL(10*time.Second)); err != nil { - t.Fatal(err) - } - if err := r2.Register(svc2, registry.RegisterTTL(10*time.Second)); err != nil { - t.Fatal(err) - } - - var found bool - svcs, err := r1.ListServices() - if err != nil { - t.Fatal(err) - } - - for _, svc := range svcs { - if svc.Name == "service.2" { - found = true - } - } - if !found { - t.Fatalf("[gossip registry] service.2 not found in r1, broadcast not work") - } - - found = false - - svcs, err = r2.ListServices() - if err != nil { - t.Fatal(err) - } - - for _, svc := range svcs { - if svc.Name == "service.1" { - found = true - } - } - - if !found { - t.Fatalf("[gossip registry] broadcast failed: service.1 not found in r2") - } - - if err := r1.Deregister(svc1); err != nil { - t.Fatal(err) - } - if err := r2.Deregister(svc2); err != nil { - t.Fatal(err) - } - -} -func TestGossipRegistryRetry(t *testing.T) { - mc1 := newMemberlistConfig() - r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321")) - - mc2 := newMemberlistConfig() - r2 := newRegistry(Config(mc2), Address("127.0.0.1:54322"), registry.Addrs("127.0.0.1:54321")) - - defer r1.(*gossipRegistry).Stop() - defer r2.(*gossipRegistry).Stop() - - svc1 := ®istry.Service{Name: "service.1", Version: "0.0.0.1"} - svc2 := ®istry.Service{Name: "service.2", Version: "0.0.0.2"} - - var mu sync.Mutex - ch := make(chan struct{}) - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - go func() { - for { - select { - case <-ticker.C: - mu.Lock() - if r1 != nil { - r1.Register(svc1, registry.RegisterTTL(2*time.Second)) - } - if r2 != nil { - r2.Register(svc2, registry.RegisterTTL(2*time.Second)) - } - if ch != nil { - close(ch) - ch = nil - } - mu.Unlock() - } - } - }() - - <-ch - var found bool - - svcs, err := r2.ListServices() - if err != nil { - t.Fatal(err) - } - - for _, svc := range svcs { - if svc.Name == "service.1" { - found = true - } - } - - if !found { - t.Fatalf("[gossip registry] broadcast failed: service.1 not found in r2") - } - - if err = r1.(*gossipRegistry).Stop(); err != nil { - t.Fatalf("[gossip registry] failed to stop registry: %v", err) - } - - mu.Lock() - r1 = nil - mu.Unlock() - - <-time.After(3 * time.Second) - - found = false - svcs, err = r2.ListServices() - if err != nil { - t.Fatal(err) - } - - for _, svc := range svcs { - if svc.Name == "service.1" { - found = true - } - } - - if found { - t.Fatalf("[gossip registry] service.1 found in r2") - } - - if tr := os.Getenv("TRAVIS"); len(tr) > 0 { - t.Logf("[gossip registry] skip test on travis") - t.Skip() - return - } - - r1 = newRegistry(Config(mc1), Address("127.0.0.1:54321")) - <-time.After(2 * time.Second) - - found = false - svcs, err = r2.ListServices() - if err != nil { - t.Fatal(err) - } - - for _, svc := range svcs { - if svc.Name == "service.1" { - found = true - } - } - - if !found { - t.Fatalf("[gossip registry] connect retry failed: service.1 not found in r2") - } - - if err := r1.Deregister(svc1); err != nil { - t.Fatal(err) - } - if err := r2.Deregister(svc2); err != nil { - t.Fatal(err) - } - - r1.(*gossipRegistry).Stop() - r2.(*gossipRegistry).Stop() -} diff --git a/registry/gossip/options.go b/registry/gossip/options.go deleted file mode 100644 index cf9dc9dc..00000000 --- a/registry/gossip/options.go +++ /dev/null @@ -1,58 +0,0 @@ -package gossip - -import ( - "context" - "time" - - "github.com/hashicorp/memberlist" - "github.com/micro/go-micro/registry" -) - -type secretKey struct{} -type addressKey struct{} -type configKey struct{} -type advertiseKey struct{} -type connectTimeoutKey struct{} -type connectRetryKey struct{} - -// helper for setting registry options -func setRegistryOption(k, v interface{}) registry.Option { - return func(o *registry.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} - -// Secret specifies an encryption key. The value should be either -// 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256. -func Secret(k []byte) registry.Option { - return setRegistryOption(secretKey{}, k) -} - -// Address to bind to - host:port -func Address(a string) registry.Option { - return setRegistryOption(addressKey{}, a) -} - -// Config sets *memberlist.Config for configuring gossip -func Config(c *memberlist.Config) registry.Option { - return setRegistryOption(configKey{}, c) -} - -// The address to advertise for other gossip members to connect to - host:port -func Advertise(a string) registry.Option { - return setRegistryOption(advertiseKey{}, a) -} - -// ConnectTimeout sets the registry connect timeout. Use -1 to specify infinite timeout -func ConnectTimeout(td time.Duration) registry.Option { - return setRegistryOption(connectTimeoutKey{}, td) -} - -// ConnectRetry enables reconnect to registry then connection closed, -// use with ConnectTimeout to specify how long retry -func ConnectRetry(v bool) registry.Option { - return setRegistryOption(connectRetryKey{}, v) -} diff --git a/registry/gossip/proto/gossip.micro.go b/registry/gossip/proto/gossip.micro.go deleted file mode 100644 index fe9796d9..00000000 --- a/registry/gossip/proto/gossip.micro.go +++ /dev/null @@ -1,28 +0,0 @@ -// Code generated by protoc-gen-micro. DO NOT EDIT. -// source: github.com/micro/go-micro/registry/gossip/proto/gossip.proto - -/* -Package gossip is a generated protocol buffer package. - -It is generated from these files: - github.com/micro/go-micro/registry/gossip/proto/gossip.proto - -It has these top-level messages: - Update -*/ -package gossip - -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package diff --git a/registry/gossip/proto/gossip.pb.go b/registry/gossip/proto/gossip.pb.go deleted file mode 100644 index e5d692fc..00000000 --- a/registry/gossip/proto/gossip.pb.go +++ /dev/null @@ -1,127 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: github.com/micro/go-micro/registry/gossip/proto/gossip.proto - -package gossip - -import ( - fmt "fmt" - math "math" - - proto "github.com/golang/protobuf/proto" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - -// Update is the message broadcast -type Update struct { - // time to live for entry - Expires uint64 `protobuf:"varint,1,opt,name=expires,proto3" json:"expires,omitempty"` - // type of update - Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"` - // what action is taken - Action int32 `protobuf:"varint,3,opt,name=action,proto3" json:"action,omitempty"` - // any other associated metadata about the data - Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - // the payload data; - Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Update) Reset() { *m = Update{} } -func (m *Update) String() string { return proto.CompactTextString(m) } -func (*Update) ProtoMessage() {} -func (*Update) Descriptor() ([]byte, []int) { - return fileDescriptor_18cba623e76e57f3, []int{0} -} - -func (m *Update) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Update.Unmarshal(m, b) -} -func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Update.Marshal(b, m, deterministic) -} -func (m *Update) XXX_Merge(src proto.Message) { - xxx_messageInfo_Update.Merge(m, src) -} -func (m *Update) XXX_Size() int { - return xxx_messageInfo_Update.Size(m) -} -func (m *Update) XXX_DiscardUnknown() { - xxx_messageInfo_Update.DiscardUnknown(m) -} - -var xxx_messageInfo_Update proto.InternalMessageInfo - -func (m *Update) GetExpires() uint64 { - if m != nil { - return m.Expires - } - return 0 -} - -func (m *Update) GetType() int32 { - if m != nil { - return m.Type - } - return 0 -} - -func (m *Update) GetAction() int32 { - if m != nil { - return m.Action - } - return 0 -} - -func (m *Update) GetMetadata() map[string]string { - if m != nil { - return m.Metadata - } - return nil -} - -func (m *Update) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func init() { - proto.RegisterType((*Update)(nil), "gossip.Update") - proto.RegisterMapType((map[string]string)(nil), "gossip.Update.MetadataEntry") -} - -func init() { - proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_18cba623e76e57f3) -} - -var fileDescriptor_18cba623e76e57f3 = []byte{ - // 227 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x8f, 0xc1, 0x4a, 0x03, 0x31, - 0x14, 0x45, 0x49, 0xa7, 0x4d, 0xed, 0x53, 0x41, 0x1e, 0x22, 0x41, 0x5c, 0x0c, 0xae, 0x66, 0xe3, - 0x0c, 0xe8, 0xa6, 0xa8, 0x5b, 0x97, 0x6e, 0x02, 0x7e, 0x40, 0x3a, 0x0d, 0x31, 0xe8, 0x34, 0x21, - 0x79, 0x15, 0xf3, 0xa9, 0xfe, 0x8d, 0x34, 0x89, 0x42, 0x77, 0xe7, 0x24, 0x37, 0xdc, 0x1b, 0x78, - 0x36, 0x96, 0xde, 0xf7, 0x9b, 0x7e, 0x74, 0xd3, 0x30, 0xd9, 0x31, 0xb8, 0xc1, 0xb8, 0xbb, 0x02, - 0x41, 0x1b, 0x1b, 0x29, 0xa4, 0xc1, 0xb8, 0x18, 0xad, 0x1f, 0x7c, 0x70, 0xe4, 0xaa, 0xf4, 0x59, - 0x90, 0x17, 0xbb, 0xfd, 0x61, 0xc0, 0xdf, 0xfc, 0x56, 0x91, 0x46, 0x01, 0x4b, 0xfd, 0xed, 0x6d, - 0xd0, 0x51, 0xb0, 0x96, 0x75, 0x73, 0xf9, 0xa7, 0x88, 0x30, 0xa7, 0xe4, 0xb5, 0x98, 0xb5, 0xac, - 0x5b, 0xc8, 0xcc, 0x78, 0x05, 0x5c, 0x8d, 0x64, 0xdd, 0x4e, 0x34, 0xf9, 0xb4, 0x1a, 0xae, 0xe1, - 0x64, 0xd2, 0xa4, 0xb6, 0x8a, 0x94, 0xe0, 0x6d, 0xd3, 0x9d, 0xde, 0xdf, 0xf4, 0xb5, 0xb9, 0xf4, - 0xf4, 0xaf, 0xf5, 0xfa, 0x65, 0x47, 0x21, 0xc9, 0xff, 0xf4, 0xa1, 0x25, 0xbf, 0x5a, 0xb6, 0xac, - 0x3b, 0x93, 0x99, 0xaf, 0x9f, 0xe0, 0xfc, 0x28, 0x8e, 0x17, 0xd0, 0x7c, 0xe8, 0x94, 0x07, 0xae, - 0xe4, 0x01, 0xf1, 0x12, 0x16, 0x5f, 0xea, 0x73, 0x5f, 0xd6, 0xad, 0x64, 0x91, 0xc7, 0xd9, 0x9a, - 0x6d, 0x78, 0xfe, 0xea, 0xc3, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x63, 0x7b, 0x1b, 0x2a, - 0x01, 0x00, 0x00, -} diff --git a/registry/gossip/proto/gossip.proto b/registry/gossip/proto/gossip.proto deleted file mode 100644 index 21acaaa5..00000000 --- a/registry/gossip/proto/gossip.proto +++ /dev/null @@ -1,17 +0,0 @@ -syntax = "proto3"; - -package gossip; - -// Update is the message broadcast -message Update { - // time to live for entry - uint64 expires = 1; - // type of update - int32 type = 2; - // what action is taken - int32 action = 3; - // any other associated metadata about the data - map metadata = 6; - // the payload data; - bytes data = 7; -} diff --git a/registry/gossip/watcher.go b/registry/gossip/watcher.go deleted file mode 100644 index 57b35521..00000000 --- a/registry/gossip/watcher.go +++ /dev/null @@ -1,53 +0,0 @@ -package gossip - -import ( - "github.com/micro/go-micro/registry" -) - -type gossipWatcher struct { - wo registry.WatchOptions - next chan *registry.Result - stop chan bool -} - -func newGossipWatcher(ch chan *registry.Result, stop chan bool, opts ...registry.WatchOption) (registry.Watcher, error) { - var wo registry.WatchOptions - for _, o := range opts { - o(&wo) - } - - return &gossipWatcher{ - wo: wo, - next: ch, - stop: stop, - }, nil -} - -func (m *gossipWatcher) Next() (*registry.Result, error) { - for { - select { - case r, ok := <-m.next: - if !ok { - return nil, registry.ErrWatcherStopped - } - // check watch options - if len(m.wo.Service) > 0 && r.Service.Name != m.wo.Service { - continue - } - nr := ®istry.Result{} - *nr = *r - return nr, nil - case <-m.stop: - return nil, registry.ErrWatcherStopped - } - } -} - -func (m *gossipWatcher) Stop() { - select { - case <-m.stop: - return - default: - close(m.stop) - } -}