From 154495610061a5468b4d79878de7a0cd6725f196 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 12 Dec 2024 09:36:12 +0300 Subject: [PATCH] update go.mod Signed-off-by: Vasiliy Tolstov --- subscriber.go | 23 ++++-------- tcp.go | 101 +++++++++++++++++++++++++++++++------------------- 2 files changed, 70 insertions(+), 54 deletions(-) diff --git a/subscriber.go b/subscriber.go index 9359d8e..3afcbf7 100644 --- a/subscriber.go +++ b/subscriber.go @@ -1,7 +1,6 @@ package tcp import ( - "bytes" "context" "fmt" "reflect" @@ -10,8 +9,8 @@ import ( "unicode/utf8" "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/server" ) @@ -184,7 +183,7 @@ func validateSubscriber(sub server.Subscriber) error { return nil } -func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { +func (h *Server) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { return func(p broker.Event) error { msg := p.Message() ct := msg.Header["Content-Type"] @@ -220,16 +219,6 @@ func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) bro req = req.Elem() } - buf := bytes.NewBuffer(msg.Body) - - if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil { - return err - } - - if err := cf.ReadBody(buf, req.Interface()); err != nil { - return err - } - fn := func(ctx context.Context, msg server.Message) error { var vals []reflect.Value if sb.typ.Kind() != reflect.Func { @@ -248,9 +237,11 @@ func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) bro return nil } - for i := len(opts.SubWrappers); i > 0; i-- { - fn = opts.SubWrappers[i-1](fn) - } + opts.Hooks.EachNext(func(hook options.Hook) { + if h, ok := hook.(server.HookSubHandler); ok { + fn = h(fn) + } + }) go func() { results <- fn(ctx, &tcpMessage{ diff --git a/tcp.go b/tcp.go index 4fe21bf..6d79c0f 100644 --- a/tcp.go +++ b/tcp.go @@ -8,6 +8,7 @@ import ( "net" "sort" "sync" + "sync/atomic" "time" "go.unistack.org/micro/v3/broker" @@ -19,31 +20,48 @@ import ( "golang.org/x/net/netutil" ) -type tcpServer struct { +var _ server.Server = (*Server)(nil) + +type Server struct { hd server.Handler rsvc *register.Service exit chan chan error subscribers map[*tcpSubscriber][]broker.Subscriber opts server.Options sync.RWMutex - registered bool - init bool + registered bool + init bool + stateLive *atomic.Uint32 + stateReady *atomic.Uint32 + stateHealth *atomic.Uint32 } -func (h *tcpServer) newCodec(ct string) (codec.Codec, error) { +func (h *Server) Live() bool { + return h.stateLive.Load() == 1 +} + +func (h *Server) Ready() bool { + return h.stateReady.Load() == 1 +} + +func (h *Server) Health() bool { + return h.stateHealth.Load() == 1 +} + +func (h *Server) newCodec(ct string) (codec.Codec, error) { if cf, ok := h.opts.Codecs[ct]; ok { return cf, nil } return nil, codec.ErrUnknownContentType } -func (h *tcpServer) Options() server.Options { +func (h *Server) Options() server.Options { h.RLock() defer h.RUnlock() return h.opts } -func (h *tcpServer) Init(opts ...server.Option) error { +func (h *Server) Init(opts ...server.Option) error { if len(opts) == 0 && h.init { return nil } @@ -68,21 +86,18 @@ func (h *tcpServer) Init(opts ...server.Option) error { if err := h.opts.Meter.Init(); err != nil { return err } - if err := h.opts.Transport.Init(); err != nil { - return err - } return nil } -func (h *tcpServer) Handle(handler server.Handler) error { +func (h *Server) Handle(handler server.Handler) error { h.Lock() h.hd = handler h.Unlock() return nil } -func (h *tcpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { +func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { options := server.NewHandlerOptions(opts...) eps := make([]*register.Endpoint, 0, len(options.Metadata)) @@ -106,11 +121,11 @@ func (h *tcpServer) NewHandler(handler interface{}, opts ...server.HandlerOption return th } -func (h *tcpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { +func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { return newSubscriber(topic, handler, opts...) } -func (h *tcpServer) Subscribe(sb server.Subscriber) error { +func (h *Server) Subscribe(sb server.Subscriber) error { sub, ok := sb.(*tcpSubscriber) if !ok { return fmt.Errorf("invalid subscriber: expected *tcpSubscriber") @@ -133,7 +148,7 @@ func (h *tcpServer) Subscribe(sb server.Subscriber) error { return nil } -func (h *tcpServer) Register() error { +func (h *Server) Register() error { h.Lock() config := h.opts rsvc := h.rsvc @@ -178,7 +193,7 @@ func (h *tcpServer) Register() error { if !registered { if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID) + config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)) } } @@ -214,7 +229,7 @@ func (h *tcpServer) Register() error { opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic()) + config.Logger.Info(config.Context, "Subscribing to topic: "+sb.Topic()) } sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) @@ -230,7 +245,7 @@ func (h *tcpServer) Register() error { return nil } -func (h *tcpServer) Deregister() error { +func (h *Server) Deregister() error { h.Lock() config := h.opts h.Unlock() @@ -241,7 +256,7 @@ func (h *tcpServer) Deregister() error { } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID) + config.Logger.Info(config.Context, "Deregistering node: "+service.Nodes[0].ID) } if err := server.DefaultDeregisterFunc(service, config); err != nil { @@ -268,11 +283,11 @@ func (h *tcpServer) Deregister() error { go func(s broker.Subscriber) { defer wg.Done() if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", s.Topic()) + config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic()) } if err := s.Unsubscribe(subCtx); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Unsubscribing from topic: %s err: %v", s.Topic(), err) + config.Logger.Error(config.Context, "Unsubscribing from errot topic: "+s.Topic(), err) } } }(sub) @@ -285,7 +300,7 @@ func (h *tcpServer) Deregister() error { return nil } -func (h *tcpServer) getListener() net.Listener { +func (h *Server) getListener() net.Listener { if h.opts.Context == nil { return nil } @@ -298,7 +313,7 @@ func (h *tcpServer) getListener() net.Listener { return l } -func (h *tcpServer) Start() error { +func (h *Server) Start() error { h.RLock() config := h.opts hd := h.hd.Handler() @@ -332,7 +347,7 @@ func (h *tcpServer) Start() error { } if config.Logger.V(logger.ErrorLevel) { - config.Logger.Infof(config.Context, "Listening on %s", ts.Addr().String()) + config.Logger.Info(config.Context, "Listening on "+ts.Addr().String()) } h.Lock() @@ -353,6 +368,9 @@ func (h *tcpServer) Start() error { return fmt.Errorf("invalid handler %T", hd) } go h.serve(ts, handle) + h.stateLive.Store(1) + h.stateReady.Store(1) + h.stateHealth.Store(1) go func() { t := new(time.Ticker) @@ -378,23 +396,23 @@ func (h *tcpServer) Start() error { // nolint: nestif if rerr != nil && registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr) + config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister, check error", config.Name, config.ID), rerr) } // deregister self in case of error if err := h.Deregister(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err) + config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error", config.Name, config.ID), err) } } } else if rerr != nil && !registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr) + config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), rerr) } continue } if err := h.Register(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err) + config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error", config.Name, config.ID), err) } } // wait for exit @@ -407,41 +425,45 @@ func (h *tcpServer) Start() error { ch <- ts.Close() + h.stateLive.Store(0) + h.stateReady.Store(0) + h.stateHealth.Store(0) + // deregister if cerr := h.Deregister(); cerr != nil { - config.Logger.Errorf(config.Context, "Register deregister error: %v", cerr) + config.Logger.Error(config.Context, "Register deregister error", cerr) } if cerr := config.Broker.Disconnect(config.Context); cerr != nil { - config.Logger.Errorf(config.Context, "Broker disconnect error: %v", cerr) + config.Logger.Error(config.Context, "Broker disconnect error", cerr) } }() return nil } -func (h *tcpServer) Stop() error { +func (h *Server) Stop() error { ch := make(chan error) h.exit <- ch return <-ch } -func (h *tcpServer) gracefulStop() { +func (h *Server) gracefulStop() { ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) defer cancel() h.opts.Wait.WaitContext(ctx) } -func (h *tcpServer) String() string { +func (h *Server) String() string { return "tcp" } -func (h *tcpServer) Name() string { +func (h *Server) Name() string { return h.opts.Name } -func (h *tcpServer) serve(ln net.Listener, hd Handler) { +func (h *Server) serve(ln net.Listener, hd Handler) { var tempDelay time.Duration // how long to sleep on accept failure h.RLock() config := h.opts @@ -465,19 +487,19 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) { tempDelay = max } if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "tcp: Accept error: %v; retrying in %v", err, tempDelay) + config.Logger.Error(config.Context, fmt.Sprintf("tcp: Accept error: %v; retrying in %v", err, tempDelay)) } time.Sleep(tempDelay) continue } if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "tcp: Accept error: %v", err) + config.Logger.Error(config.Context, "tcp: Accept error", err) } return } if err != nil { - config.Logger.Errorf(config.Context, "tcp: accept err: %v", err) + config.Logger.Error(config.Context, "tcp: accept err", err) return } @@ -490,7 +512,10 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) { } func NewServer(opts ...server.Option) server.Server { - return &tcpServer{ + return &Server{ + stateLive: &atomic.Uint32{}, + stateReady: &atomic.Uint32{}, + stateHealth: &atomic.Uint32{}, opts: server.NewOptions(opts...), exit: make(chan chan error), subscribers: make(map[*tcpSubscriber][]broker.Subscriber),