311 lines
5.9 KiB
Go
311 lines
5.9 KiB
Go
package broker
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/golang/glog"
|
|
"github.com/micro/go-micro/errors"
|
|
"github.com/micro/go-micro/registry"
|
|
"github.com/pborman/uuid"
|
|
)
|
|
|
|
// HTTP Broker is a placeholder for actual message brokers.
|
|
// This should not really be used in production but useful
|
|
// in developer where you want zero dependencies.
|
|
|
|
type httpBroker struct {
|
|
id string
|
|
address string
|
|
unsubscribe chan *httpSubscriber
|
|
|
|
sync.RWMutex
|
|
subscribers map[string][]*httpSubscriber
|
|
running bool
|
|
exit chan chan error
|
|
}
|
|
|
|
type httpSubscriber struct {
|
|
opts SubscribeOptions
|
|
id string
|
|
topic string
|
|
ch chan *httpSubscriber
|
|
fn Handler
|
|
svc *registry.Service
|
|
}
|
|
|
|
type httpPublication struct {
|
|
m *Message
|
|
t string
|
|
}
|
|
|
|
var (
|
|
DefaultSubPath = "/_sub"
|
|
broadcastVersion = "ff.http.broadcast"
|
|
)
|
|
|
|
func init() {
|
|
rand.Seed(time.Now().Unix())
|
|
}
|
|
|
|
func newHttpBroker(addrs []string, opt ...Option) Broker {
|
|
addr := ":0"
|
|
if len(addrs) > 0 && len(addrs[0]) > 0 {
|
|
addr = addrs[0]
|
|
}
|
|
|
|
return &httpBroker{
|
|
id: "broker-" + uuid.NewUUID().String(),
|
|
address: addr,
|
|
subscribers: make(map[string][]*httpSubscriber),
|
|
unsubscribe: make(chan *httpSubscriber),
|
|
exit: make(chan chan error),
|
|
}
|
|
}
|
|
|
|
func (h *httpPublication) Ack() error {
|
|
return nil
|
|
}
|
|
|
|
func (h *httpPublication) Message() *Message {
|
|
return h.m
|
|
}
|
|
|
|
func (h *httpPublication) Topic() string {
|
|
return h.t
|
|
}
|
|
|
|
func (h *httpSubscriber) Config() SubscribeOptions {
|
|
return h.opts
|
|
}
|
|
|
|
func (h *httpSubscriber) Topic() string {
|
|
return h.topic
|
|
}
|
|
|
|
func (h *httpSubscriber) Unsubscribe() error {
|
|
h.ch <- h
|
|
return nil
|
|
}
|
|
|
|
func (h *httpBroker) start() error {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
|
|
if h.running {
|
|
return nil
|
|
}
|
|
|
|
l, err := net.Listen("tcp", h.address)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Broker Listening on %s", l.Addr().String())
|
|
h.address = l.Addr().String()
|
|
|
|
go http.Serve(l, h)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case ch := <-h.exit:
|
|
ch <- l.Close()
|
|
h.Lock()
|
|
h.running = false
|
|
h.Unlock()
|
|
return
|
|
case subscriber := <-h.unsubscribe:
|
|
h.Lock()
|
|
var subscribers []*httpSubscriber
|
|
for _, sub := range h.subscribers[subscriber.topic] {
|
|
if sub.id == subscriber.id {
|
|
registry.Deregister(sub.svc)
|
|
}
|
|
subscribers = append(subscribers, sub)
|
|
}
|
|
h.subscribers[subscriber.topic] = subscribers
|
|
h.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
|
|
h.running = true
|
|
return nil
|
|
}
|
|
|
|
func (h *httpBroker) stop() error {
|
|
ch := make(chan error)
|
|
h.exit <- ch
|
|
return <-ch
|
|
}
|
|
|
|
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
if req.Method != "POST" {
|
|
err := errors.BadRequest("go.micro.broker", "Method not allowed")
|
|
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
defer req.Body.Close()
|
|
|
|
b, err := ioutil.ReadAll(req.Body)
|
|
if err != nil {
|
|
errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error reading request body: %v", err))
|
|
w.WriteHeader(500)
|
|
w.Write([]byte(errr.Error()))
|
|
return
|
|
}
|
|
|
|
var m *Message
|
|
if err = json.Unmarshal(b, &m); err != nil {
|
|
errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err))
|
|
w.WriteHeader(500)
|
|
w.Write([]byte(errr.Error()))
|
|
return
|
|
}
|
|
|
|
topic := m.Header[":topic"]
|
|
delete(m.Header, ":topic")
|
|
|
|
if len(topic) == 0 {
|
|
errr := errors.InternalServerError("go.micro.broker", "Topic not found")
|
|
w.WriteHeader(500)
|
|
w.Write([]byte(errr.Error()))
|
|
return
|
|
}
|
|
|
|
p := &httpPublication{m: m, t: topic}
|
|
h.RLock()
|
|
for _, subscriber := range h.subscribers[topic] {
|
|
subscriber.fn(p)
|
|
}
|
|
h.RUnlock()
|
|
}
|
|
|
|
func (h *httpBroker) Address() string {
|
|
return h.address
|
|
}
|
|
|
|
func (h *httpBroker) Connect() error {
|
|
return h.start()
|
|
}
|
|
|
|
func (h *httpBroker) Disconnect() error {
|
|
return h.stop()
|
|
}
|
|
|
|
func (h *httpBroker) Init(opts ...Option) error {
|
|
if len(h.id) == 0 {
|
|
h.id = "broker-" + uuid.NewUUID().String()
|
|
}
|
|
|
|
http.Handle(DefaultSubPath, h)
|
|
return nil
|
|
}
|
|
|
|
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
|
|
s, err := registry.GetService("topic:" + topic)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
msg.Header[":topic"] = topic
|
|
b, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fn := func(node *registry.Node, b io.Reader) {
|
|
r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", b)
|
|
if err == nil {
|
|
r.Body.Close()
|
|
}
|
|
}
|
|
|
|
buf := bytes.NewBuffer(nil)
|
|
|
|
for _, service := range s {
|
|
// broadcast version means broadcast to all nodes
|
|
if service.Version == broadcastVersion {
|
|
for _, node := range service.Nodes {
|
|
buf.Reset()
|
|
buf.Write(b)
|
|
fn(node, buf)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
node := service.Nodes[rand.Int()%len(service.Nodes)]
|
|
buf.Reset()
|
|
buf.Write(b)
|
|
fn(node, buf)
|
|
return nil
|
|
}
|
|
|
|
buf.Reset()
|
|
buf = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
|
opt := newSubscribeOptions(opts...)
|
|
|
|
// parse address for host, port
|
|
parts := strings.Split(h.Address(), ":")
|
|
host := strings.Join(parts[:len(parts)-1], ":")
|
|
port, _ := strconv.Atoi(parts[len(parts)-1])
|
|
|
|
id := uuid.NewUUID().String()
|
|
|
|
// register service
|
|
node := ®istry.Node{
|
|
Id: topic + "." + h.id + "." + id,
|
|
Address: host,
|
|
Port: port,
|
|
}
|
|
|
|
version := opt.Queue
|
|
if len(version) == 0 {
|
|
version = broadcastVersion
|
|
}
|
|
|
|
service := ®istry.Service{
|
|
Name: "topic:" + topic,
|
|
Version: version,
|
|
Nodes: []*registry.Node{node},
|
|
}
|
|
|
|
subscriber := &httpSubscriber{
|
|
opts: opt,
|
|
id: id,
|
|
topic: topic,
|
|
ch: h.unsubscribe,
|
|
fn: handler,
|
|
svc: service,
|
|
}
|
|
|
|
if err := registry.Register(service); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
h.Lock()
|
|
h.subscribers[topic] = append(h.subscribers[topic], subscriber)
|
|
h.Unlock()
|
|
return subscriber, nil
|
|
}
|
|
|
|
func (h *httpBroker) String() string {
|
|
return "http"
|
|
}
|