Compare commits
4 Commits
2f818d389b
...
v4.1.4
Author | SHA1 | Date | |
---|---|---|---|
2b3c413adc | |||
20fb19fee9 | |||
8e3c56f4ed | |||
fcbae6f94a |
2
.github/workflows/job_coverage.yml
vendored
2
.github/workflows/job_coverage.yml
vendored
@@ -12,7 +12,7 @@ on:
|
||||
jobs:
|
||||
|
||||
build:
|
||||
if: github.server_url == 'zhttps://github.com'
|
||||
if: github.server_url != 'https://github.com'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout code
|
||||
|
42
.github/workflows/job_sync.yml
vendored
42
.github/workflows/job_sync.yml
vendored
@@ -18,7 +18,21 @@ jobs:
|
||||
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
|
||||
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
|
||||
|
||||
- name: check master
|
||||
id: check_master
|
||||
run: |
|
||||
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
|
||||
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
|
||||
echo "src_hash=$src_hash"
|
||||
echo "dst_hash=$dst_hash"
|
||||
if [ "$src_hash" != "$dst_hash" ]; then
|
||||
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
- name: sync master
|
||||
if: steps.check_master.outputs.sync_needed == 'true'
|
||||
run: |
|
||||
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||
cd repo
|
||||
@@ -29,7 +43,21 @@ jobs:
|
||||
cd ../
|
||||
rm -rf repo
|
||||
|
||||
- name: check v3
|
||||
id: check_v3
|
||||
run: |
|
||||
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
|
||||
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
|
||||
echo "src_hash=$src_hash"
|
||||
echo "dst_hash=$dst_hash"
|
||||
if [ "$src_hash" != "$dst_hash" ]; then
|
||||
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
- name: sync v3
|
||||
if: steps.check_v3.outputs.sync_needed == 'true'
|
||||
run: |
|
||||
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||
cd repo
|
||||
@@ -40,7 +68,21 @@ jobs:
|
||||
cd ../
|
||||
rm -rf repo
|
||||
|
||||
- name: check v4
|
||||
id: check_v4
|
||||
run: |
|
||||
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
|
||||
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
|
||||
echo "src_hash=$src_hash"
|
||||
echo "dst_hash=$dst_hash"
|
||||
if [ "$src_hash" != "$dst_hash" ]; then
|
||||
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
- name: sync v4
|
||||
if: steps.check_v4.outputs.sync_needed == 'true'
|
||||
run: |
|
||||
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||
cd repo
|
||||
|
97
grpc.go
97
grpc.go
@@ -63,12 +63,12 @@ type Server struct {
|
||||
rpc *rServer
|
||||
opts server.Options
|
||||
unknownHandler grpc.StreamHandler
|
||||
sync.RWMutex
|
||||
stateLive *atomic.Uint32
|
||||
stateReady *atomic.Uint32
|
||||
stateHealth *atomic.Uint32
|
||||
started bool
|
||||
registered bool
|
||||
mu sync.RWMutex
|
||||
stateLive *atomic.Uint32
|
||||
stateReady *atomic.Uint32
|
||||
stateHealth *atomic.Uint32
|
||||
started bool
|
||||
registered bool
|
||||
// reflection bool
|
||||
}
|
||||
|
||||
@@ -92,8 +92,8 @@ func newServer(opts ...server.Option) *Server {
|
||||
}
|
||||
|
||||
func (g *Server) configure(opts ...server.Option) error {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
|
||||
for _, o := range opts {
|
||||
o(&g.opts)
|
||||
@@ -404,7 +404,16 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
|
||||
}
|
||||
}
|
||||
if appErr != nil {
|
||||
var err error
|
||||
var errStatus *status.Status
|
||||
var ok bool
|
||||
errStatus, ok = status.FromError(appErr)
|
||||
if ok {
|
||||
return errStatus.Err()
|
||||
}
|
||||
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
|
||||
return errStatus.Err()
|
||||
}
|
||||
switch verr := appErr.(type) {
|
||||
case *errors.Error:
|
||||
statusCode = microError(verr)
|
||||
@@ -418,12 +427,10 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case (interface{ GRPCStatus() *status.Status }):
|
||||
errStatus = verr.GRPCStatus()
|
||||
default:
|
||||
g.RLock()
|
||||
g.mu.RLock()
|
||||
config := g.opts
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
|
||||
}
|
||||
@@ -490,6 +497,14 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
|
||||
if appErr != nil {
|
||||
var err error
|
||||
var errStatus *status.Status
|
||||
var ok bool
|
||||
errStatus, ok = status.FromError(appErr)
|
||||
if ok {
|
||||
return errStatus.Err()
|
||||
}
|
||||
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
|
||||
return errStatus.Err()
|
||||
}
|
||||
switch verr := appErr.(type) {
|
||||
case *errors.Error:
|
||||
statusCode = microError(verr)
|
||||
@@ -520,9 +535,9 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
|
||||
}
|
||||
|
||||
func (g *Server) Options() server.Options {
|
||||
g.RLock()
|
||||
g.mu.RLock()
|
||||
opts := g.opts
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
|
||||
return opts
|
||||
}
|
||||
@@ -545,10 +560,10 @@ func (g *Server) Handle(h server.Handler) error {
|
||||
}
|
||||
|
||||
func (g *Server) Register() error {
|
||||
g.RLock()
|
||||
g.mu.RLock()
|
||||
rsvc := g.rsvc
|
||||
config := g.opts
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
|
||||
// if service already filled, reuse it and return early
|
||||
if rsvc != nil {
|
||||
@@ -563,7 +578,7 @@ func (g *Server) Register() error {
|
||||
return err
|
||||
}
|
||||
|
||||
g.RLock()
|
||||
g.mu.RLock()
|
||||
// Maps are ordered randomly, sort the keys for consistency
|
||||
handlerList := make([]string, 0, len(g.handlers))
|
||||
for n := range g.handlers {
|
||||
@@ -573,11 +588,11 @@ func (g *Server) Register() error {
|
||||
|
||||
sort.Strings(handlerList)
|
||||
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
|
||||
g.RLock()
|
||||
g.mu.RLock()
|
||||
registered := g.registered
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
|
||||
if !registered {
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
@@ -595,8 +610,8 @@ func (g *Server) Register() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
|
||||
g.registered = true
|
||||
g.rsvc = service
|
||||
@@ -607,9 +622,9 @@ func (g *Server) Register() error {
|
||||
func (g *Server) Deregister() error {
|
||||
var err error
|
||||
|
||||
g.RLock()
|
||||
g.mu.RLock()
|
||||
config := g.opts
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
|
||||
service, err := server.NewRegisterService(g)
|
||||
if err != nil {
|
||||
@@ -624,27 +639,27 @@ func (g *Server) Deregister() error {
|
||||
return err
|
||||
}
|
||||
|
||||
g.Lock()
|
||||
g.mu.Lock()
|
||||
g.rsvc = nil
|
||||
|
||||
if !g.registered {
|
||||
g.Unlock()
|
||||
g.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
g.registered = false
|
||||
|
||||
g.Unlock()
|
||||
g.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Server) Start() error {
|
||||
g.RLock()
|
||||
g.mu.RLock()
|
||||
if g.started {
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
|
||||
config := g.Options()
|
||||
|
||||
@@ -674,12 +689,12 @@ func (g *Server) Start() error {
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String())
|
||||
}
|
||||
g.Lock()
|
||||
g.mu.Lock()
|
||||
g.opts.Address = ts.Addr().String()
|
||||
if len(g.opts.Advertise) == 0 {
|
||||
g.opts.Advertise = ts.Addr().String()
|
||||
}
|
||||
g.Unlock()
|
||||
g.mu.Unlock()
|
||||
|
||||
// use RegisterCheck func before register
|
||||
// nolint: nestif
|
||||
@@ -730,9 +745,9 @@ func (g *Server) Start() error {
|
||||
select {
|
||||
// register self on interval
|
||||
case <-t.C:
|
||||
g.RLock()
|
||||
g.mu.RLock()
|
||||
registered := g.registered
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
rerr := g.opts.RegisterCheck(g.opts.Context)
|
||||
// nolint: nestif
|
||||
if rerr != nil && registered {
|
||||
@@ -809,29 +824,29 @@ func (g *Server) Start() error {
|
||||
}()
|
||||
|
||||
// mark the server as started
|
||||
g.Lock()
|
||||
g.mu.Lock()
|
||||
g.started = true
|
||||
g.Unlock()
|
||||
g.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Server) Stop() error {
|
||||
g.RLock()
|
||||
g.mu.RLock()
|
||||
if !g.started {
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
g.RUnlock()
|
||||
g.mu.RUnlock()
|
||||
|
||||
ch := make(chan error)
|
||||
g.exit <- ch
|
||||
|
||||
err := <-ch
|
||||
g.Lock()
|
||||
g.mu.Lock()
|
||||
g.rsvc = nil
|
||||
g.started = false
|
||||
g.Unlock()
|
||||
g.mu.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
Reference in New Issue
Block a user