diff --git a/broker/http_broker.go b/broker/http_broker.go index 41d80f5d..98430dae 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -106,8 +106,9 @@ func newTransport(config *tls.Config) *http.Transport { func newHttpBroker(opts ...Option) Broker { options := Options{ - Codec: json.Marshaler{}, - Context: context.TODO(), + Codec: json.Marshaler{}, + Context: context.TODO(), + Registry: registry.DefaultRegistry, } for _, o := range opts { @@ -120,17 +121,11 @@ func newHttpBroker(opts ...Option) Broker { addr = options.Addrs[0] } - // get registry - reg, ok := options.Context.Value(registryKey).(registry.Registry) - if !ok { - reg = registry.DefaultRegistry - } - h := &httpBroker{ id: uuid.New().String(), address: addr, opts: options, - r: reg, + r: options.Registry, c: &http.Client{Transport: newTransport(options.TLSConfig)}, subscribers: make(map[string][]*httpSubscriber), exit: make(chan chan error), diff --git a/broker/nats/nats.go b/broker/nats/nats.go index a72f7682..775c92f0 100644 --- a/broker/nats/nats.go +++ b/broker/nats/nats.go @@ -4,23 +4,44 @@ package nats import ( "context" "errors" + "net" + "net/url" + "strconv" "strings" "sync" + "time" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec/json" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/util/addr" + "github.com/micro/go-micro/util/log" + "github.com/nats-io/nats-server/v2/server" nats "github.com/nats-io/nats.go" ) type natsBroker struct { sync.Once sync.RWMutex - addrs []string - conn *nats.Conn - opts broker.Options - nopts nats.Options + + // indicate if we're connected + connected bool + + addrs []string + conn *nats.Conn + opts broker.Options + nopts nats.Options + + // should we drain the connection drain bool closeCh chan (error) + + // embedded server + server *server.Server + // configure to use local server + local bool + // server exit channel + exit chan bool } type subscriber struct { @@ -62,6 +83,7 @@ func (n *natsBroker) Address() string { if n.conn != nil && n.conn.IsConnected() { return n.conn.ConnectedUrl() } + if len(n.addrs) > 0 { return n.addrs[0] } @@ -69,7 +91,7 @@ func (n *natsBroker) Address() string { return "" } -func setAddrs(addrs []string) []string { +func (n *natsBroker) setAddrs(addrs []string) []string { //nolint:prealloc var cAddrs []string for _, addr := range addrs { @@ -81,16 +103,178 @@ func setAddrs(addrs []string) []string { } cAddrs = append(cAddrs, addr) } - if len(cAddrs) == 0 { + // if there's no address and we weren't told to + // embed a local server then use the default url + if len(cAddrs) == 0 && !n.local { cAddrs = []string{nats.DefaultURL} } return cAddrs } +// serve stats a local nats server if needed +func (n *natsBroker) serve(exit chan bool) error { + var host string + var port int + var local bool + + // with no address we just default it + // this is a local client address + if len(n.addrs) == 0 || n.local { + host = "127.0.0.1" + port = -1 + local = true + // with a local address we parse it + } else { + address := n.addrs[0] + if strings.HasPrefix(address, "nats://") { + address = strings.TrimPrefix(address, "nats://") + } + + if addr.IsLocal(address) { + h, p, err := net.SplitHostPort(address) + if err == nil { + host = h + port, _ = strconv.Atoi(p) + local = true + } + } + } + + // we only setup a server for local things + if !local { + return nil + } + + // 1. create new server + // 2. register the server + // 3. connect to other servers + var cOpts server.ClusterOpts + var routes []*url.URL + + // get existing nats servers to connect to + services, err := n.opts.Registry.GetService("go.micro.nats.broker") + if err == nil { + for _, service := range services { + for _, node := range service.Nodes { + u, err := url.Parse("nats://" + node.Address) + if err != nil { + log.Log(err) + continue + } + // append to the cluster routes + routes = append(routes, u) + } + } + } + + // try get existing server + s := n.server + + // get a host address + caddr, err := addr.Extract("") + if err != nil { + caddr = "0.0.0.0" + } + + // set cluster opts + cOpts = server.ClusterOpts{ + Host: caddr, + Port: -1, + } + + if s == nil { + var err error + s, err = server.NewServer(&server.Options{ + // Specify the host + Host: host, + // Use a random port + Port: port, + // Set the cluster ops + Cluster: cOpts, + // Set the routes + Routes: routes, + NoLog: true, + NoSigs: true, + MaxControlLine: 2048, + TLSConfig: n.opts.TLSConfig, + }) + if err != nil { + return err + } + + // save the server + n.server = s + } + + // start the server + go s.Start() + + var ready bool + + // wait till its ready for connections + for i := 0; i < 3; i++ { + if s.ReadyForConnections(time.Second) { + ready = true + break + } + } + + if !ready { + return errors.New("server not ready") + } + + // set the client address + n.addrs = []string{s.ClientURL()} + + go func() { + // register the cluster address + for { + select { + case <-exit: + // deregister on exit + n.opts.Registry.Deregister(®istry.Service{ + Name: "go.micro.nats.broker", + Version: "v2", + Nodes: []*registry.Node{ + {Id: s.ID(), Address: s.ClusterAddr().String()}, + }, + }) + s.Shutdown() + return + default: + // register the broker + n.opts.Registry.Register(®istry.Service{ + Name: "go.micro.nats.broker", + Version: "v2", + Nodes: []*registry.Node{ + {Id: s.ID(), Address: s.ClusterAddr().String()}, + }, + }, registry.RegisterTTL(time.Minute)) + time.Sleep(time.Minute) + } + } + }() + + return nil +} + func (n *natsBroker) Connect() error { n.Lock() defer n.Unlock() + if !n.connected { + // create exit chan + n.exit = make(chan bool) + + // start the server if needed + if err := n.serve(n.exit); err != nil { + return err + } + + // set to connected + n.connected = true + } + status := nats.CLOSED if n.conn != nil { status = n.conn.Status() @@ -122,11 +306,29 @@ func (n *natsBroker) Connect() error { func (n *natsBroker) Disconnect() error { n.RLock() defer n.RUnlock() + + // drain the connection if specified if n.drain { n.conn.Drain() return <-n.closeCh } + + // close the client connection n.conn.Close() + + // shutdown the local server + // and deregister + if n.server != nil { + select { + case <-n.exit: + default: + close(n.exit) + } + } + + // set not connected + n.connected = false + return nil } @@ -191,21 +393,6 @@ func (n *natsBroker) String() string { return "nats" } -func NewBroker(opts ...broker.Option) broker.Broker { - options := broker.Options{ - // Default codec - Codec: json.Marshaler{}, - Context: context.Background(), - } - - n := &natsBroker{ - opts: options, - } - n.setOption(opts...) - - return n -} - func (n *natsBroker) setOption(opts ...broker.Option) { for _, o := range opts { o(&n.opts) @@ -219,10 +406,15 @@ func (n *natsBroker) setOption(opts ...broker.Option) { n.nopts = nopts } + local, ok := n.opts.Context.Value(localServerKey{}).(bool) + if ok { + n.local = local + } + // broker.Options have higher priority than nats.Options // only if Addrs, Secure or TLSConfig were not set through a broker.Option // we read them from nats.Option - if len(n.opts.Addrs) == 0 { + if len(n.opts.Addrs) == 0 && !n.local { n.opts.Addrs = n.nopts.Servers } @@ -233,7 +425,7 @@ func (n *natsBroker) setOption(opts ...broker.Option) { if n.opts.TLSConfig == nil { n.opts.TLSConfig = n.nopts.TLSConfig } - n.addrs = setAddrs(n.opts.Addrs) + n.addrs = n.setAddrs(n.opts.Addrs) if n.opts.Context.Value(drainConnectionKey{}) != nil { n.drain = true @@ -254,3 +446,19 @@ func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err e n.closeCh <- err } } + +func NewBroker(opts ...broker.Option) broker.Broker { + options := broker.Options{ + // Default codec + Codec: json.Marshaler{}, + Context: context.Background(), + Registry: registry.DefaultRegistry, + } + + n := &natsBroker{ + opts: options, + } + n.setOption(opts...) + + return n +} diff --git a/broker/nats/options.go b/broker/nats/options.go index b5b106c0..ae8ca80d 100644 --- a/broker/nats/options.go +++ b/broker/nats/options.go @@ -7,12 +7,18 @@ import ( type optionsKey struct{} type drainConnectionKey struct{} +type localServerKey struct{} // Options accepts nats.Options func Options(opts nats.Options) broker.Option { return setBrokerOption(optionsKey{}, opts) } +// LocalServer embeds a local server rather than connecting to one +func LocalServer() broker.Option { + return setBrokerOption(localServerKey{}, true) +} + // DrainConnection will drain subscription on close func DrainConnection() broker.Option { return setBrokerOption(drainConnectionKey{}, struct{}{}) diff --git a/broker/options.go b/broker/options.go index 9a60e150..f0877a27 100644 --- a/broker/options.go +++ b/broker/options.go @@ -13,6 +13,8 @@ type Options struct { Secure bool Codec codec.Marshaler TLSConfig *tls.Config + // Registry used for clustering + Registry registry.Registry // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -92,7 +94,7 @@ func Queue(name string) SubscribeOption { func Registry(r registry.Registry) Option { return func(o *Options) { - o.Context = context.WithValue(o.Context, registryKey, r) + o.Registry = r } } diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index 83dfcc46..f553f44b 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -3,9 +3,7 @@ package cmd import ( "fmt" - "io" "math/rand" - "os" "strings" "time" @@ -273,7 +271,7 @@ var ( // used for default selection as the fall back defaultClient = "grpc" defaultServer = "grpc" - defaultBroker = "http" + defaultBroker = "nats" defaultRegistry = "mdns" defaultSelector = "registry" defaultTransport = "http" @@ -283,11 +281,6 @@ var ( func init() { rand.Seed(time.Now().Unix()) - help := cli.HelpPrinter - cli.HelpPrinter = func(writer io.Writer, templ string, data interface{}) { - help(writer, templ, data) - os.Exit(0) - } } func newCmd(opts ...Option) Cmd { diff --git a/defaults.go b/defaults.go index 0159f19c..65844c23 100644 --- a/defaults.go +++ b/defaults.go @@ -1,17 +1,24 @@ package micro import ( + "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" "github.com/micro/go-micro/server" "github.com/micro/go-micro/store" // set defaults + "github.com/micro/go-micro/broker/nats" gcli "github.com/micro/go-micro/client/grpc" gsrv "github.com/micro/go-micro/server/grpc" memStore "github.com/micro/go-micro/store/memory" ) func init() { + // default broker + broker.DefaultBroker = nats.NewBroker( + // embedded nats server + nats.LocalServer(), + ) // default client client.DefaultClient = gcli.NewClient() // default server diff --git a/go.mod b/go.mod index 58b17fb5..8308c9d4 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,9 @@ require ( github.com/micro/mdns v0.3.0 github.com/miekg/dns v1.1.26 github.com/mitchellh/hashstructure v1.0.0 + github.com/nats-io/gnatsd v1.4.1 // indirect + github.com/nats-io/nats-server v1.4.1 + github.com/nats-io/nats-server/v2 v2.1.2 github.com/nats-io/nats.go v1.9.1 github.com/nlopes/slack v0.6.0 github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c diff --git a/go.sum b/go.sum index 71f5b50f..23a96d27 100644 --- a/go.sum +++ b/go.sum @@ -253,12 +253,22 @@ github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c h1:nXxl5PrvVm2L/wCy8d github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= +github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44= +github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= +github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= +github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= +github.com/nats-io/nats-server v1.4.1 h1:Ul1oSOGNV/L8kjr4v6l2f9Yet6WY+LevH1/7cRZ/qyA= +github.com/nats-io/nats-server v1.4.1/go.mod h1:c8f/fHd2B6Hgms3LtCaI7y6pC4WD1f4SUxcCud5vhBc= +github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDHZ2PmiIc= +github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= +github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= diff --git a/util/addr/addr.go b/util/addr/addr.go index eb0a5972..fafe8b56 100644 --- a/util/addr/addr.go +++ b/util/addr/addr.go @@ -27,6 +27,29 @@ func isPrivateIP(ipAddr string) bool { return false } +// IsLocal tells us whether an ip is local +func IsLocal(addr string) bool { + // extract the host + host, _, err := net.SplitHostPort(addr) + if err == nil { + addr = host + } + + // check if its localhost + if addr == "localhost" { + return true + } + + // check against all local ips + for _, ip := range IPs() { + if addr == ip { + return true + } + } + + return false +} + // Extract returns a real ip func Extract(addr string) (string, error) { // if addr specified then its returned diff --git a/util/addr/addr_test.go b/util/addr/addr_test.go index ad23b128..06aae9a6 100644 --- a/util/addr/addr_test.go +++ b/util/addr/addr_test.go @@ -5,6 +5,26 @@ import ( "testing" ) +func TestIsLocal(t *testing.T) { + testData := []struct { + addr string + expect bool + }{ + {"localhost", true}, + {"localhost:8080", true}, + {"127.0.0.1", true}, + {"127.0.0.1:1001", true}, + {"80.1.1.1", false}, + } + + for _, d := range testData { + res := IsLocal(d.addr) + if res != d.expect { + t.Fatalf("expected %t got %t", d.expect, res) + } + } +} + func TestExtractor(t *testing.T) { testData := []struct { addr string