From b343420af671836ef7d9570d1fdea7914232a5c9 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 6 Dec 2018 18:19:05 +0000 Subject: [PATCH] update the gossiper --- cmd/cmd.go | 30 +- registry/gossip/gossip.go | 554 +++++++++++++++++++---------- registry/gossip/options.go | 17 + registry/gossip/proto/gossip.pb.go | 109 +++--- registry/gossip/proto/gossip.proto | 10 +- registry/gossip/watcher.go | 43 ++- 6 files changed, 509 insertions(+), 254 deletions(-) create mode 100644 registry/gossip/options.go diff --git a/cmd/cmd.go b/cmd/cmd.go index 4f23743d..8c1aae19 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -10,6 +10,7 @@ import ( "time" "github.com/micro/cli" + "github.com/micro/go-log" "github.com/micro/go-micro/client" "github.com/micro/go-micro/server" @@ -301,10 +302,15 @@ func (c *cmd) Before(ctx *cli.Context) error { serverOpts = append(serverOpts, server.Registry(*c.opts.Registry)) clientOpts = append(clientOpts, client.Registry(*c.opts.Registry)) - (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)) + if err := (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)); err != nil { + log.Fatalf("Error configuring registry: %v", err) + } + clientOpts = append(clientOpts, client.Selector(*c.opts.Selector)) - (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)) + if err := (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)); err != nil { + log.Fatalf("Error configuring broker: %v", err) + } } // Set the selector @@ -349,15 +355,21 @@ func (c *cmd) Before(ctx *cli.Context) error { } if len(ctx.String("broker_address")) > 0 { - (*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...)) + if err := (*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...)); err != nil { + log.Fatalf("Error configuring broker: %v", err) + } } if len(ctx.String("registry_address")) > 0 { - (*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...)) + if err := (*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...)); err != nil { + log.Fatalf("Error configuring registry: %v", err) + } } if len(ctx.String("transport_address")) > 0 { - (*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...)) + if err := (*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...)); err != nil { + log.Fatalf("Error configuring transport: %v", err) + } } if len(ctx.String("server_name")) > 0 { @@ -412,12 +424,16 @@ func (c *cmd) Before(ctx *cli.Context) error { // We have some command line opts for the server. // Lets set it up if len(serverOpts) > 0 { - (*c.opts.Server).Init(serverOpts...) + if err := (*c.opts.Server).Init(serverOpts...); err != nil { + log.Fatalf("Error configuring server: %v", err) + } } // Use an init option? if len(clientOpts) > 0 { - (*c.opts.Client).Init(clientOpts...) + if err := (*c.opts.Client).Init(clientOpts...); err != nil { + log.Fatalf("Error configuring client: %v", err) + } } return nil diff --git a/registry/gossip/gossip.go b/registry/gossip/gossip.go index 71f55a7a..99972a24 100644 --- a/registry/gossip/gossip.go +++ b/registry/gossip/gossip.go @@ -1,4 +1,4 @@ -// Package gossip provides a zero dependency registry using the gossip protocol SWIM +// Package Gossip provides a gossip registry based on hashicorp/memberlist package gossip import ( @@ -15,22 +15,13 @@ import ( "github.com/micro/go-log" "github.com/micro/go-micro/registry" pb "github.com/micro/go-micro/registry/gossip/proto" + "github.com/mitchellh/hashstructure" ) -type gossipRegistry struct { - opts registry.Options - queue *memberlist.TransmitLimitedQueue - memberlist *memberlist.Memberlist - delegate *delegate - - sync.RWMutex - services map[string][]*registry.Service - watchers map[string]*watcher -} - -var ( - // defaults to random port - defaultPort = 0 +const ( + addAction = "update" + delAction = "delete" + syncAction = "sync" ) type broadcast struct { @@ -39,8 +30,134 @@ type broadcast struct { } type delegate struct { + queue *memberlist.TransmitLimitedQueue + updates chan *update +} + +type gossipRegistry struct { queue *memberlist.TransmitLimitedQueue - registry *gossipRegistry + updates chan *update + options registry.Options + member *memberlist.Memberlist + interval time.Duration + + sync.RWMutex + services map[string][]*registry.Service + + s sync.RWMutex + watchers map[string]chan *registry.Result +} + +type update struct { + Update *pb.Update + Service *registry.Service + sync chan *registry.Service +} + +var ( + // You should change this if using secure + DefaultSecret = []byte("gossip") + ExpiryTick = time.Second * 5 +) + +func configure(g *gossipRegistry, opts ...registry.Option) error { + // loop through address list and get valid entries + addrs := func(curAddrs []string) []string { + var newAddrs []string + for _, addr := range curAddrs { + if trimAddr := strings.TrimSpace(addr); len(trimAddr) > 0 { + newAddrs = append(newAddrs, trimAddr) + } + } + return newAddrs + } + + // current address list + curAddrs := addrs(g.options.Addrs) + + // parse options + for _, o := range opts { + o(&g.options) + } + + // new address list + newAddrs := addrs(g.options.Addrs) + + // no new nodes and existing member. no configure + if (len(newAddrs) == len(curAddrs)) && g.member != nil { + return nil + } + + // shutdown old member + if g.member != nil { + g.member.Shutdown() + } + + // replace addresses + curAddrs = newAddrs + + // create a queue + queue := &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + return len(curAddrs) + }, + RetransmitMult: 3, + } + + // machine hostname + hostname, _ := os.Hostname() + + // create a new default config + c := memberlist.DefaultLocalConfig() + + // set bind to random port + c.BindPort = 0 + + // set the name + c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-") + + // set the delegate + c.Delegate = &delegate{ + updates: g.updates, + queue: queue, + } + + // log to dev null + c.LogOutput = ioutil.Discard + + // set a secret key if secure + if g.options.Secure { + k, ok := g.options.Context.Value(contextSecretKey{}).([]byte) + if !ok { + // use the default secret + k = DefaultSecret + } + c.SecretKey = k + } + + // TODO: set advertise addr to advertise behind nat + + // create the memberlist + m, err := memberlist.Create(c) + if err != nil { + return err + } + + // join the memberlist + if len(curAddrs) > 0 { + _, err := m.Join(curAddrs) + if err != nil { + return err + } + } + + // set internals + g.queue = queue + g.member = m + g.interval = c.GossipInterval + + log.Logf("Registry Listening on %s", m.LocalNode().Address()) + return nil } func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { @@ -91,56 +208,35 @@ func (d *delegate) NotifyMsg(b []byte) { return } - up := new(pb.Update) - if err := proto.Unmarshal(b, up); err != nil { - return - } - - // only process service action - if up.Type != "service" { - return - } - - var service *registry.Service - - switch up.Metadata["Content-Type"] { - case "application/json": - if err := json.Unmarshal(up.Data, &service); err != nil { + go func() { + up := new(pb.Update) + if err := proto.Unmarshal(b, up); err != nil { return } - // no other content type - default: - return - } - d.registry.Lock() - defer d.registry.Unlock() - - // get existing service - s := d.registry.services[service.Name] - - // save update - switch up.Action { - case "update": - d.registry.services[service.Name] = addServices(s, []*registry.Service{service}) - case "delete": - services := delServices(s, []*registry.Service{service}) - if len(services) == 0 { - delete(d.registry.services, service.Name) + // only process service action + if up.Type != "service" { return } - d.registry.services[service.Name] = services - default: - return - } - // notify watchers - for _, w := range d.registry.watchers { - select { - case w.ch <- ®istry.Result{Action: up.Action, Service: service}: + var service *registry.Service + + switch up.Metadata["Content-Type"] { + case "application/json": + if err := json.Unmarshal(up.Data, &service); err != nil { + return + } + // no other content type default: + return } - } + + // send update + d.updates <- &update{ + Update: up, + Service: service, + } + }() } func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { @@ -148,9 +244,25 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { } func (d *delegate) LocalState(join bool) []byte { - d.registry.RLock() - b, _ := json.Marshal(d.registry.services) - d.registry.RUnlock() + if !join { + return []byte{} + } + + syncCh := make(chan *registry.Service, 1) + services := map[string][]*registry.Service{} + + d.updates <- &update{ + Update: &pb.Update{ + Action: syncAction, + }, + sync: syncCh, + } + + for srv := range syncCh { + services[srv.Name] = append(services[srv.Name], srv) + } + + b, _ := json.Marshal(services) return b } @@ -161,37 +273,164 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) { if !join { return } + var services map[string][]*registry.Service if err := json.Unmarshal(buf, &services); err != nil { return } - d.registry.Lock() - for k, v := range services { - d.registry.services[k] = addServices(d.registry.services[k], v) + for _, service := range services { + for _, srv := range service { + d.updates <- &update{ + Update: &pb.Update{Action: addAction}, + Service: srv, + sync: nil, + } + } + } +} + +func (g *gossipRegistry) publish(action string, services []*registry.Service) { + g.s.RLock() + for _, sub := range g.watchers { + go func(sub chan *registry.Result) { + for _, service := range services { + sub <- ®istry.Result{Action: action, Service: service} + } + }(sub) + } + g.s.RUnlock() +} + +func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) { + next := make(chan *registry.Result, 10) + exit := make(chan bool) + + id := uuid.New().String() + + g.s.Lock() + g.watchers[id] = next + g.s.Unlock() + + go func() { + <-exit + g.s.Lock() + delete(g.watchers, id) + close(next) + g.s.Unlock() + }() + + return next, exit +} + +func (g *gossipRegistry) run() { + var mtx sync.Mutex + updates := map[uint64]*update{} + + // expiry loop + go func() { + t := time.NewTicker(ExpiryTick) + defer t.Stop() + + for _ = range t.C { + now := uint64(time.Now().UnixNano()) + + mtx.Lock() + + // process all the updates + for k, v := range updates { + // check if expiry time has passed + if d := (v.Update.Timestamp + v.Update.Expires); d < now { + // delete from records + delete(updates, k) + // set to delete + v.Update.Action = delAction + // fire a new update + g.updates <- v + } + } + + mtx.Unlock() + } + }() + + // process the updates + for u := range g.updates { + switch u.Update.Action { + case addAction: + g.Lock() + if service, ok := g.services[u.Service.Name]; !ok { + g.services[u.Service.Name] = []*registry.Service{u.Service} + + } else { + g.services[u.Service.Name] = addServices(service, []*registry.Service{u.Service}) + } + g.Unlock() + + // publish update to watchers + go g.publish(addAction, []*registry.Service{u.Service}) + + // we need to expire the node at some point in the future + if u.Update.Expires > 0 { + // create a hash of this service + if hash, err := hashstructure.Hash(u.Service, nil); err == nil { + mtx.Lock() + updates[hash] = u + mtx.Unlock() + } + } + case delAction: + g.Lock() + if service, ok := g.services[u.Service.Name]; ok { + if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 { + delete(g.services, u.Service.Name) + } else { + g.services[u.Service.Name] = services + } + } + g.Unlock() + + // publish update to watchers + go g.publish(delAction, []*registry.Service{u.Service}) + + // delete from expiry checks + if hash, err := hashstructure.Hash(u.Service, nil); err == nil { + mtx.Lock() + delete(updates, hash) + mtx.Unlock() + } + case syncAction: + // no sync channel provided + if u.sync == nil { + continue + } + + g.RLock() + + // push all services through the sync chan + for _, service := range g.services { + for _, srv := range service { + u.sync <- srv + } + + // publish to watchers + go g.publish(addAction, service) + } + + g.RUnlock() + + // close the sync chan + close(u.sync) + } } - d.registry.Unlock() } func (g *gossipRegistry) Init(opts ...registry.Option) error { - addrs := g.opts.Addrs - for _, o := range opts { - o(&g.opts) - } - - // if we have memberlist join it - if len(addrs) != len(g.opts.Addrs) { - _, err := g.memberlist.Join(g.opts.Addrs) - if err != nil { - return err - } - } - - return nil + return configure(g, opts...) } func (g *gossipRegistry) Options() registry.Options { - return g.opts + return g.options } func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { @@ -201,12 +440,22 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register } g.Lock() - g.services[s.Name] = addServices(g.services[s.Name], []*registry.Service{s}) + if service, ok := g.services[s.Name]; !ok { + g.services[s.Name] = []*registry.Service{s} + } else { + g.services[s.Name] = addServices(service, []*registry.Service{s}) + } g.Unlock() + var options registry.RegisterOptions + for _, o := range opts { + o(&options) + } + up := &pb.Update{ Id: uuid.New().String(), Timestamp: uint64(time.Now().UnixNano()), + Expires: uint64(options.TTL.Nanoseconds()), Action: "update", Type: "service", Metadata: map[string]string{ @@ -220,6 +469,9 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register notify: nil, }) + // wait + <-time.After(g.interval * 2) + return nil } @@ -230,7 +482,13 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error { } g.Lock() - g.services[s.Name] = delServices(g.services[s.Name], []*registry.Service{s}) + if service, ok := g.services[s.Name]; ok { + if services := delServices(service, []*registry.Service{s}); len(services) == 0 { + delete(g.services, s.Name) + } else { + g.services[s.Name] = services + } + } g.Unlock() up := &pb.Update{ @@ -249,135 +507,59 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error { notify: nil, }) + // wait + <-time.After(g.interval * 2) + return nil } func (g *gossipRegistry) GetService(name string) ([]*registry.Service, error) { g.RLock() - if s, ok := g.services[name]; ok { - service := cp(s) - g.RUnlock() - return service, nil - } + service, ok := g.services[name] g.RUnlock() - return nil, registry.ErrNotFound + if !ok { + return nil, registry.ErrNotFound + } + return service, nil } func (g *gossipRegistry) ListServices() ([]*registry.Service, error) { var services []*registry.Service g.RLock() - for name, _ := range g.services { - services = append(services, ®istry.Service{Name: name}) + for _, service := range g.services { + services = append(services, service...) } g.RUnlock() return services, nil } func (g *gossipRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { - var options registry.WatchOptions - for _, o := range opts { - o(&options) - } - - // watcher id - id := uuid.New().String() - - // create watcher - w := &watcher{ - ch: make(chan *registry.Result, 1), - exit: make(chan bool), - id: id, - // filter service - srv: options.Service, - // delete self - fn: func() { - g.Lock() - delete(g.watchers, id) - g.Unlock() - }, - } - - // save watcher - g.Lock() - g.watchers[w.id] = w - g.Unlock() - - return w, nil + n, e := g.subscribe() + return newGossipWatcher(n, e, opts...) } func (g *gossipRegistry) String() string { return "gossip" } -func (g *gossipRegistry) run() error { - hostname, _ := os.Hostname() - - // delegates - d := new(delegate) - - // create a new default config - c := memberlist.DefaultLocalConfig() - - // assign the delegate - c.Delegate = d - - // Set the bind port - c.BindPort = defaultPort - - // set the name - c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-") - - // log to dev null - c.LogOutput = ioutil.Discard - - // TODO: set advertise addr to advertise behind nat - - // create the memberlist - m, err := memberlist.Create(c) - if err != nil { - return err - } - - // if we have memberlist join it - if len(g.opts.Addrs) > 0 { - _, err := m.Join(g.opts.Addrs) - if err != nil { - return err - } - } - - // Set the broadcast limit and number of nodes - d.queue = &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - return m.NumMembers() - }, - RetransmitMult: 3, - } - - g.queue = d.queue - g.memberlist = m - g.delegate = d - d.registry = g - - return nil -} - -// NewRegistry returns a new gossip registry func NewRegistry(opts ...registry.Option) registry.Registry { - var options registry.Options - for _, o := range opts { - o(&options) - } - - g := &gossipRegistry{ - opts: options, + gossip := &gossipRegistry{ + options: registry.Options{}, + updates: make(chan *update, 100), services: make(map[string][]*registry.Service), - } - if err := g.run(); err != nil { - log.Fatal(err) + watchers: make(map[string]chan *registry.Result), } - log.Logf("Registry Listening on %s", g.memberlist.LocalNode().Address()) - // return gossip registry - return g + // configure the gossiper + if err := configure(gossip, opts...); err != nil { + log.Fatal("Error configuring registry: %v", err) + } + + // run the updater + go gossip.run() + + // wait for setup + <-time.After(gossip.interval * 2) + + return gossip } diff --git a/registry/gossip/options.go b/registry/gossip/options.go new file mode 100644 index 00000000..26e8cd9f --- /dev/null +++ b/registry/gossip/options.go @@ -0,0 +1,17 @@ +package gossip + +import ( + "context" + + "github.com/micro/go-micro/registry" +) + +type contextSecretKey struct{} + +// Secret specifies an encryption key. The value should be either +// 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256. +func Secret(k []byte) registry.Option { + return func(o *registry.Options) { + o.Context = context.WithValue(o.Context, contextSecretKey{}, k) + } +} diff --git a/registry/gossip/proto/gossip.pb.go b/registry/gossip/proto/gossip.pb.go index 8f2d651c..89bdca55 100644 --- a/registry/gossip/proto/gossip.pb.go +++ b/registry/gossip/proto/gossip.pb.go @@ -1,20 +1,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // source: github.com/micro/go-micro/registry/gossip/proto/gossip.proto -/* -Package gossip is a generated protocol buffer package. - -It is generated from these files: - github.com/micro/go-micro/registry/gossip/proto/gossip.proto - -It has these top-level messages: - Update -*/ package gossip -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -25,28 +18,53 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package // Update is the message broadcast type Update struct { // unique id of update - Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // unix nano timestamp of update - Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp" json:"timestamp,omitempty"` + Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // time to live for entry + Expires uint64 `protobuf:"varint,3,opt,name=expires,proto3" json:"expires,omitempty"` // type of update; service - Type string `protobuf:"bytes,3,opt,name=type" json:"type,omitempty"` + Type string `protobuf:"bytes,4,opt,name=type,proto3" json:"type,omitempty"` // what action is taken; add, del, put - Action string `protobuf:"bytes,4,opt,name=action" json:"action,omitempty"` + Action string `protobuf:"bytes,5,opt,name=action,proto3" json:"action,omitempty"` // any other associated metadata about the data - Metadata map[string]string `protobuf:"bytes,5,rep,name=metadata" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // the payload data; - Data []byte `protobuf:"bytes,6,opt,name=data,proto3" json:"data,omitempty"` + Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } -func (m *Update) Reset() { *m = Update{} } -func (m *Update) String() string { return proto.CompactTextString(m) } -func (*Update) ProtoMessage() {} -func (*Update) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (m *Update) Reset() { *m = Update{} } +func (m *Update) String() string { return proto.CompactTextString(m) } +func (*Update) ProtoMessage() {} +func (*Update) Descriptor() ([]byte, []int) { + return fileDescriptor_18cba623e76e57f3, []int{0} +} + +func (m *Update) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Update.Unmarshal(m, b) +} +func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Update.Marshal(b, m, deterministic) +} +func (m *Update) XXX_Merge(src proto.Message) { + xxx_messageInfo_Update.Merge(m, src) +} +func (m *Update) XXX_Size() int { + return xxx_messageInfo_Update.Size(m) +} +func (m *Update) XXX_DiscardUnknown() { + xxx_messageInfo_Update.DiscardUnknown(m) +} + +var xxx_messageInfo_Update proto.InternalMessageInfo func (m *Update) GetId() string { if m != nil { @@ -62,6 +80,13 @@ func (m *Update) GetTimestamp() uint64 { return 0 } +func (m *Update) GetExpires() uint64 { + if m != nil { + return m.Expires + } + return 0 +} + func (m *Update) GetType() string { if m != nil { return m.Type @@ -92,27 +117,29 @@ func (m *Update) GetData() []byte { func init() { proto.RegisterType((*Update)(nil), "gossip.Update") + proto.RegisterMapType((map[string]string)(nil), "gossip.Update.MetadataEntry") } func init() { - proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor0) + proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_18cba623e76e57f3) } -var fileDescriptor0 = []byte{ - // 237 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x8f, 0xcf, 0x4a, 0xc4, 0x30, - 0x10, 0xc6, 0x49, 0xdb, 0x0d, 0x76, 0xfc, 0x83, 0x0c, 0x22, 0x41, 0xf6, 0x50, 0x3c, 0xf5, 0x62, - 0x0b, 0x7a, 0x59, 0xd4, 0xab, 0x47, 0x2f, 0x01, 0x1f, 0x20, 0xdb, 0x86, 0x1a, 0x34, 0x9b, 0x90, - 0xce, 0x0a, 0x7d, 0x68, 0xdf, 0x41, 0x9a, 0x04, 0xc5, 0xdb, 0xef, 0x37, 0xf9, 0xc2, 0x7c, 0x03, - 0xcf, 0x93, 0xa1, 0xf7, 0xe3, 0xbe, 0x1b, 0x9c, 0xed, 0xad, 0x19, 0x82, 0xeb, 0x27, 0x77, 0x97, - 0x20, 0xe8, 0xc9, 0xcc, 0x14, 0x96, 0x7e, 0x72, 0xf3, 0x6c, 0x7c, 0xef, 0x83, 0x23, 0x97, 0xa5, - 0x8b, 0x82, 0x3c, 0xd9, 0xed, 0x37, 0x03, 0xfe, 0xe6, 0x47, 0x45, 0x1a, 0x2f, 0xa0, 0x30, 0xa3, - 0x60, 0x0d, 0x6b, 0x6b, 0x59, 0x98, 0x11, 0xb7, 0x50, 0x93, 0xb1, 0x7a, 0x26, 0x65, 0xbd, 0x28, - 0x1a, 0xd6, 0x56, 0xf2, 0x6f, 0x80, 0x08, 0x15, 0x2d, 0x5e, 0x8b, 0x32, 0xe6, 0x23, 0xe3, 0x35, - 0x70, 0x35, 0x90, 0x71, 0x07, 0x51, 0xc5, 0x69, 0x36, 0xdc, 0xc1, 0x89, 0xd5, 0xa4, 0x46, 0x45, - 0x4a, 0x6c, 0x9a, 0xb2, 0x3d, 0xbd, 0xdf, 0x76, 0xb9, 0x4d, 0xda, 0xdd, 0xbd, 0xe6, 0xe7, 0x97, - 0x03, 0x85, 0x45, 0xfe, 0xa6, 0xd7, 0x2d, 0xf1, 0x17, 0x6f, 0x58, 0x7b, 0x26, 0x23, 0xdf, 0x3c, - 0xc1, 0xf9, 0xbf, 0x38, 0x5e, 0x42, 0xf9, 0xa1, 0x97, 0xdc, 0x7c, 0x45, 0xbc, 0x82, 0xcd, 0x97, - 0xfa, 0x3c, 0xea, 0x58, 0xbb, 0x96, 0x49, 0x1e, 0x8b, 0x1d, 0xdb, 0xf3, 0x78, 0xfe, 0xc3, 0x4f, - 0x00, 0x00, 0x00, 0xff, 0xff, 0xf0, 0x49, 0xa9, 0xd7, 0x3e, 0x01, 0x00, 0x00, +var fileDescriptor_18cba623e76e57f3 = []byte{ + // 251 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0xcf, 0x4a, 0xc4, 0x30, + 0x10, 0x87, 0x69, 0xb6, 0x9b, 0xb5, 0xe3, 0x1f, 0x64, 0x10, 0x09, 0xb2, 0x87, 0xe2, 0xa9, 0x17, + 0x5b, 0xd0, 0xcb, 0xa2, 0x5e, 0x3d, 0x7a, 0x09, 0xf8, 0x00, 0xd9, 0x36, 0xd4, 0xa0, 0xd9, 0x84, + 0x64, 0x56, 0xec, 0x13, 0xf8, 0xda, 0xb2, 0x69, 0x54, 0xbc, 0x7d, 0xdf, 0xcc, 0x24, 0x99, 0x5f, + 0xe0, 0x71, 0x34, 0xf4, 0xba, 0xdf, 0xb6, 0xbd, 0xb3, 0x9d, 0x35, 0x7d, 0x70, 0xdd, 0xe8, 0x6e, + 0x66, 0x08, 0x7a, 0x34, 0x91, 0xc2, 0xd4, 0x8d, 0x2e, 0x46, 0xe3, 0x3b, 0x1f, 0x1c, 0xb9, 0x2c, + 0x6d, 0x12, 0xe4, 0xb3, 0x5d, 0x7f, 0x31, 0xe0, 0x2f, 0x7e, 0x50, 0xa4, 0xf1, 0x0c, 0x98, 0x19, + 0x44, 0x51, 0x17, 0x4d, 0x25, 0x99, 0x19, 0x70, 0x0d, 0x15, 0x19, 0xab, 0x23, 0x29, 0xeb, 0x05, + 0xab, 0x8b, 0xa6, 0x94, 0x7f, 0x05, 0x14, 0xb0, 0xd2, 0x9f, 0xde, 0x04, 0x1d, 0xc5, 0x22, 0xf5, + 0x7e, 0x14, 0x11, 0x4a, 0x9a, 0xbc, 0x16, 0x65, 0xba, 0x29, 0x31, 0x5e, 0x02, 0x57, 0x3d, 0x19, + 0xb7, 0x13, 0xcb, 0x54, 0xcd, 0x86, 0x1b, 0x38, 0xb2, 0x9a, 0xd4, 0xa0, 0x48, 0x09, 0x5e, 0x2f, + 0x9a, 0xe3, 0xdb, 0x75, 0x9b, 0xf7, 0x9c, 0xb7, 0x6a, 0x9f, 0x73, 0xfb, 0x69, 0x47, 0x61, 0x92, + 0xbf, 0xd3, 0x87, 0x57, 0xd2, 0xa9, 0x55, 0x5d, 0x34, 0x27, 0x32, 0xf1, 0xd5, 0x03, 0x9c, 0xfe, + 0x1b, 0xc7, 0x73, 0x58, 0xbc, 0xe9, 0x29, 0x67, 0x3a, 0x20, 0x5e, 0xc0, 0xf2, 0x43, 0xbd, 0xef, + 0x75, 0x0a, 0x54, 0xc9, 0x59, 0xee, 0xd9, 0xa6, 0xd8, 0xf2, 0xf4, 0x31, 0x77, 0xdf, 0x01, 0x00, + 0x00, 0xff, 0xff, 0x06, 0x6e, 0x00, 0x3c, 0x58, 0x01, 0x00, 0x00, } diff --git a/registry/gossip/proto/gossip.proto b/registry/gossip/proto/gossip.proto index 75e39cd0..5ab9fd39 100644 --- a/registry/gossip/proto/gossip.proto +++ b/registry/gossip/proto/gossip.proto @@ -8,12 +8,14 @@ message Update { string id = 1; // unix nano timestamp of update uint64 timestamp = 2; + // time to live for entry + uint64 expires = 3; // type of update; service - string type = 3; + string type = 4; // what action is taken; add, del, put - string action = 4; + string action = 5; // any other associated metadata about the data - map metadata = 5; + map metadata = 6; // the payload data; - bytes data = 6; + bytes data = 7; } diff --git a/registry/gossip/watcher.go b/registry/gossip/watcher.go index b936ab2f..a43d875e 100644 --- a/registry/gossip/watcher.go +++ b/registry/gossip/watcher.go @@ -4,37 +4,48 @@ import ( "github.com/micro/go-micro/registry" ) -type watcher struct { - id string - srv string - ch chan *registry.Result - exit chan bool - fn func() +type gossipWatcher struct { + wo registry.WatchOptions + next chan *registry.Result + stop chan bool } -func (w *watcher) Next() (*registry.Result, error) { +func newGossipWatcher(ch chan *registry.Result, stop chan bool, opts ...registry.WatchOption) (registry.Watcher, error) { + var wo registry.WatchOptions + for _, o := range opts { + o(&wo) + } + + return &gossipWatcher{ + wo: wo, + next: ch, + stop: stop, + }, nil +} + +func (m *gossipWatcher) Next() (*registry.Result, error) { for { select { - case r := <-w.ch: - if r.Service == nil { - continue + case r, ok := <-m.next: + if !ok { + return nil, registry.ErrWatcherStopped } - if len(w.srv) > 0 && (r.Service.Name != w.srv) { + // check watch options + if len(m.wo.Service) > 0 && r.Service.Name != m.wo.Service { continue } return r, nil - case <-w.exit: + case <-m.stop: return nil, registry.ErrWatcherStopped } } } -func (w *watcher) Stop() { +func (m *gossipWatcher) Stop() { select { - case <-w.exit: + case <-m.stop: return default: - close(w.exit) - w.fn() + close(m.stop) } }