Embedded NATS Broker (#1110)
* if the address is produced by a default route don't hash it * embedded nats * fix url parsing * don't override help * add ready flag
This commit is contained in:
		| @@ -4,23 +4,44 @@ package nats | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"net" | ||||
| 	"net/url" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/micro/go-micro/broker" | ||||
| 	"github.com/micro/go-micro/codec/json" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| 	"github.com/micro/go-micro/util/addr" | ||||
| 	"github.com/micro/go-micro/util/log" | ||||
| 	"github.com/nats-io/nats-server/v2/server" | ||||
| 	nats "github.com/nats-io/nats.go" | ||||
| ) | ||||
|  | ||||
| type natsBroker struct { | ||||
| 	sync.Once | ||||
| 	sync.RWMutex | ||||
| 	addrs   []string | ||||
| 	conn    *nats.Conn | ||||
| 	opts    broker.Options | ||||
| 	nopts   nats.Options | ||||
|  | ||||
| 	// indicate if we're connected | ||||
| 	connected bool | ||||
|  | ||||
| 	addrs []string | ||||
| 	conn  *nats.Conn | ||||
| 	opts  broker.Options | ||||
| 	nopts nats.Options | ||||
|  | ||||
| 	// should we drain the connection | ||||
| 	drain   bool | ||||
| 	closeCh chan (error) | ||||
|  | ||||
| 	// embedded server | ||||
| 	server *server.Server | ||||
| 	// configure to use local server | ||||
| 	local bool | ||||
| 	// server exit channel | ||||
| 	exit chan bool | ||||
| } | ||||
|  | ||||
| type subscriber struct { | ||||
| @@ -62,6 +83,7 @@ func (n *natsBroker) Address() string { | ||||
| 	if n.conn != nil && n.conn.IsConnected() { | ||||
| 		return n.conn.ConnectedUrl() | ||||
| 	} | ||||
|  | ||||
| 	if len(n.addrs) > 0 { | ||||
| 		return n.addrs[0] | ||||
| 	} | ||||
| @@ -69,7 +91,7 @@ func (n *natsBroker) Address() string { | ||||
| 	return "" | ||||
| } | ||||
|  | ||||
| func setAddrs(addrs []string) []string { | ||||
| func (n *natsBroker) setAddrs(addrs []string) []string { | ||||
| 	//nolint:prealloc | ||||
| 	var cAddrs []string | ||||
| 	for _, addr := range addrs { | ||||
| @@ -81,16 +103,178 @@ func setAddrs(addrs []string) []string { | ||||
| 		} | ||||
| 		cAddrs = append(cAddrs, addr) | ||||
| 	} | ||||
| 	if len(cAddrs) == 0 { | ||||
| 	// if there's no address and we weren't told to | ||||
| 	// embed a local server then use the default url | ||||
| 	if len(cAddrs) == 0 && !n.local { | ||||
| 		cAddrs = []string{nats.DefaultURL} | ||||
| 	} | ||||
| 	return cAddrs | ||||
| } | ||||
|  | ||||
| // serve stats a local nats server if needed | ||||
| func (n *natsBroker) serve(exit chan bool) error { | ||||
| 	var host string | ||||
| 	var port int | ||||
| 	var local bool | ||||
|  | ||||
| 	// with no address we just default it | ||||
| 	// this is a local client address | ||||
| 	if len(n.addrs) == 0 || n.local { | ||||
| 		host = "127.0.0.1" | ||||
| 		port = -1 | ||||
| 		local = true | ||||
| 		// with a local address we parse it | ||||
| 	} else { | ||||
| 		address := n.addrs[0] | ||||
| 		if strings.HasPrefix(address, "nats://") { | ||||
| 			address = strings.TrimPrefix(address, "nats://") | ||||
| 		} | ||||
|  | ||||
| 		if addr.IsLocal(address) { | ||||
| 			h, p, err := net.SplitHostPort(address) | ||||
| 			if err == nil { | ||||
| 				host = h | ||||
| 				port, _ = strconv.Atoi(p) | ||||
| 				local = true | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// we only setup a server for local things | ||||
| 	if !local { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// 1. create new server | ||||
| 	// 2. register the server | ||||
| 	// 3. connect to other servers | ||||
| 	var cOpts server.ClusterOpts | ||||
| 	var routes []*url.URL | ||||
|  | ||||
| 	// get existing nats servers to connect to | ||||
| 	services, err := n.opts.Registry.GetService("go.micro.nats.broker") | ||||
| 	if err == nil { | ||||
| 		for _, service := range services { | ||||
| 			for _, node := range service.Nodes { | ||||
| 				u, err := url.Parse("nats://" + node.Address) | ||||
| 				if err != nil { | ||||
| 					log.Log(err) | ||||
| 					continue | ||||
| 				} | ||||
| 				// append to the cluster routes | ||||
| 				routes = append(routes, u) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// try get existing server | ||||
| 	s := n.server | ||||
|  | ||||
| 	// get a host address | ||||
| 	caddr, err := addr.Extract("") | ||||
| 	if err != nil { | ||||
| 		caddr = "0.0.0.0" | ||||
| 	} | ||||
|  | ||||
| 	// set cluster opts | ||||
| 	cOpts = server.ClusterOpts{ | ||||
| 		Host: caddr, | ||||
| 		Port: -1, | ||||
| 	} | ||||
|  | ||||
| 	if s == nil { | ||||
| 		var err error | ||||
| 		s, err = server.NewServer(&server.Options{ | ||||
| 			// Specify the host | ||||
| 			Host: host, | ||||
| 			// Use a random port | ||||
| 			Port: port, | ||||
| 			// Set the cluster ops | ||||
| 			Cluster: cOpts, | ||||
| 			// Set the routes | ||||
| 			Routes:         routes, | ||||
| 			NoLog:          true, | ||||
| 			NoSigs:         true, | ||||
| 			MaxControlLine: 2048, | ||||
| 			TLSConfig:      n.opts.TLSConfig, | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		// save the server | ||||
| 		n.server = s | ||||
| 	} | ||||
|  | ||||
| 	// start the server | ||||
| 	go s.Start() | ||||
|  | ||||
| 	var ready bool | ||||
|  | ||||
| 	// wait till its ready for connections | ||||
| 	for i := 0; i < 3; i++ { | ||||
| 		if s.ReadyForConnections(time.Second) { | ||||
| 			ready = true | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if !ready { | ||||
| 		return errors.New("server not ready") | ||||
| 	} | ||||
|  | ||||
| 	// set the client address | ||||
| 	n.addrs = []string{s.ClientURL()} | ||||
|  | ||||
| 	go func() { | ||||
| 		// register the cluster address | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-exit: | ||||
| 				// deregister on exit | ||||
| 				n.opts.Registry.Deregister(®istry.Service{ | ||||
| 					Name:    "go.micro.nats.broker", | ||||
| 					Version: "v2", | ||||
| 					Nodes: []*registry.Node{ | ||||
| 						{Id: s.ID(), Address: s.ClusterAddr().String()}, | ||||
| 					}, | ||||
| 				}) | ||||
| 				s.Shutdown() | ||||
| 				return | ||||
| 			default: | ||||
| 				// register the broker | ||||
| 				n.opts.Registry.Register(®istry.Service{ | ||||
| 					Name:    "go.micro.nats.broker", | ||||
| 					Version: "v2", | ||||
| 					Nodes: []*registry.Node{ | ||||
| 						{Id: s.ID(), Address: s.ClusterAddr().String()}, | ||||
| 					}, | ||||
| 				}, registry.RegisterTTL(time.Minute)) | ||||
| 				time.Sleep(time.Minute) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) Connect() error { | ||||
| 	n.Lock() | ||||
| 	defer n.Unlock() | ||||
|  | ||||
| 	if !n.connected { | ||||
| 		// create exit chan | ||||
| 		n.exit = make(chan bool) | ||||
|  | ||||
| 		// start the server if needed | ||||
| 		if err := n.serve(n.exit); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		// set to connected | ||||
| 		n.connected = true | ||||
| 	} | ||||
|  | ||||
| 	status := nats.CLOSED | ||||
| 	if n.conn != nil { | ||||
| 		status = n.conn.Status() | ||||
| @@ -122,11 +306,29 @@ func (n *natsBroker) Connect() error { | ||||
| func (n *natsBroker) Disconnect() error { | ||||
| 	n.RLock() | ||||
| 	defer n.RUnlock() | ||||
|  | ||||
| 	// drain the connection if specified | ||||
| 	if n.drain { | ||||
| 		n.conn.Drain() | ||||
| 		return <-n.closeCh | ||||
| 	} | ||||
|  | ||||
| 	// close the client connection | ||||
| 	n.conn.Close() | ||||
|  | ||||
| 	// shutdown the local server | ||||
| 	// and deregister | ||||
| 	if n.server != nil { | ||||
| 		select { | ||||
| 		case <-n.exit: | ||||
| 		default: | ||||
| 			close(n.exit) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// set not connected | ||||
| 	n.connected = false | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -191,21 +393,6 @@ func (n *natsBroker) String() string { | ||||
| 	return "nats" | ||||
| } | ||||
|  | ||||
| func NewBroker(opts ...broker.Option) broker.Broker { | ||||
| 	options := broker.Options{ | ||||
| 		// Default codec | ||||
| 		Codec:   json.Marshaler{}, | ||||
| 		Context: context.Background(), | ||||
| 	} | ||||
|  | ||||
| 	n := &natsBroker{ | ||||
| 		opts: options, | ||||
| 	} | ||||
| 	n.setOption(opts...) | ||||
|  | ||||
| 	return n | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) setOption(opts ...broker.Option) { | ||||
| 	for _, o := range opts { | ||||
| 		o(&n.opts) | ||||
| @@ -219,10 +406,15 @@ func (n *natsBroker) setOption(opts ...broker.Option) { | ||||
| 		n.nopts = nopts | ||||
| 	} | ||||
|  | ||||
| 	local, ok := n.opts.Context.Value(localServerKey{}).(bool) | ||||
| 	if ok { | ||||
| 		n.local = local | ||||
| 	} | ||||
|  | ||||
| 	// broker.Options have higher priority than nats.Options | ||||
| 	// only if Addrs, Secure or TLSConfig were not set through a broker.Option | ||||
| 	// we read them from nats.Option | ||||
| 	if len(n.opts.Addrs) == 0 { | ||||
| 	if len(n.opts.Addrs) == 0 && !n.local { | ||||
| 		n.opts.Addrs = n.nopts.Servers | ||||
| 	} | ||||
|  | ||||
| @@ -233,7 +425,7 @@ func (n *natsBroker) setOption(opts ...broker.Option) { | ||||
| 	if n.opts.TLSConfig == nil { | ||||
| 		n.opts.TLSConfig = n.nopts.TLSConfig | ||||
| 	} | ||||
| 	n.addrs = setAddrs(n.opts.Addrs) | ||||
| 	n.addrs = n.setAddrs(n.opts.Addrs) | ||||
|  | ||||
| 	if n.opts.Context.Value(drainConnectionKey{}) != nil { | ||||
| 		n.drain = true | ||||
| @@ -254,3 +446,19 @@ func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err e | ||||
| 		n.closeCh <- err | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func NewBroker(opts ...broker.Option) broker.Broker { | ||||
| 	options := broker.Options{ | ||||
| 		// Default codec | ||||
| 		Codec:    json.Marshaler{}, | ||||
| 		Context:  context.Background(), | ||||
| 		Registry: registry.DefaultRegistry, | ||||
| 	} | ||||
|  | ||||
| 	n := &natsBroker{ | ||||
| 		opts: options, | ||||
| 	} | ||||
| 	n.setOption(opts...) | ||||
|  | ||||
| 	return n | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user