broker/nats: remove embed nats server reference (#1527)

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-04-11 22:37:29 +03:00 committed by GitHub
parent 0a2363b49b
commit 3ce2ab88f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 211 deletions

View File

@ -4,19 +4,13 @@ package nats
import (
"context"
"errors"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/codec/json"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/util/addr"
"github.com/nats-io/nats-server/v2/server"
nats "github.com/nats-io/nats.go"
)
@ -35,13 +29,6 @@ type natsBroker struct {
// should we drain the connection
drain bool
closeCh chan (error)
// embedded server
server *server.Server
// configure to use local server
local bool
// server exit channel
exit chan bool
}
type subscriber struct {
@ -108,186 +95,18 @@ func (n *natsBroker) setAddrs(addrs []string) []string {
}
cAddrs = append(cAddrs, addr)
}
// if there's no address and we weren't told to
// embed a local server then use the default url
if len(cAddrs) == 0 && !n.local {
if len(cAddrs) == 0 {
cAddrs = []string{nats.DefaultURL}
}
return cAddrs
}
// serve stats a local nats server if needed
func (n *natsBroker) serve(exit chan bool) error {
var host string
var port int
var local bool
// with no address we just default it
// this is a local client address
if len(n.addrs) == 0 {
// find an advertiseable ip
if h, err := addr.Extract(""); err != nil {
host = "127.0.0.1"
} else {
host = h
}
port = -1
local = true
} else {
address := n.addrs[0]
if strings.HasPrefix(address, "nats://") {
address = strings.TrimPrefix(address, "nats://")
}
// check if its a local address and only then embed
if addr.IsLocal(address) {
h, p, err := net.SplitHostPort(address)
if err == nil {
host = h
port, _ = strconv.Atoi(p)
local = true
}
}
}
// we only setup a server for local things
if !local {
return nil
}
// 1. create new server
// 2. register the server
// 3. connect to other servers
var cOpts server.ClusterOpts
var routes []*url.URL
// get existing nats servers to connect to
services, err := n.opts.Registry.GetService("go.micro.nats.broker")
if err == nil {
for _, service := range services {
for _, node := range service.Nodes {
u, err := url.Parse("nats://" + node.Address)
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
continue
}
// append to the cluster routes
routes = append(routes, u)
}
}
}
// try get existing server
s := n.server
// get a host address
caddr, err := addr.Extract("")
if err != nil {
caddr = "0.0.0.0"
}
// set cluster opts
cOpts = server.ClusterOpts{
Host: caddr,
Port: -1,
}
if s == nil {
var err error
s, err = server.NewServer(&server.Options{
// Specify the host
Host: host,
// Use a random port
Port: port,
// Set the cluster ops
Cluster: cOpts,
// Set the routes
Routes: routes,
NoLog: true,
NoSigs: true,
MaxControlLine: 2048,
TLSConfig: n.opts.TLSConfig,
})
if err != nil {
return err
}
// save the server
n.server = s
}
// start the server
go s.Start()
var ready bool
// wait till its ready for connections
for i := 0; i < 3; i++ {
if s.ReadyForConnections(time.Second) {
ready = true
break
}
}
if !ready {
return errors.New("server not ready")
}
// set the client address
n.addrs = []string{s.ClientURL()}
go func() {
// register the cluster address
for {
select {
case <-exit:
// deregister on exit
n.opts.Registry.Deregister(&registry.Service{
Name: "go.micro.nats.broker",
Version: "v2",
Nodes: []*registry.Node{
{Id: s.ID(), Address: s.ClusterAddr().String()},
},
})
s.Shutdown()
return
default:
// register the broker
n.opts.Registry.Register(&registry.Service{
Name: "go.micro.nats.broker",
Version: "v2",
Nodes: []*registry.Node{
{Id: s.ID(), Address: s.ClusterAddr().String()},
},
}, registry.RegisterTTL(time.Minute))
time.Sleep(time.Minute)
}
}
}()
return nil
}
func (n *natsBroker) Connect() error {
n.Lock()
defer n.Unlock()
if !n.connected {
// create exit chan
n.exit = make(chan bool)
// start embedded server if asked to
if n.local {
if err := n.serve(n.exit); err != nil {
return err
}
}
// set to connected
n.connected = true
if n.connected {
return nil
}
status := nats.CLOSED
@ -297,6 +116,7 @@ func (n *natsBroker) Connect() error {
switch status {
case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING:
n.connected = true
return nil
default: // DISCONNECTED or CLOSED or DRAINING
opts := n.nopts
@ -314,13 +134,14 @@ func (n *natsBroker) Connect() error {
return err
}
n.conn = c
n.connected = true
return nil
}
}
func (n *natsBroker) Disconnect() error {
n.RLock()
defer n.RUnlock()
n.Lock()
defer n.Unlock()
// drain the connection if specified
if n.drain {
@ -331,16 +152,6 @@ func (n *natsBroker) Disconnect() error {
// close the client connection
n.conn.Close()
// shutdown the local server
// and deregister
if n.server != nil {
select {
case <-n.exit:
default:
close(n.exit)
}
}
// set not connected
n.connected = false
@ -357,19 +168,27 @@ func (n *natsBroker) Options() broker.Options {
}
func (n *natsBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
n.RLock()
defer n.RUnlock()
if n.conn == nil {
return errors.New("not connected")
}
b, err := n.opts.Codec.Marshal(msg)
if err != nil {
return err
}
n.RLock()
defer n.RUnlock()
return n.conn.Publish(topic, b)
}
func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
n.RLock()
if n.conn == nil {
n.RUnlock()
return nil, errors.New("not connected")
}
n.RUnlock()
opt := broker.SubscribeOptions{
AutoAck: true,
@ -441,15 +260,10 @@ func (n *natsBroker) setOption(opts ...broker.Option) {
n.nopts = nopts
}
local, ok := n.opts.Context.Value(localServerKey{}).(bool)
if ok {
n.local = local
}
// broker.Options have higher priority than nats.Options
// only if Addrs, Secure or TLSConfig were not set through a broker.Option
// we read them from nats.Option
if len(n.opts.Addrs) == 0 && !n.local {
if len(n.opts.Addrs) == 0 {
n.opts.Addrs = n.nopts.Servers
}

View File

@ -7,18 +7,12 @@ import (
type optionsKey struct{}
type drainConnectionKey struct{}
type localServerKey struct{}
// Options accepts nats.Options
func Options(opts nats.Options) broker.Option {
return setBrokerOption(optionsKey{}, opts)
}
// LocalServer embeds a local server rather than connecting to one
func LocalServer() broker.Option {
return setBrokerOption(localServerKey{}, true)
}
// DrainConnection will drain subscription on close
func DrainConnection() broker.Option {
return setBrokerOption(drainConnectionKey{}, struct{}{})

2
go.mod
View File

@ -48,7 +48,7 @@ require (
github.com/micro/cli/v2 v2.1.2
github.com/miekg/dns v1.1.27
github.com/mitchellh/hashstructure v1.0.0
github.com/nats-io/nats-server/v2 v2.1.6
github.com/nats-io/nats-server/v2 v2.1.6 // indirect
github.com/nats-io/nats.go v1.9.2
github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c