2 Commits

Author SHA1 Message Date
1544956100 update go.mod
Some checks failed
build / test (push) Failing after 25s
build / lint (push) Successful in 22s
codeql / analyze (go) (push) Failing after 1m13s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-12 09:36:12 +03:00
db00f3bb91 add waitGroups for waiting finish all connects (#131)
closes #130

Reviewed-on: #131
Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru>
Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
2024-03-13 15:36:34 +03:00
5 changed files with 141 additions and 73 deletions

24
.gitignore vendored Normal file
View File

@@ -0,0 +1,24 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

13
go.mod
View File

@@ -1,8 +1,15 @@
module go.unistack.org/micro-server-tcp/v3
go 1.16
go 1.20
require (
go.unistack.org/micro/v3 v3.10.14
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
go.unistack.org/micro/v3 v3.10.51
golang.org/x/net v0.22.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

35
go.sum
View File

@@ -1,15 +1,20 @@
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
go.unistack.org/micro/v3 v3.10.14 h1:7fgLpwGlCN67twhwtngJDEQvrMkUBDSA5vzZqxIDqNE=
go.unistack.org/micro/v3 v3.10.14/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
go.unistack.org/micro/v3 v3.10.51 h1:7JlgbJDXA4+9zyk5EJ5KqvRCeMA4htu0OofntiN+hFE=
go.unistack.org/micro/v3 v3.10.51/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=

View File

@@ -1,7 +1,6 @@
package tcp
import (
"bytes"
"context"
"fmt"
"reflect"
@@ -10,8 +9,8 @@ import (
"unicode/utf8"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
)
@@ -184,11 +183,11 @@ func validateSubscriber(sub server.Subscriber) error {
return nil
}
func (s *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler {
func (h *Server) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
cf, err := h.newCodec(ct)
if err != nil {
return err
}
@@ -220,16 +219,6 @@ func (s *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) bro
req = req.Elem()
}
buf := bytes.NewBuffer(msg.Body)
if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil {
return err
}
if err := cf.ReadBody(buf, req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg server.Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
@@ -248,9 +237,11 @@ func (s *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) bro
return nil
}
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookSubHandler); ok {
fn = h(fn)
}
})
go func() {
results <- fn(ctx, &tcpMessage{

117
tcp.go
View File

@@ -2,11 +2,13 @@
package tcp // import "go.unistack.org/micro-server-tcp/v3"
import (
"context"
"crypto/tls"
"fmt"
"net"
"sort"
"sync"
"sync/atomic"
"time"
"go.unistack.org/micro/v3/broker"
@@ -14,34 +16,52 @@ import (
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
"golang.org/x/net/netutil"
)
type tcpServer struct {
var _ server.Server = (*Server)(nil)
type Server struct {
hd server.Handler
rsvc *register.Service
exit chan chan error
subscribers map[*tcpSubscriber][]broker.Subscriber
opts server.Options
sync.RWMutex
registered bool
init bool
registered bool
init bool
stateLive *atomic.Uint32
stateReady *atomic.Uint32
stateHealth *atomic.Uint32
}
func (h *tcpServer) newCodec(ct string) (codec.Codec, error) {
func (h *Server) Live() bool {
return h.stateLive.Load() == 1
}
func (h *Server) Ready() bool {
return h.stateReady.Load() == 1
}
func (h *Server) Health() bool {
return h.stateHealth.Load() == 1
}
func (h *Server) newCodec(ct string) (codec.Codec, error) {
if cf, ok := h.opts.Codecs[ct]; ok {
return cf, nil
}
return nil, codec.ErrUnknownContentType
}
func (h *tcpServer) Options() server.Options {
func (h *Server) Options() server.Options {
h.RLock()
defer h.RUnlock()
return h.opts
}
func (h *tcpServer) Init(opts ...server.Option) error {
func (h *Server) Init(opts ...server.Option) error {
if len(opts) == 0 && h.init {
return nil
}
@@ -66,21 +86,18 @@ func (h *tcpServer) Init(opts ...server.Option) error {
if err := h.opts.Meter.Init(); err != nil {
return err
}
if err := h.opts.Transport.Init(); err != nil {
return err
}
return nil
}
func (h *tcpServer) Handle(handler server.Handler) error {
func (h *Server) Handle(handler server.Handler) error {
h.Lock()
h.hd = handler
h.Unlock()
return nil
}
func (h *tcpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.NewHandlerOptions(opts...)
eps := make([]*register.Endpoint, 0, len(options.Metadata))
@@ -104,11 +121,11 @@ func (h *tcpServer) NewHandler(handler interface{}, opts ...server.HandlerOption
return th
}
func (h *tcpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, handler, opts...)
}
func (h *tcpServer) Subscribe(sb server.Subscriber) error {
func (h *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*tcpSubscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *tcpSubscriber")
@@ -131,7 +148,7 @@ func (h *tcpServer) Subscribe(sb server.Subscriber) error {
return nil
}
func (h *tcpServer) Register() error {
func (h *Server) Register() error {
h.Lock()
config := h.opts
rsvc := h.rsvc
@@ -176,7 +193,7 @@ func (h *tcpServer) Register() error {
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
}
}
@@ -212,7 +229,7 @@ func (h *tcpServer) Register() error {
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
config.Logger.Info(config.Context, "Subscribing to topic: "+sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
@@ -228,7 +245,7 @@ func (h *tcpServer) Register() error {
return nil
}
func (h *tcpServer) Deregister() error {
func (h *Server) Deregister() error {
h.Lock()
config := h.opts
h.Unlock()
@@ -239,7 +256,7 @@ func (h *tcpServer) Deregister() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID)
config.Logger.Info(config.Context, "Deregistering node: "+service.Nodes[0].ID)
}
if err := server.DefaultDeregisterFunc(service, config); err != nil {
@@ -266,11 +283,11 @@ func (h *tcpServer) Deregister() error {
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", s.Topic())
config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(subCtx); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Unsubscribing from topic: %s err: %v", s.Topic(), err)
config.Logger.Error(config.Context, "Unsubscribing from errot topic: "+s.Topic(), err)
}
}
}(sub)
@@ -283,7 +300,7 @@ func (h *tcpServer) Deregister() error {
return nil
}
func (h *tcpServer) getListener() net.Listener {
func (h *Server) getListener() net.Listener {
if h.opts.Context == nil {
return nil
}
@@ -296,7 +313,7 @@ func (h *tcpServer) getListener() net.Listener {
return l
}
func (h *tcpServer) Start() error {
func (h *Server) Start() error {
h.RLock()
config := h.opts
hd := h.hd.Handler()
@@ -330,7 +347,7 @@ func (h *tcpServer) Start() error {
}
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Infof(config.Context, "Listening on %s", ts.Addr().String())
config.Logger.Info(config.Context, "Listening on "+ts.Addr().String())
}
h.Lock()
@@ -351,6 +368,9 @@ func (h *tcpServer) Start() error {
return fmt.Errorf("invalid handler %T", hd)
}
go h.serve(ts, handle)
h.stateLive.Store(1)
h.stateReady.Store(1)
h.stateHealth.Store(1)
go func() {
t := new(time.Ticker)
@@ -376,23 +396,23 @@ func (h *tcpServer) Start() error {
// nolint: nestif
if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister, check error", config.Name, config.ID), rerr)
}
// deregister self in case of error
if err := h.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error", config.Name, config.ID), err)
}
}
} else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), rerr)
}
continue
}
if err := h.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error", config.Name, config.ID), err)
}
}
// wait for exit
@@ -401,36 +421,49 @@ func (h *tcpServer) Start() error {
}
}
h.gracefulStop()
ch <- ts.Close()
h.stateLive.Store(0)
h.stateReady.Store(0)
h.stateHealth.Store(0)
// deregister
if cerr := h.Deregister(); cerr != nil {
config.Logger.Errorf(config.Context, "Register deregister error: %v", cerr)
config.Logger.Error(config.Context, "Register deregister error", cerr)
}
if cerr := config.Broker.Disconnect(config.Context); cerr != nil {
config.Logger.Errorf(config.Context, "Broker disconnect error: %v", cerr)
config.Logger.Error(config.Context, "Broker disconnect error", cerr)
}
}()
return nil
}
func (h *tcpServer) Stop() error {
func (h *Server) Stop() error {
ch := make(chan error)
h.exit <- ch
return <-ch
}
func (h *tcpServer) String() string {
func (h *Server) gracefulStop() {
ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout)
defer cancel()
h.opts.Wait.WaitContext(ctx)
}
func (h *Server) String() string {
return "tcp"
}
func (h *tcpServer) Name() string {
func (h *Server) Name() string {
return h.opts.Name
}
func (h *tcpServer) serve(ln net.Listener, hd Handler) {
func (h *Server) serve(ln net.Listener, hd Handler) {
var tempDelay time.Duration // how long to sleep on accept failure
h.RLock()
config := h.opts
@@ -454,27 +487,35 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) {
tempDelay = max
}
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "tcp: Accept error: %v; retrying in %v", err, tempDelay)
config.Logger.Error(config.Context, fmt.Sprintf("tcp: Accept error: %v; retrying in %v", err, tempDelay))
}
time.Sleep(tempDelay)
continue
}
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "tcp: Accept error: %v", err)
config.Logger.Error(config.Context, "tcp: Accept error", err)
}
return
}
if err != nil {
config.Logger.Errorf(config.Context, "tcp: accept err: %v", err)
config.Logger.Error(config.Context, "tcp: accept err", err)
return
}
go hd.Serve(c)
h.opts.Wait.Add(1)
go func() {
hd.Serve(c)
h.opts.Wait.Done()
}()
}
}
func NewServer(opts ...server.Option) server.Server {
return &tcpServer{
return &Server{
stateLive: &atomic.Uint32{},
stateReady: &atomic.Uint32{},
stateHealth: &atomic.Uint32{},
opts: server.NewOptions(opts...),
exit: make(chan chan error),
subscribers: make(map[*tcpSubscriber][]broker.Subscriber),