add locking
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
c89df95fdc
commit
b445a7eeb7
3
util.go
3
util.go
@ -50,8 +50,11 @@ func (s *subscriber) run(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.kopts.Logger.Infof(ctx, "handle fetches")
|
||||||
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
|
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
|
||||||
|
s.Lock()
|
||||||
consumers := s.consumers[p.Topic]
|
consumers := s.consumers[p.Topic]
|
||||||
|
s.Unlock()
|
||||||
if consumers == nil {
|
if consumers == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user