broker: swap default broker from eats to http (#1524)

* broker: swap default broker from eats to http

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-04-11 03:46:54 +03:00 committed by GitHub
parent b979db6d9d
commit bc71640fd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1129 additions and 518 deletions

View File

@ -1,504 +0,0 @@
package broker
import (
"context"
"errors"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/micro/go-micro/v2/codec/json"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/util/addr"
"github.com/nats-io/nats-server/v2/server"
nats "github.com/nats-io/nats.go"
)
type natsBroker struct {
sync.Once
sync.RWMutex
// indicate if we're connected
connected bool
// address to bind routes to
addrs []string
// servers for the client
servers []string
// client connection and nats opts
conn *nats.Conn
opts Options
nopts nats.Options
// should we drain the connection
drain bool
closeCh chan (error)
// embedded server
server *server.Server
// configure to use local server
local bool
// server exit channel
exit chan bool
}
type subscriber struct {
s *nats.Subscription
opts SubscribeOptions
}
type publication struct {
t string
err error
m *Message
}
func (p *publication) Topic() string {
return p.t
}
func (p *publication) Message() *Message {
return p.m
}
func (p *publication) Ack() error {
// nats does not support acking
return nil
}
func (p *publication) Error() error {
return p.err
}
func (s *subscriber) Options() SubscribeOptions {
return s.opts
}
func (s *subscriber) Topic() string {
return s.s.Subject
}
func (s *subscriber) Unsubscribe() error {
return s.s.Unsubscribe()
}
func (n *natsBroker) Address() string {
n.RLock()
defer n.RUnlock()
if n.server != nil {
return n.server.ClusterAddr().String()
}
if n.conn != nil && n.conn.IsConnected() {
return n.conn.ConnectedUrl()
}
if len(n.addrs) > 0 {
return n.addrs[0]
}
return "127.0.0.1:-1"
}
func (n *natsBroker) setAddrs(addrs []string) []string {
//nolint:prealloc
var cAddrs []string
for _, addr := range addrs {
if len(addr) == 0 {
continue
}
if !strings.HasPrefix(addr, "nats://") {
addr = "nats://" + addr
}
cAddrs = append(cAddrs, addr)
}
// if there's no address and we weren't told to
// embed a local server then use the default url
if len(cAddrs) == 0 && !n.local {
cAddrs = []string{nats.DefaultURL}
}
return cAddrs
}
// serve stats a local nats server if needed
func (n *natsBroker) serve(exit chan bool) error {
// local server address
host := "127.0.0.1"
port := -1
// cluster address
caddr := "0.0.0.0"
cport := -1
// with no address we just default it
// this is a local client address
if len(n.addrs) > 0 {
address := n.addrs[0]
if strings.HasPrefix(address, "nats://") {
address = strings.TrimPrefix(address, "nats://")
}
// parse out the address
h, p, err := net.SplitHostPort(address)
if err == nil {
caddr = h
cport, _ = strconv.Atoi(p)
}
}
// 1. create new server
// 2. register the server
// 3. connect to other servers
// set cluster opts
cOpts := server.ClusterOpts{
Host: caddr,
Port: cport,
}
// get the routes for other nodes
var routes []*url.URL
// get existing nats servers to connect to
services, err := n.opts.Registry.GetService("go.micro.nats.broker")
if err == nil {
for _, service := range services {
for _, node := range service.Nodes {
u, err := url.Parse("nats://" + node.Address)
if err != nil {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Info(err)
}
continue
}
// append to the cluster routes
routes = append(routes, u)
}
}
}
// try get existing server
s := n.server
if s != nil {
// stop the existing server
s.Shutdown()
}
s, err = server.NewServer(&server.Options{
// Specify the host
Host: host,
// Use a random port
Port: port,
// Set the cluster ops
Cluster: cOpts,
// Set the routes
Routes: routes,
NoLog: true,
NoSigs: true,
MaxControlLine: 2048,
TLSConfig: n.opts.TLSConfig,
})
if err != nil {
return err
}
// save the server
n.server = s
// start the server
go s.Start()
var ready bool
// wait till its ready for connections
for i := 0; i < 3; i++ {
if s.ReadyForConnections(time.Second) {
ready = true
break
}
}
if !ready {
return errors.New("server not ready")
}
// set the client address
n.servers = []string{s.ClientURL()}
go func() {
var advertise string
// parse out the address
_, port, err := net.SplitHostPort(s.ClusterAddr().String())
if err == nil {
addr, _ := addr.Extract("")
advertise = net.JoinHostPort(addr, port)
} else {
s.ClusterAddr().String()
}
// register the cluster address
for {
select {
case err := <-n.closeCh:
if err != nil {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Info(err)
}
}
case <-exit:
// deregister on exit
n.opts.Registry.Deregister(&registry.Service{
Name: "go.micro.nats.broker",
Version: "v2",
Nodes: []*registry.Node{
{Id: s.ID(), Address: advertise},
},
})
s.Shutdown()
return
default:
// register the broker
n.opts.Registry.Register(&registry.Service{
Name: "go.micro.nats.broker",
Version: "v2",
Nodes: []*registry.Node{
{Id: s.ID(), Address: advertise},
},
}, registry.RegisterTTL(time.Minute))
time.Sleep(time.Minute)
}
}
}()
return nil
}
func (n *natsBroker) Connect() error {
n.Lock()
defer n.Unlock()
if !n.connected {
// create exit chan
n.exit = make(chan bool)
// start the local server
if err := n.serve(n.exit); err != nil {
return err
}
// set to connected
}
status := nats.CLOSED
if n.conn != nil {
status = n.conn.Status()
}
switch status {
case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING:
return nil
default: // DISCONNECTED or CLOSED or DRAINING
opts := n.nopts
opts.DrainTimeout = 1 * time.Second
opts.AsyncErrorCB = n.onAsyncError
opts.DisconnectedErrCB = n.onDisconnectedError
opts.ClosedCB = n.onClose
opts.Servers = n.servers
opts.Secure = n.opts.Secure
opts.TLSConfig = n.opts.TLSConfig
// secure might not be set
if n.opts.TLSConfig != nil {
opts.Secure = true
}
c, err := opts.Connect()
if err != nil {
return err
}
n.conn = c
n.connected = true
return nil
}
}
func (n *natsBroker) Disconnect() error {
n.RLock()
defer n.RUnlock()
if !n.connected {
return nil
}
// drain the connection if specified
if n.drain {
n.conn.Drain()
}
// close the client connection
n.conn.Close()
// shutdown the local server
// and deregister
if n.server != nil {
select {
case <-n.exit:
default:
close(n.exit)
}
}
// set not connected
n.connected = false
return nil
}
func (n *natsBroker) Init(opts ...Option) error {
n.setOption(opts...)
return nil
}
func (n *natsBroker) Options() Options {
return n.opts
}
func (n *natsBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
b, err := n.opts.Codec.Marshal(msg)
if err != nil {
return err
}
n.RLock()
defer n.RUnlock()
return n.conn.Publish(topic, b)
}
func (n *natsBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
if n.conn == nil {
return nil, errors.New("not connected")
}
opt := SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
fn := func(msg *nats.Msg) {
var m Message
pub := &publication{t: msg.Subject}
eh := n.opts.ErrorHandler
err := n.opts.Codec.Unmarshal(msg.Data, &m)
pub.err = err
pub.m = &m
if err != nil {
m.Body = msg.Data
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
if eh != nil {
eh(pub)
}
return
}
if err := handler(pub); err != nil {
pub.err = err
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
if eh != nil {
eh(pub)
}
}
}
var sub *nats.Subscription
var err error
n.RLock()
if len(opt.Queue) > 0 {
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
} else {
sub, err = n.conn.Subscribe(topic, fn)
}
n.RUnlock()
if err != nil {
return nil, err
}
return &subscriber{s: sub, opts: opt}, nil
}
func (n *natsBroker) String() string {
return "eats"
}
func (n *natsBroker) setOption(opts ...Option) {
for _, o := range opts {
o(&n.opts)
}
n.Once.Do(func() {
n.nopts = nats.GetDefaultOptions()
})
// local embedded server
n.local = true
// set to drain
n.drain = true
if !n.opts.Secure {
n.opts.Secure = n.nopts.Secure
}
if n.opts.TLSConfig == nil {
n.opts.TLSConfig = n.nopts.TLSConfig
}
n.addrs = n.setAddrs(n.opts.Addrs)
}
func (n *natsBroker) onClose(conn *nats.Conn) {
n.closeCh <- nil
}
func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) {
n.closeCh <- err
}
func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) {
// There are kinds of different async error nats might callback, but we are interested
// in ErrDrainTimeout only here.
if err == nats.ErrDrainTimeout {
n.closeCh <- err
}
}
func NewBroker(opts ...Option) Broker {
options := Options{
// Default codec
Codec: json.Marshaler{},
Context: context.Background(),
Registry: registry.DefaultRegistry,
}
n := &natsBroker{
opts: options,
closeCh: make(chan error),
}
n.setOption(opts...)
return n
}

