From df8b69c3ad27d98a58804f9e89e20e397e688681 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 4 Mar 2025 23:32:30 +0300 Subject: [PATCH] update to latest micro Signed-off-by: Vasiliy Tolstov --- go.mod | 22 ++--- go.sum | 39 ++++---- handler.go | 2 +- message.go | 38 -------- options.go | 4 +- request.go | 8 +- subscriber.go | 260 -------------------------------------------------- tcp.go | 116 ++-------------------- 8 files changed, 46 insertions(+), 443 deletions(-) delete mode 100644 message.go delete mode 100644 subscriber.go diff --git a/go.mod b/go.mod index a4cd21c..bc054ec 100644 --- a/go.mod +++ b/go.mod @@ -1,23 +1,23 @@ -module go.unistack.org/micro-server-tcp/v3 +module go.unistack.org/micro-server-tcp/v4 -go 1.22.0 +go 1.23.0 -toolchain go1.23.4 +toolchain go1.24.0 require ( - go.unistack.org/micro/v3 v3.11.37 - golang.org/x/net v0.34.0 + go.unistack.org/micro/v4 v4.1.2 + golang.org/x/net v0.36.0 ) require ( github.com/ash3in/uuidv8 v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/kr/pretty v0.3.1 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect - go.unistack.org/micro-proto/v3 v3.4.1 // indirect - golang.org/x/sys v0.29.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect - google.golang.org/grpc v1.69.4 // indirect - google.golang.org/protobuf v1.36.2 // indirect + github.com/spf13/cast v1.7.1 // indirect + go.unistack.org/micro-proto/v4 v4.1.0 // indirect + golang.org/x/sys v0.30.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect + google.golang.org/grpc v1.71.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 70ac70f..067b6ca 100644 --- a/go.sum +++ b/go.sum @@ -2,7 +2,8 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -15,26 +16,26 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4= github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= -go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= -go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -go.unistack.org/micro/v3 v3.11.37 h1:ZcpnXAYEMcAwmnVb5b7o8/PylGnILxXMHaUlRrPmRI0= -go.unistack.org/micro/v3 v3.11.37/go.mod h1:POGU5hstnAT9LH70m8FalyQSNi2GfIew71K75JenIZk= -golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= -golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 h1:3UsHvIr4Wc2aW4brOaSCmcxh9ksica6fHEr8P1XhkYw= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4= -google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= -google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= -google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= +github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk= +go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= +go.unistack.org/micro/v4 v4.1.2 h1:9SOlPYyPNNFpg1A7BsvhDyQm3gysLH1AhWbDCp1hyoY= +go.unistack.org/micro/v4 v4.1.2/go.mod h1:lr3oYED8Ay1vjK68QqRw30QOtdk/ffpZqMFDasOUhKw= +golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= +golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/handler.go b/handler.go index 2cdc820..78be6eb 100644 --- a/handler.go +++ b/handler.go @@ -3,7 +3,7 @@ package tcp import ( "net" - "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v4/server" ) type Handler interface { diff --git a/message.go b/message.go deleted file mode 100644 index 135d2ba..0000000 --- a/message.go +++ /dev/null @@ -1,38 +0,0 @@ -package tcp - -import ( - "go.unistack.org/micro/v3/codec" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/server" -) - -var _ server.Message = &tcpMessage{} - -type tcpMessage struct { - payload interface{} - codec codec.Codec - header metadata.Metadata - topic string - contentType string - body []byte -} - -func (r *tcpMessage) Topic() string { - return r.topic -} - -func (r *tcpMessage) ContentType() string { - return r.contentType -} - -func (r *tcpMessage) Header() metadata.Metadata { - return r.header -} - -func (r *tcpMessage) Body() interface{} { - return r.payload -} - -func (r *tcpMessage) Codec() codec.Codec { - return r.codec -} diff --git a/options.go b/options.go index 7ccd9a3..ee0af13 100644 --- a/options.go +++ b/options.go @@ -4,7 +4,7 @@ import ( "crypto/tls" "net" - "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v4/server" ) // DefaultMaxMsgSize define maximum message size that server can send @@ -18,10 +18,8 @@ type ( netListener struct{} ) -// // MaxMsgSize set the maximum message in bytes the server can receive and // send. Default maximum message size is 8K -// func MaxMsgSize(s int) server.Option { return server.SetOption(maxMsgSizeKey{}, s) } diff --git a/request.go b/request.go index 313ca06..07d3d6f 100644 --- a/request.go +++ b/request.go @@ -1,9 +1,9 @@ package tcp import ( - "go.unistack.org/micro/v3/codec" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/server" + "go.unistack.org/micro/v4/codec" + "go.unistack.org/micro/v4/metadata" + "go.unistack.org/micro/v4/server" ) var _ server.Request = &tcpRequest{} @@ -11,7 +11,7 @@ var _ server.Request = &tcpRequest{} type tcpRequest struct { codec codec.Codec body interface{} - header map[string]string + header metadata.Metadata method string endpoint string contentType string diff --git a/subscriber.go b/subscriber.go deleted file mode 100644 index fb7f075..0000000 --- a/subscriber.go +++ /dev/null @@ -1,260 +0,0 @@ -package tcp - -import ( - "context" - "fmt" - "reflect" - "strings" - "unicode" - "unicode/utf8" - - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/options" - "go.unistack.org/micro/v3/server" -) - -const ( - subSig = "func(context.Context, interface{}) error" -) - -var typeOfError = reflect.TypeOf((*error)(nil)).Elem() - -type handler struct { - reqType reflect.Type - ctxType reflect.Type - method reflect.Value -} - -type tcpSubscriber struct { - topic string - rcvr reflect.Value - typ reflect.Type - subscriber interface{} - handlers []*handler - opts server.SubscriberOptions -} - -// Is this an exported - upper case - name? -func isExported(name string) bool { - r, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(r) -} - -// Is this type exported or a builtin? -func isExportedOrBuiltinType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - // PkgPath will be non-empty even for an exported type, - // so we need to check the type name as well. - return isExported(t.Name()) || t.PkgPath() == "" -} - -func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { - options := server.SubscriberOptions{ - AutoAck: true, - } - - for _, o := range opts { - o(&options) - } - - var handlers []*handler - - if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { - h := &handler{ - method: reflect.ValueOf(sub), - } - - switch typ.NumIn() { - case 1: - h.reqType = typ.In(0) - case 2: - h.ctxType = typ.In(0) - h.reqType = typ.In(1) - } - - handlers = append(handlers, h) - } else { - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - h := &handler{ - method: method.Func, - } - - switch method.Type.NumIn() { - case 2: - h.reqType = method.Type.In(1) - case 3: - h.ctxType = method.Type.In(1) - h.reqType = method.Type.In(2) - } - - handlers = append(handlers, h) - } - } - - return &tcpSubscriber{ - rcvr: reflect.ValueOf(sub), - typ: reflect.TypeOf(sub), - topic: topic, - subscriber: sub, - handlers: handlers, - opts: options, - } -} - -func validateSubscriber(sub server.Subscriber) error { - typ := reflect.TypeOf(sub.Subscriber()) - var argType reflect.Type - - switch typ.Kind() { - case reflect.Func: - name := "Func" - switch typ.NumIn() { - case 2: - argType = typ.In(1) - default: - return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig) - } - if !isExportedOrBuiltinType(argType) { - return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType) - } - if typ.NumOut() != 1 { - return fmt.Errorf("subscriber %v has wrong number of outs: %v require signature %s", - name, typ.NumOut(), subSig) - } - if returnType := typ.Out(0); returnType != typeOfError { - return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String()) - } - default: - hdlr := reflect.ValueOf(sub.Subscriber()) - name := reflect.Indirect(hdlr).Type().Name() - - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - - switch method.Type.NumIn() { - case 3: - argType = method.Type.In(2) - default: - return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s", - name, method.Name, method.Type.NumIn(), subSig) - } - - if !isExportedOrBuiltinType(argType) { - return fmt.Errorf("%v argument type not exported: %v", name, argType) - } - if method.Type.NumOut() != 1 { - return fmt.Errorf( - "subscriber %v.%v has wrong number of outs: %v require signature %s", - name, method.Name, method.Type.NumOut(), subSig) - } - if returnType := method.Type.Out(0); returnType != typeOfError { - return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String()) - } - } - } - - return nil -} - -func (h *Server) createSubHandler(sb *tcpSubscriber, opts server.Options) broker.Handler { - return func(p broker.Event) error { - msg := p.Message() - ct := msg.Header["Content-Type"] - cf, err := h.newCodec(ct) - if err != nil { - return err - } - - hdr := make(map[string]string, len(msg.Header)-1) - for k, v := range msg.Header { - if k == "Content-Type" { - continue - } - hdr[k] = v - } - ctx := metadata.NewIncomingContext(context.Background(), hdr) - - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var isVal bool - var req reflect.Value - - if handler.reqType.Kind() == reflect.Ptr { - req = reflect.New(handler.reqType.Elem()) - } else { - req = reflect.New(handler.reqType) - isVal = true - } - if isVal { - req = req.Elem() - } - - fn := func(ctx context.Context, msg server.Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctx)) - } - - vals = append(vals, reflect.ValueOf(msg.Body())) - - returnValues := handler.method.Call(vals) - if err := returnValues[0].Interface(); err != nil { - return err.(error) - } - return nil - } - - opts.Hooks.EachNext(func(hook options.Hook) { - if h, ok := hook.(server.HookSubHandler); ok { - fn = h(fn) - } - }) - - go func() { - results <- fn(ctx, &tcpMessage{ - topic: sb.topic, - contentType: ct, - payload: req.Interface(), - header: msg.Header, - codec: cf, - }) - }() - } - - var errors []string - - for i := 0; i < len(sb.handlers); i++ { - if err := <-results; err != nil { - errors = append(errors, err.Error()) - } - } - - if len(errors) > 0 { - return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - - return nil - } -} - -func (s *tcpSubscriber) Topic() string { - return s.topic -} - -func (s *tcpSubscriber) Subscriber() interface{} { - return s.subscriber -} - -func (s *tcpSubscriber) Options() server.SubscriberOptions { - return s.opts -} diff --git a/tcp.go b/tcp.go index eac6839..2a3181e 100644 --- a/tcp.go +++ b/tcp.go @@ -6,28 +6,25 @@ import ( "crypto/tls" "fmt" "net" - "sort" "sync" "sync/atomic" "time" - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/codec" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/register" - "go.unistack.org/micro/v3/server" - msync "go.unistack.org/micro/v3/sync" + "go.unistack.org/micro/v4/codec" + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/register" + "go.unistack.org/micro/v4/server" + msync "go.unistack.org/micro/v4/sync" "golang.org/x/net/netutil" ) var _ server.Server = (*Server)(nil) type Server struct { - hd server.Handler - rsvc *register.Service - exit chan chan error - subscribers map[*tcpSubscriber][]broker.Subscriber - opts server.Options + hd server.Handler + rsvc *register.Service + exit chan chan error + opts server.Options sync.RWMutex registered bool init bool @@ -115,33 +112,6 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s return th } -func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { - return newSubscriber(topic, handler, opts...) -} - -func (h *Server) Subscribe(sb server.Subscriber) error { - sub, ok := sb.(*tcpSubscriber) - if !ok { - return fmt.Errorf("invalid subscriber: expected *tcpSubscriber") - } - if len(sub.handlers) == 0 { - return fmt.Errorf("invalid subscriber: no handler functions") - } - - if err := validateSubscriber(sb); err != nil { - return err - } - - h.Lock() - defer h.Unlock() - _, ok = h.subscribers[sub] - if ok { - return fmt.Errorf("subscriber %v already exists", h) - } - h.subscribers[sub] = nil - return nil -} - func (h *Server) Register() error { h.Lock() config := h.opts @@ -162,22 +132,6 @@ func (h *Server) Register() error { return err } - service.Nodes[0].Metadata["protocol"] = "tcp" - service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"] - - h.Lock() - - subscriberList := make([]*tcpSubscriber, 0, len(h.subscribers)) - for e := range h.subscribers { - // Only advertise non internal subscribers - subscriberList = append(subscriberList, e) - } - sort.Slice(subscriberList, func(i, j int) bool { - return subscriberList[i].topic > subscriberList[j].topic - }) - - h.Unlock() - h.RLock() registered := h.registered h.RUnlock() @@ -205,31 +159,6 @@ func (h *Server) Register() error { return nil } - for sb := range h.subscribers { - handler := h.createSubHandler(sb, config) - var opts []broker.SubscribeOption - if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.SubscribeGroup(queue)) - } - - subCtx := config.Context - if cx := sb.Options().Context; cx != nil { - subCtx = cx - } - opts = append(opts, broker.SubscribeContext(subCtx)) - opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Info(config.Context, "Subscribing to topic: "+sb.Topic()) - } - - sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) - if err != nil { - return err - } - h.subscribers[sb] = []broker.Subscriber{sub} - } - h.registered = true h.rsvc = service @@ -261,32 +190,6 @@ func (h *Server) Deregister() error { } h.registered = false - wg := sync.WaitGroup{} - subCtx := h.opts.Context - - for sb, subs := range h.subscribers { - if cx := sb.Options().Context; cx != nil { - subCtx = cx - } - - for _, sub := range subs { - wg.Add(1) - go func(s broker.Subscriber) { - defer wg.Done() - if config.Logger.V(logger.InfoLevel) { - config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic()) - } - if err := s.Unsubscribe(subCtx); err != nil { - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(config.Context, "Unsubscribing from errot topic: "+s.Topic(), err) - } - } - }(sub) - } - h.subscribers[sb] = nil - } - wg.Wait() - h.Unlock() return nil } @@ -508,6 +411,5 @@ func NewServer(opts ...server.Option) server.Server { stateHealth: &atomic.Uint32{}, opts: server.NewOptions(opts...), exit: make(chan chan error), - subscribers: make(map[*tcpSubscriber][]broker.Subscriber), } }