add waitGroups for waiting finish all connects #131
| @@ -184,11 +184,11 @@ func validateSubscriber(sub server.Subscriber) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { | func (h *tcpServer) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { | ||||||
| 	return func(p broker.Event) error { | 	return func(p broker.Event) error { | ||||||
| 		msg := p.Message() | 		msg := p.Message() | ||||||
| 		ct := msg.Header["Content-Type"] | 		ct := msg.Header["Content-Type"] | ||||||
| 		cf, err := s.newCodec(ct) | 		cf, err := h.newCodec(ct) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|   | |||||||
							
								
								
									
										19
									
								
								tcp.go
									
									
									
									
									
								
							
							
						
						
									
										19
									
								
								tcp.go
									
									
									
									
									
								
							| @@ -404,6 +404,7 @@ func (h *tcpServer) Start() error { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		h.gracefulStop() | ||||||
| 		ch <- ts.Close() | 		ch <- ts.Close() | ||||||
|  |  | ||||||
| 		// deregister | 		// deregister | ||||||
| @@ -420,19 +421,21 @@ func (h *tcpServer) Start() error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (h *tcpServer) Stop() error { | func (h *tcpServer) Stop() error { | ||||||
|  | 	ch := make(chan error) | ||||||
|  | 	h.exit <- ch | ||||||
|  | 	return <-ch | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (h *tcpServer) gracefulStop() { | ||||||
| 	ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) | 	ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) | ||||||
|  |  | ||||||
| 	go func() { | 	go func() { | ||||||
| 		defer cancel() | 		h.opts.Wait.Wait() | ||||||
| 		h.wg.Wait() | 		cancel() | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	// wait timeout or finish all connects | 	// wait timeout or finish all connects | ||||||
| 	<-ctx.Done() | 	<-ctx.Done() | ||||||
|  |  | ||||||
| 	ch := make(chan error) |  | ||||||
| 	h.exit <- ch |  | ||||||
| 	return <-ch |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *tcpServer) String() string { | func (h *tcpServer) String() string { | ||||||
| @@ -483,9 +486,9 @@ func (h *tcpServer) serve(ln net.Listener, hd Handler) { | |||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		h.wg.Add(1) | 		h.opts.Wait.Add(1) | ||||||
| 		go func() { | 		go func() { | ||||||
| 			defer h.wg.Done() | 			defer h.opts.Wait.Done() | ||||||
| 			hd.Serve(c) | 			hd.Serve(c) | ||||||
| 		}() | 		}() | ||||||
| 	} | 	} | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user