initial import of stan - nats streaming broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
commit
c5fc37933f
37
context.go
Normal file
37
context.go
Normal file
@ -0,0 +1,37 @@
|
||||
package stan
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
)
|
||||
|
||||
// setSubscribeOption returns a function to setup a context with given value
|
||||
func setSubscribeOption(k, v interface{}) broker.SubscribeOption {
|
||||
return func(o *broker.SubscribeOptions) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// setBrokerOption returns a function to setup a context with given value
|
||||
func setBrokerOption(k, v interface{}) broker.Option {
|
||||
return func(o *broker.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// setPublishOption returns a function to setup a context with given value
|
||||
func setPublishOption(k, v interface{}) broker.PublishOption {
|
||||
return func(o *broker.PublishOptions) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, k, v)
|
||||
}
|
||||
}
|
42
options.go
Normal file
42
options.go
Normal file
@ -0,0 +1,42 @@
|
||||
package stan
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
stan "github.com/nats-io/go-nats-streaming"
|
||||
)
|
||||
|
||||
type optionsKey struct{}
|
||||
|
||||
// Options accepts stan.Options
|
||||
func Options(opts stan.Options) broker.Option {
|
||||
return setBrokerOption(optionsKey{}, opts)
|
||||
}
|
||||
|
||||
type clusterIDKey struct{}
|
||||
|
||||
// ClusterID specify cluster id to connect
|
||||
func ClusterID(clusterID string) broker.Option {
|
||||
return setBrokerOption(clusterIDKey{}, clusterID)
|
||||
}
|
||||
|
||||
type subscribeOptionKey struct{}
|
||||
|
||||
func SubscribeOption(opts ...stan.SubscriptionOption) broker.SubscribeOption {
|
||||
return setSubscribeOption(subscribeOptionKey{}, opts)
|
||||
}
|
||||
|
||||
type subscribeContextKey struct{}
|
||||
|
||||
// SubscribeContext set the context for broker.SubscribeOption
|
||||
func SubscribeContext(ctx context.Context) broker.SubscribeOption {
|
||||
return setSubscribeOption(subscribeContextKey{}, ctx)
|
||||
}
|
||||
|
||||
type successAutoAckKey struct{}
|
||||
|
||||
// SuccessAutoAck allow to AutoAck messages when handler returns without error
|
||||
func SuccessAutoAck() broker.SubscribeOption {
|
||||
return setSubscribeOption(successAutoAckKey{}, true)
|
||||
}
|
279
stan.go
Normal file
279
stan.go
Normal file
@ -0,0 +1,279 @@
|
||||
// Package stan provides a NATS Streaming broker
|
||||
package stan
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/cmd"
|
||||
"github.com/micro/go-micro/codec/json"
|
||||
stan "github.com/nats-io/go-nats-streaming"
|
||||
)
|
||||
|
||||
type stanBroker struct {
|
||||
sync.RWMutex
|
||||
addrs []string
|
||||
conn stan.Conn
|
||||
opts broker.Options
|
||||
nopts stan.Options
|
||||
}
|
||||
|
||||
type subscriber struct {
|
||||
t string
|
||||
s stan.Subscription
|
||||
dq bool
|
||||
opts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
type publication struct {
|
||||
t string
|
||||
msg *stan.Msg
|
||||
m *broker.Message
|
||||
}
|
||||
|
||||
func init() {
|
||||
cmd.DefaultBrokers["stan"] = NewBroker
|
||||
}
|
||||
|
||||
func (n *publication) Topic() string {
|
||||
return n.t
|
||||
}
|
||||
|
||||
func (n *publication) Message() *broker.Message {
|
||||
return n.m
|
||||
}
|
||||
|
||||
func (n *publication) Ack() error {
|
||||
return n.msg.Ack()
|
||||
}
|
||||
|
||||
func (n *subscriber) Options() broker.SubscribeOptions {
|
||||
return n.opts
|
||||
}
|
||||
|
||||
func (n *subscriber) Topic() string {
|
||||
return n.t
|
||||
}
|
||||
|
||||
func (n *subscriber) Unsubscribe() error {
|
||||
if n.s == nil {
|
||||
return nil
|
||||
}
|
||||
// go-micro server Unsubscribe can't handle durable queues, so close as stan suggested
|
||||
// from nats streaming readme:
|
||||
// When a client disconnects, the streaming server is not notified, hence the importance of calling Close()
|
||||
if !n.dq {
|
||||
err := n.s.Unsubscribe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return n.Close()
|
||||
}
|
||||
|
||||
func (n *subscriber) Close() error {
|
||||
if n.s != nil {
|
||||
return n.s.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *stanBroker) Address() string {
|
||||
// stan does not support connected server info
|
||||
if len(n.addrs) > 0 {
|
||||
return n.addrs[0]
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func setAddrs(addrs []string) []string {
|
||||
var cAddrs []string
|
||||
for _, addr := range addrs {
|
||||
if len(addr) == 0 {
|
||||
continue
|
||||
}
|
||||
if !strings.HasPrefix(addr, "nats://") {
|
||||
addr = "nats://" + addr
|
||||
}
|
||||
cAddrs = append(cAddrs, addr)
|
||||
}
|
||||
if len(cAddrs) == 0 {
|
||||
cAddrs = []string{stan.DefaultNatsURL}
|
||||
}
|
||||
return cAddrs
|
||||
}
|
||||
|
||||
func (n *stanBroker) Connect() error {
|
||||
n.RLock()
|
||||
if n.conn != nil {
|
||||
n.RUnlock()
|
||||
return nil
|
||||
}
|
||||
n.RUnlock()
|
||||
|
||||
opts := n.nopts
|
||||
opts.NatsURL = strings.Join(n.addrs, ",")
|
||||
|
||||
clusterID, ok := n.opts.Context.Value(clusterIDKey{}).(string)
|
||||
if !ok || len(clusterID) == 0 {
|
||||
return errors.New("must specify ClusterID Option")
|
||||
}
|
||||
|
||||
clientID := uuid.New().String()
|
||||
|
||||
nopts := []stan.Option{
|
||||
stan.NatsURL(opts.NatsURL),
|
||||
stan.NatsConn(opts.NatsConn),
|
||||
stan.ConnectWait(opts.ConnectTimeout),
|
||||
stan.PubAckWait(opts.AckTimeout),
|
||||
stan.MaxPubAcksInflight(opts.MaxPubAcksInflight),
|
||||
stan.Pings(opts.PingIterval, opts.PingMaxOut),
|
||||
stan.SetConnectionLostHandler(opts.ConnectionLostCB),
|
||||
}
|
||||
|
||||
c, err := stan.Connect(clusterID, clientID, nopts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.Lock()
|
||||
n.conn = c
|
||||
n.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *stanBroker) Disconnect() error {
|
||||
n.RLock()
|
||||
n.conn.Close()
|
||||
n.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *stanBroker) Init(opts ...broker.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&n.opts)
|
||||
}
|
||||
n.addrs = setAddrs(n.opts.Addrs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *stanBroker) Options() broker.Options {
|
||||
return n.opts
|
||||
}
|
||||
|
||||
func (n *stanBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||
b, err := n.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
return n.conn.Publish(topic, b)
|
||||
}
|
||||
|
||||
func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
var successAutoAck bool
|
||||
|
||||
opt := broker.SubscribeOptions{
|
||||
AutoAck: true,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
// Make sure context is setup
|
||||
if opt.Context == nil {
|
||||
opt.Context = context.Background()
|
||||
}
|
||||
|
||||
ctx := opt.Context
|
||||
if subscribeContext, ok := ctx.Value(subscribeContextKey{}).(context.Context); ok && subscribeContext != nil {
|
||||
ctx = subscribeContext
|
||||
}
|
||||
|
||||
var stanOpts []stan.SubscriptionOption
|
||||
if opt.AutoAck {
|
||||
stanOpts = append(stanOpts, stan.SetManualAckMode())
|
||||
}
|
||||
|
||||
if subOpts, ok := ctx.Value(subscribeOptionKey{}).([]stan.SubscriptionOption); ok && len(subOpts) > 0 {
|
||||
stanOpts = append(stanOpts, subOpts...)
|
||||
}
|
||||
|
||||
if bval, ok := ctx.Value(successAutoAckKey{}).(bool); ok && bval {
|
||||
stanOpts = append(stanOpts, stan.SetManualAckMode())
|
||||
successAutoAck = true
|
||||
}
|
||||
|
||||
bopts := stan.DefaultSubscriptionOptions
|
||||
for _, bopt := range stanOpts {
|
||||
if err := bopt(&bopts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
fn := func(msg *stan.Msg) {
|
||||
var m broker.Message
|
||||
var err error
|
||||
if err = n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
|
||||
return
|
||||
}
|
||||
err = handler(&publication{m: &m, msg: msg, t: msg.Subject})
|
||||
if err == nil && successAutoAck && bopts.ManualAcks {
|
||||
msg.Ack()
|
||||
}
|
||||
}
|
||||
|
||||
var sub stan.Subscription
|
||||
var err error
|
||||
|
||||
n.RLock()
|
||||
if len(opt.Queue) > 0 {
|
||||
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn, stanOpts...)
|
||||
} else {
|
||||
sub, err = n.conn.Subscribe(topic, fn, stanOpts...)
|
||||
}
|
||||
n.RUnlock()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &subscriber{dq: len(bopts.DurableName) > 0, s: sub, opts: opt, t: topic}, nil
|
||||
}
|
||||
|
||||
func (n *stanBroker) String() string {
|
||||
return "stan"
|
||||
}
|
||||
|
||||
func NewBroker(opts ...broker.Option) broker.Broker {
|
||||
options := broker.Options{
|
||||
// Default codec
|
||||
Codec: json.Marshaler{},
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
stanOpts := stan.DefaultOptions
|
||||
if n, ok := options.Context.Value(optionsKey{}).(stan.Options); ok {
|
||||
stanOpts = n
|
||||
}
|
||||
|
||||
if len(options.Addrs) == 0 {
|
||||
options.Addrs = strings.Split(stanOpts.NatsURL, ",")
|
||||
}
|
||||
|
||||
nb := &stanBroker{
|
||||
opts: options,
|
||||
nopts: stanOpts,
|
||||
addrs: setAddrs(options.Addrs),
|
||||
}
|
||||
|
||||
return nb
|
||||
}
|
97
stan_test.go
Normal file
97
stan_test.go
Normal file
@ -0,0 +1,97 @@
|
||||
package stan
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
stan "github.com/nats-io/go-nats-streaming"
|
||||
)
|
||||
|
||||
var addrTestCases = []struct {
|
||||
name string
|
||||
description string
|
||||
addrs map[string]string // expected address : set address
|
||||
}{
|
||||
{
|
||||
"brokerOpts",
|
||||
"set broker addresses through a broker.Option in constructor",
|
||||
map[string]string{
|
||||
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||
},
|
||||
{
|
||||
"brokerInit",
|
||||
"set broker addresses through a broker.Option in broker.Init()",
|
||||
map[string]string{
|
||||
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||
},
|
||||
{
|
||||
"natsOpts",
|
||||
"set broker addresses through the snats.Option in constructor",
|
||||
map[string]string{
|
||||
"nats://192.168.10.1:5222": "192.168.10.1:5222",
|
||||
"nats://10.20.10.0:4222": "10.20.10.0:4222"},
|
||||
},
|
||||
{
|
||||
"default",
|
||||
"check if default Address is set correctly",
|
||||
map[string]string{
|
||||
"nats://localhost:4222": ""},
|
||||
},
|
||||
}
|
||||
|
||||
// TestInitAddrs tests issue #100. Ensures that if the addrs is set by an option in init it will be used.
|
||||
func TestInitAddrs(t *testing.T) {
|
||||
|
||||
for _, tc := range addrTestCases {
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.name, tc.description), func(t *testing.T) {
|
||||
|
||||
var br broker.Broker
|
||||
var addrs []string
|
||||
|
||||
for _, addr := range tc.addrs {
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
|
||||
switch tc.name {
|
||||
case "brokerOpts":
|
||||
// we know that there are just two addrs in the dict
|
||||
br = NewBroker(broker.Addrs(addrs[0], addrs[1]))
|
||||
br.Init()
|
||||
case "brokerInit":
|
||||
br = NewBroker()
|
||||
// we know that there are just two addrs in the dict
|
||||
br.Init(broker.Addrs(addrs[0], addrs[1]))
|
||||
case "natsOpts":
|
||||
nopts := stan.DefaultOptions
|
||||
nopts.NatsURL = strings.Join(addrs, ",")
|
||||
br = NewBroker(Options(nopts))
|
||||
br.Init()
|
||||
case "default":
|
||||
br = NewBroker()
|
||||
br.Init()
|
||||
}
|
||||
|
||||
stanBroker, ok := br.(*stanBroker)
|
||||
if !ok {
|
||||
t.Fatal("Expected broker to be of types *stanBroker")
|
||||
}
|
||||
// check if the same amount of addrs we set has actually been set
|
||||
if len(stanBroker.addrs) != len(tc.addrs) {
|
||||
t.Errorf("Expected Addr count = %d, Actual Addr count = %d",
|
||||
len(stanBroker.addrs), len(tc.addrs))
|
||||
}
|
||||
|
||||
for _, addr := range stanBroker.addrs {
|
||||
_, ok := tc.addrs[addr]
|
||||
if !ok {
|
||||
t.Errorf("Expected '%s' has not been set", addr)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user