add RegisterCheck server option for internal health checks

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2019-05-13 01:39:42 +03:00
parent 4f5ff076d4
commit a13cdfcc34
3 changed files with 43 additions and 13 deletions

View File

@ -25,6 +25,8 @@ type Options struct {
HdlrWrappers []HandlerWrapper HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper SubWrappers []SubscriberWrapper
// RegisterCheck runs a check function before registering the service
RegisterCheck func(context.Context) error
// The register expiry time // The register expiry time
RegisterTTL time.Duration RegisterTTL time.Duration
// The interval on which to register // The interval on which to register
@ -67,6 +69,10 @@ func newOptions(opt ...Option) Options {
opts.DebugHandler = debug.DefaultDebugHandler opts.DebugHandler = debug.DefaultDebugHandler
} }
if opts.RegisterCheck == nil {
opts.RegisterCheck = DefaultRegisterCheck
}
if len(opts.Address) == 0 { if len(opts.Address) == 0 {
opts.Address = DefaultAddress opts.Address = DefaultAddress
} }
@ -163,6 +169,13 @@ func Metadata(md map[string]string) Option {
} }
} }
// RegisterCheck run func before registry service
func RegisterCheck(fn func(context.Context) error) Option {
return func(o *Options) {
o.RegisterCheck = fn
}
}
// Register the service with a TTL // Register the service with a TTL
func RegisterTTL(t time.Duration) Option { func RegisterTTL(t time.Duration) Option {
return func(o *Options) { return func(o *Options) {

View File

@ -468,9 +468,14 @@ func (s *rpcServer) Start() error {
log.Logf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) log.Logf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
// announce self to the world // use RegisterCheck func before register
if err := s.Register(); err != nil { if err = s.opts.RegisterCheck(s.opts.Context); err != nil {
log.Log("Server register error: ", err) log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err)
} else {
// announce self to the world
if err = s.Register(); err != nil {
log.Log("Server %s-%s register error: %s", config.Name, config.Id, err)
}
} }
exit := make(chan bool) exit := make(chan bool)
@ -518,8 +523,19 @@ func (s *rpcServer) Start() error {
select { select {
// register self on interval // register self on interval
case <-t.C: case <-t.C:
if err := s.Register(); err != nil { s.RLock()
log.Log("Server register error: ", err) registered := s.registered
s.RUnlock()
if err = s.opts.RegisterCheck(s.opts.Context); err != nil && registered {
log.Logf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
// deregister self in case of error
if err := s.Deregister(); err != nil {
log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
} else {
if err := s.Register(); err != nil {
log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
} }
// wait for exit // wait for exit
case ch = <-s.exit: case ch = <-s.exit:
@ -531,7 +547,7 @@ func (s *rpcServer) Start() error {
// deregister self // deregister self
if err := s.Deregister(); err != nil { if err := s.Deregister(); err != nil {
log.Log("Server deregister error: ", err) log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
} }
// wait for requests to finish // wait for requests to finish

View File

@ -8,7 +8,7 @@ import (
"syscall" "syscall"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-log" log "github.com/micro/go-log"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
) )
@ -115,12 +115,13 @@ type Subscriber interface {
type Option func(*Options) type Option func(*Options)
var ( var (
DefaultAddress = ":0" DefaultAddress = ":0"
DefaultName = "server" DefaultName = "server"
DefaultVersion = "latest" DefaultVersion = "latest"
DefaultId = uuid.New().String() DefaultId = uuid.New().String()
DefaultServer Server = newRpcServer() DefaultServer Server = newRpcServer()
DefaultRouter = newRpcRouter() DefaultRouter = newRpcRouter()
DefaultRegisterCheck = func(context.Context) error { return nil }
) )
// DefaultOptions returns config options for the default service // DefaultOptions returns config options for the default service