fix build, add comments
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
aad8a578fc
commit
fc9af8c63e
7
go.mod
7
go.mod
@ -3,7 +3,10 @@ module github.com/unistack-org/micro-broker-kgo/v3
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/twmb/franz-go v0.8.6
|
||||
github.com/unistack-org/micro/v3 v3.5.4
|
||||
github.com/klauspost/compress v1.13.3 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.8 // indirect
|
||||
github.com/twmb/franz-go v0.9.0
|
||||
github.com/unistack-org/micro/v3 v3.6.1
|
||||
)
|
||||
|
22
go.sum
22
go.sum
@ -1,8 +1,10 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
||||
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
|
||||
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
|
||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
@ -10,6 +12,7 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
|
||||
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
|
||||
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
||||
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
|
||||
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
|
||||
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
|
||||
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
|
||||
@ -19,9 +22,14 @@ github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aW
|
||||
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
|
||||
github.com/klauspost/compress v1.13.0 h1:2T7tUoQrQT+fQWdaY5rjWztFGAFwbGD04iPJg90ZiOs=
|
||||
github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/klauspost/compress v1.13.3 h1:BtAvtV1+h0YwSVwWoYXMREPpYu9VzTJ9QDI1TEg/iQQ=
|
||||
github.com/klauspost/compress v1.13.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/pierrec/lz4/v4 v4.1.7 h1:UDV9geJWhFIufAliH7HQlz9wP3JA0t748w+RwbWMLow=
|
||||
github.com/pierrec/lz4/v4 v4.1.7/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4=
|
||||
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
@ -29,10 +37,18 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/twmb/franz-go v0.8.6 h1:m49t7tcgUz70hvTpnzHrsvQ/38Q/VKCpEj1FvDetVTE=
|
||||
github.com/twmb/franz-go v0.8.6/go.mod h1:v6QnB3abhlVAzlIEIO5L/1Emu8NlkreCI2HSps9utH0=
|
||||
github.com/twmb/franz-go v0.9.0 h1:mI8VvdLeyJh//X8KX9pWOZcp272eR41O8jI7MfGNhFU=
|
||||
github.com/twmb/franz-go v0.9.0/go.mod h1:Qa6npC7EIi4WoLmnUz9Ue/sPL0k+ex4OyHCYJ2pCAqw=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210726202344-c376dbc9081f h1:Opx7EKsXb4IOPj1ammVNksPFpbXx6aaxdIn4hGjiIpk=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210726202344-c376dbc9081f/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210804073901-278488bb4ea0 h1:mQ6E/Y7o6p5OPWkaDV5Lsy5EBZd2oBWhsTij6I0/Rh4=
|
||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210804073901-278488bb4ea0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
|
||||
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
|
||||
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
|
||||
github.com/unistack-org/micro/v3 v3.5.4 h1:6nIljqND355f+Fhc2mtCxYb5IRwer6nsMoAXpN8kka0=
|
||||
github.com/unistack-org/micro/v3 v3.5.4/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro=
|
||||
github.com/unistack-org/micro/v3 v3.6.0 h1:atxcH6C5JWVjXPDQiT8N9SALf1yWaVtpVvvxVdz7Y7s=
|
||||
github.com/unistack-org/micro/v3 v3.6.0/go.mod h1:zQnZPEy842kQNcyjmVys6tdMjty4PHdyUUKYm1wrg1s=
|
||||
github.com/unistack-org/micro/v3 v3.6.1 h1:plaRQHatJeI1iaE21rtU7Kl6tbcbK08gQuw0Fj9Bq/4=
|
||||
github.com/unistack-org/micro/v3 v3.6.1/go.mod h1:zQnZPEy842kQNcyjmVys6tdMjty4PHdyUUKYm1wrg1s=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
|
||||
|
2
kgo.go
2
kgo.go
@ -289,7 +289,7 @@ func (l *mlogger) Log(lvl kgo.LogLevel, msg string, args ...interface{}) {
|
||||
default:
|
||||
return
|
||||
}
|
||||
l.l.Fields(l.ctx, mlvl, msg, args...)
|
||||
l.l.Fields(args...).Log(l.ctx, mlvl, msg)
|
||||
}
|
||||
|
||||
func (l *mlogger) Level() kgo.LogLevel {
|
||||
|
17
options.go
17
options.go
@ -20,100 +20,117 @@ func SubscribeContext(ctx context.Context) broker.SubscribeOption {
|
||||
|
||||
type publishKey struct{}
|
||||
|
||||
// PublishKey set the kafka message key (broker option)
|
||||
func PublishKey(key []byte) broker.PublishOption {
|
||||
return broker.SetPublishOption(publishKey{}, key)
|
||||
}
|
||||
|
||||
// ClientPublishKey set the kafka message key (client option)
|
||||
func ClientPublishKey(key []byte) client.PublishOption {
|
||||
return client.SetPublishOption(publishKey{}, key)
|
||||
}
|
||||
|
||||
type clientIDKey struct{}
|
||||
|
||||
// ClientID sets the kafka client id
|
||||
func ClientID(id string) broker.Option {
|
||||
return broker.SetOption(clientIDKey{}, id)
|
||||
}
|
||||
|
||||
type maxReadBytesKey struct{}
|
||||
|
||||
// MaxReadBytes limit max bytes to read
|
||||
func MaxReadBytes(n int32) broker.Option {
|
||||
return broker.SetOption(maxReadBytesKey{}, n)
|
||||
}
|
||||
|
||||
type maxWriteBytesKey struct{}
|
||||
|
||||
// MaxWriteBytes limit max bytes to write
|
||||
func MaxWriteBytes(n int32) broker.Option {
|
||||
return broker.SetOption(maxWriteBytesKey{}, n)
|
||||
}
|
||||
|
||||
type connIdleTimeoutKey struct{}
|
||||
|
||||
// ConnIdleTimeout limit timeout for connection
|
||||
func ConnIdleTimeout(td time.Duration) broker.Option {
|
||||
return broker.SetOption(connIdleTimeoutKey{}, td)
|
||||
}
|
||||
|
||||
type connTimeoutOverheadKey struct{}
|
||||
|
||||
// ConnTimeoutOverhead ...
|
||||
func ConnTimeoutOverhead(td time.Duration) broker.Option {
|
||||
return broker.SetOption(connTimeoutOverheadKey{}, td)
|
||||
}
|
||||
|
||||
type dialerKey struct{}
|
||||
|
||||
// Dialer pass dialer
|
||||
func Dialer(fn func(ctx context.Context, network, host string) (net.Conn, error)) broker.Option {
|
||||
return broker.SetOption(dialerKey{}, fn)
|
||||
}
|
||||
|
||||
type metadataMaxAgeKey struct{}
|
||||
|
||||
// MetadataMaxAge limit metadata max age
|
||||
func MetadataMaxAge(td time.Duration) broker.Option {
|
||||
return broker.SetOption(metadataMaxAgeKey{}, td)
|
||||
}
|
||||
|
||||
type metadataMinAgeKey struct{}
|
||||
|
||||
// MetadataMinAge limit metadata min age
|
||||
func MetadataMinAge(td time.Duration) broker.Option {
|
||||
return broker.SetOption(metadataMinAgeKey{}, td)
|
||||
}
|
||||
|
||||
type produceRetriesKey struct{}
|
||||
|
||||
// ProduceRetries limit number of retries
|
||||
func ProduceRetries(n int) broker.Option {
|
||||
return broker.SetOption(produceRetriesKey{}, n)
|
||||
}
|
||||
|
||||
type requestRetriesKey struct{}
|
||||
|
||||
// RequestRetries limit number of retries
|
||||
func RequestRetries(n int) broker.Option {
|
||||
return broker.SetOption(requestRetriesKey{}, n)
|
||||
}
|
||||
|
||||
type retryBackoffKey struct{}
|
||||
|
||||
// RetryBackoff set backoff func for retry
|
||||
func RetryBackoff(fn func(int) time.Duration) broker.Option {
|
||||
return broker.SetOption(retryBackoffKey{}, fn)
|
||||
}
|
||||
|
||||
type retryTimeoutKey struct{}
|
||||
|
||||
// RetryTimeout limit retry timeout
|
||||
func RetryTimeout(fn func(int16) time.Duration) broker.Option {
|
||||
return broker.SetOption(retryTimeoutKey{}, fn)
|
||||
}
|
||||
|
||||
type saslKey struct{}
|
||||
|
||||
// SASL pass sasl mechanism to auth
|
||||
func SASL(sasls ...sasl.Mechanism) broker.Option {
|
||||
return broker.SetOption(saslKey{}, sasls)
|
||||
}
|
||||
|
||||
type hooksKey struct{}
|
||||
|
||||
// Hooks pass hooks (useful for tracing and logging)
|
||||
func Hooks(hooks ...kgo.Hook) broker.Option {
|
||||
return broker.SetOption(hooksKey{}, hooks)
|
||||
}
|
||||
|
||||
type optionsKey struct{}
|
||||
|
||||
// Options pass additional options to broker
|
||||
func Options(opts ...kgo.Opt) broker.Option {
|
||||
return broker.SetOption(optionsKey{}, opts)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user