Compare commits
32 Commits
755bd187ba
...
v4.0.1
Author | SHA1 | Date | |
---|---|---|---|
98da69fbe8 | |||
d6d2483d8d | |||
7676631737 | |||
6c2cf494ca | |||
6e5e2e0338 | |||
6dd3b4548a | |||
fff768dc2a | |||
141c7fb848 | |||
|
f347ca4e12 | ||
c78bffdb8d | |||
49ba8880f2 | |||
|
bbd840b96e | ||
1ee739de80 | |||
|
7f0265b6d1 | ||
4dc19dc63f | |||
|
5eb2718cd4 | ||
b4c98e207f | |||
|
20652894b3 | ||
27bb8c50d1 | |||
|
b90ef8cdf3 | ||
222370f96c | |||
|
1152be720d | ||
f0b9665816 | |||
|
310e9a8ceb | ||
cc155511e1 | |||
|
0a0c592b64 | ||
372c5c92f0 | |||
|
2601514578 | ||
8a1856e814 | |||
|
08e04cdb70 | ||
7286521472 | |||
|
b01e2b5563 |
27
kgo.go
27
kgo.go
@@ -11,13 +11,14 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"github.com/twmb/franz-go/pkg/kmsg"
|
||||
"go.unistack.org/micro/v4/broker"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
id "go.unistack.org/micro/v4/util/id"
|
||||
mrand "go.unistack.org/micro/v4/util/rand"
|
||||
)
|
||||
|
||||
var _ broker.Broker = &Broker{}
|
||||
var _ broker.Broker = (*Broker)(nil)
|
||||
|
||||
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
|
||||
|
||||
@@ -55,7 +56,6 @@ type Broker struct {
|
||||
c *kgo.Client
|
||||
kopts []kgo.Opt
|
||||
connected bool
|
||||
init bool
|
||||
sync.RWMutex
|
||||
opts broker.Options
|
||||
subs []*subscriber
|
||||
@@ -134,6 +134,9 @@ func (k *Broker) Disconnect(ctx context.Context) error {
|
||||
return nctx.Err()
|
||||
default:
|
||||
for _, sub := range k.subs {
|
||||
if sub.closed {
|
||||
continue
|
||||
}
|
||||
if err := sub.Unsubscribe(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -193,8 +196,10 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
|
||||
|
||||
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
k.RLock()
|
||||
if !k.connected {
|
||||
k.RUnlock()
|
||||
ok := k.connected
|
||||
k.RUnlock()
|
||||
|
||||
if !ok {
|
||||
k.Lock()
|
||||
c, err := k.connect(ctx, k.kopts...)
|
||||
if err != nil {
|
||||
@@ -205,7 +210,6 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
||||
k.connected = true
|
||||
k.Unlock()
|
||||
}
|
||||
k.RUnlock()
|
||||
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
records := make([]*kgo.Record, 0, len(msgs))
|
||||
@@ -337,6 +341,18 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mdreq := kmsg.NewMetadataRequest()
|
||||
mdreq.Topics = []kmsg.MetadataRequestTopic{
|
||||
{Topic: &topic},
|
||||
}
|
||||
|
||||
mdrsp, err := mdreq.RequestWith(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if mdrsp.Topics[0].ErrorCode != 0 {
|
||||
return nil, fmt.Errorf("topic %s not exists or permission error", topic)
|
||||
}
|
||||
|
||||
sub.c = c
|
||||
go sub.poll(ctx)
|
||||
|
||||
@@ -351,7 +367,6 @@ func (k *Broker) String() string {
|
||||
}
|
||||
|
||||
func NewBroker(opts ...broker.Option) *Broker {
|
||||
rand.Seed(time.Now().Unix())
|
||||
options := broker.NewOptions(opts...)
|
||||
|
||||
kaddrs := options.Addrs
|
||||
|
@@ -3,6 +3,7 @@ package kgo
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"go.unistack.org/micro/v4/broker"
|
||||
@@ -55,6 +56,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
s.c.PauseFetchTopics(s.topic)
|
||||
kc := make(map[string][]int32)
|
||||
for ctp := range s.consumers {
|
||||
kc[ctp.t] = append(kc[ctp.t], ctp.p)
|
||||
}
|
||||
s.killConsumers(ctx, kc)
|
||||
close(s.done)
|
||||
s.closed = true
|
||||
}
|
||||
@@ -71,15 +78,14 @@ func (s *subscriber) poll(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.c.Close()
|
||||
s.c.CloseAllowingRebalance()
|
||||
return
|
||||
case <-s.done:
|
||||
s.c.Close()
|
||||
s.c.CloseAllowingRebalance()
|
||||
return
|
||||
default:
|
||||
fetches := s.c.PollRecords(ctx, maxInflight)
|
||||
if fetches.IsClientClosed() {
|
||||
s.kopts.Logger.Errorf(ctx, "[kgo] client closed")
|
||||
if !s.closed && fetches.IsClientClosed() {
|
||||
s.closed = true
|
||||
return
|
||||
}
|
||||
@@ -163,6 +169,8 @@ func (pc *consumer) consume() {
|
||||
return
|
||||
case p := <-pc.recs:
|
||||
for _, record := range p.Records {
|
||||
ts := time.Now()
|
||||
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc()
|
||||
p := eventPool.Get().(*event)
|
||||
p.msg.Header = nil
|
||||
p.msg.Body = nil
|
||||
@@ -179,30 +187,45 @@ func (pc *consumer) consume() {
|
||||
p.msg.Body = record.Value
|
||||
} else {
|
||||
if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
|
||||
pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
|
||||
p.err = err
|
||||
p.msg.Body = record.Value
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
|
||||
if p.ack {
|
||||
pc.c.MarkCommitRecords(record)
|
||||
} else {
|
||||
eventPool.Put(p)
|
||||
pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
return
|
||||
}
|
||||
eventPool.Put(p)
|
||||
te := time.Since(ts)
|
||||
pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
|
||||
pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
|
||||
continue
|
||||
} else {
|
||||
if pc.kopts.Logger.V(logger.ErrorLevel) {
|
||||
pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
|
||||
}
|
||||
}
|
||||
te := time.Since(ts)
|
||||
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
|
||||
pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
|
||||
pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
|
||||
eventPool.Put(p)
|
||||
pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
|
||||
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
|
||||
return
|
||||
}
|
||||
}
|
||||
err := pc.handler(p)
|
||||
if err == nil {
|
||||
pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc()
|
||||
} else {
|
||||
pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
|
||||
}
|
||||
pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
|
||||
if err == nil && pc.opts.AutoAck {
|
||||
p.ack = true
|
||||
} else if err != nil {
|
||||
@@ -215,6 +238,9 @@ func (pc *consumer) consume() {
|
||||
}
|
||||
}
|
||||
}
|
||||
te := time.Since(ts)
|
||||
pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
|
||||
pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
|
||||
if p.ack {
|
||||
eventPool.Put(p)
|
||||
pc.c.MarkCommitRecords(record)
|
||||
|
Reference in New Issue
Block a user