commit
aeeb2b0010
37
broker/nats/context.go
Normal file
37
broker/nats/context.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/broker"
|
||||||
|
)
|
||||||
|
|
||||||
|
// setSubscribeOption returns a function to setup a context with given value
|
||||||
|
func setSubscribeOption(k, v interface{}) broker.SubscribeOption {
|
||||||
|
return func(o *broker.SubscribeOptions) {
|
||||||
|
if o.Context == nil {
|
||||||
|
o.Context = context.Background()
|
||||||
|
}
|
||||||
|
o.Context = context.WithValue(o.Context, k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// setBrokerOption returns a function to setup a context with given value
|
||||||
|
func setBrokerOption(k, v interface{}) broker.Option {
|
||||||
|
return func(o *broker.Options) {
|
||||||
|
if o.Context == nil {
|
||||||
|
o.Context = context.Background()
|
||||||
|
}
|
||||||
|
o.Context = context.WithValue(o.Context, k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// setPublishOption returns a function to setup a context with given value
|
||||||
|
func setPublishOption(k, v interface{}) broker.PublishOption {
|
||||||
|
return func(o *broker.PublishOptions) {
|
||||||
|
if o.Context == nil {
|
||||||
|
o.Context = context.Background()
|
||||||
|
}
|
||||||
|
o.Context = context.WithValue(o.Context, k, v)
|
||||||
|
}
|
||||||
|
}
|
245
broker/nats/nats.go
Normal file
245
broker/nats/nats.go
Normal file
@ -0,0 +1,245 @@
|
|||||||
|
// Package nats provides a NATS broker
|
||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/broker"
|
||||||
|
"github.com/micro/go-micro/codec/json"
|
||||||
|
nats "github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type natsBroker struct {
|
||||||
|
sync.RWMutex
|
||||||
|
addrs []string
|
||||||
|
conn *nats.Conn
|
||||||
|
opts broker.Options
|
||||||
|
nopts nats.Options
|
||||||
|
drain bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type subscriber struct {
|
||||||
|
s *nats.Subscription
|
||||||
|
opts broker.SubscribeOptions
|
||||||
|
drain bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type publication struct {
|
||||||
|
t string
|
||||||
|
m *broker.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *publication) Topic() string {
|
||||||
|
return p.t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *publication) Message() *broker.Message {
|
||||||
|
return p.m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *publication) Ack() error {
|
||||||
|
// nats does not support acking
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Options() broker.SubscribeOptions {
|
||||||
|
return s.opts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Topic() string {
|
||||||
|
return s.s.Subject
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Unsubscribe() error {
|
||||||
|
if s.drain {
|
||||||
|
return s.s.Drain()
|
||||||
|
}
|
||||||
|
return s.s.Unsubscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *natsBroker) Address() string {
|
||||||
|
if n.conn != nil && n.conn.IsConnected() {
|
||||||
|
return n.conn.ConnectedUrl()
|
||||||
|
}
|
||||||
|
if len(n.addrs) > 0 {
|
||||||
|
return n.addrs[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func setAddrs(addrs []string) []string {
|
||||||
|
var cAddrs []string
|
||||||
|
for _, addr := range addrs {
|
||||||
|
if len(addr) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(addr, "nats://") {
|
||||||
|
addr = "nats://" + addr
|
||||||
|
}
|
||||||
|
cAddrs = append(cAddrs, addr)
|
||||||
|
}
|
||||||
|
if len(cAddrs) == 0 {
|
||||||
|
cAddrs = []string{nats.DefaultURL}
|
||||||
|
}
|
||||||
|
return cAddrs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *natsBroker) Connect() error {
|
||||||
|
n.Lock()
|
||||||
|
defer n.Unlock()
|
||||||
|
|
||||||
|
status := nats.CLOSED
|
||||||
|
if n.conn != nil {
|
||||||
|
status = n.conn.Status()
|
||||||
|
}
|
||||||
|
|
||||||
|
switch status {
|
||||||
|
case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING:
|
||||||
|
return nil
|
||||||
|
default: // DISCONNECTED or CLOSED or DRAINING
|
||||||
|
opts := n.nopts
|
||||||
|
opts.Servers = n.addrs
|
||||||
|
opts.Secure = n.opts.Secure
|
||||||
|
opts.TLSConfig = n.opts.TLSConfig
|
||||||
|
|
||||||
|
// secure might not be set
|
||||||
|
if n.opts.TLSConfig != nil {
|
||||||
|
opts.Secure = true
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := opts.Connect()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n.conn = c
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *natsBroker) Disconnect() error {
|
||||||
|
n.RLock()
|
||||||
|
if n.drain {
|
||||||
|
n.conn.Drain()
|
||||||
|
} else {
|
||||||
|
n.conn.Close()
|
||||||
|
}
|
||||||
|
n.RUnlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *natsBroker) Init(opts ...broker.Option) error {
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&n.opts)
|
||||||
|
}
|
||||||
|
n.addrs = setAddrs(n.opts.Addrs)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *natsBroker) Options() broker.Options {
|
||||||
|
return n.opts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *natsBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||||
|
b, err := n.opts.Codec.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n.RLock()
|
||||||
|
defer n.RUnlock()
|
||||||
|
return n.conn.Publish(topic, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
|
if n.conn == nil {
|
||||||
|
return nil, errors.New("not connected")
|
||||||
|
}
|
||||||
|
|
||||||
|
opt := broker.SubscribeOptions{
|
||||||
|
AutoAck: true,
|
||||||
|
Context: context.Background(),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&opt)
|
||||||
|
}
|
||||||
|
|
||||||
|
var drain bool
|
||||||
|
if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok {
|
||||||
|
drain = true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := func(msg *nats.Msg) {
|
||||||
|
var m broker.Message
|
||||||
|
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
handler(&publication{m: &m, t: msg.Subject})
|
||||||
|
}
|
||||||
|
|
||||||
|
var sub *nats.Subscription
|
||||||
|
var err error
|
||||||
|
|
||||||
|
n.RLock()
|
||||||
|
if len(opt.Queue) > 0 {
|
||||||
|
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
|
||||||
|
} else {
|
||||||
|
sub, err = n.conn.Subscribe(topic, fn)
|
||||||
|
}
|
||||||
|
n.RUnlock()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &subscriber{s: sub, opts: opt, drain: drain}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *natsBroker) String() string {
|
||||||
|
return "nats"
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBroker(opts ...broker.Option) broker.Broker {
|
||||||
|
options := broker.Options{
|
||||||
|
// Default codec
|
||||||
|
Codec: json.Marshaler{},
|
||||||
|
Context: context.Background(),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
|
||||||
|
natsOpts := nats.GetDefaultOptions()
|
||||||
|
if n, ok := options.Context.Value(optionsKey{}).(nats.Options); ok {
|
||||||
|
natsOpts = n
|
||||||
|
}
|
||||||
|
|
||||||
|
var drain bool
|
||||||
|
if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok {
|
||||||
|
drain = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// broker.Options have higher priority than nats.Options
|
||||||
|
// only if Addrs, Secure or TLSConfig were not set through a broker.Option
|
||||||
|
// we read them from nats.Option
|
||||||
|
if len(options.Addrs) == 0 {
|
||||||
|
options.Addrs = natsOpts.Servers
|
||||||
|
}
|
||||||
|
|
||||||
|
if !options.Secure {
|
||||||
|
options.Secure = natsOpts.Secure
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.TLSConfig == nil {
|
||||||
|
options.TLSConfig = natsOpts.TLSConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
return &natsBroker{
|
||||||
|
opts: options,
|
||||||
|
nopts: natsOpts,
|
||||||
|
addrs: setAddrs(options.Addrs),
|
||||||
|
drain: drain,
|
||||||
|
}
|
||||||
|
}
|
99
broker/nats/nats_test.go
Normal file
99
broker/nats/nats_test.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/broker"
|
||||||
|
nats "github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
var addrTestCases = []struct {
|
||||||
|
name string
|
||||||
|
description string
|
||||||
|
addrs map[string]string // expected address : set address
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"brokerOpts",
|
||||||
|
"set broker addresses through a broker.Option in constructor",
|
||||||
|
map[string]string{
|
||||||
|
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||||
|
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"brokerInit",
|
||||||
|
"set broker addresses through a broker.Option in broker.Init()",
|
||||||
|
map[string]string{
|
||||||
|
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||||
|
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"natsOpts",
|
||||||
|
"set broker addresses through the nats.Option in constructor",
|
||||||
|
map[string]string{
|
||||||
|
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||||
|
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"default",
|
||||||
|
"check if default Address is set correctly",
|
||||||
|
map[string]string{
|
||||||
|
"nats://127.0.0.1:4222": "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestInitAddrs tests issue #100. Ensures that if the addrs is set by an option in init it will be used.
|
||||||
|
func TestInitAddrs(t *testing.T) {
|
||||||
|
|
||||||
|
for _, tc := range addrTestCases {
|
||||||
|
t.Run(fmt.Sprintf("%s: %s", tc.name, tc.description), func(t *testing.T) {
|
||||||
|
|
||||||
|
var br broker.Broker
|
||||||
|
var addrs []string
|
||||||
|
|
||||||
|
for _, addr := range tc.addrs {
|
||||||
|
addrs = append(addrs, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch tc.name {
|
||||||
|
case "brokerOpts":
|
||||||
|
// we know that there are just two addrs in the dict
|
||||||
|
br = NewBroker(broker.Addrs(addrs[0], addrs[1]))
|
||||||
|
br.Init()
|
||||||
|
case "brokerInit":
|
||||||
|
br = NewBroker()
|
||||||
|
// we know that there are just two addrs in the dict
|
||||||
|
br.Init(broker.Addrs(addrs[0], addrs[1]))
|
||||||
|
case "natsOpts":
|
||||||
|
nopts := nats.GetDefaultOptions()
|
||||||
|
nopts.Servers = addrs
|
||||||
|
br = NewBroker(Options(nopts))
|
||||||
|
br.Init()
|
||||||
|
case "default":
|
||||||
|
br = NewBroker()
|
||||||
|
br.Init()
|
||||||
|
}
|
||||||
|
|
||||||
|
natsBroker, ok := br.(*natsBroker)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("Expected broker to be of types *natsBroker")
|
||||||
|
}
|
||||||
|
// check if the same amount of addrs we set has actually been set, default
|
||||||
|
// have only 1 address nats://127.0.0.1:4222 (current nats code) or
|
||||||
|
// nats://localhost:4222 (older code version)
|
||||||
|
if len(natsBroker.addrs) != len(tc.addrs) && tc.name != "default" {
|
||||||
|
t.Errorf("Expected Addr count = %d, Actual Addr count = %d",
|
||||||
|
len(natsBroker.addrs), len(tc.addrs))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, addr := range natsBroker.addrs {
|
||||||
|
_, ok := tc.addrs[addr]
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Expected '%s' has not been set", addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
25
broker/nats/options.go
Normal file
25
broker/nats/options.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package nats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/micro/go-micro/broker"
|
||||||
|
nats "github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type optionsKey struct{}
|
||||||
|
type drainConnectionKey struct{}
|
||||||
|
type drainSubscriptionKey struct{}
|
||||||
|
|
||||||
|
// Options accepts nats.Options
|
||||||
|
func Options(opts nats.Options) broker.Option {
|
||||||
|
return setBrokerOption(optionsKey{}, opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DrainConnection will drain subscription on close
|
||||||
|
func DrainConnection() broker.Option {
|
||||||
|
return setBrokerOption(drainConnectionKey{}, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DrainSubscription will drain pending messages when unsubscribe
|
||||||
|
func DrainSubscription() broker.SubscribeOption {
|
||||||
|
return setSubscribeOption(drainSubscriptionKey{}, true)
|
||||||
|
}
|
@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/micro/go-micro/broker"
|
"github.com/micro/go-micro/broker"
|
||||||
"github.com/micro/go-micro/broker/http"
|
"github.com/micro/go-micro/broker/http"
|
||||||
"github.com/micro/go-micro/broker/memory"
|
"github.com/micro/go-micro/broker/memory"
|
||||||
|
"github.com/micro/go-micro/broker/nats"
|
||||||
|
|
||||||
// registries
|
// registries
|
||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
@ -170,6 +171,7 @@ var (
|
|||||||
DefaultBrokers = map[string]func(...broker.Option) broker.Broker{
|
DefaultBrokers = map[string]func(...broker.Option) broker.Broker{
|
||||||
"http": http.NewBroker,
|
"http": http.NewBroker,
|
||||||
"memory": memory.NewBroker,
|
"memory": memory.NewBroker,
|
||||||
|
"nats": nats.NewBroker,
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultClients = map[string]func(...client.Option) client.Client{
|
DefaultClients = map[string]func(...client.Option) client.Client{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user