changed embedded mutex to private field (#209)
Some checks failed
coverage / build (push) Failing after 16m47s
test / test (push) Successful in 2m56s
sync / sync (push) Successful in 9s

This commit is contained in:
2025-05-25 03:14:42 +05:00
committed by GitHub
parent abd66c79df
commit 43e095a00a

70
tcp.go
View File

@@ -10,7 +10,6 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/server" "go.unistack.org/micro/v4/server"
@@ -21,11 +20,11 @@ import (
var _ server.Server = (*Server)(nil) var _ server.Server = (*Server)(nil)
type Server struct { type Server struct {
hd server.Handler hd server.Handler
rsvc *register.Service rsvc *register.Service
exit chan chan error exit chan chan error
opts server.Options opts server.Options
sync.RWMutex mu sync.RWMutex
registered bool registered bool
init bool init bool
stateLive *atomic.Uint32 stateLive *atomic.Uint32
@@ -45,16 +44,9 @@ func (h *Server) Health() bool {
return h.stateHealth.Load() == 1 return h.stateHealth.Load() == 1
} }
func (h *Server) newCodec(ct string) (codec.Codec, error) {
if cf, ok := h.opts.Codecs[ct]; ok {
return cf, nil
}
return nil, codec.ErrUnknownContentType
}
func (h *Server) Options() server.Options { func (h *Server) Options() server.Options {
h.RLock() h.mu.RLock()
defer h.RUnlock() defer h.mu.RUnlock()
return h.opts return h.opts
} }
@@ -65,11 +57,11 @@ func (h *Server) Init(opts ...server.Option) error {
if len(opts) == 0 && h.init { if len(opts) == 0 && h.init {
return nil return nil
} }
h.Lock() h.mu.Lock()
for _, o := range opts { for _, o := range opts {
o(&h.opts) o(&h.opts)
} }
h.Unlock() h.mu.Unlock()
if err := h.opts.Register.Init(); err != nil { if err := h.opts.Register.Init(); err != nil {
return err return err
@@ -91,9 +83,9 @@ func (h *Server) Init(opts ...server.Option) error {
} }
func (h *Server) Handle(handler server.Handler) error { func (h *Server) Handle(handler server.Handler) error {
h.Lock() h.mu.Lock()
h.hd = handler h.hd = handler
h.Unlock() h.mu.Unlock()
return nil return nil
} }
@@ -113,11 +105,11 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
} }
func (h *Server) Register() error { func (h *Server) Register() error {
h.Lock() h.mu.Lock()
config := h.opts config := h.opts
rsvc := h.rsvc rsvc := h.rsvc
h.Unlock() h.mu.Unlock()
// if service already filled, reuse it and return early // if service already filled, reuse it and return early
if rsvc != nil { if rsvc != nil {
@@ -132,9 +124,9 @@ func (h *Server) Register() error {
return err return err
} }
h.RLock() h.mu.RLock()
registered := h.registered registered := h.registered
h.RUnlock() h.mu.RUnlock()
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
@@ -152,8 +144,8 @@ func (h *Server) Register() error {
return nil return nil
} }
h.Lock() h.mu.Lock()
defer h.Unlock() defer h.mu.Unlock()
if h.registered { if h.registered {
return nil return nil
@@ -166,9 +158,9 @@ func (h *Server) Register() error {
} }
func (h *Server) Deregister() error { func (h *Server) Deregister() error {
h.Lock() h.mu.Lock()
config := h.opts config := h.opts
h.Unlock() h.mu.Unlock()
service, err := server.NewRegisterService(h) service, err := server.NewRegisterService(h)
if err != nil { if err != nil {
@@ -183,14 +175,14 @@ func (h *Server) Deregister() error {
return err return err
} }
h.Lock() h.mu.Lock()
if !h.registered { if !h.registered {
h.Unlock() h.mu.Unlock()
return nil return nil
} }
h.registered = false h.registered = false
h.Unlock() h.mu.Unlock()
return nil return nil
} }
@@ -208,10 +200,10 @@ func (h *Server) getListener() net.Listener {
} }
func (h *Server) Start() error { func (h *Server) Start() error {
h.RLock() h.mu.RLock()
config := h.opts config := h.opts
hd := h.hd.Handler() hd := h.hd.Handler()
h.RUnlock() h.mu.RUnlock()
var err error var err error
var ts net.Listener var ts net.Listener
@@ -244,9 +236,9 @@ func (h *Server) Start() error {
config.Logger.Info(config.Context, "Listening on "+ts.Addr().String()) config.Logger.Info(config.Context, "Listening on "+ts.Addr().String())
} }
h.Lock() h.mu.Lock()
h.opts.Address = ts.Addr().String() h.opts.Address = ts.Addr().String()
h.Unlock() h.mu.Unlock()
if err = config.Broker.Connect(config.Context); err != nil { if err = config.Broker.Connect(config.Context); err != nil {
return err return err
@@ -283,9 +275,9 @@ func (h *Server) Start() error {
select { select {
// register self on interval // register self on interval
case <-t.C: case <-t.C:
h.RLock() h.mu.RLock()
registered := h.registered registered := h.registered
h.RUnlock() h.mu.RUnlock()
rerr := h.opts.RegisterCheck(h.opts.Context) rerr := h.opts.RegisterCheck(h.opts.Context)
// nolint: nestif // nolint: nestif
if rerr != nil && registered { if rerr != nil && registered {
@@ -359,9 +351,9 @@ func (h *Server) Name() string {
func (h *Server) serve(ln net.Listener, hd Handler) { func (h *Server) serve(ln net.Listener, hd Handler) {
var tempDelay time.Duration // how long to sleep on accept failure var tempDelay time.Duration // how long to sleep on accept failure
h.RLock() h.mu.RLock()
config := h.opts config := h.opts
h.RUnlock() h.mu.RUnlock()
for { for {
c, err := ln.Accept() c, err := ln.Accept()
// nolint: nestif // nolint: nestif
@@ -371,7 +363,7 @@ func (h *Server) serve(ln net.Listener, hd Handler) {
return return
default: default:
} }
if ne, ok := err.(net.Error); ok && ne.Temporary() { if ne, ok := err.(net.Error); ok && ne.Temporary() { // nolint:staticcheck
if tempDelay == 0 { if tempDelay == 0 {
tempDelay = 5 * time.Millisecond tempDelay = 5 * time.Millisecond
} else { } else {