[v4] hide access to internal mutex (#185)
* changed embedded mutex to private field * update ci * fix tests
This commit is contained in:
22
kgo.go
22
kgo.go
@@ -74,7 +74,7 @@ type Broker struct {
|
|||||||
|
|
||||||
opts broker.Options
|
opts broker.Options
|
||||||
|
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
init bool
|
init bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -141,9 +141,9 @@ func (b *Broker) newCodec(ct string) (codec.Codec, error) {
|
|||||||
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
||||||
ct = ct[:idx]
|
ct = ct[:idx]
|
||||||
}
|
}
|
||||||
b.RLock()
|
b.mu.RLock()
|
||||||
c, ok := b.opts.Codecs[ct]
|
c, ok := b.opts.Codecs[ct]
|
||||||
b.RUnlock()
|
b.mu.RUnlock()
|
||||||
if ok {
|
if ok {
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
@@ -238,10 +238,10 @@ func (k *Broker) Connect(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
k.Lock()
|
k.mu.Lock()
|
||||||
k.c = c
|
k.c = c
|
||||||
k.connected.Store(1)
|
k.connected.Store(1)
|
||||||
k.Unlock()
|
k.mu.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -259,8 +259,8 @@ func (k *Broker) Disconnect(ctx context.Context) error {
|
|||||||
ctx, span = k.opts.Tracer.Start(ctx, "Disconnect")
|
ctx, span = k.opts.Tracer.Start(ctx, "Disconnect")
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
k.Lock()
|
k.mu.Lock()
|
||||||
defer k.Unlock()
|
defer k.mu.Unlock()
|
||||||
select {
|
select {
|
||||||
case <-nctx.Done():
|
case <-nctx.Done():
|
||||||
return nctx.Err()
|
return nctx.Err()
|
||||||
@@ -284,8 +284,8 @@ func (k *Broker) Disconnect(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (k *Broker) Init(opts ...broker.Option) error {
|
func (k *Broker) Init(opts ...broker.Option) error {
|
||||||
k.Lock()
|
k.mu.Lock()
|
||||||
defer k.Unlock()
|
defer k.mu.Unlock()
|
||||||
|
|
||||||
if len(opts) == 0 && k.init {
|
if len(opts) == 0 && k.init {
|
||||||
return nil
|
return nil
|
||||||
@@ -538,9 +538,9 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
|
|||||||
|
|
||||||
go sub.poll(ctx)
|
go sub.poll(ctx)
|
||||||
|
|
||||||
b.Lock()
|
b.mu.Lock()
|
||||||
b.subs = append(b.subs, sub)
|
b.subs = append(b.subs, sub)
|
||||||
b.Unlock()
|
b.mu.Unlock()
|
||||||
|
|
||||||
return sub, nil
|
return sub, nil
|
||||||
}
|
}
|
||||||
|
@@ -49,8 +49,8 @@ type Subscriber struct {
|
|||||||
kopts broker.Options
|
kopts broker.Options
|
||||||
opts broker.SubscribeOptions
|
opts broker.SubscribeOptions
|
||||||
|
|
||||||
connected *atomic.Uint32
|
connected *atomic.Uint32
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
closed bool
|
closed bool
|
||||||
fatalOnError bool
|
fatalOnError bool
|
||||||
}
|
}
|
||||||
@@ -118,11 +118,11 @@ func (s *Subscriber) poll(ctx context.Context) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
for p, l := range lmap {
|
for p, l := range lmap {
|
||||||
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag))
|
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag))
|
||||||
}
|
}
|
||||||
s.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -230,9 +230,9 @@ func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str
|
|||||||
opts: s.opts,
|
opts: s.opts,
|
||||||
connected: s.connected,
|
connected: s.connected,
|
||||||
}
|
}
|
||||||
s.Lock()
|
s.mu.Lock()
|
||||||
s.consumers[tp{topic, partition}] = pc
|
s.consumers[tp{topic, partition}] = pc
|
||||||
s.Unlock()
|
s.mu.Unlock()
|
||||||
go pc.consume()
|
go pc.consume()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user