add option to inject nats.Options
This commit is contained in:
parent
c67ac99441
commit
a78cdc8394
34
nats.go
34
nats.go
@ -2,6 +2,7 @@
|
|||||||
package nats
|
package nats
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/micro/go-micro/broker"
|
"github.com/micro/go-micro/broker"
|
||||||
@ -14,6 +15,7 @@ type nbroker struct {
|
|||||||
addrs []string
|
addrs []string
|
||||||
conn *nats.Conn
|
conn *nats.Conn
|
||||||
opts broker.Options
|
opts broker.Options
|
||||||
|
nopts nats.Options
|
||||||
}
|
}
|
||||||
|
|
||||||
type subscriber struct {
|
type subscriber struct {
|
||||||
@ -87,7 +89,7 @@ func (n *nbroker) Connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := nats.DefaultOptions
|
opts := n.nopts
|
||||||
opts.Servers = n.addrs
|
opts.Servers = n.addrs
|
||||||
opts.Secure = n.opts.Secure
|
opts.Secure = n.opts.Secure
|
||||||
opts.TLSConfig = n.opts.TLSConfig
|
opts.TLSConfig = n.opts.TLSConfig
|
||||||
@ -166,17 +168,41 @@ func (n *nbroker) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewBroker(opts ...broker.Option) broker.Broker {
|
func NewBroker(opts ...broker.Option) broker.Broker {
|
||||||
|
|
||||||
|
bopts := &brokerOptions{
|
||||||
|
natsOptions: DefaultNatsOptions,
|
||||||
|
}
|
||||||
|
|
||||||
options := broker.Options{
|
options := broker.Options{
|
||||||
// Default codec
|
// Default codec
|
||||||
Codec: json.NewCodec(),
|
Codec: json.NewCodec(),
|
||||||
|
Context: context.WithValue(context.Background(), optionsKey, bopts),
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &nbroker{
|
// broker.Options have higher priority than nats.Options
|
||||||
addrs: setAddrs(options.Addrs),
|
// only if Addrs, Secure or TLSConfig were not set through a broker.Option
|
||||||
opts: options,
|
// we read them from nats.Option
|
||||||
|
if len(options.Addrs) == 0 {
|
||||||
|
options.Addrs = bopts.natsOptions.Servers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !options.Secure {
|
||||||
|
options.Secure = bopts.natsOptions.Secure
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.TLSConfig == nil {
|
||||||
|
options.TLSConfig = bopts.natsOptions.TLSConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
nb := &nbroker{
|
||||||
|
opts: options,
|
||||||
|
nopts: bopts.natsOptions,
|
||||||
|
addrs: setAddrs(options.Addrs),
|
||||||
|
}
|
||||||
|
|
||||||
|
return nb
|
||||||
}
|
}
|
||||||
|
90
nats_test.go
90
nats_test.go
@ -1,36 +1,96 @@
|
|||||||
package nats
|
package nats
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/broker"
|
"github.com/micro/go-micro/broker"
|
||||||
|
"github.com/nats-io/nats"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var addrTestCases = []struct {
|
||||||
|
name string
|
||||||
|
description string
|
||||||
|
addrs map[string]string // expected address : set address
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"brokerOptionConstructor",
|
||||||
|
"set broker addresses through a broker.Option in constructor",
|
||||||
|
map[string]string{
|
||||||
|
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||||
|
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"brokerOptionInit",
|
||||||
|
"set broker addresses through a broker.Option in broker.Init()",
|
||||||
|
map[string]string{
|
||||||
|
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||||
|
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"natsOptionConstructor",
|
||||||
|
"set broker addresses through the nats.Option in constructor",
|
||||||
|
map[string]string{
|
||||||
|
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||||
|
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"default",
|
||||||
|
"check if default Address is set correctly",
|
||||||
|
map[string]string{
|
||||||
|
"nats://localhost:4222": ""},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// TestInitAddrs tests issue #100. Ensures that if the addrs is set by an option in init it will be used.
|
// TestInitAddrs tests issue #100. Ensures that if the addrs is set by an option in init it will be used.
|
||||||
func TestInitAddrs(t *testing.T) {
|
func TestInitAddrs(t *testing.T) {
|
||||||
nb := NewBroker()
|
|
||||||
|
|
||||||
addr1, addr2 := "192.168.10.1:5222", "10.20.10.0:4222"
|
for _, tc := range addrTestCases {
|
||||||
|
t.Run(fmt.Sprintf("%s: %s", tc.name, tc.description), func(t *testing.T) {
|
||||||
|
|
||||||
nb.Init(broker.Addrs(addr1, addr2))
|
var br broker.Broker
|
||||||
|
var addrs []string
|
||||||
|
|
||||||
if len(nb.Options().Addrs) != 2 {
|
for _, addr := range tc.addrs {
|
||||||
t.Errorf("Expected Addr count = 2, Actual Addr count = %d", len(nb.Options().Addrs))
|
addrs = append(addrs, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
natsBroker, ok := nb.(*nbroker)
|
switch tc.name {
|
||||||
|
case "brokerOptionConstructor":
|
||||||
|
// we know that there are just two addrs in the dict
|
||||||
|
br = NewBroker(broker.Addrs(addrs[0], addrs[1]))
|
||||||
|
br.Init()
|
||||||
|
case "brokerOptionInit":
|
||||||
|
br = NewBroker()
|
||||||
|
// we know that there are just two addrs in the dict
|
||||||
|
br.Init(broker.Addrs(addrs[0], addrs[1]))
|
||||||
|
case "natsOptionConstructor":
|
||||||
|
nopts := nats.GetDefaultOptions()
|
||||||
|
nopts.Servers = addrs
|
||||||
|
br = NewBroker(NatsOptions(nopts))
|
||||||
|
br.Init()
|
||||||
|
case "default":
|
||||||
|
br = NewBroker()
|
||||||
|
br.Init()
|
||||||
|
}
|
||||||
|
|
||||||
|
natsBroker, ok := br.(*nbroker)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatal("Expected broker to be of types *nbroker")
|
t.Fatal("Expected broker to be of types *nbroker")
|
||||||
}
|
}
|
||||||
|
// check if the same amount of addrs we set has actually been set
|
||||||
addr1f := fmt.Sprintf("nats://%s", addr1)
|
if len(natsBroker.addrs) != len(tc.addrs) {
|
||||||
addr2f := fmt.Sprintf("nats://%s", addr2)
|
t.Errorf("Expected Addr count = %d, Actual Addr count = %d",
|
||||||
|
len(natsBroker.addrs), len(tc.addrs))
|
||||||
if natsBroker.addrs[0] != addr1f && natsBroker.addrs[1] != addr2f {
|
|
||||||
expAddr, actAddr := fmt.Sprintf("%s,%s", addr1f, addr2f), fmt.Sprintf("%s,%s", natsBroker.addrs[0], natsBroker.addrs[1])
|
|
||||||
t.Errorf("Expected = '%s', Actual = '%s'", expAddr, actAddr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, addr := range natsBroker.addrs {
|
||||||
|
_, ok := tc.addrs[addr]
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Expected '%s' has not been set", addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
27
options.go
Normal file
27
options.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/micro/go-micro/broker"
|
||||||
|
"github.com/nats-io/nats"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
DefaultNatsOptions = nats.GetDefaultOptions()
|
||||||
|
|
||||||
|
optionsKey = optionsKeyType{}
|
||||||
|
)
|
||||||
|
|
||||||
|
type optionsKeyType struct{}
|
||||||
|
|
||||||
|
type brokerOptions struct {
|
||||||
|
natsOptions nats.Options
|
||||||
|
}
|
||||||
|
|
||||||
|
// NatsOptions allow to inject a nats.Options struct for configuring
|
||||||
|
// the nats connection
|
||||||
|
func NatsOptions(nopts nats.Options) broker.Option {
|
||||||
|
return func(o *broker.Options) {
|
||||||
|
no := o.Context.Value(optionsKey).(*brokerOptions)
|
||||||
|
no.natsOptions = nopts
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user