update for latest micro
Some checks failed
build / test (push) Failing after 2s
build / lint (push) Failing after 1s
codeql / analyze (go) (push) Failing after 2s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-12-04 14:54:13 +03:00
parent 6a218ca7b2
commit 90365a455c
3 changed files with 31 additions and 38 deletions

33
kgo.go
View File

@@ -9,6 +9,7 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
@@ -60,12 +61,24 @@ type Broker struct {
init bool
c *kgo.Client
kopts []kgo.Opt
connected bool
connected *atomic.Uint32
sync.RWMutex
opts broker.Options
subs []*Subscriber
}
func (r *Broker) Live() bool {
return r.connected.Load() == 1
}
func (r *Broker) Ready() bool {
return r.connected.Load() == 1
}
func (r *Broker) Health() bool {
return r.connected.Load() == 1
}
func (k *Broker) Address() string {
return strings.Join(k.opts.Addrs, ",")
}
@@ -125,12 +138,9 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho
}
func (k *Broker) Connect(ctx context.Context) error {
k.RLock()
if k.connected {
k.RUnlock()
if k.connected.Load() == 1 {
return nil
}
k.RUnlock()
nctx := k.opts.Context
if ctx != nil {
@@ -144,19 +154,16 @@ func (k *Broker) Connect(ctx context.Context) error {
k.Lock()
k.c = c
k.connected = true
k.connected.Store(1)
k.Unlock()
return nil
}
func (k *Broker) Disconnect(ctx context.Context) error {
k.RLock()
if !k.connected {
k.RUnlock()
if k.connected.Load() == 0 {
return nil
}
k.RUnlock()
nctx := k.opts.Context
if ctx != nil {
@@ -186,7 +193,7 @@ func (k *Broker) Disconnect(ctx context.Context) error {
}
}
k.connected = false
k.connected.Store(0)
return nil
}
@@ -241,14 +248,14 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.Lock()
if !k.connected {
if k.connected.Load() == 0 {
c, _, err := k.connect(ctx, k.kopts...)
if err != nil {
k.Unlock()
return err
}
k.c = c
k.connected = true
k.connected.Store(1)
}
k.Unlock()