709
broker/http.go Normal file
View File

@ -0,0 +1,709 @@
// Package http provides a http based message broker
package broker
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
"runtime"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/v2/codec/json"
merr "github.com/micro/go-micro/v2/errors"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/registry/cache"
maddr "github.com/micro/go-micro/v2/util/addr"
mnet "github.com/micro/go-micro/v2/util/net"
mls "github.com/micro/go-micro/v2/util/tls"
"golang.org/x/net/http2"
)
// HTTP Broker is a point to point async broker
type httpBroker struct {
id string
address string
opts Options
mux *http.ServeMux
c *http.Client
r registry.Registry
sync.RWMutex
subscribers map[string][]*httpSubscriber
running bool
exit chan chan error
// offline message inbox
mtx sync.RWMutex
inbox map[string][][]byte
}
type httpSubscriber struct {
opts SubscribeOptions
id string
topic string
fn Handler
svc *registry.Service
hb *httpBroker
}
type httpEvent struct {
m *Message
t string
err error
}
var (
DefaultSubPath = "/"
serviceName = "micro.http.broker"
broadcastVersion = "ff.http.broadcast"
registerTTL = time.Minute
registerInterval = time.Second * 30
)
func init() {
rand.Seed(time.Now().Unix())
}
func newTransport(config *tls.Config) *http.Transport {
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
dialTLS := func(network string, addr string) (net.Conn, error) {
return tls.Dial(network, addr, config)
}
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
DialTLS: dialTLS,
}
runtime.SetFinalizer(&t, func(tr **http.Transport) {
(*tr).CloseIdleConnections()
})
// setup http2
http2.ConfigureTransport(t)
return t
}
func newHttpBroker(opts ...Option) Broker {
options := Options{
Codec: json.Marshaler{},
Context: context.TODO(),
Registry: registry.DefaultRegistry,
}
for _, o := range opts {
o(&options)
}
// set address
addr := ":0"
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = options.Addrs[0]
}
h := &httpBroker{
id: uuid.New().String(),
address: addr,
opts: options,
r: options.Registry,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
}
// specify the message handler
h.mux.Handle(DefaultSubPath, h)
// get optional handlers
if h.opts.Context != nil {
handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
h.mux.Handle(pattern, handler)
}
}
}
return h
}
func (h *httpEvent) Ack() error {
return nil
}
func (h *httpEvent) Error() error {
return h.err
}
func (h *httpEvent) Message() *Message {
return h.m
}
func (h *httpEvent) Topic() string {
return h.t
}
func (h *httpSubscriber) Options() SubscribeOptions {
return h.opts
}
func (h *httpSubscriber) Topic() string {
return h.topic
}
func (h *httpSubscriber) Unsubscribe() error {
return h.hb.unsubscribe(h)
}
func (h *httpBroker) saveMessage(topic string, msg []byte) {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c := h.inbox[topic]
// save message
c = append(c, msg)
// max length 64
if len(c) > 64 {
c = c[:64]
}
// save inbox
h.inbox[topic] = c
}
func (h *httpBroker) getMessage(topic string, num int) [][]byte {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c, ok := h.inbox[topic]
if !ok {
return nil
}
// more message than requests
if len(c) >= num {
msg := c[:num]
h.inbox[topic] = c[num:]
return msg
}
// reset inbox
h.inbox[topic] = nil
// return all messages
return c
}
func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
return err
}
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
}
func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
//nolint:prealloc
var subscribers []*httpSubscriber
// look for subscriber
for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward
if sub == s {
_ = h.r.Deregister(sub.svc)
continue
}
// keep subscriber
subscribers = append(subscribers, sub)
}
// set subscribers
h.subscribers[s.topic] = subscribers
return nil
}
func (h *httpBroker) run(l net.Listener) {
t := time.NewTicker(registerInterval)
defer t.Stop()
for {
select {
// heartbeat for each subscriber
case <-t.C:
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
_ = h.r.Deregister(sub.svc)
}
}
h.RUnlock()
return
}
}
}
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := merr.BadRequest("go.micro.broker", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
req.ParseForm()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
var m *Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
topic := m.Header["Micro-Topic"]
//delete(m.Header, ":topic")
if len(topic) == 0 {
errr := merr.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
p := &httpEvent{m: m, t: topic}
id := req.Form.Get("id")
//nolint:prealloc
var subs []Handler
h.RLock()
for _, subscriber := range h.subscribers[topic] {
if id != subscriber.id {
continue
}
subs = append(subs, subscriber.fn)
}
h.RUnlock()
// execute the handler
for _, fn := range subs {
p.err = fn(p)
}
}
func (h *httpBroker) Address() string {
h.RLock()
defer h.RUnlock()
return h.address
}
func (h *httpBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {
return err
}
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {
h.run(l)
h.Lock()
h.opts.Addrs = []string{addr}
h.address = addr
h.Unlock()
}()
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
}
func (h *httpBroker) Disconnect() error {
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
// stop cache
rc, ok := h.r.(cache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
}
func (h *httpBroker) Init(opts ...Option) error {
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "go.micro.http.broker-" + uuid.New().String()
}
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// get cache
if rc, ok := h.r.(cache.Cache); ok {
rc.Stop()
}
// set registry
h.r = cache.New(reg)
// reconfigure tls config
if c := h.opts.TLSConfig; c != nil {
h.c = &http.Client{
Transport: newTransport(c),
}
}
return nil
}
func (h *httpBroker) Options() Options {
return h.opts
}
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
}
m.Header["Micro-Topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService(serviceName)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {
continue
}
// look for nodes for the topic
if node.Metadata["topic"] != topic {
continue
}
nodes = append(nodes, node)
}
// only process if we have nodes
if len(nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := nodes[rand.Int()%len(nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
}
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
var err error
var host, port string
options := NewSubscribeOptions(opts...)
// parse address for host, port
host, port, err = net.SplitHostPort(h.Address())
if err != nil {
return nil, err
}
addr, err := maddr.Extract(host)
if err != nil {
return nil, err
}
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
}
// register service
node := &registry.Node{
Id: topic + "-" + h.id,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := &registry.Service{
Name: serviceName,
Version: version,
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: node.Id,
topic: topic,
fn: handler,
svc: service,
}
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
// return the subscriber
return subscriber, nil
}
func (h *httpBroker) String() string {
return "http"
}
// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {
return newHttpBroker(opts...)
}

