Move RegisterInterval into the server

This commit is contained in:
Asim Aslam 2019-01-24 13:22:17 +00:00
parent 8090f9968d
commit 539b8c1a3b
6 changed files with 74 additions and 98 deletions

View File

@ -404,6 +404,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second)) serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
} }
if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 {
serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second))
}
// client opts // client opts
if r := ctx.Int("client_retries"); r >= 0 { if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r)) clientOpts = append(clientOpts, client.Retries(r))

View File

@ -22,9 +22,6 @@ type Options struct {
Registry registry.Registry Registry registry.Registry
Transport transport.Transport Transport transport.Transport
// Register loop interval
RegisterInterval time.Duration
// Before and After funcs // Before and After funcs
BeforeStart []func() error BeforeStart []func() error
BeforeStop []func() error BeforeStop []func() error
@ -168,7 +165,7 @@ func RegisterTTL(t time.Duration) Option {
// RegisterInterval specifies the interval on which to re-register // RegisterInterval specifies the interval on which to re-register
func RegisterInterval(t time.Duration) Option { func RegisterInterval(t time.Duration) Option {
return func(o *Options) { return func(o *Options) {
o.RegisterInterval = t o.Server.Init(server.RegisterInterval(t))
} }
} }

View File

@ -27,6 +27,8 @@ type Options struct {
// The register expiry time // The register expiry time
RegisterTTL time.Duration RegisterTTL time.Duration
// The interval on which to register
RegisterInterval time.Duration
// The router for requests // The router for requests
Router Router Router Router
@ -168,6 +170,13 @@ func RegisterTTL(t time.Duration) Option {
} }
} }
// Register the service with at interval
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
}
}
// WithRouter sets the request router // WithRouter sets the request router
func WithRouter(r Router) Option { func WithRouter(r Router) Option {
return func(o *Options) { return func(o *Options) {

View File

@ -436,6 +436,7 @@ func (s *rpcServer) Start() error {
registerDebugHandler(s) registerDebugHandler(s)
config := s.Options() config := s.Options()
// start listening on the transport
ts, err := config.Transport.Listen(config.Address) ts, err := config.Transport.Listen(config.Address)
if err != nil { if err != nil {
return err return err
@ -443,30 +444,45 @@ func (s *rpcServer) Start() error {
log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr()) log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
s.Lock()
// swap address // swap address
s.Lock()
addr := s.opts.Address addr := s.opts.Address
s.opts.Address = ts.Addr() s.opts.Address = ts.Addr()
s.Unlock() s.Unlock()
exit := make(chan bool, 1) // connect to the broker
if err := config.Broker.Connect(); err != nil {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
// announce self to the world
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
exit := make(chan bool)
go func() { go func() {
for { for {
// listen for connections
err := ts.Accept(s.ServeConn) err := ts.Accept(s.ServeConn)
// check if we're supposed to exit // TODO: listen for messages
// msg := broker.Exchange(service).Consume()
select { select {
// check if we're supposed to exit
case <-exit: case <-exit:
return return
default:
}
// check the error and backoff // check the error and backoff
if err != nil { default:
log.Logf("Accept error: %v", err) if err != nil {
time.Sleep(time.Second) log.Logf("Accept error: %v", err)
continue time.Sleep(time.Second)
continue
}
} }
// no error just exit // no error just exit
@ -475,9 +491,38 @@ func (s *rpcServer) Start() error {
}() }()
go func() { go func() {
// wait for exit
ch := <-s.exit // new ticker
exit <- true t := time.NewTicker(s.opts.RegisterInterval)
// only process if it exists
if s.opts.RegisterInterval <= time.Duration(0) {
t.C = nil
}
// return error chan
var ch chan error
Loop:
for {
select {
// register self on interval
case <-t.C:
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
// wait for exit
case ch = <-s.exit:
t.Stop()
close(exit)
break Loop
}
}
// deregister self
if err := s.Deregister(); err != nil {
log.Log("Server deregister error: ", err)
}
// wait for requests to finish // wait for requests to finish
if wait(s.opts.Context) { if wait(s.opts.Context) {
@ -490,18 +535,12 @@ func (s *rpcServer) Start() error {
// disconnect the broker // disconnect the broker
config.Broker.Disconnect() config.Broker.Disconnect()
s.Lock()
// swap back address // swap back address
s.Lock()
s.opts.Address = addr s.opts.Address = addr
s.Unlock() s.Unlock()
}() }()
// TODO: subscribe to cruft
if err := config.Broker.Connect(); err != nil {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
return nil return nil
} }

View File

@ -21,8 +21,6 @@ type Server interface {
NewHandler(interface{}, ...HandlerOption) Handler NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error Subscribe(Subscriber) error
Register() error
Deregister() error
Start() error Start() error
Stop() error Stop() error
String() string String() string
@ -177,16 +175,6 @@ func Subscribe(s Subscriber) error {
return DefaultServer.Subscribe(s) return DefaultServer.Subscribe(s)
} }
// Register registers the default server with the discovery system
func Register() error {
return DefaultServer.Register()
}
// Deregister deregisters the default server from the discovery system
func Deregister() error {
return DefaultServer.Deregister()
}
// Run starts the default server and waits for a kill // Run starts the default server and waits for a kill
// signal before exiting. Also registers/deregisters the server // signal before exiting. Also registers/deregisters the server
func Run() error { func Run() error {
@ -194,18 +182,10 @@ func Run() error {
return err return err
} }
if err := DefaultServer.Register(); err != nil {
return err
}
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
log.Logf("Received signal %s", <-ch) log.Logf("Received signal %s", <-ch)
if err := DefaultServer.Deregister(); err != nil {
return err
}
return Stop() return Stop()
} }

View File

@ -5,10 +5,7 @@ import (
"os/signal" "os/signal"
"sync" "sync"
"syscall" "syscall"
"time"
"github.com/micro/cli"
"github.com/micro/go-log"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd" "github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
@ -36,27 +33,6 @@ func newService(opts ...Option) Service {
} }
} }
func (s *service) run(exit chan bool) {
if s.opts.RegisterInterval <= time.Duration(0) {
return
}
t := time.NewTicker(s.opts.RegisterInterval)
for {
select {
case <-t.C:
err := s.opts.Server.Register()
if err != nil {
log.Log("service run Server.Register error: ", err)
}
case <-exit:
t.Stop()
return
}
}
}
// Init initialises options. Additionally it calls cmd.Init // Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called // which parses command line flags. cmd.Init is only called
// on first Init. // on first Init.
@ -67,20 +43,6 @@ func (s *service) Init(opts ...Option) {
} }
s.once.Do(func() { s.once.Do(func() {
// save user action
action := s.opts.Cmd.App().Action
// set service action
s.opts.Cmd.App().Action = func(c *cli.Context) {
// set register interval
if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
s.opts.RegisterInterval = i * time.Second
}
// user action
action(c)
}
// Initialise the command flags, overriding new service // Initialise the command flags, overriding new service
_ = s.opts.Cmd.Init( _ = s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker), cmd.Broker(&s.opts.Broker),
@ -105,7 +67,7 @@ func (s *service) Server() server.Server {
} }
func (s *service) String() string { func (s *service) String() string {
return "go-micro" return "micro"
} }
func (s *service) Start() error { func (s *service) Start() error {
@ -119,10 +81,6 @@ func (s *service) Start() error {
return err return err
} }
if err := s.opts.Server.Register(); err != nil {
return err
}
for _, fn := range s.opts.AfterStart { for _, fn := range s.opts.AfterStart {
if err := fn(); err != nil { if err := fn(); err != nil {
return err return err
@ -141,10 +99,6 @@ func (s *service) Stop() error {
} }
} }
if err := s.opts.Server.Deregister(); err != nil {
return err
}
if err := s.opts.Server.Stop(); err != nil { if err := s.opts.Server.Stop(); err != nil {
return err return err
} }
@ -163,10 +117,6 @@ func (s *service) Run() error {
return err return err
} }
// start reg loop
ex := make(chan bool)
go s.run(ex)
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
@ -177,8 +127,5 @@ func (s *service) Run() error {
case <-s.opts.Context.Done(): case <-s.opts.Context.Done():
} }
// exit reg loop
close(ex)
return s.Stop() return s.Stop()
} }