fixup for never micro
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
31
tcp.go
31
tcp.go
@@ -16,7 +16,6 @@ import (
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/registry"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
regutil "github.com/unistack-org/micro/v3/util/registry"
|
||||
"golang.org/x/net/netutil"
|
||||
)
|
||||
|
||||
@@ -139,12 +138,12 @@ func (h *tcpServer) Register() error {
|
||||
eps := h.hd.Endpoints()
|
||||
h.Unlock()
|
||||
|
||||
service, err := regutil.NewService(h)
|
||||
service, err := server.NewRegistryService(h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
service.Metadata["protocol"] = "tcp"
|
||||
service.Metadata["transport"] = "tcp"
|
||||
service.Nodes[0].Metadata["protocol"] = "tcp"
|
||||
service.Nodes[0].Metadata["transport"] = "tcp"
|
||||
service.Endpoints = eps
|
||||
|
||||
h.Lock()
|
||||
@@ -183,18 +182,22 @@ func (h *tcpServer) Register() error {
|
||||
}
|
||||
h.registered = true
|
||||
|
||||
subCtx := h.opts.Context
|
||||
|
||||
for sb := range h.subscribers {
|
||||
handler := h.createSubHandler(sb, opts)
|
||||
var subOpts []broker.SubscribeOption
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
subOpts = append(subOpts, broker.Queue(queue))
|
||||
}
|
||||
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
subCtx = cx
|
||||
}
|
||||
if !sb.Options().AutoAck {
|
||||
subOpts = append(subOpts, broker.DisableAutoAck())
|
||||
}
|
||||
|
||||
sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...)
|
||||
sub, err := opts.Broker.Subscribe(subCtx, sb.Topic(), handler, subOpts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -210,7 +213,7 @@ func (h *tcpServer) Deregister() error {
|
||||
|
||||
logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id)
|
||||
|
||||
service, err := regutil.NewService(h)
|
||||
service, err := server.NewRegistryService(h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -225,10 +228,18 @@ func (h *tcpServer) Deregister() error {
|
||||
}
|
||||
h.registered = false
|
||||
|
||||
subCtx := h.opts.Context
|
||||
|
||||
for sb, subs := range h.subscribers {
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
subCtx = cx
|
||||
}
|
||||
for _, sub := range subs {
|
||||
logger.Infof("Unsubscribing from topic: %s", sub.Topic())
|
||||
sub.Unsubscribe()
|
||||
if err := sub.Unsubscribe(subCtx); err != nil {
|
||||
logger.Errorf("failed to unsubscribe topic: %s error: %v", sb.Topic(), err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
h.subscribers[sb] = nil
|
||||
}
|
||||
@@ -285,7 +296,7 @@ func (h *tcpServer) Start() error {
|
||||
h.opts.Address = ts.Addr().String()
|
||||
h.Unlock()
|
||||
|
||||
if err = opts.Broker.Connect(); err != nil {
|
||||
if err = opts.Broker.Connect(h.opts.Context); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -355,7 +366,7 @@ func (h *tcpServer) Start() error {
|
||||
// deregister
|
||||
h.Deregister()
|
||||
|
||||
opts.Broker.Disconnect()
|
||||
opts.Broker.Disconnect(h.opts.Context)
|
||||
}()
|
||||
|
||||
return nil
|
||||
|
Reference in New Issue
Block a user