Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-09-22 13:16:35 +03:00
parent f56df22fa3
commit 81a85041d3
5 changed files with 37 additions and 59 deletions

View File

@ -1,48 +0,0 @@
package segmentio
import (
"context"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/server"
)
// setSubscribeOption returns a function to setup a context with given value
func setSubscribeOption(k, v interface{}) broker.SubscribeOption {
return func(o *broker.SubscribeOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
// setBrokerOption returns a function to setup a context with given value
func setBrokerOption(k, v interface{}) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
// setBrokerOption returns a function to setup a context with given value
func setServerSubscriberOption(k, v interface{}) server.SubscriberOption {
return func(o *server.SubscriberOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
// setPublishOption returns a function to setup a context with given value
func setPublishOption(k, v interface{}) broker.PublishOption {
return func(o *broker.PublishOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

6
go.mod
View File

@ -3,7 +3,11 @@ module github.com/unistack-org/micro-broker-segmentio
go 1.15
require (
github.com/google/go-cmp v0.5.1 // indirect
github.com/google/uuid v1.1.2
github.com/segmentio/kafka-go v0.3.8
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200904234316-e7d418183b62
github.com/stretchr/testify v1.6.1 // indirect
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922101505-ec3c1a02fce6
golang.org/x/text v0.3.3 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)

25
go.sum
View File

@ -132,6 +132,8 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@ -289,15 +291,23 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY=
github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 h1:5b1yuSllbsMm/9fUIlIXSr8DbsKT/sAKSCgOx6+SAfI=
github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE=
github.com/unistack-org/micro-config-cmd v0.0.0-20200828075439-d859b9d7265b h1:v5Ak+Sr780jZclFDnx82g5biF0N5HRVKphEpJhbnVUs=
github.com/unistack-org/micro-config-cmd v0.0.0-20200828075439-d859b9d7265b/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k=
github.com/unistack-org/micro-config-cmd v0.0.0-20200909210346-ec89783dc46c/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k=
github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34 h1:VHc98t4SoiCF/jbkFu2e/j+IyJ/+MFQ1T+INNL7LubU=
github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34/go.mod h1:fT1gYn+TtfVZZ5tNx56bZIncJjmlji66g7GKdWua5hE=
github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc h1:hHAU3rgeiA0LaudfNdMLf9/jkOBeFxvJdnwXevviZF8=
github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.mod h1:il8nz4ZEcX3Usyfrtwy+YtQcb7xSUSFJdSe8PBJ9gOA=
github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200904234316-e7d418183b62 h1:dElbxPX7dJU6p7kmllY1PqvI67fwl3lBBkZk3xDjtL4=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200904234316-e7d418183b62/go.mod h1:mB0h+i3Sa4jD8G2dv97cAAdyh01hVQWKw4xSdmTpyOo=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922101505-ec3c1a02fce6 h1:gDXQgBibadqZFbV+JZXxu3J2dK2kVJYmRuy/5Jh/xd8=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922101505-ec3c1a02fce6/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
@ -414,6 +424,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w=
@ -443,6 +455,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
@ -464,6 +478,7 @@ google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBr
google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1 h1:aQktFqmDE2yjveXJlVIfslDFmFnUXSqG0i6KRcJAeMc=
google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d h1:92D1fum1bJLKSdr11OJ+54YeCMCGYIygTA7R/YZxH5M=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
@ -512,6 +527,8 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -16,28 +16,28 @@ type readerConfigKey struct{}
type writerConfigKey struct{}
func ReaderConfig(c kafka.ReaderConfig) broker.Option {
return setBrokerOption(readerConfigKey{}, c)
return broker.SetBrokerOption(readerConfigKey{}, c)
}
func WriterConfig(c kafka.WriterConfig) broker.Option {
return setBrokerOption(writerConfigKey{}, c)
return broker.SetBrokerOption(writerConfigKey{}, c)
}
type subscribeContextKey struct{}
// SubscribeContext set the context for broker.SubscribeOption
func SubscribeContext(ctx context.Context) broker.SubscribeOption {
return setSubscribeOption(subscribeContextKey{}, ctx)
return broker.SetSubscribeOption(subscribeContextKey{}, ctx)
}
type subscribeReaderConfigKey struct{}
func SubscribeReaderConfig(c kafka.ReaderConfig) broker.SubscribeOption {
return setSubscribeOption(subscribeReaderConfigKey{}, c)
return broker.SetSubscribeOption(subscribeReaderConfigKey{}, c)
}
type subscribeWriterConfigKey struct{}
func SubscribeWriterConfig(c kafka.WriterConfig) broker.SubscribeOption {
return setSubscribeOption(subscribeWriterConfigKey{}, c)
return broker.SetSubscribeOption(subscribeWriterConfigKey{}, c)
}

View File

@ -4,6 +4,7 @@ package segmentio
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -186,9 +187,10 @@ func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ
buf, err := k.opts.Codec.Marshal(msg)
if err != nil {
fmt.Printf("TTTT %v\n", err)
return err
}
fmt.Printf("FFFF\n")
kmsg := kafka.Message{Value: buf}
k.Lock()
@ -208,6 +210,9 @@ func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ
k.Unlock()
err = writer.WriteMessages(k.opts.Context, kmsg)
if logger.V(logger.TraceLevel) {
logger.Tracef("write message err: %v", err)
}
if err != nil {
switch cached {
case false: