From eecc3854b232aa449b9697003f342cab33fc6c08 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 23 Apr 2024 08:01:44 +0300 Subject: [PATCH] add metrics and tracing Signed-off-by: Vasiliy Tolstov --- go.mod | 6 +-- go.sum | 45 +++--------------- grpc.go | 125 +++++++++++++++++++------------------------------- subscriber.go | 48 ++++++++++++++++--- 4 files changed, 99 insertions(+), 125 deletions(-) diff --git a/go.mod b/go.mod index d3d0cd7..d066612 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,14 @@ go 1.20 require ( github.com/golang/protobuf v1.5.4 - go.unistack.org/micro/v3 v3.10.54 + go.unistack.org/micro/v3 v3.10.66 golang.org/x/net v0.24.0 - google.golang.org/grpc v1.63.0 + google.golang.org/grpc v1.63.2 google.golang.org/protobuf v1.33.0 ) require ( golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect ) diff --git a/go.sum b/go.sum index d5dea2f..728db3a 100644 --- a/go.sum +++ b/go.sum @@ -1,48 +1,17 @@ -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -go.unistack.org/micro/v3 v3.10.42 h1:A0nA6WT6wNq5fyQyzliX70Bj5/SGj5kadLSOySX4hro= -go.unistack.org/micro/v3 v3.10.42/go.mod h1:CSmEf5ddmft94MyKHnUSMM0W5dpmmTVbgImbgQWV5Ak= -go.unistack.org/micro/v3 v3.10.52 h1:6LlAvLLlf+3JLCEQEVNQWi7DXCoI1ocuOqqoEPj5S+k= -go.unistack.org/micro/v3 v3.10.52/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= -go.unistack.org/micro/v3 v3.10.54 h1:3qbv7jg+wpcYG/nJXzE/GEIsM8i5UdpytL2cNE8i3y0= -go.unistack.org/micro/v3 v3.10.54/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +go.unistack.org/micro/v3 v3.10.66 h1:tiG8HnyTC71IZWSC2qT/DmLhJinZJL9qvw+4Fvpm3d4= +go.unistack.org/micro/v3 v3.10.66/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c h1:lfpJ/2rWPa/kJgxyyXM8PrNnfCzcmxJ265mADgwmvLI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= -google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= -google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= -google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= -google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= -google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8= -google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= diff --git a/grpc.go b/grpc.go index 9276c1f..af641a7 100644 --- a/grpc.go +++ b/grpc.go @@ -24,10 +24,12 @@ import ( "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/server" msync "go.unistack.org/micro/v3/sync" + "go.unistack.org/micro/v3/tracer" "golang.org/x/net/netutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -106,25 +108,6 @@ func (g *Server) configure(opts ...server.Option) error { o(&g.opts) } - if err := g.opts.Register.Init(); err != nil { - return err - } - if err := g.opts.Broker.Init(); err != nil { - return err - } - if err := g.opts.Tracer.Init(); err != nil { - return err - } - if err := g.opts.Logger.Init(); err != nil { - return err - } - if err := g.opts.Meter.Init(); err != nil { - return err - } - if err := g.opts.Transport.Init(); err != nil { - return err - } - if g.opts.Context != nil { if codecs, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil { for k, v := range codecs { @@ -207,13 +190,25 @@ func (g *Server) getGrpcOptions() []grpc.ServerOption { func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { var err error + + ctx := stream.Context() + fullMethod, ok := grpc.MethodFromServerStream(stream) if !ok { return status.Errorf(codes.Internal, "method does not exist in context") } + var sp tracer.Span + ctx, sp = g.opts.Tracer.Start(ctx, fullMethod+" rpc-server", + tracer.WithSpanKind(tracer.SpanKindServer), + tracer.WithSpanLabels( + "endpoint", fullMethod, + ), + ) + ts := time.Now() g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Inc() + defer func() { te := time.Since(ts) g.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", fullMethod).Update(te.Seconds()) @@ -224,8 +219,10 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { if st == nil || st.Code() == codes.OK { g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc() } else { + sp.SetStatus(tracer.SpanStatusError, err.Error()) g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(st.Code()))).Inc() } + sp.Finish() }() var serviceName, methodName string @@ -241,7 +238,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { } // get grpc metadata - gmd, ok := gmetadata.FromIncomingContext(stream.Context()) + gmd, ok := gmetadata.FromIncomingContext(ctx) if !ok { gmd = gmetadata.MD{} } @@ -291,10 +288,10 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { } // create new context - ctx := metadata.NewIncomingContext(stream.Context(), md) + ctx = metadata.NewIncomingContext(ctx, md) // get peer from context - if p, ok := peer.FromContext(stream.Context()); ok { + if p, ok := peer.FromContext(ctx); ok { md.Set("Remote", p.Addr.String()) ctx = peer.NewContext(ctx, p) } @@ -392,10 +389,11 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s return err } - // wrap the handler func - for i := len(g.opts.HdlrWrappers); i > 0; i-- { - fn = g.opts.HdlrWrappers[i-1](fn) - } + g.opts.Hooks.EachNext(func(hook options.Hook) { + if h, ok := hook.(server.HookHandler); ok { + fn = h(fn) + } + }) statusCode := codes.OK statusDesc := "" @@ -428,7 +426,7 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s config := g.opts g.RUnlock() if config.Logger.V(logger.ErrorLevel) { - config.Logger.Warn(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message") + config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message") } // default case user pass own error type that not proto based statusCode = convertCode(verr) @@ -475,9 +473,11 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se return nil } - for i := len(opts.HdlrWrappers); i > 0; i-- { - fn = opts.HdlrWrappers[i-1](fn) - } + opts.Hooks.EachNext(func(hook options.Hook) { + if h, ok := hook.(server.HookHandler); ok { + fn = h(fn) + } + }) statusCode := codes.OK statusDesc := "" @@ -645,7 +645,7 @@ func (g *Server) Register() error { if !registered { if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID) + config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)) } } @@ -681,7 +681,7 @@ func (g *Server) Deregister() error { } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID) + config.Logger.Info(config.Context, "Deregistering node: "+service.Nodes[0].ID) } if err := server.DefaultDeregisterFunc(service, config); err != nil { @@ -705,11 +705,11 @@ func (g *Server) Deregister() error { go func(s broker.Subscriber) { defer wg.Done() if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", s.Topic()) + config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic()) } if err := s.Unsubscribe(g.opts.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Unsubscribing from topic: %s err: %v", s.Topic(), err) + config.Logger.Error(config.Context, "Unsubscribing from topic: "+s.Topic(), err) } } }(sub) @@ -756,7 +756,7 @@ func (g *Server) Start() error { } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Server [grpc] Listening on %s", ts.Addr().String()) + config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String()) } g.Lock() g.opts.Address = ts.Addr().String() @@ -770,13 +770,13 @@ func (g *Server) Start() error { // connect to the broker if err = config.Broker.Connect(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Broker [%s] connect error: %v", config.Broker.String(), err) + config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err) } return err } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())) } } @@ -784,13 +784,13 @@ func (g *Server) Start() error { // nolint: nestif if err = g.opts.RegisterCheck(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err) + config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), err) } } else { // announce self to the world if err = g.Register(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server register error: %v", err) + config.Logger.Error(config.Context, "Server register error", err) } } } @@ -803,11 +803,11 @@ func (g *Server) Start() error { go func() { if err = g.srv.Serve(ts); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "gRPC Server start error: %v", err) + config.Logger.Error(config.Context, "gRPC Server start error", err) } if err = g.Stop(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "gRPC Server stop error: %v", err) + config.Logger.Error(config.Context, "gRPC Server stop error", err) } } } @@ -837,23 +837,23 @@ func (g *Server) Start() error { // nolint: nestif if rerr != nil && registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr) + config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error, deregister it", config.Name, config.ID), rerr) } // deregister self in case of error if err = g.Deregister(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err) + config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error", config.Name, config.ID), err) } } } else if rerr != nil && !registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr) + config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), rerr) } continue } if err = g.Register(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err) + config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error", config.Name, config.ID), err) } } // wait for exit @@ -892,12 +892,12 @@ func (g *Server) Start() error { ch <- nil if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())) } // disconnect broker if err = config.Broker.Disconnect(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(config.Context, "Broker [%s] disconnect error: %v", config.Broker.String(), err) + config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err) } } }() @@ -910,37 +910,6 @@ func (g *Server) Start() error { return nil } -func (g *Server) subscribe() error { - config := g.opts - - for sb := range g.subscribers { - handler := g.createSubHandler(sb, config) - var opts []broker.SubscribeOption - if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.SubscribeGroup(queue)) - } - - subCtx := config.Context - if cx := sb.Options().Context; cx != nil { - subCtx = cx - } - opts = append(opts, broker.SubscribeContext(subCtx)) - opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) - opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly)) - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic()) - } - sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) - if err != nil { - return err - } - g.subscribers[sb] = []broker.Subscriber{sub} - } - - return nil -} - func (g *Server) Stop() error { g.RLock() if !g.started { diff --git a/subscriber.go b/subscriber.go index 79ada2a..4c05e7f 100644 --- a/subscriber.go +++ b/subscriber.go @@ -8,10 +8,9 @@ import ( "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/codec" - - // "go.unistack.org/micro/v3/errors" - // "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/server" ) @@ -198,9 +197,11 @@ func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Ha return nil } - for i := len(opts.SubWrappers); i > 0; i-- { - fn = opts.SubWrappers[i-1](fn) - } + opts.Hooks.EachNext(func(hook options.Hook) { + if h, ok := hook.(server.HookSubHandler); ok { + fn = h(fn) + } + }) if g.wg != nil { g.wg.Add(1) @@ -247,3 +248,38 @@ func (s *subscriber) Endpoints() []*register.Endpoint { func (s *subscriber) Options() server.SubscriberOptions { return s.opts } + +func (g *Server) subscribe() error { + config := g.opts + subCtx := config.Context + + for sb := range g.subscribers { + + if cx := sb.Options().Context; cx != nil { + subCtx = cx + } + + opts := []broker.SubscribeOption{ + broker.SubscribeContext(subCtx), + broker.SubscribeAutoAck(sb.Options().AutoAck), + broker.SubscribeBodyOnly(sb.Options().BodyOnly), + } + + if queue := sb.Options().Queue; len(queue) > 0 { + opts = append(opts, broker.SubscribeGroup(queue)) + } + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info(config.Context, "subscribing to topic: "+sb.Topic()) + } + + sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), g.createSubHandler(sb, config), opts...) + if err != nil { + return err + } + + g.subscribers[sb] = []broker.Subscriber{sub} + } + + return nil +}