Compare commits
	
		
			33 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 98da69fbe8 | |||
| d6d2483d8d | |||
| 7676631737 | |||
| 6c2cf494ca | |||
| 6e5e2e0338 | |||
| 6dd3b4548a | |||
| fff768dc2a | |||
| 755bd187ba | |||
| 141c7fb848 | |||
|  | f347ca4e12 | ||
| c78bffdb8d | |||
| 49ba8880f2 | |||
|  | bbd840b96e | ||
| 1ee739de80 | |||
|  | 7f0265b6d1 | ||
| 4dc19dc63f | |||
|  | 5eb2718cd4 | ||
| b4c98e207f | |||
|  | 20652894b3 | ||
| 27bb8c50d1 | |||
|  | b90ef8cdf3 | ||
| 222370f96c | |||
|  | 1152be720d | ||
| f0b9665816 | |||
|  | 310e9a8ceb | ||
| cc155511e1 | |||
|  | 0a0c592b64 | ||
| 372c5c92f0 | |||
|  | 2601514578 | ||
| 8a1856e814 | |||
|  | 08e04cdb70 | ||
| 7286521472 | |||
|  | b01e2b5563 | 
							
								
								
									
										2
									
								
								event.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								event.go
									
									
									
									
									
								
							| @@ -3,7 +3,7 @@ package kgo | |||||||
