Compare commits

...

5 Commits

Author SHA1 Message Date
98da69fbe8 Merge pull request 'rwfix' (#127) from rwfix into master
Some checks failed
build / test (push) Failing after 1m27s
build / lint (push) Failing after 2m45s
codeql / analyze (go) (push) Failing after 2m41s
Reviewed-on: #127
2023-12-20 22:57:32 +03:00
d6d2483d8d fixup panic
Some checks failed
codeql / analyze (go) (pull_request) Failing after 2m41s
prbuild / test (pull_request) Failing after 1m28s
prbuild / lint (pull_request) Failing after 2m37s
autoapprove / autoapprove (pull_request) Failing after 1m25s
automerge / automerge (pull_request) Failing after 4s
dependabot-automerge / automerge (pull_request) Has been skipped
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-12-20 22:56:28 +03:00
7676631737 fix graceful shutdown
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-13 15:15:58 +03:00
6c2cf494ca Merge pull request 'meter: write metrics inside broker implementation' (#123) from improve into master
Reviewed-on: #123
2023-04-29 21:32:45 +03:00
6e5e2e0338 fix metrics
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-29 21:32:19 +03:00
2 changed files with 53 additions and 12 deletions

27
kgo.go
View File

@@ -11,13 +11,14 @@ import (
"time"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/metadata"
id "go.unistack.org/micro/v4/util/id"
mrand "go.unistack.org/micro/v4/util/rand"
)
var _ broker.Broker = &Broker{}
var _ broker.Broker = (*Broker)(nil)
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
@@ -55,7 +56,6 @@ type Broker struct {
c *kgo.Client
kopts []kgo.Opt
connected bool
init bool
sync.RWMutex
opts broker.Options
subs []*subscriber
@@ -134,6 +134,9 @@ func (k *Broker) Disconnect(ctx context.Context) error {
return nctx.Err()
default:
for _, sub := range k.subs {
if sub.closed {
continue
}
if err := sub.Unsubscribe(ctx); err != nil {
return err
}
@@ -193,8 +196,10 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.RLock()
if !k.connected {
k.RUnlock()
ok := k.connected
k.RUnlock()
if !ok {
k.Lock()
c, err := k.connect(ctx, k.kopts...)
if err != nil {
@@ -205,7 +210,6 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
k.connected = true
k.Unlock()
}
k.RUnlock()
options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs))
@@ -337,6 +341,18 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
return nil, err
}
mdreq := kmsg.NewMetadataRequest()
mdreq.Topics = []kmsg.MetadataRequestTopic{
{Topic: &topic},
}
mdrsp, err := mdreq.RequestWith(ctx, c)
if err != nil {
return nil, err
} else if mdrsp.Topics[0].ErrorCode != 0 {
return nil, fmt.Errorf("topic %s not exists or permission error", topic)
}
sub.c = c
go sub.poll(ctx)
@@ -351,7 +367,6 @@ func (k *Broker) String() string {
}
func NewBroker(opts ...broker.Option) *Broker {
rand.Seed(time.Now().Unix())
options := broker.NewOptions(opts...)
kaddrs := options.Addrs

View File

@@ -3,6 +3,7 @@ package kgo
import (
"context"
"sync"
"time"
"github.com/twmb/franz-go/pkg/kgo"
"go.unistack.org/micro/v4/broker"
@@ -55,6 +56,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
default:
s.c.PauseFetchTopics(s.topic)
kc := make(map[string][]int32)
for ctp := range s.consumers {
kc[ctp.t] = append(kc[ctp.t], ctp.p)
}
s.killConsumers(ctx, kc)
close(s.done)
s.closed = true
}
@@ -71,15 +78,14 @@ func (s *subscriber) poll(ctx context.Context) {
for {
select {
case <-ctx.Done():
s.c.Close()
s.c.CloseAllowingRebalance()
return
case <-s.done:
s.c.Close()
s.c.CloseAllowingRebalance()
return
default:
fetches := s.c.PollRecords(ctx, maxInflight)
if fetches.IsClientClosed() {
s.kopts.Logger.Errorf(ctx, "[kgo] client closed")
if !s.closed && fetches.IsClientClosed() {
s.closed = true
return
}
@@ -163,6 +169,8 @@ func (pc *consumer) consume() {
return
case p := <-pc.recs:
for _, record := range p.Records {
ts := time.Now()
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc()
p := eventPool.Get().(*event)
p.msg.Header = nil
p.msg.Body = nil
@@ -179,30 +187,45 @@ func (pc *consumer) consume() {
p.msg.Body = record.Value
} else {
if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
p.err = err
p.msg.Body = record.Value
if eh != nil {
_ = eh(p)
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
if p.ack {
pc.c.MarkCommitRecords(record)
} else {
eventPool.Put(p)
pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
return
}
eventPool.Put(p)
te := time.Since(ts)
pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
continue
} else {
if pc.kopts.Logger.V(logger.ErrorLevel) {
pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
}
}
te := time.Since(ts)
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
eventPool.Put(p)
pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
return
}
}
err := pc.handler(p)
if err == nil {
pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc()
} else {
pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
}
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
if err == nil && pc.opts.AutoAck {
p.ack = true
} else if err != nil {
@@ -215,6 +238,9 @@ func (pc *consumer) consume() {
}
}
}
te := time.Since(ts)
pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
if p.ack {
eventPool.Put(p)
pc.c.MarkCommitRecords(record)