From bc71640fd942f1e27e58e68997dc001b7b53a549 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 11 Apr 2020 03:46:54 +0300 Subject: [PATCH] broker: swap default broker from eats to http (#1524) * broker: swap default broker from eats to http Signed-off-by: Vasiliy Tolstov --- broker/default.go | 504 ----------------------------- broker/http.go | 709 +++++++++++++++++++++++++++++++++++++++++ broker/http/http.go | 11 + broker/http/options.go | 23 ++ broker/http_test.go | 384 ++++++++++++++++++++++ config/cmd/cmd.go | 2 + go.mod | 2 - go.sum | 12 - 8 files changed, 1129 insertions(+), 518 deletions(-) delete mode 100644 broker/default.go create mode 100644 broker/http.go create mode 100644 broker/http/http.go create mode 100644 broker/http/options.go create mode 100644 broker/http_test.go diff --git a/broker/default.go b/broker/default.go deleted file mode 100644 index ae6562ce..00000000 --- a/broker/default.go +++ /dev/null @@ -1,504 +0,0 @@ -package broker - -import ( - "context" - "errors" - "net" - "net/url" - "strconv" - "strings" - "sync" - "time" - - "github.com/micro/go-micro/v2/codec/json" - "github.com/micro/go-micro/v2/logger" - "github.com/micro/go-micro/v2/registry" - "github.com/micro/go-micro/v2/util/addr" - "github.com/nats-io/nats-server/v2/server" - nats "github.com/nats-io/nats.go" -) - -type natsBroker struct { - sync.Once - sync.RWMutex - - // indicate if we're connected - connected bool - - // address to bind routes to - addrs []string - // servers for the client - servers []string - - // client connection and nats opts - conn *nats.Conn - opts Options - nopts nats.Options - - // should we drain the connection - drain bool - closeCh chan (error) - - // embedded server - server *server.Server - // configure to use local server - local bool - // server exit channel - exit chan bool -} - -type subscriber struct { - s *nats.Subscription - opts SubscribeOptions -} - -type publication struct { - t string - err error - m *Message -} - -func (p *publication) Topic() string { - return p.t -} - -func (p *publication) Message() *Message { - return p.m -} - -func (p *publication) Ack() error { - // nats does not support acking - return nil -} - -func (p *publication) Error() error { - return p.err -} - -func (s *subscriber) Options() SubscribeOptions { - return s.opts -} - -func (s *subscriber) Topic() string { - return s.s.Subject -} - -func (s *subscriber) Unsubscribe() error { - return s.s.Unsubscribe() -} - -func (n *natsBroker) Address() string { - n.RLock() - defer n.RUnlock() - - if n.server != nil { - return n.server.ClusterAddr().String() - } - - if n.conn != nil && n.conn.IsConnected() { - return n.conn.ConnectedUrl() - } - - if len(n.addrs) > 0 { - return n.addrs[0] - } - - return "127.0.0.1:-1" -} - -func (n *natsBroker) setAddrs(addrs []string) []string { - //nolint:prealloc - var cAddrs []string - for _, addr := range addrs { - if len(addr) == 0 { - continue - } - if !strings.HasPrefix(addr, "nats://") { - addr = "nats://" + addr - } - cAddrs = append(cAddrs, addr) - } - // if there's no address and we weren't told to - // embed a local server then use the default url - if len(cAddrs) == 0 && !n.local { - cAddrs = []string{nats.DefaultURL} - } - return cAddrs -} - -// serve stats a local nats server if needed -func (n *natsBroker) serve(exit chan bool) error { - // local server address - host := "127.0.0.1" - port := -1 - - // cluster address - caddr := "0.0.0.0" - cport := -1 - - // with no address we just default it - // this is a local client address - if len(n.addrs) > 0 { - address := n.addrs[0] - if strings.HasPrefix(address, "nats://") { - address = strings.TrimPrefix(address, "nats://") - } - - // parse out the address - h, p, err := net.SplitHostPort(address) - if err == nil { - caddr = h - cport, _ = strconv.Atoi(p) - } - } - - // 1. create new server - // 2. register the server - // 3. connect to other servers - - // set cluster opts - cOpts := server.ClusterOpts{ - Host: caddr, - Port: cport, - } - - // get the routes for other nodes - var routes []*url.URL - - // get existing nats servers to connect to - services, err := n.opts.Registry.GetService("go.micro.nats.broker") - if err == nil { - for _, service := range services { - for _, node := range service.Nodes { - u, err := url.Parse("nats://" + node.Address) - if err != nil { - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Info(err) - } - continue - } - // append to the cluster routes - routes = append(routes, u) - } - } - } - - // try get existing server - s := n.server - - if s != nil { - // stop the existing server - s.Shutdown() - } - - s, err = server.NewServer(&server.Options{ - // Specify the host - Host: host, - // Use a random port - Port: port, - // Set the cluster ops - Cluster: cOpts, - // Set the routes - Routes: routes, - NoLog: true, - NoSigs: true, - MaxControlLine: 2048, - TLSConfig: n.opts.TLSConfig, - }) - if err != nil { - return err - } - - // save the server - n.server = s - - // start the server - go s.Start() - - var ready bool - - // wait till its ready for connections - for i := 0; i < 3; i++ { - if s.ReadyForConnections(time.Second) { - ready = true - break - } - } - - if !ready { - return errors.New("server not ready") - } - - // set the client address - n.servers = []string{s.ClientURL()} - - go func() { - var advertise string - - // parse out the address - _, port, err := net.SplitHostPort(s.ClusterAddr().String()) - if err == nil { - addr, _ := addr.Extract("") - advertise = net.JoinHostPort(addr, port) - } else { - s.ClusterAddr().String() - } - - // register the cluster address - for { - select { - case err := <-n.closeCh: - if err != nil { - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Info(err) - } - } - case <-exit: - // deregister on exit - n.opts.Registry.Deregister(®istry.Service{ - Name: "go.micro.nats.broker", - Version: "v2", - Nodes: []*registry.Node{ - {Id: s.ID(), Address: advertise}, - }, - }) - s.Shutdown() - return - default: - // register the broker - n.opts.Registry.Register(®istry.Service{ - Name: "go.micro.nats.broker", - Version: "v2", - Nodes: []*registry.Node{ - {Id: s.ID(), Address: advertise}, - }, - }, registry.RegisterTTL(time.Minute)) - time.Sleep(time.Minute) - } - } - }() - - return nil -} - -func (n *natsBroker) Connect() error { - n.Lock() - defer n.Unlock() - - if !n.connected { - // create exit chan - n.exit = make(chan bool) - - // start the local server - if err := n.serve(n.exit); err != nil { - return err - } - - // set to connected - } - - status := nats.CLOSED - if n.conn != nil { - status = n.conn.Status() - } - - switch status { - case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING: - return nil - default: // DISCONNECTED or CLOSED or DRAINING - opts := n.nopts - opts.DrainTimeout = 1 * time.Second - opts.AsyncErrorCB = n.onAsyncError - opts.DisconnectedErrCB = n.onDisconnectedError - opts.ClosedCB = n.onClose - opts.Servers = n.servers - opts.Secure = n.opts.Secure - opts.TLSConfig = n.opts.TLSConfig - - // secure might not be set - if n.opts.TLSConfig != nil { - opts.Secure = true - } - - c, err := opts.Connect() - if err != nil { - return err - } - n.conn = c - - n.connected = true - - return nil - } -} - -func (n *natsBroker) Disconnect() error { - n.RLock() - defer n.RUnlock() - - if !n.connected { - return nil - } - - // drain the connection if specified - if n.drain { - n.conn.Drain() - } - - // close the client connection - n.conn.Close() - - // shutdown the local server - // and deregister - if n.server != nil { - select { - case <-n.exit: - default: - close(n.exit) - } - } - - // set not connected - n.connected = false - - return nil -} - -func (n *natsBroker) Init(opts ...Option) error { - n.setOption(opts...) - return nil -} - -func (n *natsBroker) Options() Options { - return n.opts -} - -func (n *natsBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { - b, err := n.opts.Codec.Marshal(msg) - if err != nil { - return err - } - n.RLock() - defer n.RUnlock() - return n.conn.Publish(topic, b) -} - -func (n *natsBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { - if n.conn == nil { - return nil, errors.New("not connected") - } - - opt := SubscribeOptions{ - AutoAck: true, - Context: context.Background(), - } - - for _, o := range opts { - o(&opt) - } - - fn := func(msg *nats.Msg) { - var m Message - pub := &publication{t: msg.Subject} - eh := n.opts.ErrorHandler - err := n.opts.Codec.Unmarshal(msg.Data, &m) - pub.err = err - pub.m = &m - if err != nil { - m.Body = msg.Data - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) - } - if eh != nil { - eh(pub) - } - return - } - if err := handler(pub); err != nil { - pub.err = err - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) - } - if eh != nil { - eh(pub) - } - } - } - - var sub *nats.Subscription - var err error - - n.RLock() - if len(opt.Queue) > 0 { - sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn) - } else { - sub, err = n.conn.Subscribe(topic, fn) - } - n.RUnlock() - if err != nil { - return nil, err - } - return &subscriber{s: sub, opts: opt}, nil -} - -func (n *natsBroker) String() string { - return "eats" -} - -func (n *natsBroker) setOption(opts ...Option) { - for _, o := range opts { - o(&n.opts) - } - - n.Once.Do(func() { - n.nopts = nats.GetDefaultOptions() - }) - - // local embedded server - n.local = true - // set to drain - n.drain = true - - if !n.opts.Secure { - n.opts.Secure = n.nopts.Secure - } - - if n.opts.TLSConfig == nil { - n.opts.TLSConfig = n.nopts.TLSConfig - } - - n.addrs = n.setAddrs(n.opts.Addrs) -} - -func (n *natsBroker) onClose(conn *nats.Conn) { - n.closeCh <- nil -} - -func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) { - n.closeCh <- err -} - -func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) { - // There are kinds of different async error nats might callback, but we are interested - // in ErrDrainTimeout only here. - if err == nats.ErrDrainTimeout { - n.closeCh <- err - } -} - -func NewBroker(opts ...Option) Broker { - options := Options{ - // Default codec - Codec: json.Marshaler{}, - Context: context.Background(), - Registry: registry.DefaultRegistry, - } - - n := &natsBroker{ - opts: options, - closeCh: make(chan error), - } - n.setOption(opts...) - - return n -} diff --git a/broker/http.go b/broker/http.go new file mode 100644 index 00000000..a4a90698 --- /dev/null +++ b/broker/http.go @@ -0,0 +1,709 @@ +// Package http provides a http based message broker +package broker + +import ( + "bytes" + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net" + "net/http" + "net/url" + "runtime" + "sync" + "time" + + "github.com/google/uuid" + "github.com/micro/go-micro/v2/codec/json" + merr "github.com/micro/go-micro/v2/errors" + "github.com/micro/go-micro/v2/registry" + "github.com/micro/go-micro/v2/registry/cache" + maddr "github.com/micro/go-micro/v2/util/addr" + mnet "github.com/micro/go-micro/v2/util/net" + mls "github.com/micro/go-micro/v2/util/tls" + "golang.org/x/net/http2" +) + +// HTTP Broker is a point to point async broker +type httpBroker struct { + id string + address string + opts Options + + mux *http.ServeMux + + c *http.Client + r registry.Registry + + sync.RWMutex + subscribers map[string][]*httpSubscriber + running bool + exit chan chan error + + // offline message inbox + mtx sync.RWMutex + inbox map[string][][]byte +} + +type httpSubscriber struct { + opts SubscribeOptions + id string + topic string + fn Handler + svc *registry.Service + hb *httpBroker +} + +type httpEvent struct { + m *Message + t string + err error +} + +var ( + DefaultSubPath = "/" + serviceName = "micro.http.broker" + broadcastVersion = "ff.http.broadcast" + registerTTL = time.Minute + registerInterval = time.Second * 30 +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +func newTransport(config *tls.Config) *http.Transport { + if config == nil { + config = &tls.Config{ + InsecureSkipVerify: true, + } + } + + dialTLS := func(network string, addr string) (net.Conn, error) { + return tls.Dial(network, addr, config) + } + + t := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + DialTLS: dialTLS, + } + runtime.SetFinalizer(&t, func(tr **http.Transport) { + (*tr).CloseIdleConnections() + }) + + // setup http2 + http2.ConfigureTransport(t) + + return t +} + +func newHttpBroker(opts ...Option) Broker { + options := Options{ + Codec: json.Marshaler{}, + Context: context.TODO(), + Registry: registry.DefaultRegistry, + } + + for _, o := range opts { + o(&options) + } + + // set address + addr := ":0" + if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { + addr = options.Addrs[0] + } + + h := &httpBroker{ + id: uuid.New().String(), + address: addr, + opts: options, + r: options.Registry, + c: &http.Client{Transport: newTransport(options.TLSConfig)}, + subscribers: make(map[string][]*httpSubscriber), + exit: make(chan chan error), + mux: http.NewServeMux(), + inbox: make(map[string][][]byte), + } + + // specify the message handler + h.mux.Handle(DefaultSubPath, h) + + // get optional handlers + if h.opts.Context != nil { + handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler) + if ok { + for pattern, handler := range handlers { + h.mux.Handle(pattern, handler) + } + } + } + + return h +} + +func (h *httpEvent) Ack() error { + return nil +} + +func (h *httpEvent) Error() error { + return h.err +} + +func (h *httpEvent) Message() *Message { + return h.m +} + +func (h *httpEvent) Topic() string { + return h.t +} + +func (h *httpSubscriber) Options() SubscribeOptions { + return h.opts +} + +func (h *httpSubscriber) Topic() string { + return h.topic +} + +func (h *httpSubscriber) Unsubscribe() error { + return h.hb.unsubscribe(h) +} + +func (h *httpBroker) saveMessage(topic string, msg []byte) { + h.mtx.Lock() + defer h.mtx.Unlock() + + // get messages + c := h.inbox[topic] + + // save message + c = append(c, msg) + + // max length 64 + if len(c) > 64 { + c = c[:64] + } + + // save inbox + h.inbox[topic] = c +} + +func (h *httpBroker) getMessage(topic string, num int) [][]byte { + h.mtx.Lock() + defer h.mtx.Unlock() + + // get messages + c, ok := h.inbox[topic] + if !ok { + return nil + } + + // more message than requests + if len(c) >= num { + msg := c[:num] + h.inbox[topic] = c[num:] + return msg + } + + // reset inbox + h.inbox[topic] = nil + + // return all messages + return c +} + +func (h *httpBroker) subscribe(s *httpSubscriber) error { + h.Lock() + defer h.Unlock() + + if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { + return err + } + + h.subscribers[s.topic] = append(h.subscribers[s.topic], s) + return nil +} + +func (h *httpBroker) unsubscribe(s *httpSubscriber) error { + h.Lock() + defer h.Unlock() + + //nolint:prealloc + var subscribers []*httpSubscriber + + // look for subscriber + for _, sub := range h.subscribers[s.topic] { + // deregister and skip forward + if sub == s { + _ = h.r.Deregister(sub.svc) + continue + } + // keep subscriber + subscribers = append(subscribers, sub) + } + + // set subscribers + h.subscribers[s.topic] = subscribers + + return nil +} + +func (h *httpBroker) run(l net.Listener) { + t := time.NewTicker(registerInterval) + defer t.Stop() + + for { + select { + // heartbeat for each subscriber + case <-t.C: + h.RLock() + for _, subs := range h.subscribers { + for _, sub := range subs { + _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) + } + } + h.RUnlock() + // received exit signal + case ch := <-h.exit: + ch <- l.Close() + h.RLock() + for _, subs := range h.subscribers { + for _, sub := range subs { + _ = h.r.Deregister(sub.svc) + } + } + h.RUnlock() + return + } + } +} + +func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != "POST" { + err := merr.BadRequest("go.micro.broker", "Method not allowed") + http.Error(w, err.Error(), http.StatusMethodNotAllowed) + return + } + defer req.Body.Close() + + req.ParseForm() + + b, err := ioutil.ReadAll(req.Body) + if err != nil { + errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err) + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + var m *Message + if err = h.opts.Codec.Unmarshal(b, &m); err != nil { + errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + topic := m.Header["Micro-Topic"] + //delete(m.Header, ":topic") + + if len(topic) == 0 { + errr := merr.InternalServerError("go.micro.broker", "Topic not found") + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + p := &httpEvent{m: m, t: topic} + id := req.Form.Get("id") + + //nolint:prealloc + var subs []Handler + + h.RLock() + for _, subscriber := range h.subscribers[topic] { + if id != subscriber.id { + continue + } + subs = append(subs, subscriber.fn) + } + h.RUnlock() + + // execute the handler + for _, fn := range subs { + p.err = fn(p) + } +} + +func (h *httpBroker) Address() string { + h.RLock() + defer h.RUnlock() + return h.address +} + +func (h *httpBroker) Connect() error { + h.RLock() + if h.running { + h.RUnlock() + return nil + } + h.RUnlock() + + h.Lock() + defer h.Unlock() + + var l net.Listener + var err error + + if h.opts.Secure || h.opts.TLSConfig != nil { + config := h.opts.TLSConfig + + fn := func(addr string) (net.Listener, error) { + if config == nil { + hosts := []string{addr} + + // check if its a valid host:port + if host, _, err := net.SplitHostPort(addr); err == nil { + if len(host) == 0 { + hosts = maddr.IPs() + } else { + hosts = []string{host} + } + } + + // generate a certificate + cert, err := mls.Certificate(hosts...) + if err != nil { + return nil, err + } + config = &tls.Config{Certificates: []tls.Certificate{cert}} + } + return tls.Listen("tcp", addr, config) + } + + l, err = mnet.Listen(h.address, fn) + } else { + fn := func(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) + } + + l, err = mnet.Listen(h.address, fn) + } + + if err != nil { + return err + } + + addr := h.address + h.address = l.Addr().String() + + go http.Serve(l, h.mux) + go func() { + h.run(l) + h.Lock() + h.opts.Addrs = []string{addr} + h.address = addr + h.Unlock() + }() + + // get registry + reg := h.opts.Registry + if reg == nil { + reg = registry.DefaultRegistry + } + // set cache + h.r = cache.New(reg) + + // set running + h.running = true + return nil +} + +func (h *httpBroker) Disconnect() error { + h.RLock() + if !h.running { + h.RUnlock() + return nil + } + h.RUnlock() + + h.Lock() + defer h.Unlock() + + // stop cache + rc, ok := h.r.(cache.Cache) + if ok { + rc.Stop() + } + + // exit and return err + ch := make(chan error) + h.exit <- ch + err := <-ch + + // set not running + h.running = false + return err +} + +func (h *httpBroker) Init(opts ...Option) error { + h.RLock() + if h.running { + h.RUnlock() + return errors.New("cannot init while connected") + } + h.RUnlock() + + h.Lock() + defer h.Unlock() + + for _, o := range opts { + o(&h.opts) + } + + if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 { + h.address = h.opts.Addrs[0] + } + + if len(h.id) == 0 { + h.id = "go.micro.http.broker-" + uuid.New().String() + } + + // get registry + reg := h.opts.Registry + if reg == nil { + reg = registry.DefaultRegistry + } + + // get cache + if rc, ok := h.r.(cache.Cache); ok { + rc.Stop() + } + + // set registry + h.r = cache.New(reg) + + // reconfigure tls config + if c := h.opts.TLSConfig; c != nil { + h.c = &http.Client{ + Transport: newTransport(c), + } + } + + return nil +} + +func (h *httpBroker) Options() Options { + return h.opts +} + +func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { + // create the message first + m := &Message{ + Header: make(map[string]string), + Body: msg.Body, + } + + for k, v := range msg.Header { + m.Header[k] = v + } + + m.Header["Micro-Topic"] = topic + + // encode the message + b, err := h.opts.Codec.Marshal(m) + if err != nil { + return err + } + + // save the message + h.saveMessage(topic, b) + + // now attempt to get the service + h.RLock() + s, err := h.r.GetService(serviceName) + if err != nil { + h.RUnlock() + return err + } + h.RUnlock() + + pub := func(node *registry.Node, t string, b []byte) error { + scheme := "http" + + // check if secure is added in metadata + if node.Metadata["secure"] == "true" { + scheme = "https" + } + + vals := url.Values{} + vals.Add("id", node.Id) + + uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode()) + r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) + if err != nil { + return err + } + + // discard response body + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + return nil + } + + srv := func(s []*registry.Service, b []byte) { + for _, service := range s { + var nodes []*registry.Node + + for _, node := range service.Nodes { + // only use nodes tagged with broker http + if node.Metadata["broker"] != "http" { + continue + } + + // look for nodes for the topic + if node.Metadata["topic"] != topic { + continue + } + + nodes = append(nodes, node) + } + + // only process if we have nodes + if len(nodes) == 0 { + continue + } + + switch service.Version { + // broadcast version means broadcast to all nodes + case broadcastVersion: + var success bool + + // publish to all nodes + for _, node := range nodes { + // publish async + if err := pub(node, topic, b); err == nil { + success = true + } + } + + // save if it failed to publish at least once + if !success { + h.saveMessage(topic, b) + } + default: + // select node to publish to + node := nodes[rand.Int()%len(nodes)] + + // publish async to one node + if err := pub(node, topic, b); err != nil { + // if failed save it + h.saveMessage(topic, b) + } + } + } + } + + // do the rest async + go func() { + // get a third of the backlog + messages := h.getMessage(topic, 8) + delay := (len(messages) > 1) + + // publish all the messages + for _, msg := range messages { + // serialize here + srv(s, msg) + + // sending a backlog of messages + if delay { + time.Sleep(time.Millisecond * 100) + } + } + }() + + return nil +} + +func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { + var err error + var host, port string + options := NewSubscribeOptions(opts...) + + // parse address for host, port + host, port, err = net.SplitHostPort(h.Address()) + if err != nil { + return nil, err + } + + addr, err := maddr.Extract(host) + if err != nil { + return nil, err + } + + var secure bool + + if h.opts.Secure || h.opts.TLSConfig != nil { + secure = true + } + + // register service + node := ®istry.Node{ + Id: topic + "-" + h.id, + Address: mnet.HostPort(addr, port), + Metadata: map[string]string{ + "secure": fmt.Sprintf("%t", secure), + "broker": "http", + "topic": topic, + }, + } + + // check for queue group or broadcast queue + version := options.Queue + if len(version) == 0 { + version = broadcastVersion + } + + service := ®istry.Service{ + Name: serviceName, + Version: version, + Nodes: []*registry.Node{node}, + } + + // generate subscriber + subscriber := &httpSubscriber{ + opts: options, + hb: h, + id: node.Id, + topic: topic, + fn: handler, + svc: service, + } + + // subscribe now + if err := h.subscribe(subscriber); err != nil { + return nil, err + } + + // return the subscriber + return subscriber, nil +} + +func (h *httpBroker) String() string { + return "http" +} + +// NewBroker returns a new http broker +func NewBroker(opts ...Option) Broker { + return newHttpBroker(opts...) +} diff --git a/broker/http/http.go b/broker/http/http.go new file mode 100644 index 00000000..ff28a41d --- /dev/null +++ b/broker/http/http.go @@ -0,0 +1,11 @@ +// Package http provides a http based message broker +package http + +import ( + "github.com/micro/go-micro/v2/broker" +) + +// NewBroker returns a new http broker +func NewBroker(opts ...broker.Option) broker.Broker { + return broker.NewBroker(opts...) +} diff --git a/broker/http/options.go b/broker/http/options.go new file mode 100644 index 00000000..c9825e1d --- /dev/null +++ b/broker/http/options.go @@ -0,0 +1,23 @@ +package http + +import ( + "context" + "net/http" + + "github.com/micro/go-micro/v2/broker" +) + +// Handle registers the handler for the given pattern. +func Handle(pattern string, handler http.Handler) broker.Option { + return func(o *broker.Options) { + if o.Context == nil { + o.Context = context.Background() + } + handlers, ok := o.Context.Value("http_handlers").(map[string]http.Handler) + if !ok { + handlers = make(map[string]http.Handler) + } + handlers[pattern] = handler + o.Context = context.WithValue(o.Context, "http_handlers", handlers) + } +} diff --git a/broker/http_test.go b/broker/http_test.go new file mode 100644 index 00000000..7d5309ae --- /dev/null +++ b/broker/http_test.go @@ -0,0 +1,384 @@ +package broker_test + +import ( + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/micro/go-micro/v2/broker" + "github.com/micro/go-micro/v2/registry" + "github.com/micro/go-micro/v2/registry/memory" +) + +var ( + // mock data + testData = map[string][]*registry.Service{ + "foo": { + { + Name: "foo", + Version: "1.0.0", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.0-123", + Address: "localhost:9999", + }, + { + Id: "foo-1.0.0-321", + Address: "localhost:9999", + }, + }, + }, + { + Name: "foo", + Version: "1.0.1", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.1-321", + Address: "localhost:6666", + }, + }, + }, + { + Name: "foo", + Version: "1.0.3", + Nodes: []*registry.Node{ + { + Id: "foo-1.0.3-345", + Address: "localhost:8888", + }, + }, + }, + }, + } +) + +func newTestRegistry() registry.Registry { + return memory.NewRegistry(memory.Services(testData)) +} + +func sub(be *testing.B, c int) { + be.StopTimer() + m := newTestRegistry() + + b := broker.NewBroker(broker.Registry(m)) + topic := uuid.New().String() + + if err := b.Init(); err != nil { + be.Fatalf("Unexpected init error: %v", err) + } + + if err := b.Connect(); err != nil { + be.Fatalf("Unexpected connect error: %v", err) + } + + msg := &broker.Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + var subs []broker.Subscriber + done := make(chan bool, c) + + for i := 0; i < c; i++ { + sub, err := b.Subscribe(topic, func(p broker.Event) error { + done <- true + m := p.Message() + + if string(m.Body) != string(msg.Body) { + be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) + } + + return nil + }, broker.Queue("shared")) + if err != nil { + be.Fatalf("Unexpected subscribe error: %v", err) + } + subs = append(subs, sub) + } + + for i := 0; i < be.N; i++ { + be.StartTimer() + if err := b.Publish(topic, msg); err != nil { + be.Fatalf("Unexpected publish error: %v", err) + } + <-done + be.StopTimer() + } + + for _, sub := range subs { + sub.Unsubscribe() + } + + if err := b.Disconnect(); err != nil { + be.Fatalf("Unexpected disconnect error: %v", err) + } +} + +func pub(be *testing.B, c int) { + be.StopTimer() + m := newTestRegistry() + b := broker.NewBroker(broker.Registry(m)) + topic := uuid.New().String() + + if err := b.Init(); err != nil { + be.Fatalf("Unexpected init error: %v", err) + } + + if err := b.Connect(); err != nil { + be.Fatalf("Unexpected connect error: %v", err) + } + + msg := &broker.Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + done := make(chan bool, c*4) + + sub, err := b.Subscribe(topic, func(p broker.Event) error { + done <- true + m := p.Message() + if string(m.Body) != string(msg.Body) { + be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) + } + return nil + }, broker.Queue("shared")) + if err != nil { + be.Fatalf("Unexpected subscribe error: %v", err) + } + + var wg sync.WaitGroup + ch := make(chan int, c*4) + be.StartTimer() + + for i := 0; i < c; i++ { + go func() { + for range ch { + if err := b.Publish(topic, msg); err != nil { + be.Fatalf("Unexpected publish error: %v", err) + } + select { + case <-done: + case <-time.After(time.Second): + } + wg.Done() + } + }() + } + + for i := 0; i < be.N; i++ { + wg.Add(1) + ch <- i + } + + wg.Wait() + be.StopTimer() + sub.Unsubscribe() + close(ch) + close(done) + + if err := b.Disconnect(); err != nil { + be.Fatalf("Unexpected disconnect error: %v", err) + } +} + +func TestBroker(t *testing.T) { + m := newTestRegistry() + b := broker.NewBroker(broker.Registry(m)) + + if err := b.Init(); err != nil { + t.Fatalf("Unexpected init error: %v", err) + } + + if err := b.Connect(); err != nil { + t.Fatalf("Unexpected connect error: %v", err) + } + + msg := &broker.Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + done := make(chan bool) + + sub, err := b.Subscribe("test", func(p broker.Event) error { + m := p.Message() + + if string(m.Body) != string(msg.Body) { + t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) + } + + close(done) + return nil + }) + if err != nil { + t.Fatalf("Unexpected subscribe error: %v", err) + } + + if err := b.Publish("test", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + + <-done + sub.Unsubscribe() + + if err := b.Disconnect(); err != nil { + t.Fatalf("Unexpected disconnect error: %v", err) + } +} + +func TestConcurrentSubBroker(t *testing.T) { + m := newTestRegistry() + b := broker.NewBroker(broker.Registry(m)) + + if err := b.Init(); err != nil { + t.Fatalf("Unexpected init error: %v", err) + } + + if err := b.Connect(); err != nil { + t.Fatalf("Unexpected connect error: %v", err) + } + + msg := &broker.Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + var subs []broker.Subscriber + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + sub, err := b.Subscribe("test", func(p broker.Event) error { + defer wg.Done() + + m := p.Message() + + if string(m.Body) != string(msg.Body) { + t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) + } + + return nil + }) + if err != nil { + t.Fatalf("Unexpected subscribe error: %v", err) + } + + wg.Add(1) + subs = append(subs, sub) + } + + if err := b.Publish("test", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + + wg.Wait() + + for _, sub := range subs { + sub.Unsubscribe() + } + + if err := b.Disconnect(); err != nil { + t.Fatalf("Unexpected disconnect error: %v", err) + } +} + +func TestConcurrentPubBroker(t *testing.T) { + m := newTestRegistry() + b := broker.NewBroker(broker.Registry(m)) + + if err := b.Init(); err != nil { + t.Fatalf("Unexpected init error: %v", err) + } + + if err := b.Connect(); err != nil { + t.Fatalf("Unexpected connect error: %v", err) + } + + msg := &broker.Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + var wg sync.WaitGroup + + sub, err := b.Subscribe("test", func(p broker.Event) error { + defer wg.Done() + + m := p.Message() + + if string(m.Body) != string(msg.Body) { + t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) + } + + return nil + }) + if err != nil { + t.Fatalf("Unexpected subscribe error: %v", err) + } + + for i := 0; i < 10; i++ { + wg.Add(1) + + if err := b.Publish("test", msg); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + wg.Wait() + + sub.Unsubscribe() + + if err := b.Disconnect(); err != nil { + t.Fatalf("Unexpected disconnect error: %v", err) + } +} + +func BenchmarkSub1(b *testing.B) { + sub(b, 1) +} +func BenchmarkSub8(b *testing.B) { + sub(b, 8) +} + +func BenchmarkSub32(b *testing.B) { + sub(b, 32) +} + +func BenchmarkSub64(b *testing.B) { + sub(b, 64) +} + +func BenchmarkSub128(b *testing.B) { + sub(b, 128) +} + +func BenchmarkPub1(b *testing.B) { + pub(b, 1) +} + +func BenchmarkPub8(b *testing.B) { + pub(b, 8) +} + +func BenchmarkPub32(b *testing.B) { + pub(b, 32) +} + +func BenchmarkPub64(b *testing.B) { + pub(b, 64) +} + +func BenchmarkPub128(b *testing.B) { + pub(b, 128) +} diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index 44071b71..b5cd79f8 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -36,6 +36,7 @@ import ( smucp "github.com/micro/go-micro/v2/server/mucp" // brokers + brokerHttp "github.com/micro/go-micro/v2/broker/http" "github.com/micro/go-micro/v2/broker/memory" "github.com/micro/go-micro/v2/broker/nats" brokerSrv "github.com/micro/go-micro/v2/broker/service" @@ -319,6 +320,7 @@ var ( "service": brokerSrv.NewBroker, "memory": memory.NewBroker, "nats": nats.NewBroker, + "http": brokerHttp.NewBroker, } DefaultClients = map[string]func(...client.Option) client.Client{ diff --git a/go.mod b/go.mod index 4ce7a293..9725ae12 100644 --- a/go.mod +++ b/go.mod @@ -50,8 +50,6 @@ require ( github.com/micro/mdns v0.3.0 github.com/miekg/dns v1.1.27 github.com/mitchellh/hashstructure v1.0.0 - github.com/nats-io/nats-server/v2 v2.1.4 - github.com/nats-io/nats.go v1.9.1 github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/patrickmn/go-cache v2.1.0+incompatible diff --git a/go.sum b/go.sum index 63e5e123..36dee0a2 100644 --- a/go.sum +++ b/go.sum @@ -310,18 +310,6 @@ github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c h1:nXxl5PrvVm2L/wCy8d github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= -github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= -github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= -github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/nats-server/v2 v2.1.4 h1:BILRnsJ2Yb/fefiFbBWADpViGF69uh4sxe8poVDQ06g= -github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg= -github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= -github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= -github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= -github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 h1:Pr5gZa2VcmktVwq0lyC39MsN5tz356vC/pQHKvq+QBo= github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249/go.mod h1:JzQ9m3PMAqcpeCam7UaHSuBuupz7CmpjehYMayT6YOk=