diff --git a/broker/http_broker.go b/broker/http_broker.go index c1a249b7..9daab6ad 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -62,6 +62,8 @@ type httpPublication struct { var ( DefaultSubPath = "/_sub" broadcastVersion = "ff.http.broadcast" + registerTTL = time.Minute + registerInterval = time.Second * 30 ) func init() { @@ -148,6 +150,44 @@ func (h *httpSubscriber) Unsubscribe() error { return nil } +func (h *httpBroker) run(l net.Listener) { + t := time.NewTicker(registerInterval) + defer t.Stop() + + for { + select { + // heartbeat for each subscriber + case <-t.C: + h.RLock() + for _, subs := range h.subscribers { + for _, sub := range subs { + h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) + } + } + h.RUnlock() + // received exit signal + case ch := <-h.exit: + ch <- l.Close() + h.Lock() + h.running = false + h.Unlock() + return + // unsubscribe subscriber + case subscriber := <-h.unsubscribe: + h.Lock() + var subscribers []*httpSubscriber + for _, sub := range h.subscribers[subscriber.topic] { + if sub.id == subscriber.id { + h.r.Deregister(sub.svc) + } + subscribers = append(subscribers, sub) + } + h.subscribers[subscriber.topic] = subscribers + h.Unlock() + } + } +} + func (h *httpBroker) start() error { h.Lock() defer h.Unlock() @@ -181,39 +221,25 @@ func (h *httpBroker) start() error { h.address = l.Addr().String() go http.Serve(l, h) - - go func() { - for { - select { - case ch := <-h.exit: - ch <- l.Close() - h.Lock() - h.running = false - h.Unlock() - return - case subscriber := <-h.unsubscribe: - h.Lock() - var subscribers []*httpSubscriber - for _, sub := range h.subscribers[subscriber.topic] { - if sub.id == subscriber.id { - h.r.Deregister(sub.svc) - } - subscribers = append(subscribers, sub) - } - h.subscribers[subscriber.topic] = subscribers - h.Unlock() - } - } - }() + go h.run(l) h.running = true return nil } func (h *httpBroker) stop() error { + h.Lock() + defer h.Unlock() + + if !h.running { + return nil + } + ch := make(chan error) h.exit <- ch - return <-ch + err := <-ch + h.running = false + return err } func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { diff --git a/options.go b/options.go index 9389ade6..8efa3134 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,8 @@ package micro import ( + "time" + "github.com/micro/cli" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" @@ -20,6 +22,9 @@ type Options struct { Registry registry.Registry Transport transport.Transport + // Register loop interval + RegisterInterval time.Duration + // Before and After funcs BeforeStart []func() error AfterStop []func() error @@ -117,6 +122,18 @@ func Action(a func(*cli.Context)) Option { } } +func RegisterTTL(t time.Duration) Option { + return func(o *Options) { + o.Server.Init(server.RegisterTTL(t)) + } +} + +func RegisterInterval(t time.Duration) Option { + return func(o *Options) { + o.RegisterInterval = t + } +} + // Before and Afters func BeforeStart(fn func() error) Option { diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 843f2281..607c24f9 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -160,46 +160,53 @@ func (c *consulRegistry) Deregister(s *Service) error { } node := s.Nodes[0] - - _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{ - Node: node.Id, - Address: node.Address, - ServiceID: node.Id, - }, nil) - return err + return c.Client.Agent().ServiceDeregister(node.Id) } -func (c *consulRegistry) Register(s *Service) error { +func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } + var options RegisterOptions + for _, o := range opts { + o(&options) + } + node := s.Nodes[0] tags := encodeMetadata(node.Metadata) tags = append(tags, encodeEndpoints(s.Endpoints)...) tags = append(tags, encodeVersion(s.Version)) - if _, err := c.Client.Catalog().Register(&consul.CatalogRegistration{ - // TODO: remove setting node and address - Node: node.Id, + var check *consul.AgentServiceCheck + + if options.TTL > time.Duration(0) { + check = &consul.AgentServiceCheck{ + TTL: fmt.Sprintf("%v", options.TTL), + } + } + + if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{ + ID: node.Id, + Name: s.Name, + Tags: tags, + Port: node.Port, Address: node.Address, - Service: &consul.AgentService{ - ID: node.Id, - Service: s.Name, - Port: node.Port, - Tags: tags, - Address: node.Address, - }, - }, nil); err != nil { + Check: check, + }); err != nil { return err } - return nil + if options.TTL == time.Duration(0) { + return nil + } + + return c.Client.Agent().PassTTL("service:"+node.Id, "") } func (c *consulRegistry) GetService(name string) ([]*Service, error) { - rsp, _, err := c.Client.Catalog().Service(name, "", nil) + rsp, _, err := c.Client.Health().Service(name, "", true, nil) if err != nil { return nil, err } @@ -207,37 +214,37 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) { serviceMap := map[string]*Service{} for _, s := range rsp { - if s.ServiceName != name { + if s.Service.Service != name { continue } // version is now a tag - version, found := decodeVersion(s.ServiceTags) + version, found := decodeVersion(s.Service.Tags) // service ID is now the node id - id := s.ServiceID + id := s.Service.ID // key is always the version key := version // address is service address - address := s.ServiceAddress + address := s.Service.Address // if we can't get the new type of version // use old the old ways if !found { // id was set as node - id = s.Node + id = s.Node.Node // key was service id - key = s.ServiceID + key = s.Service.ID // version was service id - version = s.ServiceID + version = s.Service.ID // address was address - address = s.Address + address = s.Node.Address } svc, ok := serviceMap[key] if !ok { svc = &Service{ - Endpoints: decodeEndpoints(s.ServiceTags), - Name: s.ServiceName, + Endpoints: decodeEndpoints(s.Service.Tags), + Name: s.Service.Service, Version: version, } serviceMap[key] = svc @@ -246,8 +253,8 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) { svc.Nodes = append(svc.Nodes, &Node{ Id: id, Address: address, - Port: s.ServicePort, - Metadata: decodeMetadata(s.ServiceTags), + Port: s.Service.Port, + Metadata: decodeMetadata(s.Service.Tags), }) } diff --git a/registry/mock/mock.go b/registry/mock/mock.go index 796bd006..8093ac58 100644 --- a/registry/mock/mock.go +++ b/registry/mock/mock.go @@ -53,7 +53,7 @@ func (m *MockRegistry) ListServices() ([]*registry.Service, error) { return []*registry.Service{}, nil } -func (m *MockRegistry) Register(s *registry.Service) error { +func (m *MockRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error { return nil } diff --git a/registry/options.go b/registry/options.go index 5f69b961..04e3525b 100644 --- a/registry/options.go +++ b/registry/options.go @@ -17,6 +17,13 @@ type Options struct { Context context.Context } +type RegisterOptions struct { + TTL time.Duration + // Other options for implementations of the interface + // can be stored in a context + Context context.Context +} + func Timeout(t time.Duration) Option { return func(o *Options) { o.Timeout = t @@ -36,3 +43,9 @@ func TLSConfig(t *tls.Config) Option { o.TLSConfig = t } } + +func RegisterTTL(t time.Duration) RegisterOption { + return func(o *RegisterOptions) { + o.TTL = t + } +} diff --git a/registry/registry.go b/registry/registry.go index 0644bb0d..5f8dd336 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -1,7 +1,7 @@ package registry type Registry interface { - Register(*Service) error + Register(*Service, ...RegisterOption) error Deregister(*Service) error GetService(string) ([]*Service, error) ListServices() ([]*Service, error) @@ -11,6 +11,8 @@ type Registry interface { type Option func(*Options) +type RegisterOption func(*RegisterOptions) + var ( DefaultRegistry = newConsulRegistry([]string{}) ) @@ -19,8 +21,8 @@ func NewRegistry(addrs []string, opt ...Option) Registry { return newConsulRegistry(addrs, opt...) } -func Register(s *Service) error { - return DefaultRegistry.Register(s) +func Register(s *Service, opts ...RegisterOption) error { + return DefaultRegistry.Register(s, opts...) } func Deregister(s *Service) error { diff --git a/server/options.go b/server/options.go index 621932d4..edaeceea 100644 --- a/server/options.go +++ b/server/options.go @@ -1,6 +1,8 @@ package server import ( + "time" + "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" @@ -24,6 +26,8 @@ type Options struct { HdlrWrappers []HandlerWrapper SubWrappers []SubscriberWrapper + RegisterTTL time.Duration + // Debug Handler which can be set by a user DebugHandler debug.DebugHandler @@ -154,6 +158,13 @@ func Metadata(md map[string]string) Option { } } +// Register the service with a TTL +func RegisterTTL(t time.Duration) Option { + return func(o *Options) { + o.RegisterTTL = t + } +} + // Adds a handler Wrapper to a list of options passed into the server func WrapHandler(w HandlerWrapper) Option { return func(o *Options) { diff --git a/server/rpc_server.go b/server/rpc_server.go index 4f9ee371..97f1f921 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -220,7 +220,10 @@ func (s *rpcServer) Register() error { } log.Infof("Registering node: %s", node.Id) - if err := config.Registry.Register(service); err != nil { + // create registry options + rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} + + if err := config.Registry.Register(service, rOpts...); err != nil { return err } diff --git a/service.go b/service.go index 5bdf05df..f50fb254 100644 --- a/service.go +++ b/service.go @@ -4,6 +4,7 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" @@ -30,6 +31,24 @@ func newService(opts ...Option) Service { } } +func (s *service) run(exit chan bool) { + if s.opts.RegisterInterval <= time.Duration(0) { + return + } + + t := time.NewTicker(s.opts.RegisterInterval) + + for { + select { + case <-t.C: + s.opts.Server.Register() + case <-exit: + t.Stop() + return + } + } +} + func (s *service) Init(opts ...Option) { // We might get more command flags or the action here // This is pretty ugly, find a better way @@ -115,10 +134,17 @@ func (s *service) Run() error { return err } + // start reg loop + ex := make(chan bool) + go s.run(ex) + ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) <-ch + // exit reg loop + close(ex) + if err := s.Stop(); err != nil { return err }