From a29676b86a540f2efa2c9b4cda95a8cb21d36346 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 20 May 2020 11:49:09 +0100 Subject: [PATCH] Registration Retry / Interval (#1651) * Change the default ttl to 90 seconds * add retries to registration * Add retry to web register --- server/grpc/grpc.go | 33 ++++++++++++++++++++++++++------- server/rpc_server.go | 36 ++++++++++++++++++++++++++++-------- server/server.go | 4 ++-- web/service.go | 20 +++++++++++++++++++- web/web.go | 2 +- 5 files changed, 76 insertions(+), 19 deletions(-) diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index a4501c05..1123fba8 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -22,6 +22,7 @@ import ( "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/server" "github.com/micro/go-micro/v2/util/addr" + "github.com/micro/go-micro/v2/util/backoff" mgrpc "github.com/micro/go-micro/v2/util/grpc" mnet "github.com/micro/go-micro/v2/util/net" "golang.org/x/net/netutil" @@ -566,16 +567,36 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error { } func (g *grpcServer) Register() error { - g.RLock() rsvc := g.rsvc config := g.opts g.RUnlock() + regFunc := func(service *registry.Service) error { + var regErr error + + for i := 0; i < 3; i++ { + // set the ttl + rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} + // attempt to register + if err := config.Registry.Register(service, rOpts...); err != nil { + // set the error + regErr = err + // backoff then retry + time.Sleep(backoff.Do(i + 1)) + continue + } + // success so nil error + regErr = nil + break + } + + return regErr + } + // if service already filled, reuse it and return early if rsvc != nil { - rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} - if err := config.Registry.Register(rsvc, rOpts...); err != nil { + if err := regFunc(rsvc); err != nil { return err } return nil @@ -677,10 +698,8 @@ func (g *grpcServer) Register() error { } } - // create registry options - rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} - - if err := config.Registry.Register(service, rOpts...); err != nil { + // register the service + if err := regFunc(service); err != nil { return err } diff --git a/server/rpc_server.go b/server/rpc_server.go index f1ce3ebb..212d539a 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -20,6 +20,7 @@ import ( "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/transport" "github.com/micro/go-micro/v2/util/addr" + "github.com/micro/go-micro/v2/util/backoff" mnet "github.com/micro/go-micro/v2/util/net" "github.com/micro/go-micro/v2/util/socket" ) @@ -514,18 +515,39 @@ func (s *rpcServer) Subscribe(sb Subscriber) error { } func (s *rpcServer) Register() error { - s.RLock() rsvc := s.rsvc config := s.Options() s.RUnlock() - if rsvc != nil { + regFunc := func(service *registry.Service) error { + // create registry options rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} - if err := config.Registry.Register(rsvc, rOpts...); err != nil { - return err + + var regErr error + + for i := 0; i < 3; i++ { + // attempt to register + if err := config.Registry.Register(service, rOpts...); err != nil { + // set the error + regErr = err + // backoff then retry + time.Sleep(backoff.Do(i + 1)) + continue + } + // success so nil error + regErr = nil + break } + return regErr + } + + // have we registered before? + if rsvc != nil { + if err := regFunc(rsvc); err != nil { + return err + } return nil } @@ -635,10 +657,8 @@ func (s *rpcServer) Register() error { } } - // create registry options - rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} - - if err := config.Registry.Register(service, rOpts...); err != nil { + // register the service + if err := regFunc(service); err != nil { return err } diff --git a/server/server.go b/server/server.go index 136ae6f9..a7c00fca 100644 --- a/server/server.go +++ b/server/server.go @@ -125,7 +125,7 @@ type Handler interface { } // Subscriber interface represents a subscription to a given topic using -// a specific subscriber function or object with endpoints. It mirrors +// a specific subscriber function or object with endpoints. It mirrors // the handler in its behaviour. type Subscriber interface { Topic() string @@ -145,7 +145,7 @@ var ( DefaultRouter = newRpcRouter() DefaultRegisterCheck = func(context.Context) error { return nil } DefaultRegisterInterval = time.Second * 30 - DefaultRegisterTTL = time.Minute + DefaultRegisterTTL = time.Second * 90 // NewServer creates a new server NewServer func(...Option) Server = newRpcServer diff --git a/web/service.go b/web/service.go index aed52eda..bee94814 100644 --- a/web/service.go +++ b/web/service.go @@ -18,6 +18,7 @@ import ( "github.com/micro/go-micro/v2/registry" maddr "github.com/micro/go-micro/v2/util/addr" authutil "github.com/micro/go-micro/v2/util/auth" + "github.com/micro/go-micro/v2/util/backoff" mhttp "github.com/micro/go-micro/v2/util/http" mnet "github.com/micro/go-micro/v2/util/net" signalutil "github.com/micro/go-micro/v2/util/signal" @@ -138,7 +139,24 @@ func (s *service) register() error { return err } - return r.Register(s.srv, registry.RegisterTTL(s.opts.RegisterTTL)) + var regErr error + + // try three times if necessary + for i := 0; i < 3; i++ { + // attempt to register + if err := r.Register(s.srv, registry.RegisterTTL(s.opts.RegisterTTL)); err != nil { + // set the error + regErr = err + // backoff then retry + time.Sleep(backoff.Do(i + 1)) + continue + } + // success so nil error + regErr = nil + break + } + + return regErr } func (s *service) deregister() error { diff --git a/web/web.go b/web/web.go index 72f653b9..6bd9c82b 100644 --- a/web/web.go +++ b/web/web.go @@ -31,7 +31,7 @@ var ( DefaultAddress = ":0" // for registration - DefaultRegisterTTL = time.Minute + DefaultRegisterTTL = time.Second * 90 DefaultRegisterInterval = time.Second * 30 // static directory