diff --git a/options.go b/options.go index 9389ade6..ac077048 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,8 @@ package micro import ( + "time" + "github.com/micro/cli" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" @@ -20,6 +22,10 @@ type Options struct { Registry registry.Registry Transport transport.Transport + // Registration options + RegisterTTL time.Duration + RegisterInterval time.Duration + // Before and After funcs BeforeStart []func() error AfterStop []func() error @@ -117,6 +123,18 @@ func Action(a func(*cli.Context)) Option { } } +func RegisterTTL(t time.Duration) Option { + return func(o *Options) { + o.RegisterTTL = t + } +} + +func RegisterInterval(t time.Duration) Option { + return func(o *Options) { + o.RegisterInterval = t + } +} + // Before and Afters func BeforeStart(fn func() error) Option { diff --git a/registry/consul_registry.go b/registry/consul_registry.go index 843f2281..5a985e0a 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -161,19 +161,28 @@ func (c *consulRegistry) Deregister(s *Service) error { node := s.Nodes[0] - _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{ + if _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{ Node: node.Id, Address: node.Address, ServiceID: node.Id, - }, nil) - return err + CheckID: node.Id, + }, nil); err != nil { + return err + } + + return c.Client.Agent().ServiceDeregister(node.Id) } -func (c *consulRegistry) Register(s *Service) error { +func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error { if len(s.Nodes) == 0 { return errors.New("Require at least one node") } + var options RegisterOptions + for _, o := range opts { + o(&options) + } + node := s.Nodes[0] tags := encodeMetadata(node.Metadata) @@ -191,15 +200,37 @@ func (c *consulRegistry) Register(s *Service) error { Tags: tags, Address: node.Address, }, + Check: &consul.AgentCheck{ + Node: node.Id, + CheckID: node.Id, + Name: s.Name, + ServiceID: node.Id, + ServiceName: s.Name, + Status: "passing", + }, }, nil); err != nil { return err } - return nil + if options.TTL <= time.Duration(0) { + return nil + } + + // this is cruft + return c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{ + ID: node.Id, + Name: s.Name, + Tags: tags, + Port: node.Port, + Address: node.Address, + Check: &consul.AgentServiceCheck{ + TTL: fmt.Sprintf("%v", options.TTL), + }, + }) } func (c *consulRegistry) GetService(name string) ([]*Service, error) { - rsp, _, err := c.Client.Catalog().Service(name, "", nil) + rsp, _, err := c.Client.Health().Service(name, "", true, nil) if err != nil { return nil, err } @@ -207,37 +238,37 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) { serviceMap := map[string]*Service{} for _, s := range rsp { - if s.ServiceName != name { + if s.Service.Service != name { continue } // version is now a tag - version, found := decodeVersion(s.ServiceTags) + version, found := decodeVersion(s.Service.Tags) // service ID is now the node id - id := s.ServiceID + id := s.Service.ID // key is always the version key := version // address is service address - address := s.ServiceAddress + address := s.Service.Address // if we can't get the new type of version // use old the old ways if !found { // id was set as node - id = s.Node + id = s.Node.Node // key was service id - key = s.ServiceID + key = s.Service.ID // version was service id - version = s.ServiceID + version = s.Service.ID // address was address - address = s.Address + address = s.Node.Address } svc, ok := serviceMap[key] if !ok { svc = &Service{ - Endpoints: decodeEndpoints(s.ServiceTags), - Name: s.ServiceName, + Endpoints: decodeEndpoints(s.Service.Tags), + Name: s.Service.Service, Version: version, } serviceMap[key] = svc @@ -246,8 +277,8 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) { svc.Nodes = append(svc.Nodes, &Node{ Id: id, Address: address, - Port: s.ServicePort, - Metadata: decodeMetadata(s.ServiceTags), + Port: s.Service.Port, + Metadata: decodeMetadata(s.Service.Tags), }) } diff --git a/registry/options.go b/registry/options.go index 5f69b961..27a8ced9 100644 --- a/registry/options.go +++ b/registry/options.go @@ -17,6 +17,13 @@ type Options struct { Context context.Context } +type RegisterOptions struct { + TTL time.Duration + // Other options for implementations of the interface + // can be stored in a context + Context context.Context +} + func Timeout(t time.Duration) Option { return func(o *Options) { o.Timeout = t @@ -36,3 +43,9 @@ func TLSConfig(t *tls.Config) Option { o.TLSConfig = t } } + +func WithTTL(t time.Duration) RegisterOption { + return func(o *RegisterOptions) { + o.TTL = t + } +} diff --git a/registry/registry.go b/registry/registry.go index 0644bb0d..5f8dd336 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -1,7 +1,7 @@ package registry type Registry interface { - Register(*Service) error + Register(*Service, ...RegisterOption) error Deregister(*Service) error GetService(string) ([]*Service, error) ListServices() ([]*Service, error) @@ -11,6 +11,8 @@ type Registry interface { type Option func(*Options) +type RegisterOption func(*RegisterOptions) + var ( DefaultRegistry = newConsulRegistry([]string{}) ) @@ -19,8 +21,8 @@ func NewRegistry(addrs []string, opt ...Option) Registry { return newConsulRegistry(addrs, opt...) } -func Register(s *Service) error { - return DefaultRegistry.Register(s) +func Register(s *Service, opts ...RegisterOption) error { + return DefaultRegistry.Register(s, opts...) } func Deregister(s *Service) error { diff --git a/server/options.go b/server/options.go index 621932d4..80f75c63 100644 --- a/server/options.go +++ b/server/options.go @@ -1,6 +1,8 @@ package server import ( + "time" + "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" @@ -32,6 +34,10 @@ type Options struct { Context context.Context } +type RegisterOptions struct { + TTL time.Duration +} + func newOptions(opt ...Option) Options { opts := Options{ Codecs: make(map[string]codec.NewCodec), @@ -167,3 +173,10 @@ func WrapSubscriber(w SubscriberWrapper) Option { o.SubWrappers = append(o.SubWrappers, w) } } + +// Register the service with a TTL +func RegisterTTL(t time.Duration) RegisterOption { + return func(o *RegisterOptions) { + o.TTL = t + } +} diff --git a/server/rpc_server.go b/server/rpc_server.go index 4f9ee371..8e58f156 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -155,7 +155,15 @@ func (s *rpcServer) Subscribe(sb Subscriber) error { return nil } -func (s *rpcServer) Register() error { +func (s *rpcServer) Register(opts ...RegisterOption) error { + var options RegisterOptions + for _, o := range opts { + o(&options) + } + + // create registry options + rOpts := []registry.RegisterOption{registry.WithTTL(options.TTL)} + // parse address for host, port config := s.Options() var advt, host string @@ -220,7 +228,7 @@ func (s *rpcServer) Register() error { } log.Infof("Registering node: %s", node.Id) - if err := config.Registry.Register(service); err != nil { + if err := config.Registry.Register(service, rOpts...); err != nil { return err } diff --git a/server/server.go b/server/server.go index c74e392b..5afcf7e5 100644 --- a/server/server.go +++ b/server/server.go @@ -45,7 +45,7 @@ type Server interface { NewHandler(interface{}, ...HandlerOption) Handler NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber Subscribe(Subscriber) error - Register() error + Register(...RegisterOption) error Deregister() error Start() error Stop() error @@ -86,6 +86,8 @@ type HandlerOption func(*HandlerOptions) type SubscriberOption func(*SubscriberOptions) +type RegisterOption func(*RegisterOptions) + var ( DefaultAddress = ":0" DefaultName = "go-server" diff --git a/service.go b/service.go index 5bdf05df..50d171c9 100644 --- a/service.go +++ b/service.go @@ -1,9 +1,11 @@ package micro import ( + "fmt" "os" "os/signal" "syscall" + "time" "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" @@ -30,6 +32,27 @@ func newService(opts ...Option) Service { } } +func (s *service) run(exit chan bool) { + if s.opts.RegisterInterval <= time.Duration(0) { + return + } + + t := time.NewTicker(s.opts.RegisterInterval) + + for { + select { + case <-t.C: + fmt.Println("heartbeat") + if err := s.opts.Server.Register(server.RegisterTTL(s.opts.RegisterTTL)); err != nil { + fmt.Println("FUCK", err) + } + case <-exit: + t.Stop() + return + } + } +} + func (s *service) Init(opts ...Option) { // We might get more command flags or the action here // This is pretty ugly, find a better way @@ -82,7 +105,7 @@ func (s *service) Start() error { return err } - if err := s.opts.Server.Register(); err != nil { + if err := s.opts.Server.Register(server.RegisterTTL(s.opts.RegisterTTL)); err != nil { return err } @@ -115,10 +138,17 @@ func (s *service) Run() error { return err } + // start reg loop + ex := make(chan bool) + go s.run(ex) + ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) <-ch + // exit reg loop + close(ex) + if err := s.Stop(); err != nil { return err }