add additional options and metadata fixes #25
@@ -1,2 +1,9 @@
 | 
				
			|||||||
# micro-broker-kgo
 | 
					# micro-broker-kgo
 | 
				
			||||||
yet another micro kafka broker alternative
 | 
					yet another micro kafka broker alternative
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					TODO:
 | 
				
			||||||
 | 
					* dont always append options from context on Init and New
 | 
				
			||||||
 | 
					* add SubscriberOptions(...kgo.Opt)
 | 
				
			||||||
 | 
					* add ServerSubscribeOptions(...kgo.Opt)
 | 
				
			||||||
 | 
					* check PublisherOptions(...kgo.Opt)
 | 
				
			||||||
 | 
					* check ClientPublisherOptions(...kgo.Opt)
 | 
				
			||||||
							
								
								
									
										7
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								go.mod
									
									
									
									
									
								
							@@ -3,7 +3,8 @@ module go.unistack.org/micro-broker-kgo/v3
 | 
				
			|||||||
go 1.16
 | 
					go 1.16
 | 
				
			||||||
 | 
					
 | 
				
			||||||
require (
 | 
					require (
 | 
				
			||||||
	github.com/twmb/franz-go v1.2.5
 | 
						github.com/pierrec/lz4/v4 v4.1.12 // indirect
 | 
				
			||||||
	github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211127185622-3b34db0c6d1e
 | 
						github.com/twmb/franz-go v1.2.6
 | 
				
			||||||
	go.unistack.org/micro/v3 v3.8.11
 | 
						github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211207071611-6a03ca9e400b
 | 
				
			||||||
 | 
						go.unistack.org/micro/v3 v3.8.12
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										15
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								go.sum
									
									
									
									
									
								
							@@ -17,24 +17,27 @@ github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aW
 | 
				
			|||||||
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
 | 
					github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
 | 
				
			||||||
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
 | 
					github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
 | 
				
			||||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
 | 
					github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
 | 
				
			||||||
 | 
					github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
 | 
				
			||||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
 | 
					github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
 | 
				
			||||||
github.com/pierrec/lz4/v4 v4.1.11 h1:LVs17FAZJFOjgmJXl9Tf13WfLUvZq7/RjfEJrnwZ9OE=
 | 
					 | 
				
			||||||
github.com/pierrec/lz4/v4 v4.1.11/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 | 
					github.com/pierrec/lz4/v4 v4.1.11/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 | 
				
			||||||
 | 
					github.com/pierrec/lz4/v4 v4.1.12 h1:44l88ehTZAUGW4VlO1QC4zkilL99M6Y9MXNwEs0uzP8=
 | 
				
			||||||
 | 
					github.com/pierrec/lz4/v4 v4.1.12/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 | 
				
			||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 | 
					github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 | 
				
			||||||
github.com/silas/dag v0.0.0-20210626123444-3804bac2d6d4/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
 | 
					github.com/silas/dag v0.0.0-20210626123444-3804bac2d6d4/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
 | 
				
			||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 | 
					github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 | 
				
			||||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 | 
					github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 | 
				
			||||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 | 
					github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 | 
				
			||||||
github.com/twmb/franz-go v1.2.5 h1:dWSTPbn1npb873T5C7D1PeIoimDPdTJFv8h1TjjCkD8=
 | 
					github.com/twmb/franz-go v1.2.6 h1:WVub2Sml7LqER9VU0WxsiOTom4LBK7YMj+7jbqadE3U=
 | 
				
			||||||
github.com/twmb/franz-go v1.2.5/go.mod h1:P+i2DnBaec1o0z9EI8CyAM/WAjG99CHI3oCAhZDoy48=
 | 
					github.com/twmb/franz-go v1.2.6/go.mod h1:P+i2DnBaec1o0z9EI8CyAM/WAjG99CHI3oCAhZDoy48=
 | 
				
			||||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211127185622-3b34db0c6d1e h1:ZMTL30cZwBstwP838Xmk6biMB27j51tZaKXdEhuyrw0=
 | 
					 | 
				
			||||||
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211127185622-3b34db0c6d1e/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
 | 
					github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211127185622-3b34db0c6d1e/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
 | 
				
			||||||
 | 
					github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211207071611-6a03ca9e400b h1:rCe006NN/89GvtdXqRbgsqtL/nVTj83/dQ9ok8DJFcM=
 | 
				
			||||||
 | 
					github.com/twmb/franz-go/pkg/kmsg v0.0.0-20211207071611-6a03ca9e400b/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
 | 
				
			||||||
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
 | 
					github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
 | 
				
			||||||
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
 | 
					github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
 | 
				
			||||||
go.unistack.org/micro-proto/v3 v3.1.0 h1:q39FwjFiRZn+Ux/tt+d3bJTmDtsQQWa+3SLYVo1vLfA=
 | 
					go.unistack.org/micro-proto/v3 v3.1.0 h1:q39FwjFiRZn+Ux/tt+d3bJTmDtsQQWa+3SLYVo1vLfA=
 | 
				
			||||||
go.unistack.org/micro-proto/v3 v3.1.0/go.mod h1:DpRhYCBXlmSJ/AAXTmntvlh7kQkYU6eFvlmYAx4BQS8=
 | 
					go.unistack.org/micro-proto/v3 v3.1.0/go.mod h1:DpRhYCBXlmSJ/AAXTmntvlh7kQkYU6eFvlmYAx4BQS8=
 | 
				
			||||||
go.unistack.org/micro/v3 v3.8.11 h1:Wv1YopcYNcsN3bW8Mv8v6AF99s0uKBtWQ1M/Ag8QLec=
 | 
					go.unistack.org/micro/v3 v3.8.12 h1:ACaHE8ZIHFXqEGPSRvXzND4hcqCSQf04WkzOFY6Y1gQ=
 | 
				
			||||||
go.unistack.org/micro/v3 v3.8.11/go.mod h1:KMMmOmbgo/D52/rCAbqeKbBsgEEbSKM69he54J3ZIuA=
 | 
					go.unistack.org/micro/v3 v3.8.12/go.mod h1:KMMmOmbgo/D52/rCAbqeKbBsgEEbSKM69he54J3ZIuA=
 | 
				
			||||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 | 
					golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 | 
				
			||||||
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 | 
					golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 | 
				
			||||||
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 | 
					golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										31
									
								
								options.go
									
									
									
									
									
								
							
							
						
						
									
										31
									
								
								options.go
									
									
									
									
									
								
							@@ -7,6 +7,7 @@ import (
 | 
				
			|||||||
	kgo "github.com/twmb/franz-go/pkg/kgo"
 | 
						kgo "github.com/twmb/franz-go/pkg/kgo"
 | 
				
			||||||
	"go.unistack.org/micro/v3/broker"
 | 
						"go.unistack.org/micro/v3/broker"
 | 
				
			||||||
	"go.unistack.org/micro/v3/client"
 | 
						"go.unistack.org/micro/v3/client"
 | 
				
			||||||
 | 
						"go.unistack.org/micro/v3/server"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// DefaultCommitInterval specifies how fast send commit offsets to kafka
 | 
					// DefaultCommitInterval specifies how fast send commit offsets to kafka
 | 
				
			||||||
@@ -48,6 +49,36 @@ func Options(opts ...kgo.Opt) broker.Option {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SubscribeOptions pass additional options to broker
 | 
				
			||||||
 | 
					func SubscribeOptions(opts ...kgo.Opt) broker.SubscribeOption {
 | 
				
			||||||
 | 
						return func(o *broker.SubscribeOptions) {
 | 
				
			||||||
 | 
							if o.Context == nil {
 | 
				
			||||||
 | 
								o.Context = context.Background()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								options = make([]kgo.Opt, 0, len(opts))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							options = append(options, opts...)
 | 
				
			||||||
 | 
							o.Context = context.WithValue(o.Context, optionsKey{}, options)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SubscriberOptions pass additional options to broker
 | 
				
			||||||
 | 
					func SubscriberOptions(opts ...kgo.Opt) server.SubscriberOption {
 | 
				
			||||||
 | 
						return func(o *server.SubscriberOptions) {
 | 
				
			||||||
 | 
							if o.Context == nil {
 | 
				
			||||||
 | 
								o.Context = context.Background()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							options, ok := o.Context.Value(optionsKey{}).([]kgo.Opt)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								options = make([]kgo.Opt, 0, len(opts))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							options = append(options, opts...)
 | 
				
			||||||
 | 
							o.Context = context.WithValue(o.Context, optionsKey{}, options)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type commitIntervalKey struct{}
 | 
					type commitIntervalKey struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// CommitInterval specifies interval to send commits
 | 
					// CommitInterval specifies interval to send commits
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										24
									
								
								util.go
									
									
									
									
									
								
							
							
						
						
									
										24
									
								
								util.go
									
									
									
									
									
								
							@@ -158,6 +158,14 @@ func (w *worker) handle() {
 | 
				
			|||||||
				p.ack = false
 | 
									p.ack = false
 | 
				
			||||||
				if w.opts.BodyOnly {
 | 
									if w.opts.BodyOnly {
 | 
				
			||||||
					p.msg.Body = record.Value
 | 
										p.msg.Body = record.Value
 | 
				
			||||||
 | 
										if l := len(record.Headers); l > 0 {
 | 
				
			||||||
 | 
											if p.msg.Header == nil {
 | 
				
			||||||
 | 
												p.msg.Header = metadata.New(l)
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
											for _, h := range record.Headers {
 | 
				
			||||||
 | 
												p.msg.Header.Set(h.Key, string(h.Value))
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
				} else if w.kopts.Codec.String() == "noop" {
 | 
									} else if w.kopts.Codec.String() == "noop" {
 | 
				
			||||||
					p.msg.Body = record.Value
 | 
										p.msg.Body = record.Value
 | 
				
			||||||
					p.msg.Header = metadata.New(len(record.Headers))
 | 
										p.msg.Header = metadata.New(len(record.Headers))
 | 
				
			||||||
@@ -168,6 +176,14 @@ func (w *worker) handle() {
 | 
				
			|||||||
					if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
 | 
										if err := w.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
 | 
				
			||||||
						p.err = err
 | 
											p.err = err
 | 
				
			||||||
						p.msg.Body = record.Value
 | 
											p.msg.Body = record.Value
 | 
				
			||||||
 | 
											if l := len(record.Headers); l > 0 {
 | 
				
			||||||
 | 
												if p.msg.Header == nil {
 | 
				
			||||||
 | 
													p.msg.Header = metadata.New(l)
 | 
				
			||||||
 | 
												}
 | 
				
			||||||
 | 
												for _, h := range record.Headers {
 | 
				
			||||||
 | 
													p.msg.Header.Set(h.Key, string(h.Value))
 | 
				
			||||||
 | 
												}
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
						if eh != nil {
 | 
											if eh != nil {
 | 
				
			||||||
							_ = eh(p)
 | 
												_ = eh(p)
 | 
				
			||||||
							if p.ack {
 | 
												if p.ack {
 | 
				
			||||||
@@ -188,6 +204,14 @@ func (w *worker) handle() {
 | 
				
			|||||||
						w.cherr <- err
 | 
											w.cherr <- err
 | 
				
			||||||
						return
 | 
											return
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
 | 
										if l := len(record.Headers); l > 0 {
 | 
				
			||||||
 | 
											if p.msg.Header == nil {
 | 
				
			||||||
 | 
												p.msg.Header = metadata.New(l)
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
											for _, h := range record.Headers {
 | 
				
			||||||
 | 
												p.msg.Header.Set(h.Key, string(h.Value))
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				err = w.handler(p)
 | 
									err = w.handler(p)
 | 
				
			||||||
				if err == nil && w.opts.AutoAck {
 | 
									if err == nil && w.opts.AutoAck {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user