add waitGroups for waiting finish all connects #131

Merged
vtolstov merged 7 commits from devstigneev/micro-server-tcp:issue_130 into v3 2024-03-13 15:36:35 +03:00
5 changed files with 73 additions and 21 deletions

24
.gitignore vendored Normal file
View File

@ -0,0 +1,24 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

13
go.mod
View File

@ -1,8 +1,15 @@
module go.unistack.org/micro-server-tcp/v3 module go.unistack.org/micro-server-tcp/v3
go 1.16 go 1.20
require ( require (
go.unistack.org/micro/v3 v3.10.14 go.unistack.org/micro/v3 v3.10.51
golang.org/x/net v0.0.0-20220225172249-27dd8689420f golang.org/x/net v0.22.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
) )

35
go.sum
View File

@ -1,15 +1,20 @@
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
go.unistack.org/micro/v3 v3.10.14 h1:7fgLpwGlCN67twhwtngJDEQvrMkUBDSA5vzZqxIDqNE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
go.unistack.org/micro/v3 v3.10.14/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= go.unistack.org/micro/v3 v3.10.51 h1:7JlgbJDXA4+9zyk5EJ5KqvRCeMA4htu0OofntiN+hFE=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= go.unistack.org/micro/v3 v3.10.51/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=

View File

@ -184,11 +184,11 @@ func validateSubscriber(sub server.Subscriber) error {
return nil return nil
} }
func (s *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error { return func(p broker.Event) error {
msg := p.Message() msg := p.Message()
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct) cf, err := h.newCodec(ct)
if err != nil { if err != nil {
return err return err
} }

18
tcp.go
View File

@ -2,6 +2,7 @@
package tcp // import "go.unistack.org/micro-server-tcp/v3" package tcp // import "go.unistack.org/micro-server-tcp/v3"
import ( import (
"context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
@ -14,6 +15,7 @@ import (
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v3/server"
"golang.org/x/net/netutil" "golang.org/x/net/netutil"
) )
@ -401,6 +403,8 @@ func (h *tcpServer) Start() error {
} }
} }
h.gracefulStop()
ch <- ts.Close() ch <- ts.Close()
// deregister // deregister
@ -422,6 +426,13 @@ func (h *tcpServer) Stop() error {
return <-ch return <-ch
} }
func (h *tcpServer) gracefulStop() {
ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout)
defer cancel()
h.opts.Wait.WaitContext(ctx)
}
func (h *tcpServer) String() string { func (h *tcpServer) String() string {
return "tcp" return "tcp"
} }
@ -469,7 +480,12 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) {
config.Logger.Errorf(config.Context, "tcp: accept err: %v", err) config.Logger.Errorf(config.Context, "tcp: accept err: %v", err)
return return
} }
go hd.Serve(c)
h.opts.Wait.Add(1)
go func() {
hd.Serve(c)
h.opts.Wait.Done()
}()
} }
} }