From d750157d5382a37c744ff79497268cdf292ddc34 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 10 Jul 2021 23:41:21 +0300 Subject: [PATCH] initial import Signed-off-by: Vasiliy Tolstov --- go.mod | 8 ++ go.sum | 56 +++++++++++++ kgo.go | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++++ kgo_test.go | 70 ++++++++++++++++ options.go | 113 +++++++++++++++++++++++++ 5 files changed, 483 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 kgo.go create mode 100644 kgo_test.go create mode 100644 options.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..dbbcdd9 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/unistack-org/micro-broker-kgo/v3 + +go 1.16 + +require ( + github.com/twmb/franz-go v0.8.6 + github.com/unistack-org/micro/v3 v3.4.8 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..1cd027c --- /dev/null +++ b/go.sum @@ -0,0 +1,56 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.13.0 h1:2T7tUoQrQT+fQWdaY5rjWztFGAFwbGD04iPJg90ZiOs= +github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pierrec/lz4/v4 v4.1.7 h1:UDV9geJWhFIufAliH7HQlz9wP3JA0t748w+RwbWMLow= +github.com/pierrec/lz4/v4 v4.1.7/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/twmb/franz-go v0.8.6 h1:m49t7tcgUz70hvTpnzHrsvQ/38Q/VKCpEj1FvDetVTE= +github.com/twmb/franz-go v0.8.6/go.mod h1:v6QnB3abhlVAzlIEIO5L/1Emu8NlkreCI2HSps9utH0= +github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= +github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= +github.com/unistack-org/micro/v3 v3.4.8 h1:9+qGlNHgChC3aMuFrtTFUtG55PEAjneSvplg7phwoCI= +github.com/unistack-org/micro/v3 v3.4.8/go.mod h1:LXmPfbJnJNvL0kQs8HfnkV3Wya2Wb+C7keVq++RCZnk= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kgo.go b/kgo.go new file mode 100644 index 0000000..45bf53f --- /dev/null +++ b/kgo.go @@ -0,0 +1,236 @@ +// Package kgo provides a kafka broker using kgo +package kgo + +import ( + "context" + "net" + "sync" + "time" + + kgo "github.com/twmb/franz-go/pkg/kgo" + sasl "github.com/twmb/franz-go/pkg/sasl" + "github.com/unistack-org/micro/v3/broker" +) + +type kBroker struct { + client *kgo.Client + connected bool + init bool + sync.RWMutex + opts broker.Options +} + +type subscriber struct { + topic string + opts broker.SubscribeOptions + handler broker.Handler + closed bool + done chan struct{} + sync.RWMutex +} + +type publication struct { + topic string + partition int + offset int64 + err error + sync.RWMutex + msg *broker.Message +} + +func (p *publication) Topic() string { + return p.topic +} + +func (p *publication) Message() *broker.Message { + return p.msg +} + +func (p *publication) Ack() error { + return nil +} + +func (p *publication) Error() error { + return p.err +} + +func (s *subscriber) Options() broker.SubscribeOptions { + return s.opts +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Unsubscribe(ctx context.Context) error { + return nil +} + +func (k *kBroker) Address() string { + if len(k.opts.Addrs) > 0 { + return k.opts.Addrs[0] + } + return "127.0.0.1:9092" +} + +func (k *kBroker) Name() string { + return k.opts.Name +} + +func (k *kBroker) Connect(ctx context.Context) error { + k.RLock() + if k.connected { + k.RUnlock() + return nil + } + k.RUnlock() + + nctx := k.opts.Context + if ctx != nil { + nctx = ctx + } + + opts := []kgo.Opt{kgo.SeedBrokers(k.opts.Addrs...)} + if k.opts.Context != nil { + if v, ok := k.opts.Context.Value(clientIDKey{}).(string); ok && v != "" { + opts = append(opts, kgo.ClientID(v)) + } + if v, ok := k.opts.Context.Value(maxReadBytesKey{}).(int32); ok { + opts = append(opts, kgo.BrokerMaxReadBytes(v)) + } + if v, ok := k.opts.Context.Value(maxWriteBytesKey{}).(int32); ok { + opts = append(opts, kgo.BrokerMaxWriteBytes(v)) + } + + if v, ok := k.opts.Context.Value(connIdleTimeoutKey{}).(time.Duration); ok { + opts = append(opts, kgo.ConnIdleTimeout(v)) + } + if v, ok := k.opts.Context.Value(connTimeoutOverheadKey{}).(time.Duration); ok { + opts = append(opts, kgo.ConnTimeoutOverhead(v)) + } + if v, ok := k.opts.Context.Value(dialerKey{}).(func(ctx context.Context, network, host string) (net.Conn, error)); ok { + opts = append(opts, kgo.Dialer(v)) + } + if v, ok := k.opts.Context.Value(metadataMaxAgeKey{}).(time.Duration); ok { + opts = append(opts, kgo.MetadataMaxAge(v)) + } + if v, ok := k.opts.Context.Value(metadataMinAgeKey{}).(time.Duration); ok { + opts = append(opts, kgo.MetadataMinAge(v)) + } + if v, ok := k.opts.Context.Value(produceRetriesKey{}).(int); ok { + opts = append(opts, kgo.ProduceRetries(v)) + } + if v, ok := k.opts.Context.Value(requestRetriesKey{}).(int); ok { + opts = append(opts, kgo.RequestRetries(v)) + } + if v, ok := k.opts.Context.Value(retryBackoffKey{}).(func(int) time.Duration); ok { + opts = append(opts, kgo.RetryBackoff(v)) + } + if v, ok := k.opts.Context.Value(retryTimeoutKey{}).(func(int16) time.Duration); ok { + opts = append(opts, kgo.RetryTimeout(v)) + } + if v, ok := k.opts.Context.Value(saslKey{}).([]sasl.Mechanism); ok { + opts = append(opts, kgo.SASL(v...)) + } + if v, ok := k.opts.Context.Value(hooksKey{}).([]kgo.Hook); ok { + opts = append(opts, kgo.WithHooks(v...)) + } + } + + var c *kgo.Client + var err error + + select { + case <-nctx.Done(): + return nctx.Err() + default: + c, err = kgo.NewClient(opts...) + if err != nil { + return err + } + } + + k.client = c + + return nil +} + +func (k *kBroker) Disconnect(ctx context.Context) error { + k.RLock() + if !k.connected { + k.RUnlock() + return nil + } + k.RUnlock() + + k.Lock() + defer k.Unlock() + + nctx := k.opts.Context + if ctx != nil { + nctx = ctx + } + + select { + case <-nctx.Done(): + return nctx.Err() + default: + k.client.Close() + } + + k.connected = false + return nil +} + +func (k *kBroker) Init(opts ...broker.Option) error { + k.Lock() + defer k.Unlock() + + if len(opts) == 0 && k.init { + return nil + } + for _, o := range opts { + o(&k.opts) + } + + if err := k.opts.Register.Init(); err != nil { + return err + } + if err := k.opts.Tracer.Init(); err != nil { + return err + } + if err := k.opts.Logger.Init(); err != nil { + return err + } + if err := k.opts.Meter.Init(); err != nil { + return err + } + + k.init = true + + return nil +} + +func (k *kBroker) Options() broker.Options { + return k.opts +} + +func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { + return nil +} + +func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + options := broker.NewSubscribeOptions(opts...) + sub := &subscriber{opts: options} + return sub, nil +} + +func (k *kBroker) String() string { + return "kgo" +} + +func NewBroker(opts ...broker.Option) broker.Broker { + return &kBroker{ + opts: broker.NewOptions(opts...), + } +} diff --git a/kgo_test.go b/kgo_test.go new file mode 100644 index 0000000..905ad64 --- /dev/null +++ b/kgo_test.go @@ -0,0 +1,70 @@ +package kgo_test + +import ( + "context" + "os" + "strings" + "testing" + + kgo "github.com/unistack-org/micro-broker-kgo/v3" + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/logger" +) + +var ( + bm = &broker.Message{ + Header: map[string]string{"hkey": "hval"}, + Body: []byte(`"body"`), + } +) + +func TestPubSub(t *testing.T) { + if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { + t.Skip() + } + + logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)) + ctx := context.Background() + + var addrs []string + if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 { + addrs = []string{"127.0.0.1:9092"} + } else { + addrs = strings.Split(addr, ",") + } + + b := kgo.NewBroker(broker.Addrs(addrs...), kgo.ClientID("test")) + if err := b.Init(); err != nil { + t.Fatal(err) + } + + if err := b.Connect(ctx); err != nil { + t.Fatal(err) + } + + defer func() { + if err := b.Disconnect(ctx); err != nil { + t.Fatal(err) + } + }() + + done := make(chan bool, 1) + fn := func(msg broker.Event) error { + done <- true + return msg.Ack() + } + + sub, err := b.Subscribe(ctx, "test_topic", fn, broker.SubscribeGroup("test")) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := sub.Unsubscribe(ctx); err != nil { + t.Fatal(err) + } + }() + if err := b.Publish(ctx, "test_topic", bm); err != nil { + t.Fatal(err) + } + <-done +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..cac3440 --- /dev/null +++ b/options.go @@ -0,0 +1,113 @@ +package kgo + +import ( + "context" + "net" + "time" + + kgo "github.com/twmb/franz-go/pkg/kgo" + sasl "github.com/twmb/franz-go/pkg/sasl" + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/client" +) + +type subscribeContextKey struct{} + +// SubscribeContext set the context for broker.SubscribeOption +func SubscribeContext(ctx context.Context) broker.SubscribeOption { + return broker.SetSubscribeOption(subscribeContextKey{}, ctx) +} + +type publishKey struct{} + +func PublishKey(key []byte) broker.PublishOption { + return broker.SetPublishOption(publishKey{}, key) +} + +func ClientPublishKey(key []byte) client.PublishOption { + return client.SetPublishOption(publishKey{}, key) +} + +type clientIDKey struct{} + +func ClientID(id string) broker.Option { + return broker.SetOption(clientIDKey{}, id) +} + +type maxReadBytesKey struct{} + +func MaxReadBytes(n int32) broker.Option { + return broker.SetOption(maxReadBytesKey{}, n) +} + +type maxWriteBytesKey struct{} + +func MaxWriteBytes(n int32) broker.Option { + return broker.SetOption(maxWriteBytesKey{}, n) +} + +type connIdleTimeoutKey struct{} + +func ConnIdleTimeout(td time.Duration) broker.Option { + return broker.SetOption(connIdleTimeoutKey{}, td) +} + +type connTimeoutOverheadKey struct{} + +func ConnTimeoutOverhead(td time.Duration) broker.Option { + return broker.SetOption(connTimeoutOverheadKey{}, td) +} + +type dialerKey struct{} + +func Dialer(fn func(ctx context.Context, network, host string) (net.Conn, error)) broker.Option { + return broker.SetOption(dialerKey{}, fn) +} + +type metadataMaxAgeKey struct{} + +func MetadataMaxAge(td time.Duration) broker.Option { + return broker.SetOption(metadataMaxAgeKey{}, td) +} + +type metadataMinAgeKey struct{} + +func MetadataMinAge(td time.Duration) broker.Option { + return broker.SetOption(metadataMinAgeKey{}, td) +} + +type produceRetriesKey struct{} + +func ProduceRetries(n int) broker.Option { + return broker.SetOption(produceRetriesKey{}, n) +} + +type requestRetriesKey struct{} + +func RequestRetries(n int) broker.Option { + return broker.SetOption(requestRetriesKey{}, n) +} + +type retryBackoffKey struct{} + +func RetryBackoff(fn func(int) time.Duration) broker.Option { + return broker.SetOption(retryBackoffKey{}, fn) +} + +type retryTimeoutKey struct{} + +func RetryTimeout(fn func(int16) time.Duration) broker.Option { + return broker.SetOption(retryTimeoutKey{}, fn) +} + +type saslKey struct{} + +func SASL(sasls ...sasl.Mechanism) broker.Option { + return broker.SetOption(saslKey{}, sasls) +} + +type hooksKey struct{} + +func Hooks(hooks ...kgo.Hook) broker.Option { + return broker.SetOption(hooksKey{}, hooks) +}