Asim Aslam 2020-01-19 17:21:55 +00:00
// Package http provides a http based message broker
package http
import (
merr ""
maddr ""
mnet ""
mls ""
func init() {
cmd.DefaultBrokers["http"] = NewBroker
// HTTP Broker is a point to point async broker
type httpBroker struct {
id string
address string
opts broker.Options
mux *http.ServeMux
c *http.Client
r registry.Registry
subscribers map[string][]*httpSubscriber
running bool
exit chan chan error
// offline message inbox
mtx sync.RWMutex
inbox map[string][][]byte
type httpSubscriber struct {
opts broker.SubscribeOptions
id string
topic string
fn broker.Handler
svc *registry.Service
hb *httpBroker
type httpEvent struct {
m *broker.Message
t string
var (
DefaultSubPath = "/_sub"
serviceName = ""
broadcastVersion = "ff.http.broadcast"
registerTTL = time.Minute
registerInterval = time.Second * 30
func init() {
func newTransport(config *tls.Config) *http.Transport {
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
dialTLS := func(network string, addr string) (net.Conn, error) {
return tls.Dial(network, addr, config)
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
DialTLS: dialTLS,
runtime.SetFinalizer(&t, func(tr **http.Transport) {
// setup http2
return t
func newHttpBroker(opts broker.Broker {
options := broker.Options{
Codec: json.Marshaler{},
Context: context.TODO(),
Registry: registry.DefaultRegistry,
for _, o := range opts {
// set address
addr := ":0"
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = options.Addrs[0]
h := &httpBroker{
id: uuid.New().String(),
address: addr,
opts: options,
r: options.Registry,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
// specify the message handler
h.mux.Handle(DefaultSubPath, h)
// get optional handlers
if h.opts.Context != nil {
handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
h.mux.Handle(pattern, handler)
return h
func (h *httpEvent) Ack() error {
return nil
func (h *httpEvent) Message() *broker.Message {
return h.m
func (h *httpEvent) Topic() string {
return h.t
func (h *httpSubscriber) Options() broker.SubscribeOptions {
return h.opts
func (h *httpSubscriber) Topic() string {
return h.topic
func (h *httpSubscriber) Unsubscribe() error {
return h.hb.unsubscribe(h)
func (h *httpBroker) saveMessage(topic string, msg []byte) {
defer h.mtx.Unlock()
// get messages
c := h.inbox[topic]
// save message
c = append(c, msg)
// max length 64
if len(c) > 64 {
c = c[:64]
// save inbox
h.inbox[topic] = c
func (h *httpBroker) getMessage(topic string, num int) [][]byte {
defer h.mtx.Unlock()
// get messages
c, ok := h.inbox[topic]
if !ok {
return nil
// more message than requests
if len(c) >= num {
msg := c[:num]
h.inbox[topic] = c[num:]
return msg
// reset inbox
h.inbox[topic] = nil
// return all messages
return c
func (h *httpBroker) subscribe(s *httpSubscriber) error {
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
return err
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
defer h.Unlock()
var subscribers []*httpSubscriber
// look for subscriber
for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward
if sub == s {
_ = h.r.Deregister(sub.svc)
// keep subscriber
subscribers = append(subscribers, sub)
// set subscribers
h.subscribers[s.topic] = subscribers
return nil
func (h *httpBroker) run(l net.Listener) {
t := time.NewTicker(registerInterval)
defer t.Stop()
for {
select {
// heartbeat for each subscriber
case <-t.C:
for _, subs := range h.subscribers {
for _, sub := range subs {
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
for _, subs := range h.subscribers {
for _, sub := range subs {
_ = h.r.Deregister(sub.svc)
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := merr.BadRequest("", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
defer req.Body.Close()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := merr.InternalServerError("", "Error reading request body: %v", err)
var m *broker.Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := merr.InternalServerError("", "Error parsing request body: %v", err)
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := merr.InternalServerError("", "Topic not found")
p := &httpEvent{m: m, t: topic}
id := req.Form.Get("id")
var subs []broker.Handler
for _, subscriber := range h.subscribers[topic] {
if id != {
subs = append(subs, subscriber.fn)
// execute the handler
for _, fn := range subs {
func (h *httpBroker) Address() string {
defer h.RUnlock()
return h.address
func (h *httpBroker) Connect() error {
if h.running {
return nil
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
config = &tls.Config{Certificates: []tls.Certificate{cert}}
return tls.Listen("tcp", addr, config)
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
l, err = mnet.Listen(h.address, fn)
if err != nil {
return err
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {
h.opts.Addrs = []string{addr}
h.address = addr
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
func (h *httpBroker) Disconnect() error {
if !h.running {
return nil
defer h.Unlock()
// stop cache
rc, ok := h.r.(cache.Cache)
if ok {
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
func (h *httpBroker) Init(opts error {
if h.running {
return errors.New("cannot init while connected")
defer h.Unlock()
for _, o := range opts {
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
if len( == 0 { = "" + uuid.New().String()
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
// get cache
if rc, ok := h.r.(cache.Cache); ok {
// set registry
h.r = cache.New(reg)
// reconfigure tls config
if c := h.opts.TLSConfig; c != nil {
h.c = &http.Client{
Transport: newTransport(c),
return nil
func (h *httpBroker) Options() broker.Options {
return h.opts
func (h *httpBroker) Publish(topic string, msg *broker.Message, opts error {
// create the message first
m := &broker.Message{
Header: make(map[string]string),
Body: msg.Body,
for k, v := range msg.Header {
m.Header[k] = v
m.Header[":topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
s, err := h.r.GetService(serviceName)
if err != nil {
return err
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {
return err
// discard response body
io.Copy(ioutil.Discard, r.Body)
return nil
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {
// look for nodes for the topic
if node.Metadata["topic"] != topic {
nodes = append(nodes, node)
// only process if we have nodes
if len(nodes) == 0 {
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
// select node to publish to
node := nodes[rand.Int()%len(nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
return nil
func (h *httpBroker) Subscribe(topic string, handler broker.Handler, opts (broker.Subscriber, error) {
var err error
var host, port string
options := broker.NewSubscribeOptions(opts...)
// parse address for host, port
host, port, err = net.SplitHostPort(h.Address())
if err != nil {
return nil, err
addr, err := maddr.Extract(host)
if err != nil {
return nil, err
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
// register service
node := &registry.Node{
Id: topic + "-" +,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
service := &registry.Service{
Name: serviceName,
Version: version,
Nodes: []*registry.Node{node},
// generate subscriber
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: node.Id,
topic: topic,
fn: handler,
svc: service,
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
// return the subscriber
return subscriber, nil
func (h *httpBroker) String() string {
return "http"
// NewBroker returns a new http broker
func NewBroker(opts broker.Broker {
return newHttpBroker(opts...)

package http
import (
var (
// mock data
testData = map[string][]*registry.Service{
"foo": {
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
Id: "foo-1.0.0-123",
Address: "localhost:9999",
Id: "foo-1.0.0-321",
Address: "localhost:9999",
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
Id: "foo-1.0.1-321",
Address: "localhost:6666",
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
Id: "foo-1.0.3-345",
Address: "localhost:8888",
func newTestRegistry() registry.Registry {
return memory.NewRegistry(memory.Services(testData))
func sub(be *testing.B, c int) {
// set no op logger
m := newTestRegistry()
b := NewBroker(broker.Registry(m))
topic := uuid.New().String()
if err := b.Init(); err != nil {
be.Fatalf("Unexpected init error: %v", err)
if err := b.Connect(); err != nil {
be.Fatalf("Unexpected connect error: %v", err)
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
Body: []byte(`{"message": "Hello World"}`),
var subs []broker.Subscriber
done := make(chan bool, c)
for i := 0; i < c; i++ {
sub, err := b.Subscribe(topic, func(p broker.Event) error {
done <- true
m := p.Message()
if string(m.Body) != string(msg.Body) {
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
return nil
}, broker.Queue("shared"))
if err != nil {
be.Fatalf("Unexpected subscribe error: %v", err)
subs = append(subs, sub)
for i := 0; i < be.N; i++ {
if err := b.Publish(topic, msg); err != nil {
be.Fatalf("Unexpected publish error: %v", err)
for _, sub := range subs {
if err := b.Disconnect(); err != nil {
be.Fatalf("Unexpected disconnect error: %v", err)
func pub(be *testing.B, c int) {
// set no op logger
m := newTestRegistry()
b := NewBroker(broker.Registry(m))
topic := uuid.New().String()
if err := b.Init(); err != nil {
be.Fatalf("Unexpected init error: %v", err)
if err := b.Connect(); err != nil {
be.Fatalf("Unexpected connect error: %v", err)
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
Body: []byte(`{"message": "Hello World"}`),
done := make(chan bool, c*4)
sub, err := b.Subscribe(topic, func(p broker.Event) error {
done <- true
m := p.Message()
if string(m.Body) != string(msg.Body) {
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
return nil
}, broker.Queue("shared"))
if err != nil {
be.Fatalf("Unexpected subscribe error: %v", err)
var wg sync.WaitGroup
ch := make(chan int, c*4)
for i := 0; i < c; i++ {
go func() {
for range ch {
if err := b.Publish(topic, msg); err != nil {
be.Fatalf("Unexpected publish error: %v", err)
select {
case <-done:
case <-time.After(time.Second):
for i := 0; i < be.N; i++ {
ch <- i
if err := b.Disconnect(); err != nil {
be.Fatalf("Unexpected disconnect error: %v", err)
func TestBroker(t *testing.T) {
m := newTestRegistry()
b := NewBroker(broker.Registry(m))
if err := b.Init(); err != nil {
t.Fatalf("Unexpected init error: %v", err)
if err := b.Connect(); err != nil {
t.Fatalf("Unexpected connect error: %v", err)
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
Body: []byte(`{"message": "Hello World"}`),
done := make(chan bool)
sub, err := b.Subscribe("test", func(p broker.Event) error {
m := p.Message()
if string(m.Body) != string(msg.Body) {
t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
return nil
if err != nil {
t.Fatalf("Unexpected subscribe error: %v", err)
if err := b.Publish("test", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
if err := b.Disconnect(); err != nil {
t.Fatalf("Unexpected disconnect error: %v", err)
func TestConcurrentSubBroker(t *testing.T) {
m := newTestRegistry()
b := NewBroker(broker.Registry(m))
if err := b.Init(); err != nil {
t.Fatalf("Unexpected init error: %v", err)
if err := b.Connect(); err != nil {
t.Fatalf("Unexpected connect error: %v", err)
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
Body: []byte(`{"message": "Hello World"}`),
var subs []broker.Subscriber
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
sub, err := b.Subscribe("test", func(p broker.Event) error {
defer wg.Done()
m := p.Message()
if string(m.Body) != string(msg.Body) {
t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
return nil
if err != nil {
t.Fatalf("Unexpected subscribe error: %v", err)
subs = append(subs, sub)
if err := b.Publish("test", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
for _, sub := range subs {
if err := b.Disconnect(); err != nil {
t.Fatalf("Unexpected disconnect error: %v", err)
func TestConcurrentPubBroker(t *testing.T) {
m := newTestRegistry()
b := NewBroker(broker.Registry(m))
if err := b.Init(); err != nil {
t.Fatalf("Unexpected init error: %v", err)
if err := b.Connect(); err != nil {
t.Fatalf("Unexpected connect error: %v", err)
msg := &broker.Message{
Header: map[string]string{
"Content-Type": "application/json",
Body: []byte(`{"message": "Hello World"}`),
var wg sync.WaitGroup
sub, err := b.Subscribe("test", func(p broker.Event) error {
defer wg.Done()
m := p.Message()
if string(m.Body) != string(msg.Body) {
t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
return nil
if err != nil {
t.Fatalf("Unexpected subscribe error: %v", err)
for i := 0; i < 10; i++ {
if err := b.Publish("test", msg); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
if err := b.Disconnect(); err != nil {
t.Fatalf("Unexpected disconnect error: %v", err)
func BenchmarkSub1(b *testing.B) {
sub(b, 1)
func BenchmarkSub8(b *testing.B) {
sub(b, 8)
func BenchmarkSub32(b *testing.B) {
sub(b, 32)
func BenchmarkSub64(b *testing.B) {
sub(b, 64)
func BenchmarkSub128(b *testing.B) {
sub(b, 128)
func BenchmarkPub1(b *testing.B) {
pub(b, 1)
func BenchmarkPub8(b *testing.B) {
pub(b, 8)
func BenchmarkPub32(b *testing.B) {
pub(b, 32)
func BenchmarkPub64(b *testing.B) {
pub(b, 64)
func BenchmarkPub128(b *testing.B) {
pub(b, 128)

package http
import (
// Handle registers the handler for the given pattern.
func Handle(pattern string, handler http.Handler) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
handlers, ok := o.Context.Value("http_handlers").(map[string]http.Handler)
if !ok {
handlers = make(map[string]http.Handler)
handlers[pattern] = handler
o.Context = context.WithValue(o.Context, "http_handlers", handlers)