Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
116
tcp.go
116
tcp.go
@@ -6,28 +6,25 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
msync "go.unistack.org/micro/v3/sync"
|
||||
"go.unistack.org/micro/v4/codec"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/register"
|
||||
"go.unistack.org/micro/v4/server"
|
||||
msync "go.unistack.org/micro/v4/sync"
|
||||
"golang.org/x/net/netutil"
|
||||
)
|
||||
|
||||
var _ server.Server = (*Server)(nil)
|
||||
|
||||
type Server struct {
|
||||
hd server.Handler
|
||||
rsvc *register.Service
|
||||
exit chan chan error
|
||||
subscribers map[*tcpSubscriber][]broker.Subscriber
|
||||
opts server.Options
|
||||
hd server.Handler
|
||||
rsvc *register.Service
|
||||
exit chan chan error
|
||||
opts server.Options
|
||||
sync.RWMutex
|
||||
registered bool
|
||||
init bool
|
||||
@@ -115,33 +112,6 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
|
||||
return th
|
||||
}
|
||||
|
||||
func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
return newSubscriber(topic, handler, opts...)
|
||||
}
|
||||
|
||||
func (h *Server) Subscribe(sb server.Subscriber) error {
|
||||
sub, ok := sb.(*tcpSubscriber)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid subscriber: expected *tcpSubscriber")
|
||||
}
|
||||
if len(sub.handlers) == 0 {
|
||||
return fmt.Errorf("invalid subscriber: no handler functions")
|
||||
}
|
||||
|
||||
if err := validateSubscriber(sb); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
_, ok = h.subscribers[sub]
|
||||
if ok {
|
||||
return fmt.Errorf("subscriber %v already exists", h)
|
||||
}
|
||||
h.subscribers[sub] = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Server) Register() error {
|
||||
h.Lock()
|
||||
config := h.opts
|
||||
@@ -162,22 +132,6 @@ func (h *Server) Register() error {
|
||||
return err
|
||||
}
|
||||
|
||||
service.Nodes[0].Metadata["protocol"] = "tcp"
|
||||
service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"]
|
||||
|
||||
h.Lock()
|
||||
|
||||
subscriberList := make([]*tcpSubscriber, 0, len(h.subscribers))
|
||||
for e := range h.subscribers {
|
||||
// Only advertise non internal subscribers
|
||||
subscriberList = append(subscriberList, e)
|
||||
}
|
||||
sort.Slice(subscriberList, func(i, j int) bool {
|
||||
return subscriberList[i].topic > subscriberList[j].topic
|
||||
})
|
||||
|
||||
h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
registered := h.registered
|
||||
h.RUnlock()
|
||||
@@ -205,31 +159,6 @@ func (h *Server) Register() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
for sb := range h.subscribers {
|
||||
handler := h.createSubHandler(sb, config)
|
||||
var opts []broker.SubscribeOption
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
subCtx := config.Context
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
subCtx = cx
|
||||
}
|
||||
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Info(config.Context, "Subscribing to topic: "+sb.Topic())
|
||||
}
|
||||
|
||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
h.registered = true
|
||||
h.rsvc = service
|
||||
|
||||
@@ -261,32 +190,6 @@ func (h *Server) Deregister() error {
|
||||
}
|
||||
h.registered = false
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
subCtx := h.opts.Context
|
||||
|
||||
for sb, subs := range h.subscribers {
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
subCtx = cx
|
||||
}
|
||||
|
||||
for _, sub := range subs {
|
||||
wg.Add(1)
|
||||
go func(s broker.Subscriber) {
|
||||
defer wg.Done()
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic())
|
||||
}
|
||||
if err := s.Unsubscribe(subCtx); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Error(config.Context, "Unsubscribing from errot topic: "+s.Topic(), err)
|
||||
}
|
||||
}
|
||||
}(sub)
|
||||
}
|
||||
h.subscribers[sb] = nil
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
h.Unlock()
|
||||
return nil
|
||||
}
|
||||
@@ -508,6 +411,5 @@ func NewServer(opts ...server.Option) server.Server {
|
||||
stateHealth: &atomic.Uint32{},
|
||||
opts: server.NewOptions(opts...),
|
||||
exit: make(chan chan error),
|
||||
subscribers: make(map[*tcpSubscriber][]broker.Subscriber),
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user