Merge branch 'master' of https://github.com/micro/go-micro into auth-scopes

This commit is contained in:
Ben Toogood 2020-05-21 12:32:52 +01:00
commit c09b871a6b
6 changed files with 86 additions and 20 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/server" "github.com/micro/go-micro/v2/server"
"github.com/micro/go-micro/v2/util/addr" "github.com/micro/go-micro/v2/util/addr"
"github.com/micro/go-micro/v2/util/backoff"
mgrpc "github.com/micro/go-micro/v2/util/grpc" mgrpc "github.com/micro/go-micro/v2/util/grpc"
mnet "github.com/micro/go-micro/v2/util/net" mnet "github.com/micro/go-micro/v2/util/net"
"golang.org/x/net/netutil" "golang.org/x/net/netutil"
@ -566,16 +567,36 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
} }
func (g *grpcServer) Register() error { func (g *grpcServer) Register() error {
g.RLock() g.RLock()
rsvc := g.rsvc rsvc := g.rsvc
config := g.opts config := g.opts
g.RUnlock() g.RUnlock()
regFunc := func(service *registry.Service) error {
var regErr error
for i := 0; i < 3; i++ {
// set the ttl
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
// attempt to register
if err := config.Registry.Register(service, rOpts...); err != nil {
// set the error
regErr = err
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
// success so nil error
regErr = nil
break
}
return regErr
}
// if service already filled, reuse it and return early // if service already filled, reuse it and return early
if rsvc != nil { if rsvc != nil {
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} if err := regFunc(rsvc); err != nil {
if err := config.Registry.Register(rsvc, rOpts...); err != nil {
return err return err
} }
return nil return nil
@ -677,10 +698,8 @@ func (g *grpcServer) Register() error {
} }
} }
// create registry options // register the service
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} if err := regFunc(service); err != nil {
if err := config.Registry.Register(service, rOpts...); err != nil {
return err return err
} }

View File

@ -20,6 +20,7 @@ import (
"github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/transport" "github.com/micro/go-micro/v2/transport"
"github.com/micro/go-micro/v2/util/addr" "github.com/micro/go-micro/v2/util/addr"
"github.com/micro/go-micro/v2/util/backoff"
mnet "github.com/micro/go-micro/v2/util/net" mnet "github.com/micro/go-micro/v2/util/net"
"github.com/micro/go-micro/v2/util/socket" "github.com/micro/go-micro/v2/util/socket"
) )
@ -514,18 +515,39 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
} }
func (s *rpcServer) Register() error { func (s *rpcServer) Register() error {
s.RLock() s.RLock()
rsvc := s.rsvc rsvc := s.rsvc
config := s.Options() config := s.Options()
s.RUnlock() s.RUnlock()
if rsvc != nil { regFunc := func(service *registry.Service) error {
// create registry options
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
if err := config.Registry.Register(rsvc, rOpts...); err != nil {
return err var regErr error
for i := 0; i < 3; i++ {
// attempt to register
if err := config.Registry.Register(service, rOpts...); err != nil {
// set the error
regErr = err
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
// success so nil error
regErr = nil
break
} }
return regErr
}
// have we registered before?
if rsvc != nil {
if err := regFunc(rsvc); err != nil {
return err
}
return nil return nil
} }
@ -635,10 +657,8 @@ func (s *rpcServer) Register() error {
} }
} }
// create registry options // register the service
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} if err := regFunc(service); err != nil {
if err := config.Registry.Register(service, rOpts...); err != nil {
return err return err
} }

View File

@ -125,7 +125,7 @@ type Handler interface {
} }
// Subscriber interface represents a subscription to a given topic using // Subscriber interface represents a subscription to a given topic using
// a specific subscriber function or object with endpoints. It mirrors // a specific subscriber function or object with endpoints. It mirrors
// the handler in its behaviour. // the handler in its behaviour.
type Subscriber interface { type Subscriber interface {
Topic() string Topic() string
@ -145,7 +145,7 @@ var (
DefaultRouter = newRpcRouter() DefaultRouter = newRpcRouter()
DefaultRegisterCheck = func(context.Context) error { return nil } DefaultRegisterCheck = func(context.Context) error { return nil }
DefaultRegisterInterval = time.Second * 30 DefaultRegisterInterval = time.Second * 30
DefaultRegisterTTL = time.Minute DefaultRegisterTTL = time.Second * 90
// NewServer creates a new server // NewServer creates a new server
NewServer func(...Option) Server = newRpcServer NewServer func(...Option) Server = newRpcServer

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/store" "github.com/micro/go-micro/v2/store"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -87,6 +88,10 @@ func (s *sqlStore) createDB(database, table string) error {
} }
func (s *sqlStore) initDB(database, table string) error { func (s *sqlStore) initDB(database, table string) error {
if s.db == nil {
return errors.New("Database connection not initialised")
}
// Create the namespace's database // Create the namespace's database
_, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", database)) _, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", database))
if err != nil { if err != nil {
@ -456,7 +461,11 @@ func NewStore(opts ...store.Option) store.Store {
// mark known databases // mark known databases
s.databases = make(map[string]bool) s.databases = make(map[string]bool)
// best-effort configure the store // best-effort configure the store
s.configure() if err := s.configure(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("Error configuring store ", err)
}
}
// return store // return store
return s return s

View File

@ -18,6 +18,7 @@ import (
"github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/registry"
maddr "github.com/micro/go-micro/v2/util/addr" maddr "github.com/micro/go-micro/v2/util/addr"
authutil "github.com/micro/go-micro/v2/util/auth" authutil "github.com/micro/go-micro/v2/util/auth"
"github.com/micro/go-micro/v2/util/backoff"
mhttp "github.com/micro/go-micro/v2/util/http" mhttp "github.com/micro/go-micro/v2/util/http"
mnet "github.com/micro/go-micro/v2/util/net" mnet "github.com/micro/go-micro/v2/util/net"
signalutil "github.com/micro/go-micro/v2/util/signal" signalutil "github.com/micro/go-micro/v2/util/signal"
@ -138,7 +139,24 @@ func (s *service) register() error {
return err return err
} }
return r.Register(s.srv, registry.RegisterTTL(s.opts.RegisterTTL)) var regErr error
// try three times if necessary
for i := 0; i < 3; i++ {
// attempt to register
if err := r.Register(s.srv, registry.RegisterTTL(s.opts.RegisterTTL)); err != nil {
// set the error
regErr = err
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
// success so nil error
regErr = nil
break
}
return regErr
} }
func (s *service) deregister() error { func (s *service) deregister() error {

View File

@ -31,7 +31,7 @@ var (
DefaultAddress = ":0" DefaultAddress = ":0"
// for registration // for registration
DefaultRegisterTTL = time.Minute DefaultRegisterTTL = time.Second * 90
DefaultRegisterInterval = time.Second * 30 DefaultRegisterInterval = time.Second * 30
// static directory // static directory