From 5ec9d561a6a00b800049fcddcb122117f433a8e6 Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 26 Jan 2016 23:32:27 +0000 Subject: [PATCH 1/7] meh --- options.go | 18 ++++++++++ registry/consul_registry.go | 67 +++++++++++++++++++++++++++---------- registry/options.go | 13 +++++++ registry/registry.go | 8 +++-- server/options.go | 13 +++++++ server/rpc_server.go | 12 +++++-- server/server.go | 4 ++- service.go | 32 +++++++++++++++++- 8 files changed, 142 insertions(+), 25 deletions(-) diff --git a/options.go b/options.go index 9389ade6..ac077048 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,8 @@ package micro import ( + "time" + "github.com/micro/cli" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" @@ -20,6 +22,10 @@ type Options struct { Registry registry.Registry Transport transport.Transport + // Registration options + RegisterTTL time.Duration + RegisterInterval time.Duration + // Before and After funcs BeforeStart []func() error AfterStop []func() error @@ -117,6 +123,18 @@ func Action(a func(*cli.Context)) Option { } } +func RegisterTTL(t time.Duration) Option { + return func(o *Options) { + o.RegisterTTL = t + } +} + +func RegisterInterval(t time.Duration) Option { + return func(o *Options) { + o.RegisterInterval = t + } +} + // Before and Afters func BeforeStart(fn func() error) Option { diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 843f2281..5a985e0a 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -161,19 +161,28 @@ func (c *consulRegistry) Deregister(s *Service) error { node := s.Nodes[0] - _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{ + if _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{ Node: node.Id, Address: node.Address, ServiceID: node.Id, - }, nil) - return err + CheckID: node.Id, + }, nil); err != nil { + return err + } + + return c.Client.Agent().ServiceDeregister(node.Id) } -func (c *consulRegistry) Register(s *Service) error { +func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } + var options RegisterOptions + for _, o := range opts { + o(&options) + } + node := s.Nodes[0] tags := encodeMetadata(node.Metadata) @@ -191,15 +200,37 @@ func (c *consulRegistry) Register(s *Service) error { Tags: tags, Address: node.Address, }, + Check: &consul.AgentCheck{ + Node: node.Id, + CheckID: node.Id, + Name: s.Name, + ServiceID: node.Id, + ServiceName: s.Name, + Status: "passing", + }, }, nil); err != nil { return err } - return nil + if options.TTL <= time.Duration(0) { + return nil + } + + // this is cruft + return c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{ + ID: node.Id, + Name: s.Name, + Tags: tags, + Port: node.Port, + Address: node.Address, + Check: &consul.AgentServiceCheck{ + TTL: fmt.Sprintf("%v", options.TTL), + }, + }) } func (c *consulRegistry) GetService(name string) ([]*Service, error) { - rsp, _, err := c.Client.Catalog().Service(name, "", nil) + rsp, _, err := c.Client.Health().Service(name, "", true, nil) if err != nil { return nil, err } @@ -207,37 +238,37 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) { serviceMap := map[string]*Service{} for _, s := range rsp { - if s.ServiceName != name { + if s.Service.Service != name { continue } // version is now a tag - version, found := decodeVersion(s.ServiceTags) + version, found := decodeVersion(s.Service.Tags) // service ID is now the node id - id := s.ServiceID + id := s.Service.ID // key is always the version key := version // address is service address - address := s.ServiceAddress + address := s.Service.Address // if we can't get the new type of version // use old the old ways if !found { // id was set as node - id = s.Node + id = s.Node.Node // key was service id - key = s.ServiceID + key = s.Service.ID // version was service id - version = s.ServiceID + version = s.Service.ID // address was address - address = s.Address + address = s.Node.Address } svc, ok := serviceMap[key] if !ok { svc = &Service{ - Endpoints: decodeEndpoints(s.ServiceTags), - Name: s.ServiceName, + Endpoints: decodeEndpoints(s.Service.Tags), + Name: s.Service.Service, Version: version, } serviceMap[key] = svc @@ -246,8 +277,8 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) { svc.Nodes = append(svc.Nodes, &Node{ Id: id, Address: address, - Port: s.ServicePort, - Metadata: decodeMetadata(s.ServiceTags), + Port: s.Service.Port, + Metadata: decodeMetadata(s.Service.Tags), }) } diff --git a/registry/options.go b/registry/options.go index 5f69b961..27a8ced9 100644 --- a/registry/options.go +++ b/registry/options.go @@ -17,6 +17,13 @@ type Options struct { Context context.Context } +type RegisterOptions struct { + TTL time.Duration + // Other options for implementations of the interface + // can be stored in a context + Context context.Context +} + func Timeout(t time.Duration) Option { return func(o *Options) { o.Timeout = t @@ -36,3 +43,9 @@ func TLSConfig(t *tls.Config) Option { o.TLSConfig = t } } + +func WithTTL(t time.Duration) RegisterOption { + return func(o *RegisterOptions) { + o.TTL = t + } +} diff --git a/registry/registry.go b/registry/registry.go index 0644bb0d..5f8dd336 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -1,7 +1,7 @@ package registry type Registry interface { - Register(*Service) error + Register(*Service, ...RegisterOption) error Deregister(*Service) error GetService(string) ([]*Service, error) ListServices() ([]*Service, error) @@ -11,6 +11,8 @@ type Registry interface { type Option func(*Options) +type RegisterOption func(*RegisterOptions) + var ( DefaultRegistry = newConsulRegistry([]string{}) ) @@ -19,8 +21,8 @@ func NewRegistry(addrs []string, opt ...Option) Registry { return newConsulRegistry(addrs, opt...) } -func Register(s *Service) error { - return DefaultRegistry.Register(s) +func Register(s *Service, opts ...RegisterOption) error { + return DefaultRegistry.Register(s, opts...) } func Deregister(s *Service) error { diff --git a/server/options.go b/server/options.go index 621932d4..80f75c63 100644 --- a/server/options.go +++ b/server/options.go @@ -1,6 +1,8 @@ package server import ( + "time" + "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" @@ -32,6 +34,10 @@ type Options struct { Context context.Context } +type RegisterOptions struct { + TTL time.Duration +} + func newOptions(opt ...Option) Options { opts := Options{ Codecs: make(map[string]codec.NewCodec), @@ -167,3 +173,10 @@ func WrapSubscriber(w SubscriberWrapper) Option { o.SubWrappers = append(o.SubWrappers, w) } } + +// Register the service with a TTL +func RegisterTTL(t time.Duration) RegisterOption { + return func(o *RegisterOptions) { + o.TTL = t + } +} diff --git a/server/rpc_server.go b/server/rpc_server.go index 4f9ee371..8e58f156 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -155,7 +155,15 @@ func (s *rpcServer) Subscribe(sb Subscriber) error { return nil } -func (s *rpcServer) Register() error { +func (s *rpcServer) Register(opts ...RegisterOption) error { + var options RegisterOptions + for _, o := range opts { + o(&options) + } + + // create registry options + rOpts := []registry.RegisterOption{registry.WithTTL(options.TTL)} + // parse address for host, port config := s.Options() var advt, host string @@ -220,7 +228,7 @@ func (s *rpcServer) Register() error { } log.Infof("Registering node: %s", node.Id) - if err := config.Registry.Register(service); err != nil { + if err := config.Registry.Register(service, rOpts...); err != nil { return err } diff --git a/server/server.go b/server/server.go index c74e392b..5afcf7e5 100644 --- a/server/server.go +++ b/server/server.go @@ -45,7 +45,7 @@ type Server interface { NewHandler(interface{}, ...HandlerOption) Handler NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber Subscribe(Subscriber) error - Register() error + Register(...RegisterOption) error Deregister() error Start() error Stop() error @@ -86,6 +86,8 @@ type HandlerOption func(*HandlerOptions) type SubscriberOption func(*SubscriberOptions) +type RegisterOption func(*RegisterOptions) + var ( DefaultAddress = ":0" DefaultName = "go-server" diff --git a/service.go b/service.go index 5bdf05df..50d171c9 100644 --- a/service.go +++ b/service.go @@ -1,9 +1,11 @@ package micro import ( + "fmt" "os" "os/signal" "syscall" + "time" "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" @@ -30,6 +32,27 @@ func newService(opts ...Option) Service { } } +func (s *service) run(exit chan bool) { + if s.opts.RegisterInterval <= time.Duration(0) { + return + } + + t := time.NewTicker(s.opts.RegisterInterval) + + for { + select { + case <-t.C: + fmt.Println("heartbeat") + if err := s.opts.Server.Register(server.RegisterTTL(s.opts.RegisterTTL)); err != nil { + fmt.Println("FUCK", err) + } + case <-exit: + t.Stop() + return + } + } +} + func (s *service) Init(opts ...Option) { // We might get more command flags or the action here // This is pretty ugly, find a better way @@ -82,7 +105,7 @@ func (s *service) Start() error { return err } - if err := s.opts.Server.Register(); err != nil { + if err := s.opts.Server.Register(server.RegisterTTL(s.opts.RegisterTTL)); err != nil { return err } @@ -115,10 +138,17 @@ func (s *service) Run() error { return err } + // start reg loop + ex := make(chan bool) + go s.run(ex) + ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) <-ch + // exit reg loop + close(ex) + if err := s.Stop(); err != nil { return err } From 55145d08a1357a76d9d33b98ebe3295d99ae37f4 Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 27 Jan 2016 00:15:46 +0000 Subject: [PATCH 2/7] Use agent service for this --- registry/consul_registry.go | 54 +++++++------------------------------ 1 file changed, 10 insertions(+), 44 deletions(-) diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 5a985e0a..57ee803f 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -160,16 +160,6 @@ func (c *consulRegistry) Deregister(s *Service) error { } node := s.Nodes[0] - - if _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{ - Node: node.Id, - Address: node.Address, - ServiceID: node.Id, - CheckID: node.Id, - }, nil); err != nil { - return err - } - return c.Client.Agent().ServiceDeregister(node.Id) } @@ -189,44 +179,20 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { tags = append(tags, encodeEndpoints(s.Endpoints)...) tags = append(tags, encodeVersion(s.Version)) - if _, err := c.Client.Catalog().Register(&consul.CatalogRegistration{ - // TODO: remove setting node and address - Node: node.Id, - Address: node.Address, - Service: &consul.AgentService{ - ID: node.Id, - Service: s.Name, - Port: node.Port, - Tags: tags, - Address: node.Address, - }, - Check: &consul.AgentCheck{ - Node: node.Id, - CheckID: node.Id, - Name: s.Name, - ServiceID: node.Id, - ServiceName: s.Name, - Status: "passing", - }, - }, nil); err != nil { - return err - } - - if options.TTL <= time.Duration(0) { - return nil - } - - // this is cruft - return c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{ - ID: node.Id, - Name: s.Name, - Tags: tags, - Port: node.Port, + if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{ + ID: node.Id, + Name: s.Name, + Tags: tags, + Port: node.Port, Address: node.Address, Check: &consul.AgentServiceCheck{ TTL: fmt.Sprintf("%v", options.TTL), }, - }) + }); err != nil { + return err + } + + return c.Client.Agent().PassTTL("service:"+node.Id, "") } func (c *consulRegistry) GetService(name string) ([]*Service, error) { From 47bfdbe49efa941209b587bf8eb3d7629875aa7f Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 27 Jan 2016 00:18:44 +0000 Subject: [PATCH 3/7] Fix mock --- registry/mock/mock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/mock/mock.go b/registry/mock/mock.go index 796bd006..8093ac58 100644 --- a/registry/mock/mock.go +++ b/registry/mock/mock.go @@ -53,7 +53,7 @@ func (m *MockRegistry) ListServices() ([]*registry.Service, error) { return []*registry.Service{}, nil } -func (m *MockRegistry) Register(s *registry.Service) error { +func (m *MockRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { return nil } From 7183601d3b15dcb5d4fe16bbeba1984f7aac6a68 Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 27 Jan 2016 00:26:29 +0000 Subject: [PATCH 4/7] Strip the classic aslam comments --- service.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/service.go b/service.go index 50d171c9..a05c1e09 100644 --- a/service.go +++ b/service.go @@ -1,7 +1,6 @@ package micro import ( - "fmt" "os" "os/signal" "syscall" @@ -42,9 +41,7 @@ func (s *service) run(exit chan bool) { for { select { case <-t.C: - fmt.Println("heartbeat") if err := s.opts.Server.Register(server.RegisterTTL(s.opts.RegisterTTL)); err != nil { - fmt.Println("FUCK", err) } case <-exit: t.Stop() From 61094fefe83a0eebd17f4b2b26bff2e8066c736d Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 27 Jan 2016 00:32:16 +0000 Subject: [PATCH 5/7] If TTL is nil it might bail --- registry/consul_registry.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 57ee803f..607c24f9 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -179,19 +179,29 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { tags = append(tags, encodeEndpoints(s.Endpoints)...) tags = append(tags, encodeVersion(s.Version)) + var check *consul.AgentServiceCheck + + if options.TTL > time.Duration(0) { + check = &consul.AgentServiceCheck{ + TTL: fmt.Sprintf("%v", options.TTL), + } + } + if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{ ID: node.Id, Name: s.Name, Tags: tags, Port: node.Port, Address: node.Address, - Check: &consul.AgentServiceCheck{ - TTL: fmt.Sprintf("%v", options.TTL), - }, + Check: check, }); err != nil { return err } + if options.TTL == time.Duration(0) { + return nil + } + return c.Client.Agent().PassTTL("service:"+node.Id, "") } From 013d1de2c4b3d7415bbdbf5302ba6560e6e7dee4 Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 27 Jan 2016 12:23:18 +0000 Subject: [PATCH 6/7] Prefer RegisterTTL set through Init --- options.go | 5 ++--- registry/options.go | 2 +- server/options.go | 20 +++++++++----------- server/rpc_server.go | 13 ++++--------- server/server.go | 4 +--- service.go | 5 ++--- 6 files changed, 19 insertions(+), 30 deletions(-) diff --git a/options.go b/options.go index ac077048..8efa3134 100644 --- a/options.go +++ b/options.go @@ -22,8 +22,7 @@ type Options struct { Registry registry.Registry Transport transport.Transport - // Registration options - RegisterTTL time.Duration + // Register loop interval RegisterInterval time.Duration // Before and After funcs @@ -125,7 +124,7 @@ func Action(a func(*cli.Context)) Option { func RegisterTTL(t time.Duration) Option { return func(o *Options) { - o.RegisterTTL = t + o.Server.Init(server.RegisterTTL(t)) } } diff --git a/registry/options.go b/registry/options.go index 27a8ced9..04e3525b 100644 --- a/registry/options.go +++ b/registry/options.go @@ -44,7 +44,7 @@ func TLSConfig(t *tls.Config) Option { } } -func WithTTL(t time.Duration) RegisterOption { +func RegisterTTL(t time.Duration) RegisterOption { return func(o *RegisterOptions) { o.TTL = t } diff --git a/server/options.go b/server/options.go index 80f75c63..edaeceea 100644 --- a/server/options.go +++ b/server/options.go @@ -26,6 +26,8 @@ type Options struct { HdlrWrappers []HandlerWrapper SubWrappers []SubscriberWrapper + RegisterTTL time.Duration + // Debug Handler which can be set by a user DebugHandler debug.DebugHandler @@ -34,10 +36,6 @@ type Options struct { Context context.Context } -type RegisterOptions struct { - TTL time.Duration -} - func newOptions(opt ...Option) Options { opts := Options{ Codecs: make(map[string]codec.NewCodec), @@ -160,6 +158,13 @@ func Metadata(md map[string]string) Option { } } +// Register the service with a TTL +func RegisterTTL(t time.Duration) Option { + return func(o *Options) { + o.RegisterTTL = t + } +} + // Adds a handler Wrapper to a list of options passed into the server func WrapHandler(w HandlerWrapper) Option { return func(o *Options) { @@ -173,10 +178,3 @@ func WrapSubscriber(w SubscriberWrapper) Option { o.SubWrappers = append(o.SubWrappers, w) } } - -// Register the service with a TTL -func RegisterTTL(t time.Duration) RegisterOption { - return func(o *RegisterOptions) { - o.TTL = t - } -} diff --git a/server/rpc_server.go b/server/rpc_server.go index 8e58f156..97f1f921 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -155,15 +155,7 @@ func (s *rpcServer) Subscribe(sb Subscriber) error { return nil } -func (s *rpcServer) Register(opts ...RegisterOption) error { - var options RegisterOptions - for _, o := range opts { - o(&options) - } - - // create registry options - rOpts := []registry.RegisterOption{registry.WithTTL(options.TTL)} - +func (s *rpcServer) Register() error { // parse address for host, port config := s.Options() var advt, host string @@ -228,6 +220,9 @@ func (s *rpcServer) Register(opts ...RegisterOption) error { } log.Infof("Registering node: %s", node.Id) + // create registry options + rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} + if err := config.Registry.Register(service, rOpts...); err != nil { return err } diff --git a/server/server.go b/server/server.go index 5afcf7e5..c74e392b 100644 --- a/server/server.go +++ b/server/server.go @@ -45,7 +45,7 @@ type Server interface { NewHandler(interface{}, ...HandlerOption) Handler NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber Subscribe(Subscriber) error - Register(...RegisterOption) error + Register() error Deregister() error Start() error Stop() error @@ -86,8 +86,6 @@ type HandlerOption func(*HandlerOptions) type SubscriberOption func(*SubscriberOptions) -type RegisterOption func(*RegisterOptions) - var ( DefaultAddress = ":0" DefaultName = "go-server" diff --git a/service.go b/service.go index a05c1e09..f50fb254 100644 --- a/service.go +++ b/service.go @@ -41,8 +41,7 @@ func (s *service) run(exit chan bool) { for { select { case <-t.C: - if err := s.opts.Server.Register(server.RegisterTTL(s.opts.RegisterTTL)); err != nil { - } + s.opts.Server.Register() case <-exit: t.Stop() return @@ -102,7 +101,7 @@ func (s *service) Start() error { return err } - if err := s.opts.Server.Register(server.RegisterTTL(s.opts.RegisterTTL)); err != nil { + if err := s.opts.Server.Register(); err != nil { return err } From dd9067ff4e52f53913aec19cb67d2410454268c8 Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 27 Jan 2016 20:17:31 +0000 Subject: [PATCH 7/7] Ensure the broker also expires registry entries --- broker/http_broker.go | 76 +++++++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/broker/http_broker.go b/broker/http_broker.go index c1a249b7..9daab6ad 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -62,6 +62,8 @@ type httpPublication struct { var ( DefaultSubPath = "/_sub" broadcastVersion = "ff.http.broadcast" + registerTTL = time.Minute + registerInterval = time.Second * 30 ) func init() { @@ -148,6 +150,44 @@ func (h *httpSubscriber) Unsubscribe() error { return nil } +func (h *httpBroker) run(l net.Listener) { + t := time.NewTicker(registerInterval) + defer t.Stop() + + for { + select { + // heartbeat for each subscriber + case <-t.C: + h.RLock() + for _, subs := range h.subscribers { + for _, sub := range subs { + h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) + } + } + h.RUnlock() + // received exit signal + case ch := <-h.exit: + ch <- l.Close() + h.Lock() + h.running = false + h.Unlock() + return + // unsubscribe subscriber + case subscriber := <-h.unsubscribe: + h.Lock() + var subscribers []*httpSubscriber + for _, sub := range h.subscribers[subscriber.topic] { + if sub.id == subscriber.id { + h.r.Deregister(sub.svc) + } + subscribers = append(subscribers, sub) + } + h.subscribers[subscriber.topic] = subscribers + h.Unlock() + } + } +} + func (h *httpBroker) start() error { h.Lock() defer h.Unlock() @@ -181,39 +221,25 @@ func (h *httpBroker) start() error { h.address = l.Addr().String() go http.Serve(l, h) - - go func() { - for { - select { - case ch := <-h.exit: - ch <- l.Close() - h.Lock() - h.running = false - h.Unlock() - return - case subscriber := <-h.unsubscribe: - h.Lock() - var subscribers []*httpSubscriber - for _, sub := range h.subscribers[subscriber.topic] { - if sub.id == subscriber.id { - h.r.Deregister(sub.svc) - } - subscribers = append(subscribers, sub) - } - h.subscribers[subscriber.topic] = subscribers - h.Unlock() - } - } - }() + go h.run(l) h.running = true return nil } func (h *httpBroker) stop() error { + h.Lock() + defer h.Unlock() + + if !h.running { + return nil + } + ch := make(chan error) h.exit <- ch - return <-ch + err := <-ch + h.running = false + return err } func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {