Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
2b3c413adc | |||
20fb19fee9 |
97
grpc.go
97
grpc.go
@@ -63,12 +63,12 @@ type Server struct {
|
|||||||
rpc *rServer
|
rpc *rServer
|
||||||
opts server.Options
|
opts server.Options
|
||||||
unknownHandler grpc.StreamHandler
|
unknownHandler grpc.StreamHandler
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
stateLive *atomic.Uint32
|
stateLive *atomic.Uint32
|
||||||
stateReady *atomic.Uint32
|
stateReady *atomic.Uint32
|
||||||
stateHealth *atomic.Uint32
|
stateHealth *atomic.Uint32
|
||||||
started bool
|
started bool
|
||||||
registered bool
|
registered bool
|
||||||
// reflection bool
|
// reflection bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,8 +92,8 @@ func newServer(opts ...server.Option) *Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) configure(opts ...server.Option) error {
|
func (g *Server) configure(opts ...server.Option) error {
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
defer g.Unlock()
|
defer g.mu.Unlock()
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&g.opts)
|
o(&g.opts)
|
||||||
@@ -404,7 +404,16 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
|
var err error
|
||||||
var errStatus *status.Status
|
var errStatus *status.Status
|
||||||
|
var ok bool
|
||||||
|
errStatus, ok = status.FromError(appErr)
|
||||||
|
if ok {
|
||||||
|
return errStatus.Err()
|
||||||
|
}
|
||||||
|
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
|
||||||
|
return errStatus.Err()
|
||||||
|
}
|
||||||
switch verr := appErr.(type) {
|
switch verr := appErr.(type) {
|
||||||
case *errors.Error:
|
case *errors.Error:
|
||||||
statusCode = microError(verr)
|
statusCode = microError(verr)
|
||||||
@@ -418,12 +427,10 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case (interface{ GRPCStatus() *status.Status }):
|
|
||||||
errStatus = verr.GRPCStatus()
|
|
||||||
default:
|
default:
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
config := g.opts
|
config := g.opts
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
|
config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
|
||||||
}
|
}
|
||||||
@@ -490,6 +497,14 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
|
|||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
var err error
|
var err error
|
||||||
var errStatus *status.Status
|
var errStatus *status.Status
|
||||||
|
var ok bool
|
||||||
|
errStatus, ok = status.FromError(appErr)
|
||||||
|
if ok {
|
||||||
|
return errStatus.Err()
|
||||||
|
}
|
||||||
|
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
|
||||||
|
return errStatus.Err()
|
||||||
|
}
|
||||||
switch verr := appErr.(type) {
|
switch verr := appErr.(type) {
|
||||||
case *errors.Error:
|
case *errors.Error:
|
||||||
statusCode = microError(verr)
|
statusCode = microError(verr)
|
||||||
@@ -520,9 +535,9 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) Options() server.Options {
|
func (g *Server) Options() server.Options {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
opts := g.opts
|
opts := g.opts
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
return opts
|
return opts
|
||||||
}
|
}
|
||||||
@@ -545,10 +560,10 @@ func (g *Server) Handle(h server.Handler) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) Register() error {
|
func (g *Server) Register() error {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
rsvc := g.rsvc
|
rsvc := g.rsvc
|
||||||
config := g.opts
|
config := g.opts
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
// if service already filled, reuse it and return early
|
// if service already filled, reuse it and return early
|
||||||
if rsvc != nil {
|
if rsvc != nil {
|
||||||
@@ -563,7 +578,7 @@ func (g *Server) Register() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
// Maps are ordered randomly, sort the keys for consistency
|
// Maps are ordered randomly, sort the keys for consistency
|
||||||
handlerList := make([]string, 0, len(g.handlers))
|
handlerList := make([]string, 0, len(g.handlers))
|
||||||
for n := range g.handlers {
|
for n := range g.handlers {
|
||||||
@@ -573,11 +588,11 @@ func (g *Server) Register() error {
|
|||||||
|
|
||||||
sort.Strings(handlerList)
|
sort.Strings(handlerList)
|
||||||
|
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
registered := g.registered
|
registered := g.registered
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
if !registered {
|
if !registered {
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
@@ -595,8 +610,8 @@ func (g *Server) Register() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
defer g.Unlock()
|
defer g.mu.Unlock()
|
||||||
|
|
||||||
g.registered = true
|
g.registered = true
|
||||||
g.rsvc = service
|
g.rsvc = service
|
||||||
@@ -607,9 +622,9 @@ func (g *Server) Register() error {
|
|||||||
func (g *Server) Deregister() error {
|
func (g *Server) Deregister() error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
config := g.opts
|
config := g.opts
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
service, err := server.NewRegisterService(g)
|
service, err := server.NewRegisterService(g)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -624,27 +639,27 @@ func (g *Server) Deregister() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.rsvc = nil
|
g.rsvc = nil
|
||||||
|
|
||||||
if !g.registered {
|
if !g.registered {
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
g.registered = false
|
g.registered = false
|
||||||
|
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) Start() error {
|
func (g *Server) Start() error {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
if g.started {
|
if g.started {
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
config := g.Options()
|
config := g.Options()
|
||||||
|
|
||||||
@@ -674,12 +689,12 @@ func (g *Server) Start() error {
|
|||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String())
|
config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String())
|
||||||
}
|
}
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.opts.Address = ts.Addr().String()
|
g.opts.Address = ts.Addr().String()
|
||||||
if len(g.opts.Advertise) == 0 {
|
if len(g.opts.Advertise) == 0 {
|
||||||
g.opts.Advertise = ts.Addr().String()
|
g.opts.Advertise = ts.Addr().String()
|
||||||
}
|
}
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
|
|
||||||
// use RegisterCheck func before register
|
// use RegisterCheck func before register
|
||||||
// nolint: nestif
|
// nolint: nestif
|
||||||
@@ -730,9 +745,9 @@ func (g *Server) Start() error {
|
|||||||
select {
|
select {
|
||||||
// register self on interval
|
// register self on interval
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
registered := g.registered
|
registered := g.registered
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
rerr := g.opts.RegisterCheck(g.opts.Context)
|
rerr := g.opts.RegisterCheck(g.opts.Context)
|
||||||
// nolint: nestif
|
// nolint: nestif
|
||||||
if rerr != nil && registered {
|
if rerr != nil && registered {
|
||||||
@@ -809,29 +824,29 @@ func (g *Server) Start() error {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// mark the server as started
|
// mark the server as started
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.started = true
|
g.started = true
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) Stop() error {
|
func (g *Server) Stop() error {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
if !g.started {
|
if !g.started {
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
ch := make(chan error)
|
ch := make(chan error)
|
||||||
g.exit <- ch
|
g.exit <- ch
|
||||||
|
|
||||||
err := <-ch
|
err := <-ch
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.rsvc = nil
|
g.rsvc = nil
|
||||||
g.started = false
|
g.started = false
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user