59
tcp.go
59
tcp.go
@@ -18,17 +18,14 @@ import (
|
||||
)
|
||||
|
||||
type tcpServer struct {
|
||||
hd server.Handler
|
||||
rsvc *register.Service
|
||||
exit chan chan error
|
||||
subscribers map[*tcpSubscriber][]broker.Subscriber
|
||||
opts server.Options
|
||||
sync.RWMutex
|
||||
opts server.Options
|
||||
hd server.Handler
|
||||
exit chan chan error
|
||||
registerOnce sync.Once
|
||||
subscribers map[*tcpSubscriber][]broker.Subscriber
|
||||
// used for first registration
|
||||
registered bool
|
||||
init bool
|
||||
// register service instance
|
||||
rsvc *register.Service
|
||||
}
|
||||
|
||||
func (h *tcpServer) newCodec(ct string) (codec.Codec, error) {
|
||||
@@ -89,8 +86,7 @@ func (h *tcpServer) Handle(handler server.Handler) error {
|
||||
func (h *tcpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
|
||||
options := server.NewHandlerOptions(opts...)
|
||||
|
||||
var eps []*register.Endpoint
|
||||
|
||||
eps := make([]*register.Endpoint, 0, len(options.Metadata))
|
||||
for name, metadata := range options.Metadata {
|
||||
eps = append(eps, ®ister.Endpoint{
|
||||
Name: name,
|
||||
@@ -157,12 +153,14 @@ func (h *tcpServer) Register() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service.Nodes[0].Metadata["protocol"] = "tcp"
|
||||
service.Nodes[0].Metadata["transport"] = "tcp"
|
||||
service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"]
|
||||
service.Endpoints = eps
|
||||
|
||||
h.Lock()
|
||||
var subscriberList []*tcpSubscriber
|
||||
|
||||
subscriberList := make([]*tcpSubscriber, 0, len(h.subscribers))
|
||||
for e := range h.subscribers {
|
||||
// Only advertise non internal subscribers
|
||||
subscriberList = append(subscriberList, e)
|
||||
@@ -312,10 +310,13 @@ func (h *tcpServer) Start() error {
|
||||
|
||||
if l := h.getListener(); l != nil {
|
||||
ts = l
|
||||
} else {
|
||||
}
|
||||
|
||||
// nolint: nestif
|
||||
if ts == nil {
|
||||
// check the tls config for secure connect
|
||||
if tc := config.TLSConfig; tc != nil {
|
||||
ts, err = tls.Listen("tcp", config.Address, tc)
|
||||
if config.TLSConfig != nil {
|
||||
ts, err = tls.Listen("tcp", config.Address, config.TLSConfig)
|
||||
// otherwise just plain tcp listener
|
||||
} else {
|
||||
ts, err = net.Listen("tcp", config.Address)
|
||||
@@ -375,6 +376,7 @@ func (h *tcpServer) Start() error {
|
||||
registered := h.registered
|
||||
h.RUnlock()
|
||||
rerr := h.opts.RegisterCheck(h.opts.Context)
|
||||
// nolint: nestif
|
||||
if rerr != nil && registered {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
|
||||
@@ -405,9 +407,13 @@ func (h *tcpServer) Start() error {
|
||||
ch <- ts.Close()
|
||||
|
||||
// deregister
|
||||
h.Deregister()
|
||||
if cerr := h.Deregister(); cerr != nil {
|
||||
config.Logger.Errorf(config.Context, "Register deregister error: %v", cerr)
|
||||
}
|
||||
|
||||
config.Broker.Disconnect(config.Context)
|
||||
if cerr := config.Broker.Disconnect(config.Context); cerr != nil {
|
||||
config.Logger.Errorf(config.Context, "Broker disconnect error: %v", cerr)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
@@ -419,24 +425,25 @@ func (h *tcpServer) Stop() error {
|
||||
return <-ch
|
||||
}
|
||||
|
||||
func (s *tcpServer) String() string {
|
||||
func (h *tcpServer) String() string {
|
||||
return "tcp"
|
||||
}
|
||||
|
||||
func (s *tcpServer) Name() string {
|
||||
return s.opts.Name
|
||||
func (h *tcpServer) Name() string {
|
||||
return h.opts.Name
|
||||
}
|
||||
|
||||
func (s *tcpServer) serve(ln net.Listener, h Handler) {
|
||||
func (h *tcpServer) serve(ln net.Listener, hd Handler) {
|
||||
var tempDelay time.Duration // how long to sleep on accept failure
|
||||
s.RLock()
|
||||
config := s.opts
|
||||
s.RUnlock()
|
||||
h.RLock()
|
||||
config := h.opts
|
||||
h.RUnlock()
|
||||
for {
|
||||
c, err := ln.Accept()
|
||||
// nolint: nestif
|
||||
if err != nil {
|
||||
select {
|
||||
case <-s.exit:
|
||||
case <-h.exit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
@@ -465,7 +472,7 @@ func (s *tcpServer) serve(ln net.Listener, h Handler) {
|
||||
config.Logger.Errorf(config.Context, "tcp: accept err: %v", err)
|
||||
return
|
||||
}
|
||||
go h.Serve(c)
|
||||
go hd.Serve(c)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user