From 539b8c1a3b5e4ac2cee45166e7ab4e24beb202f1 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 24 Jan 2019 13:22:17 +0000 Subject: [PATCH] Move RegisterInterval into the server --- cmd/cmd.go | 4 +++ options.go | 5 +-- server/options.go | 9 +++++ server/rpc_server.go | 79 +++++++++++++++++++++++++++++++++----------- server/server.go | 20 ----------- service.go | 55 +----------------------------- 6 files changed, 74 insertions(+), 98 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 41dcdc43..f1e80a4a 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -404,6 +404,10 @@ func (c *cmd) Before(ctx *cli.Context) error { serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second)) } + if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 { + serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second)) + } + // client opts if r := ctx.Int("client_retries"); r >= 0 { clientOpts = append(clientOpts, client.Retries(r)) diff --git a/options.go b/options.go index ae2008a2..9afb7085 100644 --- a/options.go +++ b/options.go @@ -22,9 +22,6 @@ type Options struct { Registry registry.Registry Transport transport.Transport - // Register loop interval - RegisterInterval time.Duration - // Before and After funcs BeforeStart []func() error BeforeStop []func() error @@ -168,7 +165,7 @@ func RegisterTTL(t time.Duration) Option { // RegisterInterval specifies the interval on which to re-register func RegisterInterval(t time.Duration) Option { return func(o *Options) { - o.RegisterInterval = t + o.Server.Init(server.RegisterInterval(t)) } } diff --git a/server/options.go b/server/options.go index f1355122..6acb5960 100644 --- a/server/options.go +++ b/server/options.go @@ -27,6 +27,8 @@ type Options struct { // The register expiry time RegisterTTL time.Duration + // The interval on which to register + RegisterInterval time.Duration // The router for requests Router Router @@ -168,6 +170,13 @@ func RegisterTTL(t time.Duration) Option { } } +// Register the service with at interval +func RegisterInterval(t time.Duration) Option { + return func(o *Options) { + o.RegisterInterval = t + } +} + // WithRouter sets the request router func WithRouter(r Router) Option { return func(o *Options) { diff --git a/server/rpc_server.go b/server/rpc_server.go index 0ba11836..13638c67 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -436,6 +436,7 @@ func (s *rpcServer) Start() error { registerDebugHandler(s) config := s.Options() + // start listening on the transport ts, err := config.Transport.Listen(config.Address) if err != nil { return err @@ -443,30 +444,45 @@ func (s *rpcServer) Start() error { log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr()) - s.Lock() // swap address + s.Lock() addr := s.opts.Address s.opts.Address = ts.Addr() s.Unlock() - exit := make(chan bool, 1) + // connect to the broker + if err := config.Broker.Connect(); err != nil { + return err + } + + log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address()) + + // announce self to the world + if err := s.Register(); err != nil { + log.Log("Server register error: ", err) + } + + exit := make(chan bool) go func() { for { + // listen for connections err := ts.Accept(s.ServeConn) - // check if we're supposed to exit + // TODO: listen for messages + // msg := broker.Exchange(service).Consume() + select { + // check if we're supposed to exit case <-exit: return - default: - } - // check the error and backoff - if err != nil { - log.Logf("Accept error: %v", err) - time.Sleep(time.Second) - continue + default: + if err != nil { + log.Logf("Accept error: %v", err) + time.Sleep(time.Second) + continue + } } // no error just exit @@ -475,9 +491,38 @@ func (s *rpcServer) Start() error { }() go func() { - // wait for exit - ch := <-s.exit - exit <- true + + // new ticker + t := time.NewTicker(s.opts.RegisterInterval) + + // only process if it exists + if s.opts.RegisterInterval <= time.Duration(0) { + t.C = nil + } + + // return error chan + var ch chan error + + Loop: + for { + select { + // register self on interval + case <-t.C: + if err := s.Register(); err != nil { + log.Log("Server register error: ", err) + } + // wait for exit + case ch = <-s.exit: + t.Stop() + close(exit) + break Loop + } + } + + // deregister self + if err := s.Deregister(); err != nil { + log.Log("Server deregister error: ", err) + } // wait for requests to finish if wait(s.opts.Context) { @@ -490,18 +535,12 @@ func (s *rpcServer) Start() error { // disconnect the broker config.Broker.Disconnect() - s.Lock() // swap back address + s.Lock() s.opts.Address = addr s.Unlock() }() - // TODO: subscribe to cruft - if err := config.Broker.Connect(); err != nil { - return err - } - - log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address()) return nil } diff --git a/server/server.go b/server/server.go index 82113843..74d9146e 100644 --- a/server/server.go +++ b/server/server.go @@ -21,8 +21,6 @@ type Server interface { NewHandler(interface{}, ...HandlerOption) Handler NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber Subscribe(Subscriber) error - Register() error - Deregister() error Start() error Stop() error String() string @@ -177,16 +175,6 @@ func Subscribe(s Subscriber) error { return DefaultServer.Subscribe(s) } -// Register registers the default server with the discovery system -func Register() error { - return DefaultServer.Register() -} - -// Deregister deregisters the default server from the discovery system -func Deregister() error { - return DefaultServer.Deregister() -} - // Run starts the default server and waits for a kill // signal before exiting. Also registers/deregisters the server func Run() error { @@ -194,18 +182,10 @@ func Run() error { return err } - if err := DefaultServer.Register(); err != nil { - return err - } - ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) log.Logf("Received signal %s", <-ch) - if err := DefaultServer.Deregister(); err != nil { - return err - } - return Stop() } diff --git a/service.go b/service.go index e884283d..6b7c6508 100644 --- a/service.go +++ b/service.go @@ -5,10 +5,7 @@ import ( "os/signal" "sync" "syscall" - "time" - "github.com/micro/cli" - "github.com/micro/go-log" "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" "github.com/micro/go-micro/metadata" @@ -36,27 +33,6 @@ func newService(opts ...Option) Service { } } -func (s *service) run(exit chan bool) { - if s.opts.RegisterInterval <= time.Duration(0) { - return - } - - t := time.NewTicker(s.opts.RegisterInterval) - - for { - select { - case <-t.C: - err := s.opts.Server.Register() - if err != nil { - log.Log("service run Server.Register error: ", err) - } - case <-exit: - t.Stop() - return - } - } -} - // Init initialises options. Additionally it calls cmd.Init // which parses command line flags. cmd.Init is only called // on first Init. @@ -67,20 +43,6 @@ func (s *service) Init(opts ...Option) { } s.once.Do(func() { - // save user action - action := s.opts.Cmd.App().Action - - // set service action - s.opts.Cmd.App().Action = func(c *cli.Context) { - // set register interval - if i := time.Duration(c.GlobalInt("register_interval")); i > 0 { - s.opts.RegisterInterval = i * time.Second - } - - // user action - action(c) - } - // Initialise the command flags, overriding new service _ = s.opts.Cmd.Init( cmd.Broker(&s.opts.Broker), @@ -105,7 +67,7 @@ func (s *service) Server() server.Server { } func (s *service) String() string { - return "go-micro" + return "micro" } func (s *service) Start() error { @@ -119,10 +81,6 @@ func (s *service) Start() error { return err } - if err := s.opts.Server.Register(); err != nil { - return err - } - for _, fn := range s.opts.AfterStart { if err := fn(); err != nil { return err @@ -141,10 +99,6 @@ func (s *service) Stop() error { } } - if err := s.opts.Server.Deregister(); err != nil { - return err - } - if err := s.opts.Server.Stop(); err != nil { return err } @@ -163,10 +117,6 @@ func (s *service) Run() error { return err } - // start reg loop - ex := make(chan bool) - go s.run(ex) - ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) @@ -177,8 +127,5 @@ func (s *service) Run() error { case <-s.opts.Context.Done(): } - // exit reg loop - close(ex) - return s.Stop() }