update micro, add Health/Live/Ready checks
Some checks failed
codeql / analyze (go) (push) Failing after 55s
build / test (push) Failing after 4m55s
build / lint (push) Successful in 9m33s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-12-02 15:55:55 +03:00
parent 28234bcc3c
commit 66756f74dd
4 changed files with 57 additions and 519 deletions

40
grpc.go
View File

@@ -12,13 +12,13 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
greflection "google.golang.org/grpc/reflection"
reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1"
// nolint: staticcheck
oldproto "github.com/golang/protobuf/proto"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors"
@@ -39,6 +39,7 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
)
const (
@@ -75,9 +76,12 @@ type Server struct {
opts server.Options
unknownHandler grpc.StreamHandler
sync.RWMutex
started bool
registered bool
reflection bool
stateLive *atomic.Uint32
stateReady *atomic.Uint32
stateHealth *atomic.Uint32
started bool
registered bool
reflection bool
}
func newServer(opts ...server.Option) *Server {
@@ -90,6 +94,9 @@ func newServer(opts ...server.Option) *Server {
handlers: make(map[string]server.Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
stateLive: &atomic.Uint32{},
stateReady: &atomic.Uint32{},
stateHealth: &atomic.Uint32{},
}
g.opts.Meter = g.opts.Meter.Clone(meter.Labels("type", "grpc"))
@@ -438,7 +445,7 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
// user defined error that proto based we can attach it to grpc status
statusCode = convertCode(appErr)
statusDesc = appErr.Error()
errStatus, err = status.New(statusCode, statusDesc).WithDetails(oldproto.MessageV1(verr))
errStatus, err = status.New(statusCode, statusDesc).WithDetails(protoadapt.MessageV1Of(verr))
if err != nil {
return err
}
@@ -523,7 +530,7 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
// user defined error that proto based we can attach it to grpc status
statusCode = convertCode(appErr)
statusDesc = appErr.Error()
errStatus, err = status.New(statusCode, statusDesc).WithDetails(oldproto.MessageV1(verr))
errStatus, err = status.New(statusCode, statusDesc).WithDetails(protoadapt.MessageV1Of(verr))
if err != nil {
return err
}
@@ -834,6 +841,9 @@ func (g *Server) Start() error {
}
}
}
g.stateLive.Store(1)
g.stateReady.Store(1)
g.stateHealth.Store(1)
}()
go func() {
@@ -903,12 +913,18 @@ func (g *Server) Start() error {
go func() {
g.srv.GracefulStop()
close(exit)
g.stateLive.Store(0)
g.stateReady.Store(0)
g.stateHealth.Store(0)
}()
select {
case <-exit:
case <-time.After(g.opts.GracefulTimeout):
g.srv.Stop()
g.stateLive.Store(0)
g.stateReady.Store(0)
g.stateHealth.Store(0)
}
// close transport
@@ -965,6 +981,18 @@ func (g *Server) GRPCServer() *grpc.Server {
return g.srv
}
func (g *Server) Live() bool {
return g.stateLive.Load() == 1
}
func (g *Server) Ready() bool {
return g.stateReady.Load() == 1
}
func (g *Server) Health() bool {
return g.stateHealth.Load() == 1
}
func NewServer(opts ...server.Option) *Server {
return newServer(opts...)
}