Compare commits

..

2 Commits

Author SHA1 Message Date
2b3c413adc fixup grpc error codes in unary and stream processing
All checks were successful
sync / sync (push) Successful in 26s
coverage / build (push) Successful in 1m51s
test / test (push) Successful in 2m40s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-06-08 11:16:43 +03:00
20fb19fee9 changed embedded mutex to private field (#278)
Some checks failed
coverage / build (push) Successful in 2m26s
test / test (push) Failing after 16m18s
sync / sync (push) Successful in 8s
2025-05-14 01:25:57 +03:00

97
grpc.go
View File

@@ -63,12 +63,12 @@ type Server struct {
rpc *rServer rpc *rServer
opts server.Options opts server.Options
unknownHandler grpc.StreamHandler unknownHandler grpc.StreamHandler
sync.RWMutex mu sync.RWMutex
stateLive *atomic.Uint32 stateLive *atomic.Uint32
stateReady *atomic.Uint32 stateReady *atomic.Uint32
stateHealth *atomic.Uint32 stateHealth *atomic.Uint32
started bool started bool
registered bool registered bool
// reflection bool // reflection bool
} }
@@ -92,8 +92,8 @@ func newServer(opts ...server.Option) *Server {
} }
func (g *Server) configure(opts ...server.Option) error { func (g *Server) configure(opts ...server.Option) error {
g.Lock() g.mu.Lock()
defer g.Unlock() defer g.mu.Unlock()
for _, o := range opts { for _, o := range opts {
o(&g.opts) o(&g.opts)
@@ -404,7 +404,16 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
} }
} }
if appErr != nil { if appErr != nil {
var err error
var errStatus *status.Status var errStatus *status.Status
var ok bool
errStatus, ok = status.FromError(appErr)
if ok {
return errStatus.Err()
}
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
return errStatus.Err()
}
switch verr := appErr.(type) { switch verr := appErr.(type) {
case *errors.Error: case *errors.Error:
statusCode = microError(verr) statusCode = microError(verr)
@@ -418,12 +427,10 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
if err != nil { if err != nil {
return err return err
} }
case (interface{ GRPCStatus() *status.Status }):
errStatus = verr.GRPCStatus()
default: default:
g.RLock() g.mu.RLock()
config := g.opts config := g.opts
g.RUnlock() g.mu.RUnlock()
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message") config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
} }
@@ -490,6 +497,14 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
if appErr != nil { if appErr != nil {
var err error var err error
var errStatus *status.Status var errStatus *status.Status
var ok bool
errStatus, ok = status.FromError(appErr)
if ok {
return errStatus.Err()
}
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
return errStatus.Err()
}
switch verr := appErr.(type) { switch verr := appErr.(type) {
case *errors.Error: case *errors.Error:
statusCode = microError(verr) statusCode = microError(verr)
@@ -520,9 +535,9 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
} }
func (g *Server) Options() server.Options { func (g *Server) Options() server.Options {
g.RLock() g.mu.RLock()
opts := g.opts opts := g.opts
g.RUnlock() g.mu.RUnlock()
return opts return opts
} }
@@ -545,10 +560,10 @@ func (g *Server) Handle(h server.Handler) error {
} }
func (g *Server) Register() error { func (g *Server) Register() error {
g.RLock() g.mu.RLock()
rsvc := g.rsvc rsvc := g.rsvc
config := g.opts config := g.opts
g.RUnlock() g.mu.RUnlock()
// if service already filled, reuse it and return early // if service already filled, reuse it and return early
if rsvc != nil { if rsvc != nil {
@@ -563,7 +578,7 @@ func (g *Server) Register() error {
return err return err
} }
g.RLock() g.mu.RLock()
// Maps are ordered randomly, sort the keys for consistency // Maps are ordered randomly, sort the keys for consistency
handlerList := make([]string, 0, len(g.handlers)) handlerList := make([]string, 0, len(g.handlers))
for n := range g.handlers { for n := range g.handlers {
@@ -573,11 +588,11 @@ func (g *Server) Register() error {
sort.Strings(handlerList) sort.Strings(handlerList)
g.RUnlock() g.mu.RUnlock()
g.RLock() g.mu.RLock()
registered := g.registered registered := g.registered
g.RUnlock() g.mu.RUnlock()
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
@@ -595,8 +610,8 @@ func (g *Server) Register() error {
return nil return nil
} }
g.Lock() g.mu.Lock()
defer g.Unlock() defer g.mu.Unlock()
g.registered = true g.registered = true
g.rsvc = service g.rsvc = service
@@ -607,9 +622,9 @@ func (g *Server) Register() error {
func (g *Server) Deregister() error { func (g *Server) Deregister() error {
var err error var err error
g.RLock() g.mu.RLock()
config := g.opts config := g.opts
g.RUnlock() g.mu.RUnlock()
service, err := server.NewRegisterService(g) service, err := server.NewRegisterService(g)
if err != nil { if err != nil {
@@ -624,27 +639,27 @@ func (g *Server) Deregister() error {
return err return err
} }
g.Lock() g.mu.Lock()
g.rsvc = nil g.rsvc = nil
if !g.registered { if !g.registered {
g.Unlock() g.mu.Unlock()
return nil return nil
} }
g.registered = false g.registered = false
g.Unlock() g.mu.Unlock()
return nil return nil
} }
func (g *Server) Start() error { func (g *Server) Start() error {
g.RLock() g.mu.RLock()
if g.started { if g.started {
g.RUnlock() g.mu.RUnlock()
return nil return nil
} }
g.RUnlock() g.mu.RUnlock()
config := g.Options() config := g.Options()
@@ -674,12 +689,12 @@ func (g *Server) Start() error {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String()) config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String())
} }
g.Lock() g.mu.Lock()
g.opts.Address = ts.Addr().String() g.opts.Address = ts.Addr().String()
if len(g.opts.Advertise) == 0 { if len(g.opts.Advertise) == 0 {
g.opts.Advertise = ts.Addr().String() g.opts.Advertise = ts.Addr().String()
} }
g.Unlock() g.mu.Unlock()
// use RegisterCheck func before register // use RegisterCheck func before register
// nolint: nestif // nolint: nestif
@@ -730,9 +745,9 @@ func (g *Server) Start() error {
select { select {
// register self on interval // register self on interval
case <-t.C: case <-t.C:
g.RLock() g.mu.RLock()
registered := g.registered registered := g.registered
g.RUnlock() g.mu.RUnlock()
rerr := g.opts.RegisterCheck(g.opts.Context) rerr := g.opts.RegisterCheck(g.opts.Context)
// nolint: nestif // nolint: nestif
if rerr != nil && registered { if rerr != nil && registered {
@@ -809,29 +824,29 @@ func (g *Server) Start() error {
}() }()
// mark the server as started // mark the server as started
g.Lock() g.mu.Lock()
g.started = true g.started = true
g.Unlock() g.mu.Unlock()
return nil return nil
} }
func (g *Server) Stop() error { func (g *Server) Stop() error {
g.RLock() g.mu.RLock()
if !g.started { if !g.started {
g.RUnlock() g.mu.RUnlock()
return nil return nil
} }
g.RUnlock() g.mu.RUnlock()
ch := make(chan error) ch := make(chan error)
g.exit <- ch g.exit <- ch
err := <-ch err := <-ch
g.Lock() g.mu.Lock()
g.rsvc = nil g.rsvc = nil
g.started = false g.started = false
g.Unlock() g.mu.Unlock()
return err return err
} }