Merge pull request #790 from milosgajdos83/memreg-ttl

[WIP] Memory registry TTL expiry
This commit is contained in:
Asim Aslam 2019-09-30 15:35:57 +01:00 committed by GitHub
commit 57647772c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 293 additions and 76 deletions

View File

@ -2,6 +2,7 @@ package handler
import ( import (
"context" "context"
"time"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
@ -26,10 +27,17 @@ func (r *Registry) GetService(ctx context.Context, req *pb.GetRequest, rsp *pb.G
} }
func (r *Registry) Register(ctx context.Context, req *pb.Service, rsp *pb.EmptyResponse) error { func (r *Registry) Register(ctx context.Context, req *pb.Service, rsp *pb.EmptyResponse) error {
err := r.Registry.Register(service.ToService(req)) var regOpts []registry.RegisterOption
if req.Options != nil {
ttl := time.Duration(req.Options.Ttl) * time.Second
regOpts = append(regOpts, registry.RegisterTTL(ttl))
}
err := r.Registry.Register(service.ToService(req), regOpts...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.registry", err.Error()) return errors.InternalServerError("go.micro.registry", err.Error())
} }
return nil return nil
} }

View File

@ -3,22 +3,33 @@ package memory
import ( import (
"context" "context"
"strings"
"sync" "sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/util/log"
) )
var ( var (
timeout = time.Millisecond * 10 sendEventTime = 10 * time.Millisecond
ttlPruneTime = 1 * time.Minute
DefaultTTL = 1 * time.Minute
) )
// node tracks node registration timestamp and TTL
type node struct {
lastSeen time.Time
ttl time.Duration
}
type Registry struct { type Registry struct {
options registry.Options options registry.Options
sync.RWMutex sync.RWMutex
Services map[string][]*registry.Service Services map[string][]*registry.Service
nodes map[string]*node
Watchers map[string]*Watcher Watchers map[string]*Watcher
} }
@ -36,11 +47,67 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
services = make(map[string][]*registry.Service) services = make(map[string][]*registry.Service)
} }
return &Registry{ reg := &Registry{
options: options, options: options,
Services: services, Services: services,
nodes: make(map[string]*node),
Watchers: make(map[string]*Watcher), Watchers: make(map[string]*Watcher),
} }
go reg.ttlPrune()
return reg
}
// nodeTrackId returns a string we use to track a node of a given service
func nodeTrackId(svcName, svcVersion, nodeId string) string {
return svcName + "+" + svcVersion + "+" + nodeId
}
func (m *Registry) ttlPrune() {
prune := time.NewTicker(ttlPruneTime)
defer prune.Stop()
for {
select {
case <-prune.C:
m.Lock()
for nodeTrackId, node := range m.nodes {
// if the TTL has been set and we exceed the hresholdset by it we stop tracking the node
if node.ttl.Seconds() != 0.0 && time.Since(node.lastSeen) > node.ttl {
// split nodeTrackID into service Name, Version and Node Id
trackIdSplit := strings.Split(nodeTrackId, "+")
svcName, svcVersion, nodeId := trackIdSplit[0], trackIdSplit[1], trackIdSplit[2]
log.Debugf("Registry TTL expired for service %s, node %s", svcName, nodeId)
// we need to find a node that expired and delete it from service nodes
if _, ok := m.Services[svcName]; ok {
for _, service := range m.Services[svcName] {
if service.Version != svcVersion {
continue
}
// find expired service node and delete it
var nodes []*registry.Node
for _, n := range service.Nodes {
var del bool
if n.Id == nodeId {
del = true
}
if !del {
nodes = append(nodes, n)
}
}
service.Nodes = nodes
}
}
// stop tracking the node
delete(m.nodes, nodeTrackId)
}
}
m.Unlock()
}
}
return
} }
func (m *Registry) sendEvent(r *registry.Result) { func (m *Registry) sendEvent(r *registry.Result) {
@ -61,7 +128,7 @@ func (m *Registry) sendEvent(r *registry.Result) {
default: default:
select { select {
case w.res <- r: case w.res <- r:
case <-time.After(timeout): case <-time.After(sendEventTime):
} }
} }
} }
@ -111,33 +178,75 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
log.Debugf("Registry deregistering service: %s", s.Name)
var options registry.RegisterOptions
for _, o := range opts {
o(&options)
}
if service, ok := m.Services[s.Name]; !ok { if service, ok := m.Services[s.Name]; !ok {
m.Services[s.Name] = []*registry.Service{s} m.Services[s.Name] = []*registry.Service{s}
// add all nodes into nodes map to track their TTL
for _, n := range s.Nodes {
log.Debugf("Registry tracking new service: %s, node %s", s.Name, n.Id)
m.nodes[nodeTrackId(s.Name, s.Version, n.Id)] = &node{
lastSeen: time.Now(),
ttl: options.TTL,
}
}
go m.sendEvent(&registry.Result{Action: "update", Service: s}) go m.sendEvent(&registry.Result{Action: "update", Service: s})
return nil
} else { } else {
svcCount := len(service) // svcCount keeps the count of all versions of particular service
svcNodeCounts := make(map[string]map[string]int) //svcCount := len(service)
// svcNodes maintains a list of node Ids per particular service version
svcNodes := make(map[string]map[string][]string)
// collect all service ids for all service versions
for _, s := range service { for _, s := range service {
if _, ok := svcNodeCounts[s.Name]; !ok { if _, ok := svcNodes[s.Name]; !ok {
svcNodeCounts[s.Name] = make(map[string]int) svcNodes[s.Name] = make(map[string][]string)
} }
if _, ok := svcNodeCounts[s.Name][s.Version]; !ok { if _, ok := svcNodes[s.Name][s.Version]; !ok {
svcNodeCounts[s.Name][s.Version] = len(s.Nodes) for _, n := range s.Nodes {
svcNodes[s.Name][s.Version] = append(svcNodes[s.Name][s.Version], n.Id)
} }
} }
// if merged count and original service counts changed we added new version of the service }
// if merged count and original service counts changed we know we are adding a new version of the service
merged := registry.Merge(service, []*registry.Service{s}) merged := registry.Merge(service, []*registry.Service{s})
if len(merged) != svcCount { // if the node count of any service [version] changed we know we are adding a new node to the service
for _, s := range merged {
// we know that if the node counts have changed we need to track new nodes
if len(s.Nodes) != len(svcNodes[s.Name][s.Version]) {
for _, n := range s.Nodes {
var found bool
for _, id := range svcNodes[s.Name][s.Version] {
if n.Id == id {
found = true
break
}
}
if !found {
log.Debugf("Registry tracking new node: %s for service %s", n.Id, s.Name)
m.nodes[nodeTrackId(s.Name, s.Version, n.Id)] = &node{
lastSeen: time.Now(),
ttl: options.TTL,
}
}
}
m.Services[s.Name] = merged m.Services[s.Name] = merged
go m.sendEvent(&registry.Result{Action: "update", Service: s}) go m.sendEvent(&registry.Result{Action: "update", Service: s})
return nil return nil
} }
// if the node count for a particular service has changed we added a new node to the service // refresh the timestamp and TTL of the service node
for _, s := range merged { for _, n := range s.Nodes {
if len(s.Nodes) != svcNodeCounts[s.Name][s.Version] { trackId := nodeTrackId(s.Name, s.Version, n.Id)
m.Services[s.Name] = merged log.Debugf("Registry refreshing TTL for node %s for service %s", n.Id, s.Name)
go m.sendEvent(&registry.Result{Action: "update", Service: s}) if trackedNode, ok := m.nodes[trackId]; ok {
return nil trackedNode.lastSeen = time.Now()
trackedNode.ttl = options.TTL
}
} }
} }
} }
@ -149,12 +258,51 @@ func (m *Registry) Deregister(s *registry.Service) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
log.Debugf("Registry deregistering service: %s", s.Name)
if service, ok := m.Services[s.Name]; ok { if service, ok := m.Services[s.Name]; ok {
// svcNodes collects the list of all node Ids for each service version
svcNodes := make(map[string]map[string][]string)
// collect all service node ids for all service versions
for _, svc := range service {
if _, ok := svcNodes[svc.Name]; !ok {
svcNodes[svc.Name] = make(map[string][]string)
}
if _, ok := svcNodes[svc.Name][svc.Version]; !ok {
for _, n := range svc.Nodes {
svcNodes[svc.Name][svc.Version] = append(svcNodes[svc.Name][svc.Version], n.Id)
}
}
}
// if there are no more services we know we have either removed all nodes or there were no nodes
if updatedService := registry.Remove(service, []*registry.Service{s}); len(updatedService) == 0 {
for _, id := range svcNodes[s.Name][s.Version] {
log.Debugf("Registry stopped tracking node %s for service %s", id, s.Name)
delete(m.nodes, nodeTrackId(s.Name, s.Version, id))
go m.sendEvent(&registry.Result{Action: "delete", Service: s}) go m.sendEvent(&registry.Result{Action: "delete", Service: s})
if service := registry.Remove(service, []*registry.Service{s}); len(service) == 0 { }
log.Debugf("Registry deleting service %s: no service nodes", s.Name)
delete(m.Services, s.Name) delete(m.Services, s.Name)
return nil
} else { } else {
m.Services[s.Name] = service // find out which nodes have been removed
for _, id := range svcNodes[s.Name][s.Version] {
for _, svc := range updatedService {
var found bool
for _, n := range svc.Nodes {
if id == n.Id {
found = true
break
}
}
if !found {
log.Debugf("Registry stopped tracking node %s for service %s", id, s.Name)
delete(m.nodes, nodeTrackId(s.Name, s.Version, id))
go m.sendEvent(&registry.Result{Action: "delete", Service: s})
}
}
m.Services[s.Name] = updatedService
}
} }
} }

View File

@ -56,6 +56,7 @@ type Service struct {
Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Endpoints []*Endpoint `protobuf:"bytes,4,rep,name=endpoints,proto3" json:"endpoints,omitempty"` Endpoints []*Endpoint `protobuf:"bytes,4,rep,name=endpoints,proto3" json:"endpoints,omitempty"`
Nodes []*Node `protobuf:"bytes,5,rep,name=nodes,proto3" json:"nodes,omitempty"` Nodes []*Node `protobuf:"bytes,5,rep,name=nodes,proto3" json:"nodes,omitempty"`
Options *Options `protobuf:"bytes,6,opt,name=options,proto3" json:"options,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -121,6 +122,13 @@ func (m *Service) GetNodes() []*Node {
return nil return nil
} }
func (m *Service) GetOptions() *Options {
if m != nil {
return m.Options
}
return nil
}
// Node represents the node the service is on // Node represents the node the service is on
type Node struct { type Node struct {
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
@ -305,6 +313,46 @@ func (m *Value) GetValues() []*Value {
return nil return nil
} }
// Options are registry options
type Options struct {
Ttl int64 `protobuf:"varint,1,opt,name=ttl,proto3" json:"ttl,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Options) Reset() { *m = Options{} }
func (m *Options) String() string { return proto.CompactTextString(m) }
func (*Options) ProtoMessage() {}
func (*Options) Descriptor() ([]byte, []int) {
return fileDescriptor_41af05d40a615591, []int{4}
}
func (m *Options) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Options.Unmarshal(m, b)
}
func (m *Options) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Options.Marshal(b, m, deterministic)
}
func (m *Options) XXX_Merge(src proto.Message) {
xxx_messageInfo_Options.Merge(m, src)
}
func (m *Options) XXX_Size() int {
return xxx_messageInfo_Options.Size(m)
}
func (m *Options) XXX_DiscardUnknown() {
xxx_messageInfo_Options.DiscardUnknown(m)
}
var xxx_messageInfo_Options proto.InternalMessageInfo
func (m *Options) GetTtl() int64 {
if m != nil {
return m.Ttl
}
return 0
}
// Result is returns by the watcher // Result is returns by the watcher
type Result struct { type Result struct {
Action string `protobuf:"bytes,1,opt,name=action,proto3" json:"action,omitempty"` Action string `protobuf:"bytes,1,opt,name=action,proto3" json:"action,omitempty"`
@ -319,7 +367,7 @@ func (m *Result) Reset() { *m = Result{} }
func (m *Result) String() string { return proto.CompactTextString(m) } func (m *Result) String() string { return proto.CompactTextString(m) }
func (*Result) ProtoMessage() {} func (*Result) ProtoMessage() {}
func (*Result) Descriptor() ([]byte, []int) { func (*Result) Descriptor() ([]byte, []int) {
return fileDescriptor_41af05d40a615591, []int{4} return fileDescriptor_41af05d40a615591, []int{5}
} }
func (m *Result) XXX_Unmarshal(b []byte) error { func (m *Result) XXX_Unmarshal(b []byte) error {
@ -371,7 +419,7 @@ func (m *EmptyResponse) Reset() { *m = EmptyResponse{} }
func (m *EmptyResponse) String() string { return proto.CompactTextString(m) } func (m *EmptyResponse) String() string { return proto.CompactTextString(m) }
func (*EmptyResponse) ProtoMessage() {} func (*EmptyResponse) ProtoMessage() {}
func (*EmptyResponse) Descriptor() ([]byte, []int) { func (*EmptyResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_41af05d40a615591, []int{5} return fileDescriptor_41af05d40a615591, []int{6}
} }
func (m *EmptyResponse) XXX_Unmarshal(b []byte) error { func (m *EmptyResponse) XXX_Unmarshal(b []byte) error {
@ -403,7 +451,7 @@ func (m *GetRequest) Reset() { *m = GetRequest{} }
func (m *GetRequest) String() string { return proto.CompactTextString(m) } func (m *GetRequest) String() string { return proto.CompactTextString(m) }
func (*GetRequest) ProtoMessage() {} func (*GetRequest) ProtoMessage() {}
func (*GetRequest) Descriptor() ([]byte, []int) { func (*GetRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_41af05d40a615591, []int{6} return fileDescriptor_41af05d40a615591, []int{7}
} }
func (m *GetRequest) XXX_Unmarshal(b []byte) error { func (m *GetRequest) XXX_Unmarshal(b []byte) error {
@ -442,7 +490,7 @@ func (m *GetResponse) Reset() { *m = GetResponse{} }
func (m *GetResponse) String() string { return proto.CompactTextString(m) } func (m *GetResponse) String() string { return proto.CompactTextString(m) }
func (*GetResponse) ProtoMessage() {} func (*GetResponse) ProtoMessage() {}
func (*GetResponse) Descriptor() ([]byte, []int) { func (*GetResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_41af05d40a615591, []int{7} return fileDescriptor_41af05d40a615591, []int{8}
} }
func (m *GetResponse) XXX_Unmarshal(b []byte) error { func (m *GetResponse) XXX_Unmarshal(b []byte) error {
@ -480,7 +528,7 @@ func (m *ListRequest) Reset() { *m = ListRequest{} }
func (m *ListRequest) String() string { return proto.CompactTextString(m) } func (m *ListRequest) String() string { return proto.CompactTextString(m) }
func (*ListRequest) ProtoMessage() {} func (*ListRequest) ProtoMessage() {}
func (*ListRequest) Descriptor() ([]byte, []int) { func (*ListRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_41af05d40a615591, []int{8} return fileDescriptor_41af05d40a615591, []int{9}
} }
func (m *ListRequest) XXX_Unmarshal(b []byte) error { func (m *ListRequest) XXX_Unmarshal(b []byte) error {
@ -512,7 +560,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {} func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) { func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_41af05d40a615591, []int{9} return fileDescriptor_41af05d40a615591, []int{10}
} }
func (m *ListResponse) XXX_Unmarshal(b []byte) error { func (m *ListResponse) XXX_Unmarshal(b []byte) error {
@ -552,7 +600,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} }
func (m *WatchRequest) String() string { return proto.CompactTextString(m) } func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {} func (*WatchRequest) ProtoMessage() {}
func (*WatchRequest) Descriptor() ([]byte, []int) { func (*WatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_41af05d40a615591, []int{10} return fileDescriptor_41af05d40a615591, []int{11}
} }
func (m *WatchRequest) XXX_Unmarshal(b []byte) error { func (m *WatchRequest) XXX_Unmarshal(b []byte) error {
@ -599,7 +647,7 @@ func (m *Event) Reset() { *m = Event{} }
func (m *Event) String() string { return proto.CompactTextString(m) } func (m *Event) String() string { return proto.CompactTextString(m) }
func (*Event) ProtoMessage() {} func (*Event) ProtoMessage() {}
func (*Event) Descriptor() ([]byte, []int) { func (*Event) Descriptor() ([]byte, []int) {
return fileDescriptor_41af05d40a615591, []int{11} return fileDescriptor_41af05d40a615591, []int{12}
} }
func (m *Event) XXX_Unmarshal(b []byte) error { func (m *Event) XXX_Unmarshal(b []byte) error {
@ -657,6 +705,7 @@ func init() {
proto.RegisterType((*Endpoint)(nil), "go.micro.registry.Endpoint") proto.RegisterType((*Endpoint)(nil), "go.micro.registry.Endpoint")
proto.RegisterMapType((map[string]string)(nil), "go.micro.registry.Endpoint.MetadataEntry") proto.RegisterMapType((map[string]string)(nil), "go.micro.registry.Endpoint.MetadataEntry")
proto.RegisterType((*Value)(nil), "go.micro.registry.Value") proto.RegisterType((*Value)(nil), "go.micro.registry.Value")
proto.RegisterType((*Options)(nil), "go.micro.registry.Options")
proto.RegisterType((*Result)(nil), "go.micro.registry.Result") proto.RegisterType((*Result)(nil), "go.micro.registry.Result")
proto.RegisterType((*EmptyResponse)(nil), "go.micro.registry.EmptyResponse") proto.RegisterType((*EmptyResponse)(nil), "go.micro.registry.EmptyResponse")
proto.RegisterType((*GetRequest)(nil), "go.micro.registry.GetRequest") proto.RegisterType((*GetRequest)(nil), "go.micro.registry.GetRequest")
@ -670,45 +719,47 @@ func init() {
func init() { proto.RegisterFile("registry.proto", fileDescriptor_41af05d40a615591) } func init() { proto.RegisterFile("registry.proto", fileDescriptor_41af05d40a615591) }
var fileDescriptor_41af05d40a615591 = []byte{ var fileDescriptor_41af05d40a615591 = []byte{
// 632 bytes of a gzipped FileDescriptorProto // 667 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xdb, 0x6e, 0xd3, 0x4c, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xdd, 0x6e, 0xd3, 0x4c,
0x10, 0x8e, 0xed, 0x38, 0x87, 0x49, 0xdb, 0xbf, 0xff, 0x08, 0x81, 0x31, 0x05, 0x22, 0x4b, 0xa0, 0x10, 0x8d, 0xed, 0xfc, 0x4e, 0xda, 0x7e, 0xfd, 0x46, 0x08, 0x8c, 0x5b, 0x20, 0xb2, 0x04, 0x0a,
0x80, 0x84, 0xa9, 0x42, 0x85, 0x38, 0x5c, 0x21, 0x62, 0x2a, 0xa1, 0x16, 0x09, 0x73, 0xba, 0x36, 0x48, 0x84, 0x2a, 0x54, 0x88, 0x9f, 0x2b, 0x44, 0x43, 0x25, 0xd4, 0x82, 0x58, 0xfe, 0xae, 0x4d,
0xf1, 0xa8, 0x58, 0xc4, 0x07, 0x76, 0x37, 0x91, 0xfc, 0x0e, 0x48, 0x3c, 0x01, 0x77, 0x3c, 0x0a, 0x3c, 0x2a, 0x16, 0x89, 0x6d, 0x76, 0xb7, 0x91, 0xf2, 0x0e, 0x48, 0x3c, 0x01, 0x6f, 0xc3, 0x53,
0x0f, 0x86, 0xbc, 0x5e, 0x27, 0xa9, 0x62, 0x07, 0xa4, 0xc2, 0xdd, 0x8c, 0xf7, 0x9b, 0x6f, 0x67, 0xf0, 0x34, 0x68, 0xd7, 0xeb, 0x24, 0x55, 0xd7, 0x01, 0xa9, 0x70, 0x37, 0xe3, 0x3d, 0x33, 0x3b,
0xbe, 0x6f, 0x36, 0x81, 0x3d, 0x46, 0x67, 0x11, 0x17, 0x2c, 0x77, 0x33, 0x96, 0x8a, 0x14, 0xff, 0x73, 0xce, 0x59, 0x19, 0xb6, 0x38, 0x9d, 0x24, 0x42, 0xf2, 0xf9, 0x20, 0xe7, 0x99, 0xcc, 0xf0,
0x3f, 0x4b, 0xdd, 0x38, 0x9a, 0xb2, 0xd4, 0xad, 0x0e, 0x9c, 0x1f, 0x3a, 0x74, 0xdf, 0x10, 0x5b, 0xff, 0x93, 0x6c, 0x30, 0x4d, 0xc6, 0x3c, 0x1b, 0x94, 0x07, 0xe1, 0x4f, 0x17, 0x5a, 0x6f, 0x88,
0x44, 0x53, 0x42, 0x84, 0x76, 0x12, 0xc4, 0x64, 0x69, 0x43, 0x6d, 0xd4, 0xf7, 0x65, 0x8c, 0x16, 0xcf, 0x92, 0x31, 0x21, 0x42, 0x3d, 0x8d, 0xa6, 0xe4, 0x3b, 0x3d, 0xa7, 0xdf, 0x61, 0x3a, 0x46,
0x74, 0x17, 0xc4, 0x78, 0x94, 0x26, 0x96, 0x2e, 0x3f, 0x57, 0x29, 0x4e, 0xa0, 0x17, 0x93, 0x08, 0x1f, 0x5a, 0x33, 0xe2, 0x22, 0xc9, 0x52, 0xdf, 0xd5, 0x9f, 0xcb, 0x14, 0x0f, 0xa0, 0x3d, 0x25,
0xc2, 0x40, 0x04, 0x96, 0x31, 0x34, 0x46, 0x83, 0xf1, 0xc8, 0xdd, 0xe0, 0x77, 0x15, 0xb7, 0x7b, 0x19, 0xc5, 0x91, 0x8c, 0x7c, 0xaf, 0xe7, 0xf5, 0xbb, 0xc3, 0xfe, 0xe0, 0x5c, 0xff, 0x81, 0xe9,
0xaa, 0xa0, 0x5e, 0x22, 0x58, 0xee, 0x2f, 0x2b, 0xf1, 0x31, 0xf4, 0x29, 0x09, 0xb3, 0x34, 0x4a, 0x3d, 0x38, 0x36, 0xd0, 0x51, 0x2a, 0xf9, 0x9c, 0x2d, 0x2a, 0xf1, 0x11, 0x74, 0x28, 0x8d, 0xf3,
0x04, 0xb7, 0xda, 0x92, 0xe6, 0x5a, 0x0d, 0x8d, 0xa7, 0x30, 0xfe, 0x0a, 0x8d, 0xf7, 0xc0, 0x4c, 0x2c, 0x49, 0xa5, 0xf0, 0xeb, 0xba, 0xcd, 0x8e, 0xa5, 0xcd, 0xc8, 0x60, 0xd8, 0x12, 0x8d, 0x77,
0xd2, 0x90, 0xb8, 0x65, 0xca, 0xb2, 0x2b, 0x35, 0x65, 0xaf, 0xd2, 0x90, 0xfc, 0x12, 0x65, 0x3f, 0xa1, 0x91, 0x66, 0x31, 0x09, 0xbf, 0xa1, 0xcb, 0xae, 0x58, 0xca, 0x5e, 0x66, 0x31, 0xb1, 0x02,
0x85, 0xdd, 0x73, 0x4d, 0xe0, 0x3e, 0x18, 0x9f, 0x29, 0x57, 0xd3, 0x16, 0x21, 0x5e, 0x02, 0x73, 0x85, 0xfb, 0xd0, 0xca, 0x72, 0x99, 0x64, 0xa9, 0xf0, 0x9b, 0x3d, 0xa7, 0xdf, 0x1d, 0x06, 0x96,
0x11, 0xcc, 0xe6, 0xa4, 0x46, 0x2d, 0x93, 0x27, 0xfa, 0x23, 0xcd, 0xf9, 0xa9, 0x41, 0xbb, 0x20, 0x82, 0x57, 0x05, 0x82, 0x95, 0xd0, 0xe0, 0x09, 0x6c, 0x9e, 0x19, 0x1d, 0xb7, 0xc1, 0xfb, 0x4c,
0xc3, 0x3d, 0xd0, 0xa3, 0x50, 0xd5, 0xe8, 0x51, 0x58, 0xe8, 0x13, 0x84, 0x21, 0x23, 0xce, 0x2b, 0x73, 0xc3, 0x91, 0x0a, 0xf1, 0x12, 0x34, 0x66, 0xd1, 0xe4, 0x94, 0x0c, 0x41, 0x45, 0xf2, 0xd8,
0x7d, 0x54, 0x5a, 0xa8, 0x99, 0xa5, 0x4c, 0x58, 0xc6, 0x50, 0x1b, 0x19, 0xbe, 0x8c, 0xf1, 0xd9, 0x7d, 0xe8, 0x84, 0x3f, 0x1c, 0xa8, 0xab, 0x11, 0x70, 0x0b, 0xdc, 0x24, 0x36, 0x35, 0x6e, 0x12,
0x9a, 0x66, 0xe5, 0xb0, 0xb7, 0x1a, 0xba, 0x6e, 0x12, 0xec, 0x62, 0x63, 0x7c, 0xd5, 0xa1, 0x57, 0x2b, 0x56, 0xa3, 0x38, 0xe6, 0x24, 0x44, 0xc9, 0xaa, 0x49, 0x95, 0x06, 0x79, 0xc6, 0xa5, 0xef,
0x49, 0x59, 0x6b, 0xf7, 0x18, 0xba, 0x8c, 0xbe, 0xcc, 0x89, 0x0b, 0x59, 0x3c, 0x18, 0x5b, 0x35, 0xf5, 0x9c, 0xbe, 0xc7, 0x74, 0x8c, 0x4f, 0x57, 0x98, 0x2e, 0x28, 0xba, 0x59, 0xb1, 0x6b, 0x15,
0xfd, 0xbd, 0x2f, 0xf8, 0xfc, 0x0a, 0x88, 0x47, 0xd0, 0x63, 0xc4, 0xb3, 0x34, 0xe1, 0x24, 0x87, 0xcd, 0x17, 0x5b, 0xe3, 0xab, 0x0b, 0xed, 0x52, 0x00, 0xab, 0x49, 0x86, 0xd0, 0xe2, 0xf4, 0xe5,
0xdd, 0x56, 0xb4, 0x44, 0xa2, 0xb7, 0x21, 0xc5, 0x9d, 0x2d, 0xbe, 0xff, 0x1b, 0x39, 0x02, 0x30, 0x94, 0x84, 0xd4, 0xc5, 0xdd, 0xa1, 0x6f, 0x99, 0xef, 0xbd, 0xea, 0xc7, 0x4a, 0x20, 0xee, 0x43,
0x65, 0x5b, 0xb5, 0x52, 0x20, 0xb4, 0x45, 0x9e, 0x55, 0x55, 0x32, 0xc6, 0x43, 0xe8, 0xc8, 0x6a, 0x9b, 0x93, 0xc8, 0xb3, 0x54, 0x90, 0x5e, 0x76, 0x5d, 0xd1, 0x02, 0x89, 0xa3, 0x73, 0x54, 0xdc,
0xae, 0x36, 0xbe, 0x79, 0x50, 0x85, 0x73, 0x04, 0x74, 0x7c, 0xe2, 0xf3, 0x99, 0xc0, 0xcb, 0xd0, 0x5e, 0xe3, 0x96, 0x7f, 0x43, 0x47, 0x04, 0x0d, 0x3d, 0x96, 0x95, 0x0a, 0x84, 0xba, 0x9c, 0xe7,
0x09, 0xa6, 0xa2, 0x78, 0x48, 0xe5, 0x2d, 0x2a, 0xc3, 0x23, 0xe8, 0xf2, 0xf2, 0x91, 0x28, 0xc9, 0x65, 0x95, 0x8e, 0x71, 0x0f, 0x9a, 0xba, 0x5a, 0x98, 0x77, 0x52, 0xbd, 0xa8, 0xc1, 0x85, 0x3b,
0xed, 0xe6, 0x67, 0xe4, 0x57, 0x50, 0x3c, 0x80, 0xbe, 0x88, 0x62, 0xe2, 0x22, 0x88, 0x33, 0xb5, 0xd0, 0x32, 0x4e, 0x54, 0x93, 0x49, 0x39, 0xd1, 0x77, 0x78, 0x4c, 0x85, 0xa1, 0x84, 0x26, 0x23,
0x62, 0xab, 0x0f, 0xce, 0x7f, 0xb0, 0xeb, 0xc5, 0x99, 0xc8, 0x7d, 0xa5, 0xb6, 0x73, 0x1b, 0xe0, 0x71, 0x3a, 0x91, 0x78, 0x19, 0x9a, 0xd1, 0x58, 0xc1, 0xcc, 0x08, 0x26, 0x53, 0x56, 0x17, 0xc5,
0x98, 0x84, 0xaf, 0x1c, 0xb3, 0x56, 0x57, 0x96, 0xbd, 0x54, 0xa9, 0xe3, 0xc1, 0x40, 0xe2, 0x94, 0xbb, 0x33, 0x7a, 0x04, 0xd5, 0x2f, 0x93, 0x95, 0x50, 0xdc, 0x85, 0x8e, 0x4c, 0xa6, 0x24, 0x64,
0x49, 0x0f, 0xa1, 0xa7, 0x4e, 0xb8, 0xa5, 0xc9, 0x89, 0xb7, 0x35, 0xb7, 0xc4, 0x3a, 0xbb, 0x30, 0x34, 0xcd, 0x8d, 0xff, 0x96, 0x1f, 0xc2, 0xff, 0x60, 0x73, 0x34, 0xcd, 0xe5, 0x9c, 0x19, 0x29,
0x38, 0x89, 0x78, 0x75, 0x9f, 0xf3, 0x02, 0x76, 0xca, 0xf4, 0x82, 0xb4, 0x23, 0xd8, 0xf9, 0x10, 0xc2, 0x5b, 0x00, 0x87, 0x24, 0x99, 0x91, 0xd3, 0x5f, 0x5e, 0x59, 0xcc, 0x52, 0xa6, 0xe1, 0x08,
0x88, 0xe9, 0xa7, 0xdf, 0xcf, 0xf1, 0x5d, 0x03, 0xd3, 0x5b, 0x50, 0x22, 0x36, 0x1e, 0xec, 0xe1, 0xba, 0x1a, 0x67, 0x14, 0x7c, 0x00, 0x6d, 0x73, 0x22, 0x7c, 0x47, 0xd3, 0xb1, 0x6e, 0xb8, 0x05,
0x9a, 0xad, 0x7b, 0xe3, 0x83, 0xba, 0x9d, 0x2b, 0xea, 0xde, 0xe6, 0x19, 0x29, 0xd3, 0xb7, 0x4a, 0x36, 0xdc, 0x84, 0xee, 0x51, 0x22, 0xca, 0xfb, 0xc2, 0xe7, 0xb0, 0x51, 0xa4, 0x17, 0x6c, 0xdb,
0xbd, 0x6e, 0x5f, 0xfb, 0x8f, 0xed, 0xbb, 0x7b, 0x1f, 0xfa, 0xcb, 0x6b, 0x10, 0xa0, 0xf3, 0x9c, 0x87, 0x8d, 0x0f, 0x91, 0x1c, 0x7f, 0xfa, 0xfd, 0x1e, 0xdf, 0x1d, 0x68, 0x8c, 0x66, 0x94, 0xca,
0x51, 0x20, 0x68, 0xbf, 0x55, 0xc4, 0x13, 0x9a, 0x91, 0xa0, 0x7d, 0xad, 0x88, 0xdf, 0x65, 0x61, 0x73, 0xaf, 0x79, 0x6f, 0x45, 0xf3, 0xad, 0xe1, 0xae, 0xcd, 0x90, 0xaa, 0xee, 0xed, 0x3c, 0x27,
0xf1, 0x5d, 0x1f, 0x7f, 0x33, 0xa0, 0xe7, 0x2b, 0x3a, 0x3c, 0x95, 0x6e, 0x56, 0x3f, 0xdb, 0xd7, 0xe3, 0x88, 0xb5, 0x54, 0xaf, 0xca, 0x57, 0xff, 0x63, 0xf9, 0xee, 0xdc, 0x83, 0xce, 0xe2, 0x1a,
0x6b, 0x2e, 0x5c, 0x99, 0x6d, 0xdf, 0x68, 0x3a, 0x56, 0xab, 0xd1, 0xc2, 0x97, 0x15, 0x35, 0x31, 0x04, 0x68, 0x3e, 0xe3, 0x14, 0x49, 0xda, 0xae, 0xa9, 0xf8, 0x80, 0x26, 0x24, 0x69, 0xdb, 0x51,
0xdc, 0xd2, 0xbd, 0x3d, 0xac, 0x13, 0xeb, 0xdc, 0x9a, 0xb5, 0xf0, 0x04, 0x60, 0x42, 0xec, 0x6f, 0xf1, 0xbb, 0x3c, 0x56, 0xdf, 0xdd, 0xe1, 0x37, 0x0f, 0xda, 0xcc, 0xb4, 0xc3, 0x63, 0xad, 0x66,
0xb1, 0xbd, 0x2e, 0x17, 0x47, 0x95, 0x70, 0xac, 0x9b, 0x65, 0x6d, 0xd1, 0xec, 0x9b, 0x8d, 0xe7, 0xf9, 0x27, 0xb8, 0x66, 0xb9, 0x70, 0x29, 0x76, 0x70, 0xbd, 0xea, 0xd8, 0x58, 0xa3, 0x86, 0x2f,
0x4b, 0xca, 0x63, 0x30, 0xe5, 0x0e, 0x61, 0x1d, 0x76, 0x7d, 0xbb, 0xec, 0xab, 0x35, 0x80, 0xf2, 0xca, 0xd6, 0xc4, 0x71, 0xcd, 0xf4, 0x41, 0xcf, 0x46, 0xd6, 0x19, 0x9b, 0xd5, 0xf0, 0x08, 0xe0,
0x2d, 0x3b, 0xad, 0x43, 0xed, 0x63, 0x47, 0xfe, 0xa7, 0x3e, 0xf8, 0x15, 0x00, 0x00, 0xff, 0xff, 0x80, 0xf8, 0xdf, 0xea, 0xf6, 0xba, 0x30, 0x8e, 0x29, 0x11, 0x68, 0xdb, 0x65, 0xc5, 0x68, 0xc1,
0x08, 0x0e, 0xe4, 0xae, 0x65, 0x07, 0x00, 0x00, 0x8d, 0xca, 0xf3, 0x45, 0xcb, 0x43, 0x68, 0x68, 0x0f, 0xa1, 0x0d, 0xbb, 0xea, 0xae, 0xe0, 0xaa,
0x05, 0x50, 0xbc, 0xe5, 0xb0, 0xb6, 0xe7, 0x7c, 0x6c, 0xea, 0xdf, 0xf4, 0xfd, 0x5f, 0x01, 0x00,
0x00, 0xff, 0xff, 0xfb, 0x3e, 0x7d, 0xa4, 0xb8, 0x07, 0x00, 0x00,
} }

View File

@ -17,6 +17,7 @@ message Service {
map<string,string> metadata = 3; map<string,string> metadata = 3;
repeated Endpoint endpoints = 4; repeated Endpoint endpoints = 4;
repeated Node nodes = 5; repeated Node nodes = 5;
Options options = 6;
} }
// Node represents the node the service is on // Node represents the node the service is on
@ -42,6 +43,11 @@ message Value {
repeated Value values = 3; repeated Value values = 3;
} }
// Options are registry options
message Options {
int64 ttl = 1;
}
// Result is returns by the watcher // Result is returns by the watcher
message Result { message Result {
string action = 1; // create, update, delete string action = 1; // create, update, delete

View File

@ -58,8 +58,12 @@ func (s *serviceRegistry) Register(srv *registry.Service, opts ...registry.Regis
o(&options) o(&options)
} }
// encode srv into protobuf and pack Register TTL into it
pbSrv := ToProto(srv)
pbSrv.Options.Ttl = int64(options.TTL.Seconds())
// register the service // register the service
_, err := s.client.Register(context.TODO(), ToProto(srv), s.callOpts()...) _, err := s.client.Register(context.TODO(), pbSrv, s.callOpts()...)
if err != nil { if err != nil {
return err return err
} }