add waitGroups for waiting finish all connects #131
							
								
								
									
										24
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,24 @@ | |||||||
|  | # Binaries for programs and plugins | ||||||
|  | *.exe | ||||||
|  | *.exe~ | ||||||
|  | *.dll | ||||||
|  | *.so | ||||||
|  | *.dylib | ||||||
|  | bin | ||||||
|  |  | ||||||
|  | # Test binary, built with `go test -c` | ||||||
|  | *.test | ||||||
|  |  | ||||||
|  | # Output of the go coverage tool, specifically when used with LiteIDE | ||||||
|  | *.out | ||||||
|  |  | ||||||
|  | # Dependency directories (remove the comment below to include it) | ||||||
|  | # vendor/ | ||||||
|  |  | ||||||
|  | # Go workspace file | ||||||
|  | go.work | ||||||
|  |  | ||||||
|  | # General | ||||||
|  | .DS_Store | ||||||
|  | .idea | ||||||
|  | .vscode | ||||||
							
								
								
									
										13
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								go.mod
									
									
									
									
									
								
							| @@ -1,8 +1,15 @@ | |||||||
| module go.unistack.org/micro-server-tcp/v3 | module go.unistack.org/micro-server-tcp/v3 | ||||||
|  |  | ||||||
| go 1.16 | go 1.20 | ||||||
|  |  | ||||||
| require ( | require ( | ||||||
| 	go.unistack.org/micro/v3 v3.10.14 | 	go.unistack.org/micro/v3 v3.10.51 | ||||||
| 	golang.org/x/net v0.0.0-20220225172249-27dd8689420f | 	golang.org/x/net v0.22.0 | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | require ( | ||||||
|  | 	github.com/golang/protobuf v1.5.3 // indirect | ||||||
|  | 	google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect | ||||||
|  | 	google.golang.org/grpc v1.57.0 // indirect | ||||||
|  | 	google.golang.org/protobuf v1.31.0 // indirect | ||||||
| ) | ) | ||||||
|   | |||||||
							
								
								
									
										35
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										35
									
								
								go.sum
									
									
									
									
									
								
							| @@ -1,15 +1,20 @@ | |||||||
| github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= | github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= | ||||||
| github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= | github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= | ||||||
| github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= | github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= | ||||||
| go.unistack.org/micro/v3 v3.10.14 h1:7fgLpwGlCN67twhwtngJDEQvrMkUBDSA5vzZqxIDqNE= | github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= | ||||||
| go.unistack.org/micro/v3 v3.10.14/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q= | github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= | ||||||
| golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= | go.unistack.org/micro/v3 v3.10.51 h1:7JlgbJDXA4+9zyk5EJ5KqvRCeMA4htu0OofntiN+hFE= | ||||||
| golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= | go.unistack.org/micro/v3 v3.10.51/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= | ||||||
| golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= | ||||||
| golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= | ||||||
| golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= | golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= | ||||||
| golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= | golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= | ||||||
| golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||||||
| gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY= | ||||||
| gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= | ||||||
| gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= | ||||||
|  | google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= | ||||||
|  | google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= | ||||||
|  | google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= | ||||||
|  | google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= | ||||||
|  | google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= | ||||||
|   | |||||||
| @@ -184,11 +184,11 @@ func validateSubscriber(sub server.Subscriber) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { | func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { | ||||||
| 	return func(p broker.Event) error { | 	return func(p broker.Event) error { | ||||||
| 		msg := p.Message() | 		msg := p.Message() | ||||||
| 		ct := msg.Header["Content-Type"] | 		ct := msg.Header["Content-Type"] | ||||||
| 		cf, err := s.newCodec(ct) | 		cf, err := h.newCodec(ct) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|   | |||||||
							
								
								
									
										18
									
								
								tcp.go
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								tcp.go
									
									
									
									
									
								
							| @@ -2,6 +2,7 @@ | |||||||
| package tcp // import "go.unistack.org/micro-server-tcp/v3" | package tcp // import "go.unistack.org/micro-server-tcp/v3" | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"crypto/tls" | 	"crypto/tls" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net" | 	"net" | ||||||
| @@ -14,6 +15,7 @@ import ( | |||||||
| 	"go.unistack.org/micro/v3/logger" | 	"go.unistack.org/micro/v3/logger" | ||||||
| 	"go.unistack.org/micro/v3/register" | 	"go.unistack.org/micro/v3/register" | ||||||
| 	"go.unistack.org/micro/v3/server" | 	"go.unistack.org/micro/v3/server" | ||||||
|  |  | ||||||
| 	"golang.org/x/net/netutil" | 	"golang.org/x/net/netutil" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -401,6 +403,8 @@ func (h *tcpServer) Start() error { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		h.gracefulStop() | ||||||
|  |  | ||||||
| 		ch <- ts.Close() | 		ch <- ts.Close() | ||||||
|  |  | ||||||
| 		// deregister | 		// deregister | ||||||
| @@ -422,6 +426,13 @@ func (h *tcpServer) Stop() error { | |||||||
| 	return <-ch | 	return <-ch | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (h *tcpServer) gracefulStop() { | ||||||
|  | 	ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) | ||||||
|  | 	defer cancel() | ||||||
|  |  | ||||||
| 
					
					vtolstov marked this conversation as resolved
					
						
						
							Outdated
						
					
				 | |||||||
|  | 	h.opts.Wait.WaitContext(ctx) | ||||||
|  | } | ||||||
|  |  | ||||||
| func (h *tcpServer) String() string { | func (h *tcpServer) String() string { | ||||||
| 	return "tcp" | 	return "tcp" | ||||||
| } | } | ||||||
| 
				
					
						devstigneev
						commented  Тут мы ждем либо у нас концертны все закрылись и вызвался cancel() Тут мы ждем либо у нас концертны все закрылись и вызвался cancel()
либо ctx.Done выполнился по GracefulTimeout 
				
					
						devstigneev
						commented  если я использую time.Timer, не совсем смог прийти к решению, что я его в ручную остановил и мог без препятствий завершить метод если я использую time.Timer, не совсем смог прийти к решению, что я его в ручную остановил и мог без препятствий завершить метод 
				
					
						devstigneev
						commented  но возникает другая проблема, если Wait - залочился, то горутина с ней утекла но возникает другая проблема, если Wait - залочился, то горутина с ней утекла | |||||||
| @@ -469,7 +480,12 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) { | |||||||
| 			config.Logger.Errorf(config.Context, "tcp: accept err: %v", err) | 			config.Logger.Errorf(config.Context, "tcp: accept err: %v", err) | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
| 		go hd.Serve(c) |  | ||||||
|  | 		h.opts.Wait.Add(1) | ||||||
|  | 		go func() { | ||||||
|  | 			hd.Serve(c) | ||||||
|  | 			h.opts.Wait.Done() | ||||||
|  | 		}() | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	
смотри, лучше сделать канал и если вейтгруппа кончилась- прямо там закрывать канал, тогда в селекте мы сразу попадем на нужный кейс
да, я прикинул потом, что закрытие нужно сместить