| import ( | import ( | ||||||
| 	"sync" | 	"sync" | ||||||
|  |  | ||||||
| 	"go.unistack.org/micro/v3/broker" | 	"go.unistack.org/micro/v4/broker" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type event struct { | type event struct { | ||||||
|   | |||||||
							
								
								
									
										8
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										8
									
								
								go.mod
									
									
									
									
									
								
							| @@ -1,14 +1,14 @@ | |||||||
| module go.unistack.org/micro-broker-kgo/v3 | module go.unistack.org/micro-broker-kgo/v4 | ||||||
|  |  | ||||||
| go 1.17 | go 1.19 | ||||||
|  |  | ||||||
| require ( | require ( | ||||||
| 	github.com/twmb/franz-go v1.11.5 | 	github.com/twmb/franz-go v1.11.5 | ||||||
| 	github.com/twmb/franz-go/pkg/kmsg v1.3.0 | 	go.unistack.org/micro/v4 v4.0.1 | ||||||
| 	go.unistack.org/micro/v3 v3.10.14 |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| require ( | require ( | ||||||
| 	github.com/klauspost/compress v1.15.9 // indirect | 	github.com/klauspost/compress v1.15.9 // indirect | ||||||
| 	github.com/pierrec/lz4/v4 v4.1.15 // indirect | 	github.com/pierrec/lz4/v4 v4.1.15 // indirect | ||||||
|  | 	github.com/twmb/franz-go/pkg/kmsg v1.3.0 // indirect | ||||||
| ) | ) | ||||||
|   | |||||||
							
								
								
									
										10
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								go.sum
									
									
									
									
									
								
							| @@ -1,16 +1,13 @@ | |||||||
| github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= |  | ||||||
| github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= | github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= | ||||||
| github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= | github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= | ||||||
| github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= |  | ||||||
| github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= | github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= | ||||||
| github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= | github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= | ||||||
| github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= |  | ||||||
| github.com/twmb/franz-go v1.11.5 h1:TTv5lVJd+87XkmP9dWN9Jgpf7IUUr7a7jee+byR8LBE= | github.com/twmb/franz-go v1.11.5 h1:TTv5lVJd+87XkmP9dWN9Jgpf7IUUr7a7jee+byR8LBE= | ||||||
| github.com/twmb/franz-go v1.11.5/go.mod h1:FvaHNlpT6woVYIl6LAuIeL7yHol1Fp6Gv2Dn21AvH78= | github.com/twmb/franz-go v1.11.5/go.mod h1:FvaHNlpT6woVYIl6LAuIeL7yHol1Fp6Gv2Dn21AvH78= | ||||||
| github.com/twmb/franz-go/pkg/kmsg v1.3.0 h1:ouBETB7nTqRxiO5E8/pySoFZtVEW2VWw55z3/bsUzTw= | github.com/twmb/franz-go/pkg/kmsg v1.3.0 h1:ouBETB7nTqRxiO5E8/pySoFZtVEW2VWw55z3/bsUzTw= | ||||||
| github.com/twmb/franz-go/pkg/kmsg v1.3.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= | github.com/twmb/franz-go/pkg/kmsg v1.3.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= | ||||||
| go.unistack.org/micro/v3 v3.10.14 h1:7fgLpwGlCN67twhwtngJDEQvrMkUBDSA5vzZqxIDqNE= | go.unistack.org/micro/v4 v4.0.1 h1:xo1IxbVfgh8i0eY0VeYa3cbb13u5n/Mxnp3FOgWD4Jo= | ||||||
| go.unistack.org/micro/v3 v3.10.14/go.mod h1:uMAc0U/x7dmtICCrblGf0ZLgYegu3VwQAquu+OFCw1Q= | go.unistack.org/micro/v4 v4.0.1/go.mod h1:p/J5UcSJjfHsWGT31uKoghQ5rUQZzQJBAFy+Z4+ZVMs= | ||||||
| golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= | golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= | ||||||
| golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= | golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= | ||||||
| golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||||||
| @@ -19,6 +16,3 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc | |||||||
| golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= | golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= | ||||||
| golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | ||||||
| golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | ||||||
| gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= |  | ||||||
| gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |  | ||||||
| gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |  | ||||||
|   | |||||||
							
								
								
									
										68
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										68
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -1,5 +1,5 @@ | |||||||
| // Package kgo provides a kafka broker using kgo | // Package kgo provides a kafka broker using kgo | ||||||
| package kgo // import "go.unistack.org/micro-broker-kgo/v3" | package kgo // import "go.unistack.org/micro-broker-kgo/v4" | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| @@ -11,13 +11,14 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/twmb/franz-go/pkg/kgo" | 	"github.com/twmb/franz-go/pkg/kgo" | ||||||
| 	"go.unistack.org/micro/v3/broker" | 	"github.com/twmb/franz-go/pkg/kmsg" | ||||||
| 	"go.unistack.org/micro/v3/metadata" | 	"go.unistack.org/micro/v4/broker" | ||||||
| 	id "go.unistack.org/micro/v3/util/id" | 	"go.unistack.org/micro/v4/metadata" | ||||||
| 	mrand "go.unistack.org/micro/v3/util/rand" | 	id "go.unistack.org/micro/v4/util/id" | ||||||
|  | 	mrand "go.unistack.org/micro/v4/util/rand" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var _ broker.Broker = &Broker{} | var _ broker.Broker = (*Broker)(nil) | ||||||
|  |  | ||||||
| var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") | var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") | ||||||
|  |  | ||||||
| @@ -55,7 +56,6 @@ type Broker struct { | |||||||
| 	c         *kgo.Client | 	c         *kgo.Client | ||||||
| 	kopts     []kgo.Opt | 	kopts     []kgo.Opt | ||||||
| 	connected bool | 	connected bool | ||||||
| 	init      bool |  | ||||||
| 	sync.RWMutex | 	sync.RWMutex | ||||||
| 	opts broker.Options | 	opts broker.Options | ||||||
| 	subs []*subscriber | 	subs []*subscriber | ||||||
| @@ -134,6 +134,9 @@ func (k *Broker) Disconnect(ctx context.Context) error { | |||||||
| 		return nctx.Err() | 		return nctx.Err() | ||||||
| 	default: | 	default: | ||||||
| 		for _, sub := range k.subs { | 		for _, sub := range k.subs { | ||||||
|  | 			if sub.closed { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
| 			if err := sub.Unsubscribe(ctx); err != nil { | 			if err := sub.Unsubscribe(ctx); err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| @@ -193,8 +196,10 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, | |||||||
|  |  | ||||||
| func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { | func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { | ||||||
| 	k.RLock() | 	k.RLock() | ||||||
| 	if !k.connected { | 	ok := k.connected | ||||||
| 		k.RUnlock() | 	k.RUnlock() | ||||||
|  |  | ||||||
|  | 	if !ok { | ||||||
| 		k.Lock() | 		k.Lock() | ||||||
| 		c, err := k.connect(ctx, k.kopts...) | 		c, err := k.connect(ctx, k.kopts...) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| @@ -205,23 +210,27 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br | |||||||
| 		k.connected = true | 		k.connected = true | ||||||
| 		k.Unlock() | 		k.Unlock() | ||||||
| 	} | 	} | ||||||
| 	k.RUnlock() |  | ||||||
|  |  | ||||||
| 	options := broker.NewPublishOptions(opts...) | 	options := broker.NewPublishOptions(opts...) | ||||||
| 	records := make([]*kgo.Record, 0, len(msgs)) | 	records := make([]*kgo.Record, 0, len(msgs)) | ||||||
| 	var errs []string | 	var errs []string | ||||||
| 	var err error | 	var err error | ||||||
| 	var key []byte | 	var key []byte | ||||||
|  | 	var promise func(*kgo.Record, error) | ||||||
|  |  | ||||||
| 	if options.Context != nil { | 	if options.Context != nil { | ||||||
| 		if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil { | 		if k, ok := options.Context.Value(publishKey{}).([]byte); ok && k != nil { | ||||||
| 			key = k | 			key = k | ||||||
| 		} | 		} | ||||||
|  | 		if p, ok := options.Context.Value(publishPromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { | ||||||
|  | 			promise = p | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, msg := range msgs { | 	for _, msg := range msgs { | ||||||
| 		rec := &kgo.Record{Context: ctx, Key: key} | 		rec := &kgo.Record{Context: ctx, Key: key} | ||||||
| 		rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) | 		rec.Topic, _ = msg.Header.Get(metadata.HeaderTopic) | ||||||
|  | 		k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Inc() | ||||||
| 		if options.BodyOnly { | 		if options.BodyOnly { | ||||||
| 			rec.Value = msg.Body | 			rec.Value = msg.Body | ||||||
| 		} else if k.opts.Codec.String() == "noop" { | 		} else if k.opts.Codec.String() == "noop" { | ||||||
| @@ -238,10 +247,36 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br | |||||||
| 		records = append(records, rec) | 		records = append(records, rec) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if promise != nil { | ||||||
|  | 		ts := time.Now() | ||||||
|  | 		for _, rec := range records { | ||||||
|  | 			k.c.Produce(ctx, rec, func(r *kgo.Record, err error) { | ||||||
|  | 				te := time.Since(ts) | ||||||
|  | 				k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", rec.Topic).Dec() | ||||||
|  | 				k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", r.Topic).Update(te.Seconds()) | ||||||
|  | 				k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", r.Topic).Update(te.Seconds()) | ||||||
|  | 				if err != nil { | ||||||
|  | 					k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "failure").Inc() | ||||||
|  | 				} else { | ||||||
|  | 					k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", r.Topic, "status", "success").Inc() | ||||||
|  | 				} | ||||||
|  | 				promise(r, err) | ||||||
|  | 			}) | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	ts := time.Now() | ||||||
| 	results := k.c.ProduceSync(ctx, records...) | 	results := k.c.ProduceSync(ctx, records...) | ||||||
|  | 	te := time.Since(ts) | ||||||
| 	for _, result := range results { | 	for _, result := range results { | ||||||
|  | 		k.opts.Meter.Summary(broker.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic).Update(te.Seconds()) | ||||||
|  | 		k.opts.Meter.Histogram(broker.PublishMessageDurationSeconds, "endpoint", result.Record.Topic).Update(te.Seconds()) | ||||||
|  | 		k.opts.Meter.Counter(broker.PublishMessageInflight, "endpoint", result.Record.Topic).Dec() | ||||||
| 		if result.Err != nil { | 		if result.Err != nil { | ||||||
|  | 			k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "failure").Inc() | ||||||
| 			errs = append(errs, result.Err.Error()) | 			errs = append(errs, result.Err.Error()) | ||||||
|  | 		} else { | ||||||
|  | 			k.opts.Meter.Counter(broker.PublishMessageTotal, "endpoint", result.Record.Topic, "status", "success").Inc() | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -306,6 +341,18 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han | |||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	mdreq := kmsg.NewMetadataRequest() | ||||||
|  | 	mdreq.Topics = []kmsg.MetadataRequestTopic{ | ||||||
|  | 		{Topic: &topic}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	mdrsp, err := mdreq.RequestWith(ctx, c) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} else if mdrsp.Topics[0].ErrorCode != 0 { | ||||||
|  | 		return nil, fmt.Errorf("topic %s not exists or permission error", topic) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	sub.c = c | 	sub.c = c | ||||||
| 	go sub.poll(ctx) | 	go sub.poll(ctx) | ||||||
|  |  | ||||||
| @@ -320,7 +367,6 @@ func (k *Broker) String() string { | |||||||
| } | } | ||||||
|  |  | ||||||
| func NewBroker(opts ...broker.Option) *Broker { | func NewBroker(opts ...broker.Option) *Broker { | ||||||
| 	rand.Seed(time.Now().Unix()) |  | ||||||
| 	options := broker.NewOptions(opts...) | 	options := broker.NewOptions(opts...) | ||||||
|  |  | ||||||
| 	kaddrs := options.Addrs | 	kaddrs := options.Addrs | ||||||
|   | |||||||
| @@ -10,10 +10,10 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	kg "github.com/twmb/franz-go/pkg/kgo" | 	kg "github.com/twmb/franz-go/pkg/kgo" | ||||||
| 	kgo "go.unistack.org/micro-broker-kgo/v3" | 	kgo "go.unistack.org/micro-broker-kgo/v4" | ||||||
| 	"go.unistack.org/micro/v3/broker" | 	"go.unistack.org/micro/v4/broker" | ||||||
| 	"go.unistack.org/micro/v3/logger" | 	"go.unistack.org/micro/v4/logger" | ||||||
| 	"go.unistack.org/micro/v3/metadata" | 	"go.unistack.org/micro/v4/metadata" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
|   | |||||||
| @@ -5,7 +5,7 @@ import ( | |||||||
| 	"fmt" | 	"fmt" | ||||||
|  |  | ||||||
| 	"github.com/twmb/franz-go/pkg/kgo" | 	"github.com/twmb/franz-go/pkg/kgo" | ||||||
| 	"go.unistack.org/micro/v3/logger" | 	"go.unistack.org/micro/v4/logger" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type mlogger struct { | type mlogger struct { | ||||||
|   | |||||||
							
								
								
									
										37
									
								
								metrics.go
									
									
									
									
									
								
							
							
						
						
									
										37
									
								
								metrics.go
									
									
									
									
									
								
							| @@ -6,9 +6,44 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/twmb/franz-go/pkg/kgo" | 	"github.com/twmb/franz-go/pkg/kgo" | ||||||
| 	"go.unistack.org/micro/v3/meter" | 	"go.unistack.org/micro/v4/meter" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | /* | ||||||
|  | 	func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper { | ||||||
|  | 		handler := &wrapper{ | ||||||
|  | 			opts: NewOptions(opts...), | ||||||
|  | 		} | ||||||
|  | 		return handler.SubscriberFunc | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc { | ||||||
|  | 		return func(ctx context.Context, msg server.Message) error { | ||||||
|  | 			endpoint := msg.Topic() | ||||||
|  |  | ||||||
|  | 			labels := make([]string, 0, 4) | ||||||
|  | 			labels = append(labels, labelEndpoint, endpoint) | ||||||
|  |  | ||||||
|  | 			w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc() | ||||||
|  | 			ts := time.Now() | ||||||
|  | 			err := fn(ctx, msg) | ||||||
|  | 			te := time.Since(ts) | ||||||
|  | 			w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec() | ||||||
|  |  | ||||||
|  | 			w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds()) | ||||||
|  | 			w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds()) | ||||||
|  |  | ||||||
|  | 			if err == nil { | ||||||
|  | 				labels = append(labels, labelStatus, labelSuccess) | ||||||
|  | 			} else { | ||||||
|  | 				labels = append(labels, labelStatus, labelFailure) | ||||||
|  | 			} | ||||||
|  | 			w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc() | ||||||
|  |  | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | */ | ||||||
| type metrics struct { | type metrics struct { | ||||||
| 	meter meter.Meter | 	meter meter.Meter | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										16
									
								
								options.go
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								options.go
									
									
									
									
									
								
							| @@ -5,8 +5,8 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/twmb/franz-go/pkg/kgo" | 	"github.com/twmb/franz-go/pkg/kgo" | ||||||
| 	"go.unistack.org/micro/v3/broker" | 	"go.unistack.org/micro/v4/broker" | ||||||
| 	"go.unistack.org/micro/v3/client" | 	"go.unistack.org/micro/v4/client" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // DefaultCommitInterval specifies how fast send commit offsets to kafka | // DefaultCommitInterval specifies how fast send commit offsets to kafka | ||||||
| @@ -78,3 +78,15 @@ type subscribeMaxInflightKey struct{} | |||||||
| func SubscribeMaxInFlight(n int) broker.SubscribeOption { | func SubscribeMaxInFlight(n int) broker.SubscribeOption { | ||||||
| 	return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n) | 	return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | type publishPromiseKey struct{} | ||||||
|  |  | ||||||
|  | // PublishPromise set the kafka promise func for Produce | ||||||
|  | func PublishPromise(fn func(*kgo.Record, error)) broker.PublishOption { | ||||||
|  | 	return broker.SetPublishOption(publishPromiseKey{}, fn) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ClientPublishKey set the kafka message key (client option) | ||||||
|  | func ClientPublishPromise(fn func(*kgo.Record, error)) client.PublishOption { | ||||||
|  | 	return client.SetPublishOption(publishPromiseKey{}, fn) | ||||||
|  | } | ||||||
|   | |||||||
| @@ -3,11 +3,12 @@ package kgo | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"sync" | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/twmb/franz-go/pkg/kgo" | 	"github.com/twmb/franz-go/pkg/kgo" | ||||||
| 	"go.unistack.org/micro/v3/broker" | 	"go.unistack.org/micro/v4/broker" | ||||||
| 	"go.unistack.org/micro/v3/logger" | 	"go.unistack.org/micro/v4/logger" | ||||||
| 	"go.unistack.org/micro/v3/metadata" | 	"go.unistack.org/micro/v4/metadata" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type tp struct { | type tp struct { | ||||||
| @@ -55,6 +56,12 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error { | |||||||
| 	case <-ctx.Done(): | 	case <-ctx.Done(): | ||||||
| 		return ctx.Err() | 		return ctx.Err() | ||||||
| 	default: | 	default: | ||||||
|  | 		s.c.PauseFetchTopics(s.topic) | ||||||
|  | 		kc := make(map[string][]int32) | ||||||
|  | 		for ctp := range s.consumers { | ||||||
|  | 			kc[ctp.t] = append(kc[ctp.t], ctp.p) | ||||||
|  | 		} | ||||||
|  | 		s.killConsumers(ctx, kc) | ||||||
| 		close(s.done) | 		close(s.done) | ||||||
| 		s.closed = true | 		s.closed = true | ||||||
| 	} | 	} | ||||||
| @@ -71,15 +78,14 @@ func (s *subscriber) poll(ctx context.Context) { | |||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case <-ctx.Done(): | 		case <-ctx.Done(): | ||||||
| 			s.c.Close() | 			s.c.CloseAllowingRebalance() | ||||||
| 			return | 			return | ||||||
| 		case <-s.done: | 		case <-s.done: | ||||||
| 			s.c.Close() | 			s.c.CloseAllowingRebalance() | ||||||
| 			return | 			return | ||||||
| 		default: | 		default: | ||||||
| 			fetches := s.c.PollRecords(ctx, maxInflight) | 			fetches := s.c.PollRecords(ctx, maxInflight) | ||||||
| 			if fetches.IsClientClosed() { | 			if !s.closed && fetches.IsClientClosed() { | ||||||
| 				s.kopts.Logger.Errorf(ctx, "[kgo] client closed") |  | ||||||
| 				s.closed = true | 				s.closed = true | ||||||
| 				return | 				return | ||||||
| 			} | 			} | ||||||
| @@ -163,6 +169,8 @@ func (pc *consumer) consume() { | |||||||
| 			return | 			return | ||||||
| 		case p := <-pc.recs: | 		case p := <-pc.recs: | ||||||
| 			for _, record := range p.Records { | 			for _, record := range p.Records { | ||||||
|  | 				ts := time.Now() | ||||||
|  | 				pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc() | ||||||
| 				p := eventPool.Get().(*event) | 				p := eventPool.Get().(*event) | ||||||
| 				p.msg.Header = nil | 				p.msg.Header = nil | ||||||
| 				p.msg.Body = nil | 				p.msg.Body = nil | ||||||
| @@ -179,30 +187,45 @@ func (pc *consumer) consume() { | |||||||
| 					p.msg.Body = record.Value | 					p.msg.Body = record.Value | ||||||
| 				} else { | 				} else { | ||||||
| 					if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { | 					if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil { | ||||||
|  | 						pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() | ||||||
| 						p.err = err | 						p.err = err | ||||||
| 						p.msg.Body = record.Value | 						p.msg.Body = record.Value | ||||||
| 						if eh != nil { | 						if eh != nil { | ||||||
| 							_ = eh(p) | 							_ = eh(p) | ||||||
|  | 							pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() | ||||||
| 							if p.ack { | 							if p.ack { | ||||||
| 								pc.c.MarkCommitRecords(record) | 								pc.c.MarkCommitRecords(record) | ||||||
| 							} else { | 							} else { | ||||||
| 								eventPool.Put(p) | 								eventPool.Put(p) | ||||||
| 								pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | 								pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") | ||||||
| 								return | 								return | ||||||
| 							} | 							} | ||||||
| 							eventPool.Put(p) | 							eventPool.Put(p) | ||||||
|  | 							te := time.Since(ts) | ||||||
|  | 							pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) | ||||||
|  | 							pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) | ||||||
| 							continue | 							continue | ||||||
| 						} else { | 						} else { | ||||||
| 							if pc.kopts.Logger.V(logger.ErrorLevel) { | 							if pc.kopts.Logger.V(logger.ErrorLevel) { | ||||||
| 								pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err) | 								pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err) | ||||||
| 							} | 							} | ||||||
| 						} | 						} | ||||||
|  | 						te := time.Since(ts) | ||||||
|  | 						pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() | ||||||
|  | 						pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) | ||||||
|  | 						pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) | ||||||
| 						eventPool.Put(p) | 						eventPool.Put(p) | ||||||
| 						pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") | 						pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") | ||||||
| 						return | 						return | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 				err := pc.handler(p) | 				err := pc.handler(p) | ||||||
|  | 				if err == nil { | ||||||
|  | 					pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc() | ||||||
|  | 				} else { | ||||||
|  | 					pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc() | ||||||
|  | 				} | ||||||
|  | 				pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec() | ||||||
| 				if err == nil && pc.opts.AutoAck { | 				if err == nil && pc.opts.AutoAck { | ||||||
| 					p.ack = true | 					p.ack = true | ||||||
| 				} else if err != nil { | 				} else if err != nil { | ||||||
| @@ -215,6 +238,9 @@ func (pc *consumer) consume() { | |||||||
| 						} | 						} | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
|  | 				te := time.Since(ts) | ||||||
|  | 				pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds()) | ||||||
|  | 				pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds()) | ||||||
| 				if p.ack { | 				if p.ack { | ||||||
| 					eventPool.Put(p) | 					eventPool.Put(p) | ||||||
| 					pc.c.MarkCommitRecords(record) | 					pc.c.MarkCommitRecords(record) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user