From 81a85041d324283772c828e6085c17a4bec76508 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 22 Sep 2020 13:16:35 +0300 Subject: [PATCH] cleanup Signed-off-by: Vasiliy Tolstov --- context.go | 48 ------------------------------------------------ go.mod | 6 +++++- go.sum | 25 +++++++++++++++++++++---- options.go | 10 +++++----- segmentio.go | 7 ++++++- 5 files changed, 37 insertions(+), 59 deletions(-) delete mode 100644 context.go diff --git a/context.go b/context.go deleted file mode 100644 index 20ceada..0000000 --- a/context.go +++ /dev/null @@ -1,48 +0,0 @@ -package segmentio - -import ( - "context" - - "github.com/unistack-org/micro/v3/broker" - "github.com/unistack-org/micro/v3/server" -) - -// setSubscribeOption returns a function to setup a context with given value -func setSubscribeOption(k, v interface{}) broker.SubscribeOption { - return func(o *broker.SubscribeOptions) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} - -// setBrokerOption returns a function to setup a context with given value -func setBrokerOption(k, v interface{}) broker.Option { - return func(o *broker.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} - -// setBrokerOption returns a function to setup a context with given value -func setServerSubscriberOption(k, v interface{}) server.SubscriberOption { - return func(o *server.SubscriberOptions) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} - -// setPublishOption returns a function to setup a context with given value -func setPublishOption(k, v interface{}) broker.PublishOption { - return func(o *broker.PublishOptions) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} diff --git a/go.mod b/go.mod index f731c68..62c21b6 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,11 @@ module github.com/unistack-org/micro-broker-segmentio go 1.15 require ( + github.com/google/go-cmp v0.5.1 // indirect github.com/google/uuid v1.1.2 github.com/segmentio/kafka-go v0.3.8 - github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200904234316-e7d418183b62 + github.com/stretchr/testify v1.6.1 // indirect + github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922101505-ec3c1a02fce6 + golang.org/x/text v0.3.3 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect ) diff --git a/go.sum b/go.sum index 3809f5e..67701b2 100644 --- a/go.sum +++ b/go.sum @@ -132,6 +132,8 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k= +github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -289,15 +291,23 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY= github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= +github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 h1:5b1yuSllbsMm/9fUIlIXSr8DbsKT/sAKSCgOx6+SAfI= github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE= -github.com/unistack-org/micro-config-cmd v0.0.0-20200828075439-d859b9d7265b h1:v5Ak+Sr780jZclFDnx82g5biF0N5HRVKphEpJhbnVUs= -github.com/unistack-org/micro-config-cmd v0.0.0-20200828075439-d859b9d7265b/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k= +github.com/unistack-org/micro-config-cmd v0.0.0-20200909210346-ec89783dc46c/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k= +github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34 h1:VHc98t4SoiCF/jbkFu2e/j+IyJ/+MFQ1T+INNL7LubU= +github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34/go.mod h1:fT1gYn+TtfVZZ5tNx56bZIncJjmlji66g7GKdWua5hE= +github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc h1:hHAU3rgeiA0LaudfNdMLf9/jkOBeFxvJdnwXevviZF8= +github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.mod h1:il8nz4ZEcX3Usyfrtwy+YtQcb7xSUSFJdSe8PBJ9gOA= github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200904234316-e7d418183b62 h1:dElbxPX7dJU6p7kmllY1PqvI67fwl3lBBkZk3xDjtL4= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200904234316-e7d418183b62/go.mod h1:mB0h+i3Sa4jD8G2dv97cAAdyh01hVQWKw4xSdmTpyOo= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922101505-ec3c1a02fce6 h1:gDXQgBibadqZFbV+JZXxu3J2dK2kVJYmRuy/5Jh/xd8= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922101505-ec3c1a02fce6/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= @@ -414,6 +424,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w= @@ -443,6 +455,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= @@ -464,6 +478,7 @@ google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBr google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1 h1:aQktFqmDE2yjveXJlVIfslDFmFnUXSqG0i6KRcJAeMc= google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d h1:92D1fum1bJLKSdr11OJ+54YeCMCGYIygTA7R/YZxH5M= google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -512,6 +527,8 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/options.go b/options.go index a4cce12..4759223 100644 --- a/options.go +++ b/options.go @@ -16,28 +16,28 @@ type readerConfigKey struct{} type writerConfigKey struct{} func ReaderConfig(c kafka.ReaderConfig) broker.Option { - return setBrokerOption(readerConfigKey{}, c) + return broker.SetBrokerOption(readerConfigKey{}, c) } func WriterConfig(c kafka.WriterConfig) broker.Option { - return setBrokerOption(writerConfigKey{}, c) + return broker.SetBrokerOption(writerConfigKey{}, c) } type subscribeContextKey struct{} // SubscribeContext set the context for broker.SubscribeOption func SubscribeContext(ctx context.Context) broker.SubscribeOption { - return setSubscribeOption(subscribeContextKey{}, ctx) + return broker.SetSubscribeOption(subscribeContextKey{}, ctx) } type subscribeReaderConfigKey struct{} func SubscribeReaderConfig(c kafka.ReaderConfig) broker.SubscribeOption { - return setSubscribeOption(subscribeReaderConfigKey{}, c) + return broker.SetSubscribeOption(subscribeReaderConfigKey{}, c) } type subscribeWriterConfigKey struct{} func SubscribeWriterConfig(c kafka.WriterConfig) broker.SubscribeOption { - return setSubscribeOption(subscribeWriterConfigKey{}, c) + return broker.SetSubscribeOption(subscribeWriterConfigKey{}, c) } diff --git a/segmentio.go b/segmentio.go index d02e548..f6f1a13 100644 --- a/segmentio.go +++ b/segmentio.go @@ -4,6 +4,7 @@ package segmentio import ( "context" "errors" + "fmt" "sync" "time" @@ -186,9 +187,10 @@ func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ buf, err := k.opts.Codec.Marshal(msg) if err != nil { + fmt.Printf("TTTT %v\n", err) return err } - + fmt.Printf("FFFF\n") kmsg := kafka.Message{Value: buf} k.Lock() @@ -208,6 +210,9 @@ func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ k.Unlock() err = writer.WriteMessages(k.opts.Context, kmsg) + if logger.V(logger.TraceLevel) { + logger.Tracef("write message err: %v", err) + } if err != nil { switch cached { case false: