diff --git a/broker/nats/nats.go b/broker/nats/nats.go index 01b59d18..b8bf6daf 100644 --- a/broker/nats/nats.go +++ b/broker/nats/nats.go @@ -4,19 +4,13 @@ package nats import ( "context" "errors" - "net" - "net/url" - "strconv" "strings" "sync" - "time" "github.com/micro/go-micro/v2/broker" "github.com/micro/go-micro/v2/codec/json" "github.com/micro/go-micro/v2/logger" "github.com/micro/go-micro/v2/registry" - "github.com/micro/go-micro/v2/util/addr" - "github.com/nats-io/nats-server/v2/server" nats "github.com/nats-io/nats.go" ) @@ -35,13 +29,6 @@ type natsBroker struct { // should we drain the connection drain bool closeCh chan (error) - - // embedded server - server *server.Server - // configure to use local server - local bool - // server exit channel - exit chan bool } type subscriber struct { @@ -108,186 +95,18 @@ func (n *natsBroker) setAddrs(addrs []string) []string { } cAddrs = append(cAddrs, addr) } - // if there's no address and we weren't told to - // embed a local server then use the default url - if len(cAddrs) == 0 && !n.local { + if len(cAddrs) == 0 { cAddrs = []string{nats.DefaultURL} } return cAddrs } -// serve stats a local nats server if needed -func (n *natsBroker) serve(exit chan bool) error { - var host string - var port int - var local bool - - // with no address we just default it - // this is a local client address - if len(n.addrs) == 0 { - // find an advertiseable ip - if h, err := addr.Extract(""); err != nil { - host = "127.0.0.1" - } else { - host = h - } - - port = -1 - local = true - } else { - address := n.addrs[0] - if strings.HasPrefix(address, "nats://") { - address = strings.TrimPrefix(address, "nats://") - } - - // check if its a local address and only then embed - if addr.IsLocal(address) { - h, p, err := net.SplitHostPort(address) - if err == nil { - host = h - port, _ = strconv.Atoi(p) - local = true - } - } - } - - // we only setup a server for local things - if !local { - return nil - } - - // 1. create new server - // 2. register the server - // 3. connect to other servers - var cOpts server.ClusterOpts - var routes []*url.URL - - // get existing nats servers to connect to - services, err := n.opts.Registry.GetService("go.micro.nats.broker") - if err == nil { - for _, service := range services { - for _, node := range service.Nodes { - u, err := url.Parse("nats://" + node.Address) - if err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) - } - continue - } - // append to the cluster routes - routes = append(routes, u) - } - } - } - - // try get existing server - s := n.server - - // get a host address - caddr, err := addr.Extract("") - if err != nil { - caddr = "0.0.0.0" - } - - // set cluster opts - cOpts = server.ClusterOpts{ - Host: caddr, - Port: -1, - } - - if s == nil { - var err error - s, err = server.NewServer(&server.Options{ - // Specify the host - Host: host, - // Use a random port - Port: port, - // Set the cluster ops - Cluster: cOpts, - // Set the routes - Routes: routes, - NoLog: true, - NoSigs: true, - MaxControlLine: 2048, - TLSConfig: n.opts.TLSConfig, - }) - if err != nil { - return err - } - - // save the server - n.server = s - } - - // start the server - go s.Start() - - var ready bool - - // wait till its ready for connections - for i := 0; i < 3; i++ { - if s.ReadyForConnections(time.Second) { - ready = true - break - } - } - - if !ready { - return errors.New("server not ready") - } - - // set the client address - n.addrs = []string{s.ClientURL()} - - go func() { - // register the cluster address - for { - select { - case <-exit: - // deregister on exit - n.opts.Registry.Deregister(®istry.Service{ - Name: "go.micro.nats.broker", - Version: "v2", - Nodes: []*registry.Node{ - {Id: s.ID(), Address: s.ClusterAddr().String()}, - }, - }) - s.Shutdown() - return - default: - // register the broker - n.opts.Registry.Register(®istry.Service{ - Name: "go.micro.nats.broker", - Version: "v2", - Nodes: []*registry.Node{ - {Id: s.ID(), Address: s.ClusterAddr().String()}, - }, - }, registry.RegisterTTL(time.Minute)) - time.Sleep(time.Minute) - } - } - }() - - return nil -} - func (n *natsBroker) Connect() error { n.Lock() defer n.Unlock() - if !n.connected { - // create exit chan - n.exit = make(chan bool) - - // start embedded server if asked to - if n.local { - if err := n.serve(n.exit); err != nil { - return err - } - } - - // set to connected - n.connected = true + if n.connected { + return nil } status := nats.CLOSED @@ -297,6 +116,7 @@ func (n *natsBroker) Connect() error { switch status { case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING: + n.connected = true return nil default: // DISCONNECTED or CLOSED or DRAINING opts := n.nopts @@ -314,13 +134,14 @@ func (n *natsBroker) Connect() error { return err } n.conn = c + n.connected = true return nil } } func (n *natsBroker) Disconnect() error { - n.RLock() - defer n.RUnlock() + n.Lock() + defer n.Unlock() // drain the connection if specified if n.drain { @@ -331,16 +152,6 @@ func (n *natsBroker) Disconnect() error { // close the client connection n.conn.Close() - // shutdown the local server - // and deregister - if n.server != nil { - select { - case <-n.exit: - default: - close(n.exit) - } - } - // set not connected n.connected = false @@ -357,19 +168,27 @@ func (n *natsBroker) Options() broker.Options { } func (n *natsBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { + n.RLock() + defer n.RUnlock() + + if n.conn == nil { + return errors.New("not connected") + } + b, err := n.opts.Codec.Marshal(msg) if err != nil { return err } - n.RLock() - defer n.RUnlock() return n.conn.Publish(topic, b) } func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + n.RLock() if n.conn == nil { + n.RUnlock() return nil, errors.New("not connected") } + n.RUnlock() opt := broker.SubscribeOptions{ AutoAck: true, @@ -441,15 +260,10 @@ func (n *natsBroker) setOption(opts ...broker.Option) { n.nopts = nopts } - local, ok := n.opts.Context.Value(localServerKey{}).(bool) - if ok { - n.local = local - } - // broker.Options have higher priority than nats.Options // only if Addrs, Secure or TLSConfig were not set through a broker.Option // we read them from nats.Option - if len(n.opts.Addrs) == 0 && !n.local { + if len(n.opts.Addrs) == 0 { n.opts.Addrs = n.nopts.Servers } diff --git a/broker/nats/options.go b/broker/nats/options.go index ba127e73..78708c0b 100644 --- a/broker/nats/options.go +++ b/broker/nats/options.go @@ -7,18 +7,12 @@ import ( type optionsKey struct{} type drainConnectionKey struct{} -type localServerKey struct{} // Options accepts nats.Options func Options(opts nats.Options) broker.Option { return setBrokerOption(optionsKey{}, opts) } -// LocalServer embeds a local server rather than connecting to one -func LocalServer() broker.Option { - return setBrokerOption(localServerKey{}, true) -} - // DrainConnection will drain subscription on close func DrainConnection() broker.Option { return setBrokerOption(drainConnectionKey{}, struct{}{}) diff --git a/go.mod b/go.mod index 8a445a11..ffed3e84 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( github.com/micro/cli/v2 v2.1.2 github.com/miekg/dns v1.1.27 github.com/mitchellh/hashstructure v1.0.0 - github.com/nats-io/nats-server/v2 v2.1.6 + github.com/nats-io/nats-server/v2 v2.1.6 // indirect github.com/nats-io/nats.go v1.9.2 github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c