Merge pull request #473 from unistack-org/health
add RegisterCheck server option for internal health checks
This commit is contained in:
		| @@ -25,6 +25,8 @@ type Options struct { | |||||||
| 	HdlrWrappers []HandlerWrapper | 	HdlrWrappers []HandlerWrapper | ||||||
| 	SubWrappers  []SubscriberWrapper | 	SubWrappers  []SubscriberWrapper | ||||||
|  |  | ||||||
|  | 	// RegisterCheck runs a check function before registering the service | ||||||
|  | 	RegisterCheck func(context.Context) error | ||||||
| 	// The register expiry time | 	// The register expiry time | ||||||
| 	RegisterTTL time.Duration | 	RegisterTTL time.Duration | ||||||
| 	// The interval on which to register | 	// The interval on which to register | ||||||
| @@ -67,6 +69,10 @@ func newOptions(opt ...Option) Options { | |||||||
| 		opts.DebugHandler = debug.DefaultDebugHandler | 		opts.DebugHandler = debug.DefaultDebugHandler | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if opts.RegisterCheck == nil { | ||||||
|  | 		opts.RegisterCheck = DefaultRegisterCheck | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if len(opts.Address) == 0 { | 	if len(opts.Address) == 0 { | ||||||
| 		opts.Address = DefaultAddress | 		opts.Address = DefaultAddress | ||||||
| 	} | 	} | ||||||
| @@ -163,6 +169,13 @@ func Metadata(md map[string]string) Option { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // RegisterCheck run func before registry service | ||||||
|  | func RegisterCheck(fn func(context.Context) error) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.RegisterCheck = fn | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // Register the service with a TTL | // Register the service with a TTL | ||||||
| func RegisterTTL(t time.Duration) Option { | func RegisterTTL(t time.Duration) Option { | ||||||
| 	return func(o *Options) { | 	return func(o *Options) { | ||||||
|   | |||||||
| @@ -468,9 +468,14 @@ func (s *rpcServer) Start() error { | |||||||
|  |  | ||||||
| 	log.Logf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) | 	log.Logf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) | ||||||
|  |  | ||||||
|  | 	// use RegisterCheck func before register | ||||||
|  | 	if err = s.opts.RegisterCheck(s.opts.Context); err != nil { | ||||||
|  | 		log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err) | ||||||
|  | 	} else { | ||||||
| 		// announce self to the world | 		// announce self to the world | ||||||
| 	if err := s.Register(); err != nil { | 		if err = s.Register(); err != nil { | ||||||
| 		log.Log("Server register error: ", err) | 			log.Log("Server %s-%s register error: %s", config.Name, config.Id, err) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	exit := make(chan bool) | 	exit := make(chan bool) | ||||||
| @@ -518,8 +523,19 @@ func (s *rpcServer) Start() error { | |||||||
| 			select { | 			select { | ||||||
| 			// register self on interval | 			// register self on interval | ||||||
| 			case <-t.C: | 			case <-t.C: | ||||||
|  | 				s.RLock() | ||||||
|  | 				registered := s.registered | ||||||
|  | 				s.RUnlock() | ||||||
|  | 				if err = s.opts.RegisterCheck(s.opts.Context); err != nil && registered { | ||||||
|  | 					log.Logf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err) | ||||||
|  | 					// deregister self in case of error | ||||||
|  | 					if err := s.Deregister(); err != nil { | ||||||
|  | 						log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err) | ||||||
|  | 					} | ||||||
|  | 				} else { | ||||||
| 					if err := s.Register(); err != nil { | 					if err := s.Register(); err != nil { | ||||||
| 					log.Log("Server register error: ", err) | 						log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err) | ||||||
|  | 					} | ||||||
| 				} | 				} | ||||||
| 			// wait for exit | 			// wait for exit | ||||||
| 			case ch = <-s.exit: | 			case ch = <-s.exit: | ||||||
| @@ -531,7 +547,7 @@ func (s *rpcServer) Start() error { | |||||||
|  |  | ||||||
| 		// deregister self | 		// deregister self | ||||||
| 		if err := s.Deregister(); err != nil { | 		if err := s.Deregister(); err != nil { | ||||||
| 			log.Log("Server deregister error: ", err) | 			log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// wait for requests to finish | 		// wait for requests to finish | ||||||
|   | |||||||
| @@ -8,7 +8,7 @@ import ( | |||||||
| 	"syscall" | 	"syscall" | ||||||
|  |  | ||||||
| 	"github.com/google/uuid" | 	"github.com/google/uuid" | ||||||
| 	"github.com/micro/go-log" | 	log "github.com/micro/go-log" | ||||||
| 	"github.com/micro/go-micro/codec" | 	"github.com/micro/go-micro/codec" | ||||||
| 	"github.com/micro/go-micro/registry" | 	"github.com/micro/go-micro/registry" | ||||||
| ) | ) | ||||||
| @@ -121,6 +121,7 @@ var ( | |||||||
| 	DefaultId                   = uuid.New().String() | 	DefaultId                   = uuid.New().String() | ||||||
| 	DefaultServer        Server = newRpcServer() | 	DefaultServer        Server = newRpcServer() | ||||||
| 	DefaultRouter               = newRpcRouter() | 	DefaultRouter               = newRpcRouter() | ||||||
|  | 	DefaultRegisterCheck        = func(context.Context) error { return nil } | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // DefaultOptions returns config options for the default service | // DefaultOptions returns config options for the default service | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user