diff --git a/broker/broker.go b/broker/broker.go index 2871e818..97845f12 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -38,13 +38,9 @@ type Subscriber interface { } var ( - DefaultBroker Broker = newHttpBroker() + DefaultBroker Broker = NewBroker() ) -func NewBroker(opts ...Option) Broker { - return newHttpBroker(opts...) -} - func Init(opts ...Option) error { return DefaultBroker.Init(opts...) } diff --git a/broker/common_test.go b/broker/common_test.go deleted file mode 100644 index c01d42c7..00000000 --- a/broker/common_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package broker - -import ( - "github.com/micro/go-micro/registry" -) - -var ( - // mock data - testData = map[string][]*registry.Service{ - "foo": { - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-1.0.0-123", - Address: "localhost:9999", - }, - { - Id: "foo-1.0.0-321", - Address: "localhost:9999", - }, - }, - }, - { - Name: "foo", - Version: "1.0.1", - Nodes: []*registry.Node{ - { - Id: "foo-1.0.1-321", - Address: "localhost:6666", - }, - }, - }, - { - Name: "foo", - Version: "1.0.3", - Nodes: []*registry.Node{ - { - Id: "foo-1.0.3-345", - Address: "localhost:8888", - }, - }, - }, - }, - } -) diff --git a/broker/default.go b/broker/default.go new file mode 100644 index 00000000..3f45c3ef --- /dev/null +++ b/broker/default.go @@ -0,0 +1,459 @@ +package broker + +import ( + "context" + "errors" + "net" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/micro/go-micro/codec/json" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/util/log" + "github.com/micro/go-micro/util/addr" + "github.com/nats-io/nats-server/v2/server" + nats "github.com/nats-io/nats.go" +) + +type natsBroker struct { + sync.Once + sync.RWMutex + + // indicate if we're connected + connected bool + + // address to bind routes to + addrs []string + // servers for the client + servers []string + + // client connection and nats opts + conn *nats.Conn + opts Options + nopts nats.Options + + // should we drain the connection + drain bool + closeCh chan (error) + + // embedded server + server *server.Server + // configure to use local server + local bool + // server exit channel + exit chan bool +} + +type subscriber struct { + s *nats.Subscription + opts SubscribeOptions +} + +type publication struct { + t string + m *Message +} + +func (p *publication) Topic() string { + return p.t +} + +func (p *publication) Message() *Message { + return p.m +} + +func (p *publication) Ack() error { + // nats does not support acking + return nil +} + +func (s *subscriber) Options() SubscribeOptions { + return s.opts +} + +func (s *subscriber) Topic() string { + return s.s.Subject +} + +func (s *subscriber) Unsubscribe() error { + return s.s.Unsubscribe() +} + +func (n *natsBroker) Address() string { + n.RLock() + defer n.RUnlock() + + if n.server != nil { + return n.server.ClusterAddr().String() + } + + if n.conn != nil && n.conn.IsConnected() { + return n.conn.ConnectedUrl() + } + + if len(n.addrs) > 0 { + return n.addrs[0] + } + + return "127.0.0.1:-1" +} + +func (n *natsBroker) setAddrs(addrs []string) []string { + //nolint:prealloc + var cAddrs []string + for _, addr := range addrs { + if len(addr) == 0 { + continue + } + if !strings.HasPrefix(addr, "nats://") { + addr = "nats://" + addr + } + cAddrs = append(cAddrs, addr) + } + // if there's no address and we weren't told to + // embed a local server then use the default url + if len(cAddrs) == 0 && !n.local { + cAddrs = []string{nats.DefaultURL} + } + return cAddrs +} + +// serve stats a local nats server if needed +func (n *natsBroker) serve(exit chan bool) error { + // local server address + host := "127.0.0.1" + port := -1 + + // cluster address + caddr := "0.0.0.0" + cport := -1 + + // with no address we just default it + // this is a local client address + if len(n.addrs) > 0 { + address := n.addrs[0] + if strings.HasPrefix(address, "nats://") { + address = strings.TrimPrefix(address, "nats://") + } + + // parse out the address + h, p, err := net.SplitHostPort(address) + if err == nil { + caddr = h + cport, _ = strconv.Atoi(p) + } + } + + // 1. create new server + // 2. register the server + // 3. connect to other servers + + // set cluster opts + cOpts := server.ClusterOpts{ + Host: caddr, + Port: cport, + } + + // get the routes for other nodes + var routes []*url.URL + + // get existing nats servers to connect to + services, err := n.opts.Registry.GetService("go.micro.nats.broker") + if err == nil { + for _, service := range services { + for _, node := range service.Nodes { + u, err := url.Parse("nats://" + node.Address) + if err != nil { + log.Log(err) + continue + } + // append to the cluster routes + routes = append(routes, u) + } + } + } + + // try get existing server + s := n.server + + if s != nil { + // stop the existing server + s.Shutdown() + } + + s, err = server.NewServer(&server.Options{ + // Specify the host + Host: host, + // Use a random port + Port: port, + // Set the cluster ops + Cluster: cOpts, + // Set the routes + Routes: routes, + NoLog: true, + NoSigs: true, + MaxControlLine: 2048, + TLSConfig: n.opts.TLSConfig, + }) + if err != nil { + return err + } + + // save the server + n.server = s + + // start the server + go s.Start() + + var ready bool + + // wait till its ready for connections + for i := 0; i < 3; i++ { + if s.ReadyForConnections(time.Second) { + ready = true + break + } + } + + if !ready { + return errors.New("server not ready") + } + + // set the client address + n.servers = []string{s.ClientURL()} + + go func() { + var advertise string + + // parse out the address + _, port, err := net.SplitHostPort(s.ClusterAddr().String()) + if err == nil { + addr, _ := addr.Extract("") + advertise = net.JoinHostPort(addr, port) + } else { + s.ClusterAddr().String() + } + + // register the cluster address + for { + select { + case <-exit: + // deregister on exit + n.opts.Registry.Deregister(®istry.Service{ + Name: "go.micro.nats.broker", + Version: "v2", + Nodes: []*registry.Node{ + {Id: s.ID(), Address: advertise}, + }, + }) + s.Shutdown() + return + default: + // register the broker + n.opts.Registry.Register(®istry.Service{ + Name: "go.micro.nats.broker", + Version: "v2", + Nodes: []*registry.Node{ + {Id: s.ID(), Address: advertise}, + }, + }, registry.RegisterTTL(time.Minute)) + time.Sleep(time.Minute) + } + } + }() + + return nil +} + +func (n *natsBroker) Connect() error { + n.Lock() + defer n.Unlock() + + if !n.connected { + // create exit chan + n.exit = make(chan bool) + + // start the local server + if err := n.serve(n.exit); err != nil { + return err + } + + // set to connected + n.connected = true + } + + status := nats.CLOSED + if n.conn != nil { + status = n.conn.Status() + } + + switch status { + case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING: + return nil + default: // DISCONNECTED or CLOSED or DRAINING + opts := n.nopts + opts.Servers = n.servers + opts.Secure = n.opts.Secure + opts.TLSConfig = n.opts.TLSConfig + + // secure might not be set + if n.opts.TLSConfig != nil { + opts.Secure = true + } + + c, err := opts.Connect() + if err != nil { + return err + } + n.conn = c + return nil + } +} + +func (n *natsBroker) Disconnect() error { + n.RLock() + defer n.RUnlock() + + if !n.connected { + return nil + } + + // drain the connection if specified + if n.drain { + n.conn.Drain() + return <-n.closeCh + } + + // close the client connection + n.conn.Close() + + // shutdown the local server + // and deregister + select { + case <-n.exit: + default: + close(n.exit) + } + + // set not connected + n.connected = false + + return nil +} + +func (n *natsBroker) Init(opts ...Option) error { + n.setOption(opts...) + return nil +} + +func (n *natsBroker) Options() Options { + return n.opts +} + +func (n *natsBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { + b, err := n.opts.Codec.Marshal(msg) + if err != nil { + return err + } + n.RLock() + defer n.RUnlock() + return n.conn.Publish(topic, b) +} + +func (n *natsBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { + if n.conn == nil { + return nil, errors.New("not connected") + } + + opt := SubscribeOptions{ + AutoAck: true, + Context: context.Background(), + } + + for _, o := range opts { + o(&opt) + } + + fn := func(msg *nats.Msg) { + var m Message + if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { + return + } + handler(&publication{m: &m, t: msg.Subject}) + } + + var sub *nats.Subscription + var err error + + n.RLock() + if len(opt.Queue) > 0 { + sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn) + } else { + sub, err = n.conn.Subscribe(topic, fn) + } + n.RUnlock() + if err != nil { + return nil, err + } + return &subscriber{s: sub, opts: opt}, nil +} + +func (n *natsBroker) String() string { + return "nats" +} + +func (n *natsBroker) setOption(opts ...Option) { + for _, o := range opts { + o(&n.opts) + } + + n.Once.Do(func() { + n.nopts = nats.GetDefaultOptions() + }) + + // local embedded server + n.local = true + // set to drain + n.drain = true + + if !n.opts.Secure { + n.opts.Secure = n.nopts.Secure + } + + if n.opts.TLSConfig == nil { + n.opts.TLSConfig = n.nopts.TLSConfig + } + + n.addrs = n.setAddrs(n.opts.Addrs) +} + +func (n *natsBroker) onClose(conn *nats.Conn) { + n.closeCh <- nil +} + +func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) { + // There are kinds of different async error nats might callback, but we are interested + // in ErrDrainTimeout only here. + if err == nats.ErrDrainTimeout { + n.closeCh <- err + } +} + +func NewBroker(opts ...Option) Broker { + options := Options{ + // Default codec + Codec: json.Marshaler{}, + Context: context.Background(), + Registry: registry.DefaultRegistry, + } + + n := &natsBroker{ + opts: options, + } + n.setOption(opts...) + + return n +} diff --git a/broker/http/http.go b/broker/http/http.go index 7d09986a..7ae7ed7f 100644 --- a/broker/http/http.go +++ b/broker/http/http.go @@ -2,10 +2,704 @@ package http import ( + "bytes" + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net" + "net/http" + "net/url" + "runtime" + "sync" + "time" + + "github.com/google/uuid" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec/json" + merr "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/registry/cache" + maddr "github.com/micro/go-micro/util/addr" + mnet "github.com/micro/go-micro/util/net" + mls "github.com/micro/go-micro/util/tls" + "golang.org/x/net/http2" ) +// HTTP Broker is a point to point async broker +type httpBroker struct { + id string + address string + opts broker.Options + + mux *http.ServeMux + + c *http.Client + r registry.Registry + + sync.RWMutex + subscribers map[string][]*httpSubscriber + running bool + exit chan chan error + + // offline message inbox + mtx sync.RWMutex + inbox map[string][][]byte +} + +type httpSubscriber struct { + opts broker.SubscribeOptions + id string + topic string + fn broker.Handler + svc *registry.Service + hb *httpBroker +} + +type httpEvent struct { + m *broker.Message + t string +} + +var ( + DefaultSubPath = "/_sub" + serviceName = "go.micro.http.broker" + broadcastVersion = "ff.http.broadcast" + registerTTL = time.Minute + registerInterval = time.Second * 30 +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +func newTransport(config *tls.Config) *http.Transport { + if config == nil { + config = &tls.Config{ + InsecureSkipVerify: true, + } + } + + dialTLS := func(network string, addr string) (net.Conn, error) { + return tls.Dial(network, addr, config) + } + + t := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + DialTLS: dialTLS, + } + runtime.SetFinalizer(&t, func(tr **http.Transport) { + (*tr).CloseIdleConnections() + }) + + // setup http2 + http2.ConfigureTransport(t) + + return t +} + +func newHttpBroker(opts ...broker.Option) broker.Broker { + options := broker.Options{ + Codec: json.Marshaler{}, + Context: context.TODO(), + Registry: registry.DefaultRegistry, + } + + for _, o := range opts { + o(&options) + } + + // set address + addr := ":0" + if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { + addr = options.Addrs[0] + } + + h := &httpBroker{ + id: uuid.New().String(), + address: addr, + opts: options, + r: options.Registry, + c: &http.Client{Transport: newTransport(options.TLSConfig)}, + subscribers: make(map[string][]*httpSubscriber), + exit: make(chan chan error), + mux: http.NewServeMux(), + inbox: make(map[string][][]byte), + } + + // specify the message handler + h.mux.Handle(DefaultSubPath, h) + + // get optional handlers + if h.opts.Context != nil { + handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler) + if ok { + for pattern, handler := range handlers { + h.mux.Handle(pattern, handler) + } + } + } + + return h +} + +func (h *httpEvent) Ack() error { + return nil +} + +func (h *httpEvent) Message() *broker.Message { + return h.m +} + +func (h *httpEvent) Topic() string { + return h.t +} + +func (h *httpSubscriber) Options() broker.SubscribeOptions { + return h.opts +} + +func (h *httpSubscriber) Topic() string { + return h.topic +} + +func (h *httpSubscriber) Unsubscribe() error { + return h.hb.unsubscribe(h) +} + +func (h *httpBroker) saveMessage(topic string, msg []byte) { + h.mtx.Lock() + defer h.mtx.Unlock() + + // get messages + c := h.inbox[topic] + + // save message + c = append(c, msg) + + // max length 64 + if len(c) > 64 { + c = c[:64] + } + + // save inbox + h.inbox[topic] = c +} + +func (h *httpBroker) getMessage(topic string, num int) [][]byte { + h.mtx.Lock() + defer h.mtx.Unlock() + + // get messages + c, ok := h.inbox[topic] + if !ok { + return nil + } + + // more message than requests + if len(c) >= num { + msg := c[:num] + h.inbox[topic] = c[num:] + return msg + } + + // reset inbox + h.inbox[topic] = nil + + // return all messages + return c +} + +func (h *httpBroker) subscribe(s *httpSubscriber) error { + h.Lock() + defer h.Unlock() + + if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { + return err + } + + h.subscribers[s.topic] = append(h.subscribers[s.topic], s) + return nil +} + +func (h *httpBroker) unsubscribe(s *httpSubscriber) error { + h.Lock() + defer h.Unlock() + + //nolint:prealloc + var subscribers []*httpSubscriber + + // look for subscriber + for _, sub := range h.subscribers[s.topic] { + // deregister and skip forward + if sub == s { + _ = h.r.Deregister(sub.svc) + continue + } + // keep subscriber + subscribers = append(subscribers, sub) + } + + // set subscribers + h.subscribers[s.topic] = subscribers + + return nil +} + +func (h *httpBroker) run(l net.Listener) { + t := time.NewTicker(registerInterval) + defer t.Stop() + + for { + select { + // heartbeat for each subscriber + case <-t.C: + h.RLock() + for _, subs := range h.subscribers { + for _, sub := range subs { + _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) + } + } + h.RUnlock() + // received exit signal + case ch := <-h.exit: + ch <- l.Close() + h.RLock() + for _, subs := range h.subscribers { + for _, sub := range subs { + _ = h.r.Deregister(sub.svc) + } + } + h.RUnlock() + return + } + } +} + +func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != "POST" { + err := merr.BadRequest("go.micro.broker", "Method not allowed") + http.Error(w, err.Error(), http.StatusMethodNotAllowed) + return + } + defer req.Body.Close() + + req.ParseForm() + + b, err := ioutil.ReadAll(req.Body) + if err != nil { + errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err) + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + var m *broker.Message + if err = h.opts.Codec.Unmarshal(b, &m); err != nil { + errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + topic := m.Header[":topic"] + delete(m.Header, ":topic") + + if len(topic) == 0 { + errr := merr.InternalServerError("go.micro.broker", "Topic not found") + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + p := &httpEvent{m: m, t: topic} + id := req.Form.Get("id") + + //nolint:prealloc + var subs []broker.Handler + + h.RLock() + for _, subscriber := range h.subscribers[topic] { + if id != subscriber.id { + continue + } + subs = append(subs, subscriber.fn) + } + h.RUnlock() + + // execute the handler + for _, fn := range subs { + fn(p) + } +} + +func (h *httpBroker) Address() string { + h.RLock() + defer h.RUnlock() + return h.address +} + +func (h *httpBroker) Connect() error { + h.RLock() + if h.running { + h.RUnlock() + return nil + } + h.RUnlock() + + h.Lock() + defer h.Unlock() + + var l net.Listener + var err error + + if h.opts.Secure || h.opts.TLSConfig != nil { + config := h.opts.TLSConfig + + fn := func(addr string) (net.Listener, error) { + if config == nil { + hosts := []string{addr} + + // check if its a valid host:port + if host, _, err := net.SplitHostPort(addr); err == nil { + if len(host) == 0 { + hosts = maddr.IPs() + } else { + hosts = []string{host} + } + } + + // generate a certificate + cert, err := mls.Certificate(hosts...) + if err != nil { + return nil, err + } + config = &tls.Config{Certificates: []tls.Certificate{cert}} + } + return tls.Listen("tcp", addr, config) + } + + l, err = mnet.Listen(h.address, fn) + } else { + fn := func(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) + } + + l, err = mnet.Listen(h.address, fn) + } + + if err != nil { + return err + } + + addr := h.address + h.address = l.Addr().String() + + go http.Serve(l, h.mux) + go func() { + h.run(l) + h.Lock() + h.opts.Addrs = []string{addr} + h.address = addr + h.Unlock() + }() + + // get registry + reg := h.opts.Registry + if reg == nil { + reg = registry.DefaultRegistry + } + // set cache + h.r = cache.New(reg) + + // set running + h.running = true + return nil +} + +func (h *httpBroker) Disconnect() error { + h.RLock() + if !h.running { + h.RUnlock() + return nil + } + h.RUnlock() + + h.Lock() + defer h.Unlock() + + // stop cache + rc, ok := h.r.(cache.Cache) + if ok { + rc.Stop() + } + + // exit and return err + ch := make(chan error) + h.exit <- ch + err := <-ch + + // set not running + h.running = false + return err +} + +func (h *httpBroker) Init(opts ...broker.Option) error { + h.RLock() + if h.running { + h.RUnlock() + return errors.New("cannot init while connected") + } + h.RUnlock() + + h.Lock() + defer h.Unlock() + + for _, o := range opts { + o(&h.opts) + } + + if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 { + h.address = h.opts.Addrs[0] + } + + if len(h.id) == 0 { + h.id = "go.micro.http.broker-" + uuid.New().String() + } + + // get registry + reg := h.opts.Registry + if reg == nil { + reg = registry.DefaultRegistry + } + + // get cache + if rc, ok := h.r.(cache.Cache); ok { + rc.Stop() + } + + // set registry + h.r = cache.New(reg) + + // reconfigure tls config + if c := h.opts.TLSConfig; c != nil { + h.c = &http.Client{ + Transport: newTransport(c), + } + } + + return nil +} + +func (h *httpBroker) Options() broker.Options { + return h.opts +} + +func (h *httpBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { + // create the message first + m := &broker.Message{ + Header: make(map[string]string), + Body: msg.Body, + } + + for k, v := range msg.Header { + m.Header[k] = v + } + + m.Header[":topic"] = topic + + // encode the message + b, err := h.opts.Codec.Marshal(m) + if err != nil { + return err + } + + // save the message + h.saveMessage(topic, b) + + // now attempt to get the service + h.RLock() + s, err := h.r.GetService(serviceName) + if err != nil { + h.RUnlock() + return err + } + h.RUnlock() + + pub := func(node *registry.Node, t string, b []byte) error { + scheme := "http" + + // check if secure is added in metadata + if node.Metadata["secure"] == "true" { + scheme = "https" + } + + vals := url.Values{} + vals.Add("id", node.Id) + + uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode()) + r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) + if err != nil { + return err + } + + // discard response body + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + return nil + } + + srv := func(s []*registry.Service, b []byte) { + for _, service := range s { + var nodes []*registry.Node + + for _, node := range service.Nodes { + // only use nodes tagged with broker http + if node.Metadata["broker"] != "http" { + continue + } + + // look for nodes for the topic + if node.Metadata["topic"] != topic { + continue + } + + nodes = append(nodes, node) + } + + // only process if we have nodes + if len(nodes) == 0 { + continue + } + + switch service.Version { + // broadcast version means broadcast to all nodes + case broadcastVersion: + var success bool + + // publish to all nodes + for _, node := range nodes { + // publish async + if err := pub(node, topic, b); err == nil { + success = true + } + } + + // save if it failed to publish at least once + if !success { + h.saveMessage(topic, b) + } + default: + // select node to publish to + node := nodes[rand.Int()%len(nodes)] + + // publish async to one node + if err := pub(node, topic, b); err != nil { + // if failed save it + h.saveMessage(topic, b) + } + } + } + } + + // do the rest async + go func() { + // get a third of the backlog + messages := h.getMessage(topic, 8) + delay := (len(messages) > 1) + + // publish all the messages + for _, msg := range messages { + // serialize here + srv(s, msg) + + // sending a backlog of messages + if delay { + time.Sleep(time.Millisecond * 100) + } + } + }() + + return nil +} + +func (h *httpBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + var err error + var host, port string + options := broker.NewSubscribeOptions(opts...) + + // parse address for host, port + host, port, err = net.SplitHostPort(h.Address()) + if err != nil { + return nil, err + } + + addr, err := maddr.Extract(host) + if err != nil { + return nil, err + } + + var secure bool + + if h.opts.Secure || h.opts.TLSConfig != nil { + secure = true + } + + // register service + node := ®istry.Node{ + Id: topic + "-" + h.id, + Address: mnet.HostPort(addr, port), + Metadata: map[string]string{ + "secure": fmt.Sprintf("%t", secure), + "broker": "http", + "topic": topic, + }, + } + + // check for queue group or broadcast queue + version := options.Queue + if len(version) == 0 { + version = broadcastVersion + } + + service := ®istry.Service{ + Name: serviceName, + Version: version, + Nodes: []*registry.Node{node}, + } + + // generate subscriber + subscriber := &httpSubscriber{ + opts: options, + hb: h, + id: node.Id, + topic: topic, + fn: handler, + svc: service, + } + + // subscribe now + if err := h.subscribe(subscriber); err != nil { + return nil, err + } + + // return the subscriber + return subscriber, nil +} + +func (h *httpBroker) String() string { + return "http" +} + // NewBroker returns a new http broker func NewBroker(opts ...broker.Option) broker.Broker { - return broker.NewBroker(opts...) + return newHttpBroker(opts...) } diff --git a/broker/http_broker_test.go b/broker/http/http_test.go similarity index 80% rename from broker/http_broker_test.go rename to broker/http/http_test.go index 3312a7f3..f3aec647 100644 --- a/broker/http_broker_test.go +++ b/broker/http/http_test.go @@ -1,4 +1,4 @@ -package broker +package http import ( "sync" @@ -6,12 +6,55 @@ import ( "time" "github.com/google/uuid" + "github.com/micro/go-micro/broker" "github.com/micro/go-micro/debug/log/noop" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry/memory" "github.com/micro/go-micro/util/log" ) +var ( + // mock data + testData = map[string][]*registry.Service{ + "foo": { + { + Name: "foo", + Version: "1.0.0", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.0-123", + Address: "localhost:9999", + }, + { + Id: "foo-1.0.0-321", + Address: "localhost:9999", + }, + }, + }, + { + Name: "foo", + Version: "1.0.1", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.1-321", + Address: "localhost:6666", + }, + }, + }, + { + Name: "foo", + Version: "1.0.3", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.3-345", + Address: "localhost:8888", + }, + }, + }, + }, + } +) + func newTestRegistry() registry.Registry { return memory.NewRegistry(memory.Services(testData)) } @@ -23,7 +66,7 @@ func sub(be *testing.B, c int) { be.StopTimer() m := newTestRegistry() - b := NewBroker(Registry(m)) + b := NewBroker(broker.Registry(m)) topic := uuid.New().String() if err := b.Init(); err != nil { @@ -34,18 +77,18 @@ func sub(be *testing.B, c int) { be.Fatalf("Unexpected connect error: %v", err) } - msg := &Message{ + msg := &broker.Message{ Header: map[string]string{ "Content-Type": "application/json", }, Body: []byte(`{"message": "Hello World"}`), } - var subs []Subscriber + var subs []broker.Subscriber done := make(chan bool, c) for i := 0; i < c; i++ { - sub, err := b.Subscribe(topic, func(p Event) error { + sub, err := b.Subscribe(topic, func(p broker.Event) error { done <- true m := p.Message() @@ -54,7 +97,7 @@ func sub(be *testing.B, c int) { } return nil - }, Queue("shared")) + }, broker.Queue("shared")) if err != nil { be.Fatalf("Unexpected subscribe error: %v", err) } @@ -85,7 +128,7 @@ func pub(be *testing.B, c int) { be.StopTimer() m := newTestRegistry() - b := NewBroker(Registry(m)) + b := NewBroker(broker.Registry(m)) topic := uuid.New().String() if err := b.Init(); err != nil { @@ -96,7 +139,7 @@ func pub(be *testing.B, c int) { be.Fatalf("Unexpected connect error: %v", err) } - msg := &Message{ + msg := &broker.Message{ Header: map[string]string{ "Content-Type": "application/json", }, @@ -105,14 +148,14 @@ func pub(be *testing.B, c int) { done := make(chan bool, c*4) - sub, err := b.Subscribe(topic, func(p Event) error { + sub, err := b.Subscribe(topic, func(p broker.Event) error { done <- true m := p.Message() if string(m.Body) != string(msg.Body) { be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) } return nil - }, Queue("shared")) + }, broker.Queue("shared")) if err != nil { be.Fatalf("Unexpected subscribe error: %v", err) } @@ -154,7 +197,7 @@ func pub(be *testing.B, c int) { func TestBroker(t *testing.T) { m := newTestRegistry() - b := NewBroker(Registry(m)) + b := NewBroker(broker.Registry(m)) if err := b.Init(); err != nil { t.Fatalf("Unexpected init error: %v", err) @@ -164,7 +207,7 @@ func TestBroker(t *testing.T) { t.Fatalf("Unexpected connect error: %v", err) } - msg := &Message{ + msg := &broker.Message{ Header: map[string]string{ "Content-Type": "application/json", }, @@ -173,7 +216,7 @@ func TestBroker(t *testing.T) { done := make(chan bool) - sub, err := b.Subscribe("test", func(p Event) error { + sub, err := b.Subscribe("test", func(p broker.Event) error { m := p.Message() if string(m.Body) != string(msg.Body) { @@ -201,7 +244,7 @@ func TestBroker(t *testing.T) { func TestConcurrentSubBroker(t *testing.T) { m := newTestRegistry() - b := NewBroker(Registry(m)) + b := NewBroker(broker.Registry(m)) if err := b.Init(); err != nil { t.Fatalf("Unexpected init error: %v", err) @@ -211,18 +254,18 @@ func TestConcurrentSubBroker(t *testing.T) { t.Fatalf("Unexpected connect error: %v", err) } - msg := &Message{ + msg := &broker.Message{ Header: map[string]string{ "Content-Type": "application/json", }, Body: []byte(`{"message": "Hello World"}`), } - var subs []Subscriber + var subs []broker.Subscriber var wg sync.WaitGroup for i := 0; i < 10; i++ { - sub, err := b.Subscribe("test", func(p Event) error { + sub, err := b.Subscribe("test", func(p broker.Event) error { defer wg.Done() m := p.Message() @@ -258,7 +301,7 @@ func TestConcurrentSubBroker(t *testing.T) { func TestConcurrentPubBroker(t *testing.T) { m := newTestRegistry() - b := NewBroker(Registry(m)) + b := NewBroker(broker.Registry(m)) if err := b.Init(); err != nil { t.Fatalf("Unexpected init error: %v", err) @@ -268,7 +311,7 @@ func TestConcurrentPubBroker(t *testing.T) { t.Fatalf("Unexpected connect error: %v", err) } - msg := &Message{ + msg := &broker.Message{ Header: map[string]string{ "Content-Type": "application/json", }, @@ -277,7 +320,7 @@ func TestConcurrentPubBroker(t *testing.T) { var wg sync.WaitGroup - sub, err := b.Subscribe("test", func(p Event) error { + sub, err := b.Subscribe("test", func(p broker.Event) error { defer wg.Done() m := p.Message() diff --git a/broker/http_broker.go b/broker/http_broker.go deleted file mode 100644 index 98430dae..00000000 --- a/broker/http_broker.go +++ /dev/null @@ -1,698 +0,0 @@ -package broker - -import ( - "bytes" - "context" - "crypto/tls" - "errors" - "fmt" - "io" - "io/ioutil" - "math/rand" - "net" - "net/http" - "net/url" - "runtime" - "sync" - "time" - - "github.com/google/uuid" - "github.com/micro/go-micro/codec/json" - merr "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/registry/cache" - maddr "github.com/micro/go-micro/util/addr" - mnet "github.com/micro/go-micro/util/net" - mls "github.com/micro/go-micro/util/tls" - "golang.org/x/net/http2" -) - -// HTTP Broker is a point to point async broker -type httpBroker struct { - id string - address string - opts Options - - mux *http.ServeMux - - c *http.Client - r registry.Registry - - sync.RWMutex - subscribers map[string][]*httpSubscriber - running bool - exit chan chan error - - // offline message inbox - mtx sync.RWMutex - inbox map[string][][]byte -} - -type httpSubscriber struct { - opts SubscribeOptions - id string - topic string - fn Handler - svc *registry.Service - hb *httpBroker -} - -type httpEvent struct { - m *Message - t string -} - -var ( - DefaultSubPath = "/_sub" - serviceName = "go.micro.http.broker" - broadcastVersion = "ff.http.broadcast" - registerTTL = time.Minute - registerInterval = time.Second * 30 -) - -func init() { - rand.Seed(time.Now().Unix()) -} - -func newTransport(config *tls.Config) *http.Transport { - if config == nil { - config = &tls.Config{ - InsecureSkipVerify: true, - } - } - - dialTLS := func(network string, addr string) (net.Conn, error) { - return tls.Dial(network, addr, config) - } - - t := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - DialTLS: dialTLS, - } - runtime.SetFinalizer(&t, func(tr **http.Transport) { - (*tr).CloseIdleConnections() - }) - - // setup http2 - http2.ConfigureTransport(t) - - return t -} - -func newHttpBroker(opts ...Option) Broker { - options := Options{ - Codec: json.Marshaler{}, - Context: context.TODO(), - Registry: registry.DefaultRegistry, - } - - for _, o := range opts { - o(&options) - } - - // set address - addr := ":0" - if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { - addr = options.Addrs[0] - } - - h := &httpBroker{ - id: uuid.New().String(), - address: addr, - opts: options, - r: options.Registry, - c: &http.Client{Transport: newTransport(options.TLSConfig)}, - subscribers: make(map[string][]*httpSubscriber), - exit: make(chan chan error), - mux: http.NewServeMux(), - inbox: make(map[string][][]byte), - } - - // specify the message handler - h.mux.Handle(DefaultSubPath, h) - - // get optional handlers - if h.opts.Context != nil { - handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler) - if ok { - for pattern, handler := range handlers { - h.mux.Handle(pattern, handler) - } - } - } - - return h -} - -func (h *httpEvent) Ack() error { - return nil -} - -func (h *httpEvent) Message() *Message { - return h.m -} - -func (h *httpEvent) Topic() string { - return h.t -} - -func (h *httpSubscriber) Options() SubscribeOptions { - return h.opts -} - -func (h *httpSubscriber) Topic() string { - return h.topic -} - -func (h *httpSubscriber) Unsubscribe() error { - return h.hb.unsubscribe(h) -} - -func (h *httpBroker) saveMessage(topic string, msg []byte) { - h.mtx.Lock() - defer h.mtx.Unlock() - - // get messages - c := h.inbox[topic] - - // save message - c = append(c, msg) - - // max length 64 - if len(c) > 64 { - c = c[:64] - } - - // save inbox - h.inbox[topic] = c -} - -func (h *httpBroker) getMessage(topic string, num int) [][]byte { - h.mtx.Lock() - defer h.mtx.Unlock() - - // get messages - c, ok := h.inbox[topic] - if !ok { - return nil - } - - // more message than requests - if len(c) >= num { - msg := c[:num] - h.inbox[topic] = c[num:] - return msg - } - - // reset inbox - h.inbox[topic] = nil - - // return all messages - return c -} - -func (h *httpBroker) subscribe(s *httpSubscriber) error { - h.Lock() - defer h.Unlock() - - if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { - return err - } - - h.subscribers[s.topic] = append(h.subscribers[s.topic], s) - return nil -} - -func (h *httpBroker) unsubscribe(s *httpSubscriber) error { - h.Lock() - defer h.Unlock() - - //nolint:prealloc - var subscribers []*httpSubscriber - - // look for subscriber - for _, sub := range h.subscribers[s.topic] { - // deregister and skip forward - if sub == s { - _ = h.r.Deregister(sub.svc) - continue - } - // keep subscriber - subscribers = append(subscribers, sub) - } - - // set subscribers - h.subscribers[s.topic] = subscribers - - return nil -} - -func (h *httpBroker) run(l net.Listener) { - t := time.NewTicker(registerInterval) - defer t.Stop() - - for { - select { - // heartbeat for each subscriber - case <-t.C: - h.RLock() - for _, subs := range h.subscribers { - for _, sub := range subs { - _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) - } - } - h.RUnlock() - // received exit signal - case ch := <-h.exit: - ch <- l.Close() - h.RLock() - for _, subs := range h.subscribers { - for _, sub := range subs { - _ = h.r.Deregister(sub.svc) - } - } - h.RUnlock() - return - } - } -} - -func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - err := merr.BadRequest("go.micro.broker", "Method not allowed") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - return - } - defer req.Body.Close() - - req.ParseForm() - - b, err := ioutil.ReadAll(req.Body) - if err != nil { - errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err) - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - var m *Message - if err = h.opts.Codec.Unmarshal(b, &m); err != nil { - errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - topic := m.Header[":topic"] - delete(m.Header, ":topic") - - if len(topic) == 0 { - errr := merr.InternalServerError("go.micro.broker", "Topic not found") - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - p := &httpEvent{m: m, t: topic} - id := req.Form.Get("id") - - //nolint:prealloc - var subs []Handler - - h.RLock() - for _, subscriber := range h.subscribers[topic] { - if id != subscriber.id { - continue - } - subs = append(subs, subscriber.fn) - } - h.RUnlock() - - // execute the handler - for _, fn := range subs { - fn(p) - } -} - -func (h *httpBroker) Address() string { - h.RLock() - defer h.RUnlock() - return h.address -} - -func (h *httpBroker) Connect() error { - h.RLock() - if h.running { - h.RUnlock() - return nil - } - h.RUnlock() - - h.Lock() - defer h.Unlock() - - var l net.Listener - var err error - - if h.opts.Secure || h.opts.TLSConfig != nil { - config := h.opts.TLSConfig - - fn := func(addr string) (net.Listener, error) { - if config == nil { - hosts := []string{addr} - - // check if its a valid host:port - if host, _, err := net.SplitHostPort(addr); err == nil { - if len(host) == 0 { - hosts = maddr.IPs() - } else { - hosts = []string{host} - } - } - - // generate a certificate - cert, err := mls.Certificate(hosts...) - if err != nil { - return nil, err - } - config = &tls.Config{Certificates: []tls.Certificate{cert}} - } - return tls.Listen("tcp", addr, config) - } - - l, err = mnet.Listen(h.address, fn) - } else { - fn := func(addr string) (net.Listener, error) { - return net.Listen("tcp", addr) - } - - l, err = mnet.Listen(h.address, fn) - } - - if err != nil { - return err - } - - addr := h.address - h.address = l.Addr().String() - - go http.Serve(l, h.mux) - go func() { - h.run(l) - h.Lock() - h.opts.Addrs = []string{addr} - h.address = addr - h.Unlock() - }() - - // get registry - reg, ok := h.opts.Context.Value(registryKey).(registry.Registry) - if !ok { - reg = registry.DefaultRegistry - } - // set cache - h.r = cache.New(reg) - - // set running - h.running = true - return nil -} - -func (h *httpBroker) Disconnect() error { - h.RLock() - if !h.running { - h.RUnlock() - return nil - } - h.RUnlock() - - h.Lock() - defer h.Unlock() - - // stop cache - rc, ok := h.r.(cache.Cache) - if ok { - rc.Stop() - } - - // exit and return err - ch := make(chan error) - h.exit <- ch - err := <-ch - - // set not running - h.running = false - return err -} - -func (h *httpBroker) Init(opts ...Option) error { - h.RLock() - if h.running { - h.RUnlock() - return errors.New("cannot init while connected") - } - h.RUnlock() - - h.Lock() - defer h.Unlock() - - for _, o := range opts { - o(&h.opts) - } - - if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 { - h.address = h.opts.Addrs[0] - } - - if len(h.id) == 0 { - h.id = "go.micro.http.broker-" + uuid.New().String() - } - - // get registry - reg, ok := h.opts.Context.Value(registryKey).(registry.Registry) - if !ok { - reg = registry.DefaultRegistry - } - - // get cache - if rc, ok := h.r.(cache.Cache); ok { - rc.Stop() - } - - // set registry - h.r = cache.New(reg) - - // reconfigure tls config - if c := h.opts.TLSConfig; c != nil { - h.c = &http.Client{ - Transport: newTransport(c), - } - } - - return nil -} - -func (h *httpBroker) Options() Options { - return h.opts -} - -func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { - // create the message first - m := &Message{ - Header: make(map[string]string), - Body: msg.Body, - } - - for k, v := range msg.Header { - m.Header[k] = v - } - - m.Header[":topic"] = topic - - // encode the message - b, err := h.opts.Codec.Marshal(m) - if err != nil { - return err - } - - // save the message - h.saveMessage(topic, b) - - // now attempt to get the service - h.RLock() - s, err := h.r.GetService(serviceName) - if err != nil { - h.RUnlock() - return err - } - h.RUnlock() - - pub := func(node *registry.Node, t string, b []byte) error { - scheme := "http" - - // check if secure is added in metadata - if node.Metadata["secure"] == "true" { - scheme = "https" - } - - vals := url.Values{} - vals.Add("id", node.Id) - - uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode()) - r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) - if err != nil { - return err - } - - // discard response body - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() - return nil - } - - srv := func(s []*registry.Service, b []byte) { - for _, service := range s { - var nodes []*registry.Node - - for _, node := range service.Nodes { - // only use nodes tagged with broker http - if node.Metadata["broker"] != "http" { - continue - } - - // look for nodes for the topic - if node.Metadata["topic"] != topic { - continue - } - - nodes = append(nodes, node) - } - - // only process if we have nodes - if len(nodes) == 0 { - continue - } - - switch service.Version { - // broadcast version means broadcast to all nodes - case broadcastVersion: - var success bool - - // publish to all nodes - for _, node := range nodes { - // publish async - if err := pub(node, topic, b); err == nil { - success = true - } - } - - // save if it failed to publish at least once - if !success { - h.saveMessage(topic, b) - } - default: - // select node to publish to - node := nodes[rand.Int()%len(nodes)] - - // publish async to one node - if err := pub(node, topic, b); err != nil { - // if failed save it - h.saveMessage(topic, b) - } - } - } - } - - // do the rest async - go func() { - // get a third of the backlog - messages := h.getMessage(topic, 8) - delay := (len(messages) > 1) - - // publish all the messages - for _, msg := range messages { - // serialize here - srv(s, msg) - - // sending a backlog of messages - if delay { - time.Sleep(time.Millisecond * 100) - } - } - }() - - return nil -} - -func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { - var err error - var host, port string - options := NewSubscribeOptions(opts...) - - // parse address for host, port - host, port, err = net.SplitHostPort(h.Address()) - if err != nil { - return nil, err - } - - addr, err := maddr.Extract(host) - if err != nil { - return nil, err - } - - var secure bool - - if h.opts.Secure || h.opts.TLSConfig != nil { - secure = true - } - - // register service - node := ®istry.Node{ - Id: topic + "-" + h.id, - Address: mnet.HostPort(addr, port), - Metadata: map[string]string{ - "secure": fmt.Sprintf("%t", secure), - "broker": "http", - "topic": topic, - }, - } - - // check for queue group or broadcast queue - version := options.Queue - if len(version) == 0 { - version = broadcastVersion - } - - service := ®istry.Service{ - Name: serviceName, - Version: version, - Nodes: []*registry.Node{node}, - } - - // generate subscriber - subscriber := &httpSubscriber{ - opts: options, - hb: h, - id: node.Id, - topic: topic, - fn: handler, - svc: service, - } - - // subscribe now - if err := h.subscribe(subscriber); err != nil { - return nil, err - } - - // return the subscriber - return subscriber, nil -} - -func (h *httpBroker) String() string { - return "http" -} diff --git a/broker/service/service.go b/broker/service/service.go index fff207fd..00d6d0b5 100644 --- a/broker/service/service.go +++ b/broker/service/service.go @@ -124,9 +124,11 @@ func NewBroker(opts ...broker.Option) broker.Broker { addrs = []string{"127.0.0.1:8001"} } + cli := client.DefaultClient + return &serviceBroker{ Addrs: addrs, - Client: pb.NewBrokerService(DefaultName, client.DefaultClient), + Client: pb.NewBrokerService(DefaultName, cli), options: options, } } diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index f553f44b..43269291 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -8,23 +8,30 @@ import ( "time" "github.com/micro/cli" + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/client/selector" + "github.com/micro/go-micro/registry" "github.com/micro/go-micro/client" + "github.com/micro/go-micro/server" + "github.com/micro/go-micro/store" + "github.com/micro/go-micro/util/log" + "github.com/micro/go-micro/runtime" + "github.com/micro/go-micro/transport" + + // clients cgrpc "github.com/micro/go-micro/client/grpc" cmucp "github.com/micro/go-micro/client/mucp" - "github.com/micro/go-micro/server" + + // servers sgrpc "github.com/micro/go-micro/server/grpc" smucp "github.com/micro/go-micro/server/mucp" - "github.com/micro/go-micro/util/log" // brokers - "github.com/micro/go-micro/broker" - "github.com/micro/go-micro/broker/http" "github.com/micro/go-micro/broker/memory" "github.com/micro/go-micro/broker/nats" brokerSrv "github.com/micro/go-micro/broker/service" // registries - "github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry/etcd" kreg "github.com/micro/go-micro/registry/kubernetes" "github.com/micro/go-micro/registry/mdns" @@ -32,27 +39,20 @@ import ( regSrv "github.com/micro/go-micro/registry/service" // selectors - "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/client/selector/dns" "github.com/micro/go-micro/client/selector/router" "github.com/micro/go-micro/client/selector/static" // transports - "github.com/micro/go-micro/transport" - tgrpc "github.com/micro/go-micro/transport/grpc" thttp "github.com/micro/go-micro/transport/http" tmem "github.com/micro/go-micro/transport/memory" - "github.com/micro/go-micro/transport/quic" // runtimes - "github.com/micro/go-micro/runtime" "github.com/micro/go-micro/runtime/kubernetes" // stores - "github.com/micro/go-micro/store" cfStore "github.com/micro/go-micro/store/cloudflare" ckStore "github.com/micro/go-micro/store/cockroach" - etcdStore "github.com/micro/go-micro/store/etcd" memStore "github.com/micro/go-micro/store/memory" svcStore "github.com/micro/go-micro/store/service" ) @@ -217,7 +217,6 @@ var ( DefaultBrokers = map[string]func(...broker.Option) broker.Broker{ "service": brokerSrv.NewBroker, - "http": http.NewBroker, "memory": memory.NewBroker, "nats": nats.NewBroker, } @@ -236,9 +235,7 @@ var ( } DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ - "default": selector.NewSelector, "dns": dns.NewSelector, - "cache": selector.NewSelector, "router": router.NewSelector, "static": static.NewSelector, } @@ -251,8 +248,6 @@ var ( DefaultTransports = map[string]func(...transport.Option) transport.Transport{ "memory": tmem.NewTransport, "http": thttp.NewTransport, - "grpc": tgrpc.NewTransport, - "quic": quic.NewTransport, } DefaultRuntimes = map[string]func(...runtime.Option) runtime.Runtime{ @@ -263,7 +258,6 @@ var ( DefaultStores = map[string]func(...store.Option) store.Store{ "memory": memStore.NewStore, "cockroach": ckStore.NewStore, - "etcd": etcdStore.NewStore, "cloudflare": cfStore.NewStore, "service": svcStore.NewStore, } @@ -271,7 +265,7 @@ var ( // used for default selection as the fall back defaultClient = "grpc" defaultServer = "grpc" - defaultBroker = "nats" + defaultBroker = "enats" defaultRegistry = "mdns" defaultSelector = "registry" defaultTransport = "http" @@ -558,8 +552,12 @@ func (c *cmd) Init(opts ...Option) error { for _, o := range opts { o(&c.opts) } - c.app.Name = c.opts.Name - c.app.Version = c.opts.Version + if len(c.opts.Name) > 0 { + c.app.Name = c.opts.Name + } + if len(c.opts.Version) > 0 { + c.app.Version = c.opts.Version + } c.app.HideVersion = len(c.opts.Version) == 0 c.app.Usage = c.opts.Description c.app.RunAndExitOnError() diff --git a/defaults.go b/defaults.go index 1a83bcf0..0159f19c 100644 --- a/defaults.go +++ b/defaults.go @@ -1,28 +1,17 @@ package micro import ( - "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" "github.com/micro/go-micro/server" "github.com/micro/go-micro/store" // set defaults - "github.com/micro/go-micro/broker/nats" gcli "github.com/micro/go-micro/client/grpc" gsrv "github.com/micro/go-micro/server/grpc" memStore "github.com/micro/go-micro/store/memory" ) func init() { - // default broker - broker.DefaultBroker = nats.NewBroker( - // embedded nats server - nats.LocalServer(), - ) - // new client initialisation - client.NewClient = gcli.NewClient - // new server initialisation - server.NewServer = gsrv.NewServer // default client client.DefaultClient = gcli.NewClient() // default server diff --git a/runtime/service/service.go b/runtime/service/service.go index 91e62df8..4270d5ea 100644 --- a/runtime/service/service.go +++ b/runtime/service/service.go @@ -15,25 +15,6 @@ type svc struct { runtime pb.RuntimeService } -// NewRuntime creates new service runtime and returns it -func NewRuntime(opts ...runtime.Option) runtime.Runtime { - // get default options - options := runtime.Options{} - - // apply requested options - for _, o := range opts { - o(&options) - } - - // create default client - cli := client.DefaultClient - - return &svc{ - options: options, - runtime: pb.NewRuntimeService(runtime.DefaultName, cli), - } -} - // Init initializes runtime with given options func (s *svc) Init(opts ...runtime.Option) error { s.Lock() @@ -183,3 +164,23 @@ func (s *svc) Stop() error { func (s *svc) String() string { return "service" } + +// NewRuntime creates new service runtime and returns it +func NewRuntime(opts ...runtime.Option) runtime.Runtime { + // get default options + options := runtime.Options{} + + // apply requested options + for _, o := range opts { + o(&options) + } + + // create default client + cli := client.DefaultClient + + return &svc{ + options: options, + runtime: pb.NewRuntimeService(runtime.DefaultName, cli), + } +} + diff --git a/service.go b/service.go index f6844e33..abbf23c5 100644 --- a/service.go +++ b/service.go @@ -76,6 +76,11 @@ func (s *service) Init(opts ...Option) { } } + // set cmd name + if len(s.opts.Cmd.App().Name) == 0 { + s.opts.Cmd.App().Name = s.Server().Options().Name + } + // Initialise the command flags, overriding new service _ = s.opts.Cmd.Init( cmd.Broker(&s.opts.Broker),