Merge pull request #44 from micro/ttl

TTL for Service Registration
This commit is contained in:
Asim 2016-01-27 20:25:29 +00:00
commit cd9801891e
9 changed files with 168 additions and 63 deletions

View File

@ -62,6 +62,8 @@ type httpPublication struct {
var ( var (
DefaultSubPath = "/_sub" DefaultSubPath = "/_sub"
broadcastVersion = "ff.http.broadcast" broadcastVersion = "ff.http.broadcast"
registerTTL = time.Minute
registerInterval = time.Second * 30
) )
func init() { func init() {
@ -148,6 +150,44 @@ func (h *httpSubscriber) Unsubscribe() error {
return nil return nil
} }
func (h *httpBroker) run(l net.Listener) {
t := time.NewTicker(registerInterval)
defer t.Stop()
for {
select {
// heartbeat for each subscriber
case <-t.C:
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.Lock()
h.running = false
h.Unlock()
return
// unsubscribe subscriber
case subscriber := <-h.unsubscribe:
h.Lock()
var subscribers []*httpSubscriber
for _, sub := range h.subscribers[subscriber.topic] {
if sub.id == subscriber.id {
h.r.Deregister(sub.svc)
}
subscribers = append(subscribers, sub)
}
h.subscribers[subscriber.topic] = subscribers
h.Unlock()
}
}
}
func (h *httpBroker) start() error { func (h *httpBroker) start() error {
h.Lock() h.Lock()
defer h.Unlock() defer h.Unlock()
@ -181,39 +221,25 @@ func (h *httpBroker) start() error {
h.address = l.Addr().String() h.address = l.Addr().String()
go http.Serve(l, h) go http.Serve(l, h)
go h.run(l)
go func() {
for {
select {
case ch := <-h.exit:
ch <- l.Close()
h.Lock()
h.running = false
h.Unlock()
return
case subscriber := <-h.unsubscribe:
h.Lock()
var subscribers []*httpSubscriber
for _, sub := range h.subscribers[subscriber.topic] {
if sub.id == subscriber.id {
h.r.Deregister(sub.svc)
}
subscribers = append(subscribers, sub)
}
h.subscribers[subscriber.topic] = subscribers
h.Unlock()
}
}
}()
h.running = true h.running = true
return nil return nil
} }
func (h *httpBroker) stop() error { func (h *httpBroker) stop() error {
h.Lock()
defer h.Unlock()
if !h.running {
return nil
}
ch := make(chan error) ch := make(chan error)
h.exit <- ch h.exit <- ch
return <-ch err := <-ch
h.running = false
return err
} }
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {

View File

@ -1,6 +1,8 @@
package micro package micro
import ( import (
"time"
"github.com/micro/cli" "github.com/micro/cli"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
@ -20,6 +22,9 @@ type Options struct {
Registry registry.Registry Registry registry.Registry
Transport transport.Transport Transport transport.Transport
// Register loop interval
RegisterInterval time.Duration
// Before and After funcs // Before and After funcs
BeforeStart []func() error BeforeStart []func() error
AfterStop []func() error AfterStop []func() error
@ -117,6 +122,18 @@ func Action(a func(*cli.Context)) Option {
} }
} }
func RegisterTTL(t time.Duration) Option {
return func(o *Options) {
o.Server.Init(server.RegisterTTL(t))
}
}
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
}
}
// Before and Afters // Before and Afters
func BeforeStart(fn func() error) Option { func BeforeStart(fn func() error) Option {

View File

@ -160,46 +160,53 @@ func (c *consulRegistry) Deregister(s *Service) error {
} }
node := s.Nodes[0] node := s.Nodes[0]
return c.Client.Agent().ServiceDeregister(node.Id)
_, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{
Node: node.Id,
Address: node.Address,
ServiceID: node.Id,
}, nil)
return err
} }
func (c *consulRegistry) Register(s *Service) error { func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
if len(s.Nodes) == 0 { if len(s.Nodes) == 0 {
return errors.New("Require at least one node") return errors.New("Require at least one node")
} }
var options RegisterOptions
for _, o := range opts {
o(&options)
}
node := s.Nodes[0] node := s.Nodes[0]
tags := encodeMetadata(node.Metadata) tags := encodeMetadata(node.Metadata)
tags = append(tags, encodeEndpoints(s.Endpoints)...) tags = append(tags, encodeEndpoints(s.Endpoints)...)
tags = append(tags, encodeVersion(s.Version)) tags = append(tags, encodeVersion(s.Version))
if _, err := c.Client.Catalog().Register(&consul.CatalogRegistration{ var check *consul.AgentServiceCheck
// TODO: remove setting node and address
Node: node.Id, if options.TTL > time.Duration(0) {
Address: node.Address, check = &consul.AgentServiceCheck{
Service: &consul.AgentService{ TTL: fmt.Sprintf("%v", options.TTL),
}
}
if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
ID: node.Id, ID: node.Id,
Service: s.Name, Name: s.Name,
Port: node.Port,
Tags: tags, Tags: tags,
Port: node.Port,
Address: node.Address, Address: node.Address,
}, Check: check,
}, nil); err != nil { }); err != nil {
return err return err
} }
if options.TTL == time.Duration(0) {
return nil return nil
} }
return c.Client.Agent().PassTTL("service:"+node.Id, "")
}
func (c *consulRegistry) GetService(name string) ([]*Service, error) { func (c *consulRegistry) GetService(name string) ([]*Service, error) {
rsp, _, err := c.Client.Catalog().Service(name, "", nil) rsp, _, err := c.Client.Health().Service(name, "", true, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -207,37 +214,37 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
serviceMap := map[string]*Service{} serviceMap := map[string]*Service{}
for _, s := range rsp { for _, s := range rsp {
if s.ServiceName != name { if s.Service.Service != name {
continue continue
} }
// version is now a tag // version is now a tag
version, found := decodeVersion(s.ServiceTags) version, found := decodeVersion(s.Service.Tags)
// service ID is now the node id // service ID is now the node id
id := s.ServiceID id := s.Service.ID
// key is always the version // key is always the version
key := version key := version
// address is service address // address is service address
address := s.ServiceAddress address := s.Service.Address
// if we can't get the new type of version // if we can't get the new type of version
// use old the old ways // use old the old ways
if !found { if !found {
// id was set as node // id was set as node
id = s.Node id = s.Node.Node
// key was service id // key was service id
key = s.ServiceID key = s.Service.ID
// version was service id // version was service id
version = s.ServiceID version = s.Service.ID
// address was address // address was address
address = s.Address address = s.Node.Address
} }
svc, ok := serviceMap[key] svc, ok := serviceMap[key]
if !ok { if !ok {
svc = &Service{ svc = &Service{
Endpoints: decodeEndpoints(s.ServiceTags), Endpoints: decodeEndpoints(s.Service.Tags),
Name: s.ServiceName, Name: s.Service.Service,
Version: version, Version: version,
} }
serviceMap[key] = svc serviceMap[key] = svc
@ -246,8 +253,8 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
svc.Nodes = append(svc.Nodes, &Node{ svc.Nodes = append(svc.Nodes, &Node{
Id: id, Id: id,
Address: address, Address: address,
Port: s.ServicePort, Port: s.Service.Port,
Metadata: decodeMetadata(s.ServiceTags), Metadata: decodeMetadata(s.Service.Tags),
}) })
} }

View File

@ -53,7 +53,7 @@ func (m *MockRegistry) ListServices() ([]*registry.Service, error) {
return []*registry.Service{}, nil return []*registry.Service{}, nil
} }
func (m *MockRegistry) Register(s *registry.Service) error { func (m *MockRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
return nil return nil
} }

View File

@ -17,6 +17,13 @@ type Options struct {
Context context.Context Context context.Context
} }
type RegisterOptions struct {
TTL time.Duration
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
func Timeout(t time.Duration) Option { func Timeout(t time.Duration) Option {
return func(o *Options) { return func(o *Options) {
o.Timeout = t o.Timeout = t
@ -36,3 +43,9 @@ func TLSConfig(t *tls.Config) Option {
o.TLSConfig = t o.TLSConfig = t
} }
} }
func RegisterTTL(t time.Duration) RegisterOption {
return func(o *RegisterOptions) {
o.TTL = t
}
}

View File

@ -1,7 +1,7 @@
package registry package registry
type Registry interface { type Registry interface {
Register(*Service) error Register(*Service, ...RegisterOption) error
Deregister(*Service) error Deregister(*Service) error
GetService(string) ([]*Service, error) GetService(string) ([]*Service, error)
ListServices() ([]*Service, error) ListServices() ([]*Service, error)
@ -11,6 +11,8 @@ type Registry interface {
type Option func(*Options) type Option func(*Options)
type RegisterOption func(*RegisterOptions)
var ( var (
DefaultRegistry = newConsulRegistry([]string{}) DefaultRegistry = newConsulRegistry([]string{})
) )
@ -19,8 +21,8 @@ func NewRegistry(addrs []string, opt ...Option) Registry {
return newConsulRegistry(addrs, opt...) return newConsulRegistry(addrs, opt...)
} }
func Register(s *Service) error { func Register(s *Service, opts ...RegisterOption) error {
return DefaultRegistry.Register(s) return DefaultRegistry.Register(s, opts...)
} }
func Deregister(s *Service) error { func Deregister(s *Service) error {

View File

@ -1,6 +1,8 @@
package server package server
import ( import (
"time"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
@ -24,6 +26,8 @@ type Options struct {
HdlrWrappers []HandlerWrapper HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper SubWrappers []SubscriberWrapper
RegisterTTL time.Duration
// Debug Handler which can be set by a user // Debug Handler which can be set by a user
DebugHandler debug.DebugHandler DebugHandler debug.DebugHandler
@ -154,6 +158,13 @@ func Metadata(md map[string]string) Option {
} }
} }
// Register the service with a TTL
func RegisterTTL(t time.Duration) Option {
return func(o *Options) {
o.RegisterTTL = t
}
}
// Adds a handler Wrapper to a list of options passed into the server // Adds a handler Wrapper to a list of options passed into the server
func WrapHandler(w HandlerWrapper) Option { func WrapHandler(w HandlerWrapper) Option {
return func(o *Options) { return func(o *Options) {

View File

@ -220,7 +220,10 @@ func (s *rpcServer) Register() error {
} }
log.Infof("Registering node: %s", node.Id) log.Infof("Registering node: %s", node.Id)
if err := config.Registry.Register(service); err != nil { // create registry options
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
if err := config.Registry.Register(service, rOpts...); err != nil {
return err return err
} }

View File

@ -4,6 +4,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd" "github.com/micro/go-micro/cmd"
@ -30,6 +31,24 @@ func newService(opts ...Option) Service {
} }
} }
func (s *service) run(exit chan bool) {
if s.opts.RegisterInterval <= time.Duration(0) {
return
}
t := time.NewTicker(s.opts.RegisterInterval)
for {
select {
case <-t.C:
s.opts.Server.Register()
case <-exit:
t.Stop()
return
}
}
}
func (s *service) Init(opts ...Option) { func (s *service) Init(opts ...Option) {
// We might get more command flags or the action here // We might get more command flags or the action here
// This is pretty ugly, find a better way // This is pretty ugly, find a better way
@ -115,10 +134,17 @@ func (s *service) Run() error {
return err return err
} }
// start reg loop
ex := make(chan bool)
go s.run(ex)
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
<-ch <-ch
// exit reg loop
close(ex)
if err := s.Stop(); err != nil { if err := s.Stop(); err != nil {
return err return err
} }