update for latest micro
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		
							
								
								
									
										10
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								go.mod
									
									
									
									
									
								
							| @@ -1,8 +1,8 @@ | ||||
| module go.unistack.org/micro-broker-kgo/v3 | ||||
|  | ||||
| go 1.22 | ||||
| go 1.22.7 | ||||
|  | ||||
| toolchain go1.23.1 | ||||
| toolchain go1.23.3 | ||||
|  | ||||
| require ( | ||||
| 	github.com/google/uuid v1.6.0 | ||||
| @@ -10,7 +10,7 @@ require ( | ||||
| 	github.com/twmb/franz-go/pkg/kadm v1.14.0 | ||||
| 	github.com/twmb/franz-go/pkg/kmsg v1.9.0 | ||||
| 	go.opentelemetry.io/otel v1.32.0 | ||||
| 	go.unistack.org/micro/v3 v3.10.101 | ||||
| 	go.unistack.org/micro/v3 v3.11.0 | ||||
| ) | ||||
|  | ||||
| require ( | ||||
| @@ -19,7 +19,7 @@ require ( | ||||
| 	go.unistack.org/micro-proto/v3 v3.4.1 // indirect | ||||
| 	golang.org/x/crypto v0.29.0 // indirect | ||||
| 	golang.org/x/sys v0.27.0 // indirect | ||||
| 	google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect | ||||
| 	google.golang.org/grpc v1.67.1 // indirect | ||||
| 	google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect | ||||
| 	google.golang.org/grpc v1.68.0 // indirect | ||||
| 	google.golang.org/protobuf v1.35.2 // indirect | ||||
| ) | ||||
|   | ||||
							
								
								
									
										26
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								go.sum
									
									
									
									
									
								
							| @@ -1,17 +1,9 @@ | ||||
| github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||||
| github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||||
| github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= | ||||
| github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= | ||||
| github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= | ||||
| github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | ||||
| github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= | ||||
| github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= | ||||
| github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= | ||||
| github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= | ||||
| github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||||
| github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||||
| github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= | ||||
| github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= | ||||
| github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= | ||||
| github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= | ||||
| github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= | ||||
| @@ -22,21 +14,15 @@ go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= | ||||
| go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= | ||||
| go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= | ||||
| go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= | ||||
| go.unistack.org/micro/v3 v3.10.101 h1:CwMg7f2Mnsy+tRcsY0RTcAMkTVp+GMUgPU6pPaG8gpw= | ||||
| go.unistack.org/micro/v3 v3.10.101/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= | ||||
| go.unistack.org/micro/v3 v3.11.0 h1:usQ+8wQuOWpQd4+DGhFXSgZ+e+wOBjuT3W5GJZ02bSs= | ||||
| go.unistack.org/micro/v3 v3.11.0/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= | ||||
| golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= | ||||
| golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= | ||||
| golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= | ||||
| golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= | ||||
| golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= | ||||
| golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||||
| golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= | ||||
| golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 h1:LWZqQOEjDyONlF1H6afSWpAL/znlREo2tHfLoe+8LMA= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= | ||||
| google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= | ||||
| google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= | ||||
| google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= | ||||
| google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= | ||||
| google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= | ||||
| google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= | ||||
| gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||||
| gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||||
|   | ||||
							
								
								
									
										33
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										33
									
								
								kgo.go
									
									
									
									
									
								
							| @@ -9,6 +9,7 @@ import ( | ||||
| 	"net/http" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/google/uuid" | ||||
| @@ -60,12 +61,24 @@ type Broker struct { | ||||
| 	init      bool | ||||
| 	c         *kgo.Client | ||||
| 	kopts     []kgo.Opt | ||||
| 	connected bool | ||||
| 	connected *atomic.Uint32 | ||||
| 	sync.RWMutex | ||||
| 	opts broker.Options | ||||
| 	subs []*Subscriber | ||||
| } | ||||
|  | ||||
| func (r *Broker) Live() bool { | ||||
| 	return r.connected.Load() == 1 | ||||
| } | ||||
|  | ||||
| func (r *Broker) Ready() bool { | ||||
| 	return r.connected.Load() == 1 | ||||
| } | ||||
|  | ||||
| func (r *Broker) Health() bool { | ||||
| 	return r.connected.Load() == 1 | ||||
| } | ||||
|  | ||||
| func (k *Broker) Address() string { | ||||
| 	return strings.Join(k.opts.Addrs, ",") | ||||
| } | ||||
| @@ -125,12 +138,9 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, *ho | ||||
| } | ||||
|  | ||||
| func (k *Broker) Connect(ctx context.Context) error { | ||||
| 	k.RLock() | ||||
| 	if k.connected { | ||||
| 		k.RUnlock() | ||||
| 	if k.connected.Load() == 1 { | ||||
| 		return nil | ||||
| 	} | ||||
| 	k.RUnlock() | ||||
|  | ||||
| 	nctx := k.opts.Context | ||||
| 	if ctx != nil { | ||||
| @@ -144,19 +154,16 @@ func (k *Broker) Connect(ctx context.Context) error { | ||||
|  | ||||
| 	k.Lock() | ||||
| 	k.c = c | ||||
| 	k.connected = true | ||||
| 	k.connected.Store(1) | ||||
| 	k.Unlock() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (k *Broker) Disconnect(ctx context.Context) error { | ||||
| 	k.RLock() | ||||
| 	if !k.connected { | ||||
| 		k.RUnlock() | ||||
| 	if k.connected.Load() == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
| 	k.RUnlock() | ||||
|  | ||||
| 	nctx := k.opts.Context | ||||
| 	if ctx != nil { | ||||
| @@ -186,7 +193,7 @@ func (k *Broker) Disconnect(ctx context.Context) error { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	k.connected = false | ||||
| 	k.connected.Store(0) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -241,14 +248,14 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, | ||||
|  | ||||
| func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { | ||||
| 	k.Lock() | ||||
| 	if !k.connected { | ||||
| 	if k.connected.Load() == 0 { | ||||
| 		c, _, err := k.connect(ctx, k.kopts...) | ||||
| 		if err != nil { | ||||
| 			k.Unlock() | ||||
| 			return err | ||||
| 		} | ||||
| 		k.c = c | ||||
| 		k.connected = true | ||||
| 		k.connected.Store(1) | ||||
| 	} | ||||
| 	k.Unlock() | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user