11
broker/http/http.go Normal file
View File

@ -0,0 +1,11 @@
// Package http provides a http based message broker
package http
import (
"github.com/micro/go-micro/v2/broker"
)
// NewBroker returns a new http broker
func NewBroker(opts ...broker.Option) broker.Broker {
return broker.NewBroker(opts...)
}

23
broker/http/options.go Normal file
View File

@ -0,0 +1,23 @@
package http
import (
"context"
"net/http"
"github.com/micro/go-micro/v2/broker"
)
// Handle registers the handler for the given pattern.
func Handle(pattern string, handler http.Handler) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
handlers, ok := o.Context.Value("http_handlers").(map[string]http.Handler)
if !ok {
handlers = make(map[string]http.Handler)
}
handlers[pattern] = handler
o.Context = context.WithValue(o.Context, "http_handlers", handlers)
}
}

384
broker/http_test.go Normal file
View File

@ -0,0 +1,384 @@
package broker_test
import (
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/registry/memory"
)
var (
// mock data
testData = map[string][]*registry.Service{
"foo": {
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost:9999",
},
{
Id: "foo-1.0.0-321",
Address: "localhost:9999",
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost:6666",
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost:8888",
},
},
},
},
}
)
func newTestRegistry() registry.Registry {
return memory.NewRegistry(memory.Services(testData))
}
func sub(be *testing.B, c int) {
be.StopTimer()
m := newTestRegistry()
b := broker.NewBroker(broker.Registry(m))
topic := uuid.New().String()
if err := b.Init(); err != nil {
be.Fatalf("Unexpected init error: %v", err)
}
if err := b.Connect(); err != nil {
be.Fatalf("Unexpected connect error: %v", err)
}
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
var subs []broker.Subscriber
done := make(chan bool, c)
for i := 0; i < c; i++ {
sub, err := b.Subscribe(topic, func(p broker.Event) error {
done <- true
m := p.Message()
if string(m.Body) != string(msg.Body) {
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
}
return nil
}, broker.Queue("shared"))
if err != nil {
be.Fatalf("Unexpected subscribe error: %v", err)
}
subs = append(subs, sub)
}
for i := 0; i < be.N; i++ {
be.StartTimer()
if err := b.Publish(topic, msg); err != nil {
be.Fatalf("Unexpected publish error: %v", err)
}
<-done
be.StopTimer()
}
for _, sub := range subs {
sub.Unsubscribe()
}
if err := b.Disconnect(); err != nil {
be.Fatalf("Unexpected disconnect error: %v", err)
}
}
func pub(be *testing.B, c int) {
be.StopTimer()
m := newTestRegistry()
b := broker.NewBroker(broker.Registry(m))
topic := uuid.New().String()
if err := b.Init(); err != nil {
be.Fatalf("Unexpected init error: %v", err)
}
if err := b.Connect(); err != nil {
be.Fatalf("Unexpected connect error: %v", err)
}
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
done := make(chan bool, c*4)
sub, err := b.Subscribe(topic, func(p broker.Event) error {
done <- true
m := p.Message()
if string(m.Body) != string(msg.Body) {
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
}
return nil
}, broker.Queue("shared"))
if err != nil {
be.Fatalf("Unexpected subscribe error: %v", err)
}
var wg sync.WaitGroup
ch := make(chan int, c*4)
be.StartTimer()
for i := 0; i < c; i++ {
go func() {
for range ch {
if err := b.Publish(topic, msg); err != nil {
be.Fatalf("Unexpected publish error: %v", err)
}
select {
case <-done:
case <-time.After(time.Second):
}
wg.Done()
}
}()
}
for i := 0; i < be.N; i++ {
wg.Add(1)
ch <- i
}
wg.Wait()
be.StopTimer()
sub.Unsubscribe()
close(ch)
close(done)
if err := b.Disconnect(); err != nil {
be.Fatalf("Unexpected disconnect error: %v", err)
}
}
func TestBroker(t *testing.T) {
m := newTestRegistry()
b := broker.NewBroker(broker.Registry(m))
if err := b.Init(); err != nil {
t.Fatalf("Unexpected init error: %v", err)
}
if err := b.Connect(); err != nil {
t.Fatalf("Unexpected connect error: %v", err)
}
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
done := make(chan bool)
sub, err := b.Subscribe("test", func(p broker.Event) error {
m := p.Message()
if string(m.Body) != string(msg.Body) {
t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
}
close(done)
return nil
})
if err != nil {
t.Fatalf("Unexpected subscribe error: %v", err)
}
if err := b.Publish("test", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
<-done
sub.Unsubscribe()
if err := b.Disconnect(); err != nil {
t.Fatalf("Unexpected disconnect error: %v", err)
}
}
func TestConcurrentSubBroker(t *testing.T) {
m := newTestRegistry()
b := broker.NewBroker(broker.Registry(m))
if err := b.Init(); err != nil {
t.Fatalf("Unexpected init error: %v", err)
}
if err := b.Connect(); err != nil {
t.Fatalf("Unexpected connect error: %v", err)
}
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
var subs []broker.Subscriber
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
sub, err := b.Subscribe("test", func(p broker.Event) error {
defer wg.Done()
m := p.Message()
if string(m.Body) != string(msg.Body) {
t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
}
return nil
})
if err != nil {
t.Fatalf("Unexpected subscribe error: %v", err)
}
wg.Add(1)
subs = append(subs, sub)
}
if err := b.Publish("test", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
wg.Wait()
for _, sub := range subs {
sub.Unsubscribe()
}
if err := b.Disconnect(); err != nil {
t.Fatalf("Unexpected disconnect error: %v", err)
}
}
func TestConcurrentPubBroker(t *testing.T) {
m := newTestRegistry()
b := broker.NewBroker(broker.Registry(m))
if err := b.Init(); err != nil {
t.Fatalf("Unexpected init error: %v", err)
}
if err := b.Connect(); err != nil {
t.Fatalf("Unexpected connect error: %v", err)
}
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
var wg sync.WaitGroup
sub, err := b.Subscribe("test", func(p broker.Event) error {
defer wg.Done()
m := p.Message()
if string(m.Body) != string(msg.Body) {
t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
}
return nil
})
if err != nil {
t.Fatalf("Unexpected subscribe error: %v", err)
}
for i := 0; i < 10; i++ {
wg.Add(1)
if err := b.Publish("test", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
wg.Wait()
sub.Unsubscribe()
if err := b.Disconnect(); err != nil {
t.Fatalf("Unexpected disconnect error: %v", err)
}
}
func BenchmarkSub1(b *testing.B) {
sub(b, 1)
}
func BenchmarkSub8(b *testing.B) {
sub(b, 8)
}
func BenchmarkSub32(b *testing.B) {
sub(b, 32)
}
func BenchmarkSub64(b *testing.B) {
sub(b, 64)
}
func BenchmarkSub128(b *testing.B) {
sub(b, 128)
}
func BenchmarkPub1(b *testing.B) {
pub(b, 1)
}
func BenchmarkPub8(b *testing.B) {
pub(b, 8)
}
func BenchmarkPub32(b *testing.B) {
pub(b, 32)
}
func BenchmarkPub64(b *testing.B) {
pub(b, 64)
}
func BenchmarkPub128(b *testing.B) {
pub(b, 128)
}

View File

@ -36,6 +36,7 @@ import (
smucp "github.com/micro/go-micro/v2/server/mucp"
// brokers
brokerHttp "github.com/micro/go-micro/v2/broker/http"
"github.com/micro/go-micro/v2/broker/memory"
"github.com/micro/go-micro/v2/broker/nats"
brokerSrv "github.com/micro/go-micro/v2/broker/service"
@ -319,6 +320,7 @@ var (
"service": brokerSrv.NewBroker,
"memory": memory.NewBroker,
"nats": nats.NewBroker,
"http": brokerHttp.NewBroker,
}
DefaultClients = map[string]func(...client.Option) client.Client{

2
go.mod
View File

@ -50,8 +50,6 @@ require (
github.com/micro/mdns v0.3.0
github.com/miekg/dns v1.1.27
github.com/mitchellh/hashstructure v1.0.0
github.com/nats-io/nats-server/v2 v2.1.4
github.com/nats-io/nats.go v1.9.1
github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/patrickmn/go-cache v2.1.0+incompatible

12
go.sum
View File

@ -310,18 +310,6 @@ github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c h1:nXxl5PrvVm2L/wCy8d
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server/v2 v2.1.4 h1:BILRnsJ2Yb/fefiFbBWADpViGF69uh4sxe8poVDQ06g=
github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg=
github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 h1:Pr5gZa2VcmktVwq0lyC39MsN5tz356vC/pQHKvq+QBo=
github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249/go.mod h1:JzQ9m3PMAqcpeCam7UaHSuBuupz7CmpjehYMayT6YOk=