Merge pull request #400 from micro/interval
Move RegisterInterval into the server
This commit is contained in:
		| @@ -404,6 +404,10 @@ func (c *cmd) Before(ctx *cli.Context) error { | |||||||
| 		serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second)) | 		serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second)) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 { | ||||||
|  | 		serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// client opts | 	// client opts | ||||||
| 	if r := ctx.Int("client_retries"); r >= 0 { | 	if r := ctx.Int("client_retries"); r >= 0 { | ||||||
| 		clientOpts = append(clientOpts, client.Retries(r)) | 		clientOpts = append(clientOpts, client.Retries(r)) | ||||||
|   | |||||||
| @@ -22,9 +22,6 @@ type Options struct { | |||||||
| 	Registry  registry.Registry | 	Registry  registry.Registry | ||||||
| 	Transport transport.Transport | 	Transport transport.Transport | ||||||
|  |  | ||||||
| 	// Register loop interval |  | ||||||
| 	RegisterInterval time.Duration |  | ||||||
|  |  | ||||||
| 	// Before and After funcs | 	// Before and After funcs | ||||||
| 	BeforeStart []func() error | 	BeforeStart []func() error | ||||||
| 	BeforeStop  []func() error | 	BeforeStop  []func() error | ||||||
| @@ -168,7 +165,7 @@ func RegisterTTL(t time.Duration) Option { | |||||||
| // RegisterInterval specifies the interval on which to re-register | // RegisterInterval specifies the interval on which to re-register | ||||||
| func RegisterInterval(t time.Duration) Option { | func RegisterInterval(t time.Duration) Option { | ||||||
| 	return func(o *Options) { | 	return func(o *Options) { | ||||||
| 		o.RegisterInterval = t | 		o.Server.Init(server.RegisterInterval(t)) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -27,6 +27,8 @@ type Options struct { | |||||||
|  |  | ||||||
| 	// The register expiry time | 	// The register expiry time | ||||||
| 	RegisterTTL time.Duration | 	RegisterTTL time.Duration | ||||||
|  | 	// The interval on which to register | ||||||
|  | 	RegisterInterval time.Duration | ||||||
|  |  | ||||||
| 	// The router for requests | 	// The router for requests | ||||||
| 	Router Router | 	Router Router | ||||||
| @@ -168,6 +170,13 @@ func RegisterTTL(t time.Duration) Option { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Register the service with at interval | ||||||
|  | func RegisterInterval(t time.Duration) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.RegisterInterval = t | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // WithRouter sets the request router | // WithRouter sets the request router | ||||||
| func WithRouter(r Router) Option { | func WithRouter(r Router) Option { | ||||||
| 	return func(o *Options) { | 	return func(o *Options) { | ||||||
|   | |||||||
| @@ -439,6 +439,7 @@ func (s *rpcServer) Start() error { | |||||||
| 	registerDebugHandler(s) | 	registerDebugHandler(s) | ||||||
| 	config := s.Options() | 	config := s.Options() | ||||||
|  |  | ||||||
|  | 	// start listening on the transport | ||||||
| 	ts, err := config.Transport.Listen(config.Address) | 	ts, err := config.Transport.Listen(config.Address) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| @@ -446,30 +447,45 @@ func (s *rpcServer) Start() error { | |||||||
|  |  | ||||||
| 	log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr()) | 	log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr()) | ||||||
|  |  | ||||||
| 	s.Lock() |  | ||||||
| 	// swap address | 	// swap address | ||||||
|  | 	s.Lock() | ||||||
| 	addr := s.opts.Address | 	addr := s.opts.Address | ||||||
| 	s.opts.Address = ts.Addr() | 	s.opts.Address = ts.Addr() | ||||||
| 	s.Unlock() | 	s.Unlock() | ||||||
|  |  | ||||||
| 	exit := make(chan bool, 1) | 	// connect to the broker | ||||||
|  | 	if err := config.Broker.Connect(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address()) | ||||||
|  |  | ||||||
|  | 	// announce self to the world | ||||||
|  | 	if err := s.Register(); err != nil { | ||||||
|  | 		log.Log("Server register error: ", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	exit := make(chan bool) | ||||||
|  |  | ||||||
| 	go func() { | 	go func() { | ||||||
| 		for { | 		for { | ||||||
|  | 			// listen for connections | ||||||
| 			err := ts.Accept(s.ServeConn) | 			err := ts.Accept(s.ServeConn) | ||||||
|  |  | ||||||
| 			// check if we're supposed to exit | 			// TODO: listen for messages | ||||||
|  | 			// msg := broker.Exchange(service).Consume() | ||||||
|  |  | ||||||
| 			select { | 			select { | ||||||
|  | 			// check if we're supposed to exit | ||||||
| 			case <-exit: | 			case <-exit: | ||||||
| 				return | 				return | ||||||
| 			default: |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			// check the error and backoff | 			// check the error and backoff | ||||||
| 			if err != nil { | 			default: | ||||||
| 				log.Logf("Accept error: %v", err) | 				if err != nil { | ||||||
| 				time.Sleep(time.Second) | 					log.Logf("Accept error: %v", err) | ||||||
| 				continue | 					time.Sleep(time.Second) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// no error just exit | 			// no error just exit | ||||||
| @@ -478,9 +494,38 @@ func (s *rpcServer) Start() error { | |||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	go func() { | 	go func() { | ||||||
| 		// wait for exit |  | ||||||
| 		ch := <-s.exit | 		// new ticker | ||||||
| 		exit <- true | 		t := time.NewTicker(s.opts.RegisterInterval) | ||||||
|  |  | ||||||
|  | 		// only process if it exists | ||||||
|  | 		if s.opts.RegisterInterval <= time.Duration(0) { | ||||||
|  | 			t.C = nil | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// return error chan | ||||||
|  | 		var ch chan error | ||||||
|  |  | ||||||
|  | 	Loop: | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			// register self on interval | ||||||
|  | 			case <-t.C: | ||||||
|  | 				if err := s.Register(); err != nil { | ||||||
|  | 					log.Log("Server register error: ", err) | ||||||
|  | 				} | ||||||
|  | 			// wait for exit | ||||||
|  | 			case ch = <-s.exit: | ||||||
|  | 				t.Stop() | ||||||
|  | 				close(exit) | ||||||
|  | 				break Loop | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// deregister self | ||||||
|  | 		if err := s.Deregister(); err != nil { | ||||||
|  | 			log.Log("Server deregister error: ", err) | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		// wait for requests to finish | 		// wait for requests to finish | ||||||
| 		if wait(s.opts.Context) { | 		if wait(s.opts.Context) { | ||||||
| @@ -493,18 +538,12 @@ func (s *rpcServer) Start() error { | |||||||
| 		// disconnect the broker | 		// disconnect the broker | ||||||
| 		config.Broker.Disconnect() | 		config.Broker.Disconnect() | ||||||
|  |  | ||||||
| 		s.Lock() |  | ||||||
| 		// swap back address | 		// swap back address | ||||||
|  | 		s.Lock() | ||||||
| 		s.opts.Address = addr | 		s.opts.Address = addr | ||||||
| 		s.Unlock() | 		s.Unlock() | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	// TODO: subscribe to cruft |  | ||||||
| 	if err := config.Broker.Connect(); err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address()) |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -21,8 +21,6 @@ type Server interface { | |||||||
| 	NewHandler(interface{}, ...HandlerOption) Handler | 	NewHandler(interface{}, ...HandlerOption) Handler | ||||||
| 	NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber | 	NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber | ||||||
| 	Subscribe(Subscriber) error | 	Subscribe(Subscriber) error | ||||||
| 	Register() error |  | ||||||
| 	Deregister() error |  | ||||||
| 	Start() error | 	Start() error | ||||||
| 	Stop() error | 	Stop() error | ||||||
| 	String() string | 	String() string | ||||||
| @@ -173,16 +171,6 @@ func Subscribe(s Subscriber) error { | |||||||
| 	return DefaultServer.Subscribe(s) | 	return DefaultServer.Subscribe(s) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Register registers the default server with the discovery system |  | ||||||
| func Register() error { |  | ||||||
| 	return DefaultServer.Register() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Deregister deregisters the default server from the discovery system |  | ||||||
| func Deregister() error { |  | ||||||
| 	return DefaultServer.Deregister() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Run starts the default server and waits for a kill | // Run starts the default server and waits for a kill | ||||||
| // signal before exiting. Also registers/deregisters the server | // signal before exiting. Also registers/deregisters the server | ||||||
| func Run() error { | func Run() error { | ||||||
| @@ -190,18 +178,10 @@ func Run() error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := DefaultServer.Register(); err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	ch := make(chan os.Signal, 1) | 	ch := make(chan os.Signal, 1) | ||||||
| 	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) | 	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) | ||||||
| 	log.Logf("Received signal %s", <-ch) | 	log.Logf("Received signal %s", <-ch) | ||||||
|  |  | ||||||
| 	if err := DefaultServer.Deregister(); err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return Stop() | 	return Stop() | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										55
									
								
								service.go
									
									
									
									
									
								
							
							
						
						
									
										55
									
								
								service.go
									
									
									
									
									
								
							| @@ -5,10 +5,7 @@ import ( | |||||||
| 	"os/signal" | 	"os/signal" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"syscall" | 	"syscall" | ||||||
| 	"time" |  | ||||||
|  |  | ||||||
| 	"github.com/micro/cli" |  | ||||||
| 	"github.com/micro/go-log" |  | ||||||
| 	"github.com/micro/go-micro/client" | 	"github.com/micro/go-micro/client" | ||||||
| 	"github.com/micro/go-micro/cmd" | 	"github.com/micro/go-micro/cmd" | ||||||
| 	"github.com/micro/go-micro/metadata" | 	"github.com/micro/go-micro/metadata" | ||||||
| @@ -36,27 +33,6 @@ func newService(opts ...Option) Service { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *service) run(exit chan bool) { |  | ||||||
| 	if s.opts.RegisterInterval <= time.Duration(0) { |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	t := time.NewTicker(s.opts.RegisterInterval) |  | ||||||
|  |  | ||||||
| 	for { |  | ||||||
| 		select { |  | ||||||
| 		case <-t.C: |  | ||||||
| 			err := s.opts.Server.Register() |  | ||||||
| 			if err != nil { |  | ||||||
| 				log.Log("service run Server.Register error: ", err) |  | ||||||
| 			} |  | ||||||
| 		case <-exit: |  | ||||||
| 			t.Stop() |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Init initialises options. Additionally it calls cmd.Init | // Init initialises options. Additionally it calls cmd.Init | ||||||
| // which parses command line flags. cmd.Init is only called | // which parses command line flags. cmd.Init is only called | ||||||
| // on first Init. | // on first Init. | ||||||
| @@ -67,20 +43,6 @@ func (s *service) Init(opts ...Option) { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	s.once.Do(func() { | 	s.once.Do(func() { | ||||||
| 		// save user action |  | ||||||
| 		action := s.opts.Cmd.App().Action |  | ||||||
|  |  | ||||||
| 		// set service action |  | ||||||
| 		s.opts.Cmd.App().Action = func(c *cli.Context) { |  | ||||||
| 			// set register interval |  | ||||||
| 			if i := time.Duration(c.GlobalInt("register_interval")); i > 0 { |  | ||||||
| 				s.opts.RegisterInterval = i * time.Second |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			// user action |  | ||||||
| 			action(c) |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		// Initialise the command flags, overriding new service | 		// Initialise the command flags, overriding new service | ||||||
| 		_ = s.opts.Cmd.Init( | 		_ = s.opts.Cmd.Init( | ||||||
| 			cmd.Broker(&s.opts.Broker), | 			cmd.Broker(&s.opts.Broker), | ||||||
| @@ -105,7 +67,7 @@ func (s *service) Server() server.Server { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (s *service) String() string { | func (s *service) String() string { | ||||||
| 	return "go-micro" | 	return "micro" | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *service) Start() error { | func (s *service) Start() error { | ||||||
| @@ -119,10 +81,6 @@ func (s *service) Start() error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := s.opts.Server.Register(); err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for _, fn := range s.opts.AfterStart { | 	for _, fn := range s.opts.AfterStart { | ||||||
| 		if err := fn(); err != nil { | 		if err := fn(); err != nil { | ||||||
| 			return err | 			return err | ||||||
| @@ -141,10 +99,6 @@ func (s *service) Stop() error { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := s.opts.Server.Deregister(); err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if err := s.opts.Server.Stop(); err != nil { | 	if err := s.opts.Server.Stop(); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -163,10 +117,6 @@ func (s *service) Run() error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// start reg loop |  | ||||||
| 	ex := make(chan bool) |  | ||||||
| 	go s.run(ex) |  | ||||||
|  |  | ||||||
| 	ch := make(chan os.Signal, 1) | 	ch := make(chan os.Signal, 1) | ||||||
| 	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) | 	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) | ||||||
|  |  | ||||||
| @@ -177,8 +127,5 @@ func (s *service) Run() error { | |||||||
| 	case <-s.opts.Context.Done(): | 	case <-s.opts.Context.Done(): | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// exit reg loop |  | ||||||
| 	close(ex) |  | ||||||
|  |  | ||||||
| 	return s.Stop() | 	return s.Stop() | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user