From ec82a143b2bd168d2a1525b400016a87dad077d6 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Tue, 5 Mar 2024 21:16:09 +0300 Subject: [PATCH 1/7] add waitGroups for waiting finish all connects --- .gitignore | 24 ++++++++++++++++++++++++ go.mod | 13 ++++++++++--- go.sum | 35 ++++++++++++++++++++--------------- tcp.go | 20 +++++++++++++++++++- 4 files changed, 73 insertions(+), 19 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9e16696 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib +bin + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# General +.DS_Store +.idea +.vscode \ No newline at end of file diff --git a/go.mod b/go.mod index f37e02b..c85f092 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,15 @@ module go.unistack.org/micro-server-tcp/v3 -go 1.16 +go 1.20 require ( - go.unistack.org/micro/v3 v3.10.14 - golang.org/x/net v0.0.0-20220225172249-27dd8689420f + go.unistack.org/micro/v3 v3.10.42 + golang.org/x/net v0.22.0 +) + +require ( + github.com/golang/protobuf v1.5.3 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect + google.golang.org/grpc v1.57.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect ) diff --git a/go.sum b/go.sum index ea93dcc..0a8577b 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,20 @@ -github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= -github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= -github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= -go.unistack.org/micro/v3 v3.10.14 h1:7fgLpwGlCN67twhwtngJDEQvrMkUBDSA5vzZqxIDqNE= -go.unistack.org/micro/v3 v3.10.14/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +go.unistack.org/micro/v3 v3.10.42 h1:A0nA6WT6wNq5fyQyzliX70Bj5/SGj5kadLSOySX4hro= +go.unistack.org/micro/v3 v3.10.42/go.mod h1:CSmEf5ddmft94MyKHnUSMM0W5dpmmTVbgImbgQWV5Ak= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= +google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/tcp.go b/tcp.go index dd6bd18..52e8555 100644 --- a/tcp.go +++ b/tcp.go @@ -2,6 +2,7 @@ package tcp // import "go.unistack.org/micro-server-tcp/v3" import ( + "context" "crypto/tls" "fmt" "net" @@ -26,6 +27,8 @@ type tcpServer struct { sync.RWMutex registered bool init bool + + wg sync.WaitGroup } func (h *tcpServer) newCodec(ct string) (codec.Codec, error) { @@ -417,6 +420,16 @@ func (h *tcpServer) Start() error { } func (h *tcpServer) Stop() error { + ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) + + go func() { + defer cancel() + h.wg.Wait() + }() + + // wait timeout or finish all connects + <-ctx.Done() + ch := make(chan error) h.exit <- ch return <-ch @@ -469,7 +482,12 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) { config.Logger.Errorf(config.Context, "tcp: accept err: %v", err) return } - go hd.Serve(c) + + h.wg.Add(1) + go func() { + defer h.wg.Done() + hd.Serve(c) + }() } } -- 2.45.2 From bbe2425174da0675deda652f0accf6aa6394789d Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Wed, 6 Mar 2024 13:00:49 +0300 Subject: [PATCH 2/7] use WG opts and add func gracefulStop --- subscriber.go | 4 ++-- tcp.go | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/subscriber.go b/subscriber.go index a159c7b..9359d8e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -184,11 +184,11 @@ func validateSubscriber(sub server.Subscriber) error { return nil } -func (s *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { +func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { return func(p broker.Event) error { msg := p.Message() ct := msg.Header["Content-Type"] - cf, err := s.newCodec(ct) + cf, err := h.newCodec(ct) if err != nil { return err } diff --git a/tcp.go b/tcp.go index 52e8555..2efa88a 100644 --- a/tcp.go +++ b/tcp.go @@ -404,6 +404,7 @@ func (h *tcpServer) Start() error { } } + h.gracefulStop() ch <- ts.Close() // deregister @@ -420,19 +421,21 @@ func (h *tcpServer) Start() error { } func (h *tcpServer) Stop() error { + ch := make(chan error) + h.exit <- ch + return <-ch +} + +func (h *tcpServer) gracefulStop() { ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) go func() { - defer cancel() - h.wg.Wait() + h.opts.Wait.Wait() + cancel() }() // wait timeout or finish all connects <-ctx.Done() - - ch := make(chan error) - h.exit <- ch - return <-ch } func (h *tcpServer) String() string { @@ -483,9 +486,9 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) { return } - h.wg.Add(1) + h.opts.Wait.Add(1) go func() { - defer h.wg.Done() + defer h.opts.Wait.Done() hd.Serve(c) }() } -- 2.45.2 From 9d321b12b82b85713c8b353c681087fcfaac6a73 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Wed, 6 Mar 2024 18:44:45 +0300 Subject: [PATCH 3/7] add errGroup --- tcp.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/tcp.go b/tcp.go index 2efa88a..eb558da 100644 --- a/tcp.go +++ b/tcp.go @@ -16,6 +16,7 @@ import ( "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/server" "golang.org/x/net/netutil" + "golang.org/x/sync/errgroup" ) type tcpServer struct { @@ -427,15 +428,27 @@ func (h *tcpServer) Stop() error { } func (h *tcpServer) gracefulStop() { - ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) + tm := time.NewTimer(h.opts.GracefulTimeout) + defer tm.Stop() - go func() { + g, gctx := errgroup.WithContext(context.Background()) + + g.Go(func() error { h.opts.Wait.Wait() - cancel() - }() + return nil + }) - // wait timeout or finish all connects - <-ctx.Done() + g.Go(func() error { + select { + case <-tm.C: + return nil + case <-gctx.Done(): + return gctx.Err() + } + }) + + g.Wait() + return } func (h *tcpServer) String() string { -- 2.45.2 From 8661d62ce80dd98fb2313a0bca020b3d2412759f Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 7 Mar 2024 16:30:52 +0300 Subject: [PATCH 4/7] use done chan --- tcp.go | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/tcp.go b/tcp.go index eb558da..94435d8 100644 --- a/tcp.go +++ b/tcp.go @@ -2,7 +2,6 @@ package tcp // import "go.unistack.org/micro-server-tcp/v3" import ( - "context" "crypto/tls" "fmt" "net" @@ -16,7 +15,6 @@ import ( "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/server" "golang.org/x/net/netutil" - "golang.org/x/sync/errgroup" ) type tcpServer struct { @@ -28,8 +26,6 @@ type tcpServer struct { sync.RWMutex registered bool init bool - - wg sync.WaitGroup } func (h *tcpServer) newCodec(ct string) (codec.Codec, error) { @@ -431,24 +427,17 @@ func (h *tcpServer) gracefulStop() { tm := time.NewTimer(h.opts.GracefulTimeout) defer tm.Stop() - g, gctx := errgroup.WithContext(context.Background()) - - g.Go(func() error { + done := make(chan struct{}) + go func() { h.opts.Wait.Wait() - return nil - }) + done <- struct{}{} + }() - g.Go(func() error { - select { - case <-tm.C: - return nil - case <-gctx.Done(): - return gctx.Err() - } - }) - - g.Wait() - return + select { + case <-tm.C: + case <-done: + close(done) + } } func (h *tcpServer) String() string { @@ -501,8 +490,8 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) { h.opts.Wait.Add(1) go func() { - defer h.opts.Wait.Done() hd.Serve(c) + h.opts.Wait.Done() }() } } -- 2.45.2 From d7288c7130a9b6925e873be0c0e8ea76ce8f3bb7 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Fri, 8 Mar 2024 16:49:49 +0300 Subject: [PATCH 5/7] remove sent to channel --- tcp.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tcp.go b/tcp.go index 94435d8..e6a8029 100644 --- a/tcp.go +++ b/tcp.go @@ -430,13 +430,12 @@ func (h *tcpServer) gracefulStop() { done := make(chan struct{}) go func() { h.opts.Wait.Wait() - done <- struct{}{} + close(done) }() select { case <-tm.C: case <-done: - close(done) } } -- 2.45.2 From 97625535d2158db4522ed1f25edde28595187d25 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Wed, 13 Mar 2024 15:19:06 +0300 Subject: [PATCH 6/7] add using WaitingContext --- go.mod | 2 +- go.sum | 4 ++-- tcp.go | 17 +++++------------ 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index c85f092..dd751b6 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module go.unistack.org/micro-server-tcp/v3 go 1.20 require ( - go.unistack.org/micro/v3 v3.10.42 + go.unistack.org/micro/v3 v3.10.51 golang.org/x/net v0.22.0 ) diff --git a/go.sum b/go.sum index 0a8577b..b52bf1a 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -go.unistack.org/micro/v3 v3.10.42 h1:A0nA6WT6wNq5fyQyzliX70Bj5/SGj5kadLSOySX4hro= -go.unistack.org/micro/v3 v3.10.42/go.mod h1:CSmEf5ddmft94MyKHnUSMM0W5dpmmTVbgImbgQWV5Ak= +go.unistack.org/micro/v3 v3.10.51 h1:7JlgbJDXA4+9zyk5EJ5KqvRCeMA4htu0OofntiN+hFE= +go.unistack.org/micro/v3 v3.10.51/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= diff --git a/tcp.go b/tcp.go index e6a8029..89cf771 100644 --- a/tcp.go +++ b/tcp.go @@ -4,6 +4,7 @@ package tcp // import "go.unistack.org/micro-server-tcp/v3" import ( "crypto/tls" "fmt" + "golang.org/x/net/context" "net" "sort" "sync" @@ -402,6 +403,7 @@ func (h *tcpServer) Start() error { } h.gracefulStop() + ch <- ts.Close() // deregister @@ -424,19 +426,10 @@ func (h *tcpServer) Stop() error { } func (h *tcpServer) gracefulStop() { - tm := time.NewTimer(h.opts.GracefulTimeout) - defer tm.Stop() + ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) + defer cancel() - done := make(chan struct{}) - go func() { - h.opts.Wait.Wait() - close(done) - }() - - select { - case <-tm.C: - case <-done: - } + h.opts.Wait.WaitContext(ctx) } func (h *tcpServer) String() string { -- 2.45.2 From b8ef6db02c92b075f6163e1c0faf815d2d1d9bf8 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Wed, 13 Mar 2024 15:20:57 +0300 Subject: [PATCH 7/7] del import googl ctx --- tcp.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tcp.go b/tcp.go index 89cf771..4fe21bf 100644 --- a/tcp.go +++ b/tcp.go @@ -2,9 +2,9 @@ package tcp // import "go.unistack.org/micro-server-tcp/v3" import ( + "context" "crypto/tls" "fmt" - "golang.org/x/net/context" "net" "sort" "sync" @@ -15,6 +15,7 @@ import ( "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/server" + "golang.org/x/net/netutil" ) -- 2.45.2