update go.mod
Some checks failed
build / test (push) Failing after 25s
build / lint (push) Successful in 22s
codeql / analyze (go) (push) Failing after 1m13s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2024-12-12 09:36:12 +03:00
parent db00f3bb91
commit 1544956100
2 changed files with 70 additions and 54 deletions

View File

@ -1,7 +1,6 @@
package tcp
import (
"bytes"
"context"
"fmt"
"reflect"
@ -10,8 +9,8 @@ import (
"unicode/utf8"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
)
@ -184,7 +183,7 @@ func validateSubscriber(sub server.Subscriber) error {
return nil
}
func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler {
func (h *Server) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
@ -220,16 +219,6 @@ func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) bro
req = req.Elem()
}
buf := bytes.NewBuffer(msg.Body)
if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil {
return err
}
if err := cf.ReadBody(buf, req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg server.Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
@ -248,9 +237,11 @@ func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) bro
return nil
}
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookSubHandler); ok {
fn = h(fn)
}
})
go func() {
results <- fn(ctx, &tcpMessage{

101
tcp.go
View File

@ -8,6 +8,7 @@ import (
"net"
"sort"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v3/broker"
@ -19,31 +20,48 @@ import (
"golang.org/x/net/netutil"
)
type tcpServer struct {
var _ server.Server = (*Server)(nil)
type Server struct {
hd server.Handler
rsvc *register.Service
exit chan chan error
subscribers map[*tcpSubscriber][]broker.Subscriber
opts server.Options
sync.RWMutex
registered bool
init bool
registered bool
init bool
stateLive *atomic.Uint32
stateReady *atomic.Uint32
stateHealth *atomic.Uint32
}
func (h *tcpServer) newCodec(ct string) (codec.Codec, error) {
func (h *Server) Live() bool {
return h.stateLive.Load() == 1
}
func (h *Server) Ready() bool {
return h.stateReady.Load() == 1
}
func (h *Server) Health() bool {
return h.stateHealth.Load() == 1
}
func (h *Server) newCodec(ct string) (codec.Codec, error) {
if cf, ok := h.opts.Codecs[ct]; ok {
return cf, nil
}
return nil, codec.ErrUnknownContentType
}
func (h *tcpServer) Options() server.Options {
func (h *Server) Options() server.Options {
h.RLock()
defer h.RUnlock()
return h.opts
}
func (h *tcpServer) Init(opts ...server.Option) error {
func (h *Server) Init(opts ...server.Option) error {
if len(opts) == 0 && h.init {
return nil
}
@ -68,21 +86,18 @@ func (h *tcpServer) Init(opts ...server.Option) error {
if err := h.opts.Meter.Init(); err != nil {
return err
}
if err := h.opts.Transport.Init(); err != nil {
return err
}
return nil
}
func (h *tcpServer) Handle(handler server.Handler) error {
func (h *Server) Handle(handler server.Handler) error {
h.Lock()
h.hd = handler
h.Unlock()
return nil
}
func (h *tcpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.NewHandlerOptions(opts...)
eps := make([]*register.Endpoint, 0, len(options.Metadata))
@ -106,11 +121,11 @@ func (h *tcpServer) NewHandler(handler interface{}, opts ...server.HandlerOption
return th
}
func (h *tcpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, handler, opts...)
}
func (h *tcpServer) Subscribe(sb server.Subscriber) error {
func (h *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*tcpSubscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *tcpSubscriber")
@ -133,7 +148,7 @@ func (h *tcpServer) Subscribe(sb server.Subscriber) error {
return nil
}
func (h *tcpServer) Register() error {
func (h *Server) Register() error {
h.Lock()
config := h.opts
rsvc := h.rsvc
@ -178,7 +193,7 @@ func (h *tcpServer) Register() error {
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
}
}
@ -214,7 +229,7 @@ func (h *tcpServer) Register() error {
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
config.Logger.Info(config.Context, "Subscribing to topic: "+sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
@ -230,7 +245,7 @@ func (h *tcpServer) Register() error {
return nil
}
func (h *tcpServer) Deregister() error {
func (h *Server) Deregister() error {
h.Lock()
config := h.opts
h.Unlock()
@ -241,7 +256,7 @@ func (h *tcpServer) Deregister() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID)
config.Logger.Info(config.Context, "Deregistering node: "+service.Nodes[0].ID)
}
if err := server.DefaultDeregisterFunc(service, config); err != nil {
@ -268,11 +283,11 @@ func (h *tcpServer) Deregister() error {
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", s.Topic())
config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(subCtx); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Unsubscribing from topic: %s err: %v", s.Topic(), err)
config.Logger.Error(config.Context, "Unsubscribing from errot topic: "+s.Topic(), err)
}
}
}(sub)
@ -285,7 +300,7 @@ func (h *tcpServer) Deregister() error {
return nil
}
func (h *tcpServer) getListener() net.Listener {
func (h *Server) getListener() net.Listener {
if h.opts.Context == nil {
return nil
}
@ -298,7 +313,7 @@ func (h *tcpServer) getListener() net.Listener {
return l
}
func (h *tcpServer) Start() error {
func (h *Server) Start() error {
h.RLock()
config := h.opts
hd := h.hd.Handler()
@ -332,7 +347,7 @@ func (h *tcpServer) Start() error {
}
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Infof(config.Context, "Listening on %s", ts.Addr().String())
config.Logger.Info(config.Context, "Listening on "+ts.Addr().String())
}
h.Lock()
@ -353,6 +368,9 @@ func (h *tcpServer) Start() error {
return fmt.Errorf("invalid handler %T", hd)
}
go h.serve(ts, handle)
h.stateLive.Store(1)
h.stateReady.Store(1)
h.stateHealth.Store(1)
go func() {
t := new(time.Ticker)
@ -378,23 +396,23 @@ func (h *tcpServer) Start() error {
// nolint: nestif
if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister, check error", config.Name, config.ID), rerr)
}
// deregister self in case of error
if err := h.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error", config.Name, config.ID), err)
}
}
} else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), rerr)
}
continue
}
if err := h.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error", config.Name, config.ID), err)
}
}
// wait for exit
@ -407,41 +425,45 @@ func (h *tcpServer) Start() error {
ch <- ts.Close()
h.stateLive.Store(0)
h.stateReady.Store(0)
h.stateHealth.Store(0)
// deregister
if cerr := h.Deregister(); cerr != nil {
config.Logger.Errorf(config.Context, "Register deregister error: %v", cerr)
config.Logger.Error(config.Context, "Register deregister error", cerr)
}
if cerr := config.Broker.Disconnect(config.Context); cerr != nil {
config.Logger.Errorf(config.Context, "Broker disconnect error: %v", cerr)
config.Logger.Error(config.Context, "Broker disconnect error", cerr)
}
}()
return nil
}
func (h *tcpServer) Stop() error {
func (h *Server) Stop() error {
ch := make(chan error)
h.exit <- ch
return <-ch
}
func (h *tcpServer) gracefulStop() {
func (h *Server) gracefulStop() {
ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout)
defer cancel()
h.opts.Wait.WaitContext(ctx)
}
func (h *tcpServer) String() string {
func (h *Server) String() string {
return "tcp"
}
func (h *tcpServer) Name() string {
func (h *Server) Name() string {
return h.opts.Name
}
func (h *tcpServer) serve(ln net.Listener, hd Handler) {
func (h *Server) serve(ln net.Listener, hd Handler) {
var tempDelay time.Duration // how long to sleep on accept failure
h.RLock()
config := h.opts
@ -465,19 +487,19 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) {
tempDelay = max
}
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "tcp: Accept error: %v; retrying in %v", err, tempDelay)
config.Logger.Error(config.Context, fmt.Sprintf("tcp: Accept error: %v; retrying in %v", err, tempDelay))
}
time.Sleep(tempDelay)
continue
}
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "tcp: Accept error: %v", err)
config.Logger.Error(config.Context, "tcp: Accept error", err)
}
return
}
if err != nil {
config.Logger.Errorf(config.Context, "tcp: accept err: %v", err)
config.Logger.Error(config.Context, "tcp: accept err", err)
return
}
@ -490,7 +512,10 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) {
}
func NewServer(opts ...server.Option) server.Server {
return &tcpServer{
return &Server{
stateLive: &atomic.Uint32{},
stateReady: &atomic.Uint32{},
stateHealth: &atomic.Uint32{},
opts: server.NewOptions(opts...),
exit: make(chan chan error),
subscribers: make(map[*tcpSubscriber][]broker.Subscriber),