micro/broker/http_broker.go

387 lines
7.3 KiB
Go
Raw Normal View History

package broker
import (
"bytes"
2016-01-17 01:13:02 +03:00
"crypto/tls"
"encoding/json"
"fmt"
2015-12-23 23:05:47 +03:00
"io"
"io/ioutil"
2015-12-23 23:05:47 +03:00
"math/rand"
"net"
"net/http"
2016-01-17 01:13:02 +03:00
"net/url"
"runtime"
"strconv"
"strings"
"sync"
2015-12-23 23:05:47 +03:00
"time"
log "github.com/golang/glog"
2015-11-20 19:17:33 +03:00
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
mls "github.com/micro/misc/lib/tls"
2015-08-26 14:15:37 +03:00
"github.com/pborman/uuid"
)
2015-12-23 22:07:26 +03:00
// HTTP Broker is a placeholder for actual message brokers.
// This should not really be used in production but useful
// in developer where you want zero dependencies.
type httpBroker struct {
id string
address string
unsubscribe chan *httpSubscriber
opts Options
2016-01-17 01:13:02 +03:00
c *http.Client
sync.RWMutex
subscribers map[string][]*httpSubscriber
running bool
exit chan chan error
}
type httpSubscriber struct {
2015-12-23 22:07:26 +03:00
opts SubscribeOptions
id string
topic string
ch chan *httpSubscriber
fn Handler
svc *registry.Service
}
2015-12-23 22:07:26 +03:00
type httpPublication struct {
m *Message
t string
}
var (
2015-12-23 23:05:47 +03:00
DefaultSubPath = "/_sub"
broadcastVersion = "ff.http.broadcast"
)
2015-12-23 23:05:47 +03:00
func init() {
rand.Seed(time.Now().Unix())
}
2016-01-17 01:13:02 +03:00
func newTransport() *http.Transport {
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
runtime.SetFinalizer(&t, func(tr **http.Transport) {
(*tr).CloseIdleConnections()
})
return t
}
func newHttpBroker(addrs []string, opts ...Option) Broker {
var options Options
for _, o := range opts {
o(&options)
}
addr := ":0"
if len(addrs) > 0 && len(addrs[0]) > 0 {
addr = addrs[0]
}
return &httpBroker{
id: "broker-" + uuid.NewUUID().String(),
address: addr,
opts: options,
2016-01-17 01:13:02 +03:00
c: &http.Client{Transport: newTransport()},
subscribers: make(map[string][]*httpSubscriber),
unsubscribe: make(chan *httpSubscriber),
exit: make(chan chan error),
}
}
2015-12-23 22:07:26 +03:00
func (h *httpPublication) Ack() error {
return nil
}
func (h *httpPublication) Message() *Message {
return h.m
}
func (h *httpPublication) Topic() string {
return h.t
}
func (h *httpSubscriber) Options() SubscribeOptions {
2015-12-23 22:07:26 +03:00
return h.opts
}
func (h *httpSubscriber) Topic() string {
return h.topic
}
func (h *httpSubscriber) Unsubscribe() error {
h.ch <- h
// artificial delay
time.Sleep(time.Millisecond * 10)
return nil
}
func (h *httpBroker) start() error {
h.Lock()
defer h.Unlock()
if h.running {
return nil
}
var l net.Listener
var err error
if h.opts.Secure {
cert, err := mls.Certificate(h.address)
if err != nil {
return err
}
l, err = tls.Listen("tcp", h.address, &tls.Config{Certificates: []tls.Certificate{cert}})
} else {
l, err = net.Listen("tcp", h.address)
}
if err != nil {
return err
}
log.Infof("Broker Listening on %s", l.Addr().String())
h.address = l.Addr().String()
go http.Serve(l, h)
go func() {
for {
select {
case ch := <-h.exit:
ch <- l.Close()
h.Lock()
h.running = false
h.Unlock()
return
case subscriber := <-h.unsubscribe:
h.Lock()
var subscribers []*httpSubscriber
for _, sub := range h.subscribers[subscriber.topic] {
if sub.id == subscriber.id {
registry.Deregister(sub.svc)
}
subscribers = append(subscribers, sub)
}
h.subscribers[subscriber.topic] = subscribers
h.Unlock()
}
}
}()
h.running = true
return nil
}
func (h *httpBroker) stop() error {
ch := make(chan error)
h.exit <- ch
return <-ch
}
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := errors.BadRequest("go.micro.broker", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
2016-01-17 01:13:02 +03:00
req.ParseForm()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error reading request body: %v", err))
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
var m *Message
if err = json.Unmarshal(b, &m); err != nil {
errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err))
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := errors.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
2015-12-23 22:07:26 +03:00
p := &httpPublication{m: m, t: topic}
2016-01-17 01:13:02 +03:00
id := req.Form.Get("id")
h.RLock()
for _, subscriber := range h.subscribers[topic] {
2016-01-17 01:13:02 +03:00
if id == subscriber.id {
subscriber.fn(p)
}
}
h.RUnlock()
}
func (h *httpBroker) Address() string {
return h.address
}
func (h *httpBroker) Connect() error {
return h.start()
}
func (h *httpBroker) Disconnect() error {
return h.stop()
}
2015-12-23 22:07:26 +03:00
func (h *httpBroker) Init(opts ...Option) error {
var options Options
for _, o := range opts {
o(&options)
}
h.opts = options
if len(h.id) == 0 {
h.id = "broker-" + uuid.NewUUID().String()
}
http.Handle(DefaultSubPath, h)
return nil
}
func (h *httpBroker) Options() Options {
return h.opts
}
2015-12-23 22:07:26 +03:00
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
s, err := registry.GetService("topic:" + topic)
if err != nil {
return err
}
msg.Header[":topic"] = topic
b, err := json.Marshal(msg)
if err != nil {
return err
}
2015-12-23 23:05:47 +03:00
fn := func(node *registry.Node, b io.Reader) {
2016-01-17 01:13:02 +03:00
scheme := "http"
2016-01-17 01:13:02 +03:00
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", b)
2015-12-23 23:05:47 +03:00
if err == nil {
r.Body.Close()
}
}
buf := bytes.NewBuffer(nil)
2015-11-08 04:48:48 +03:00
for _, service := range s {
2015-12-23 23:05:47 +03:00
// broadcast version means broadcast to all nodes
if service.Version == broadcastVersion {
for _, node := range service.Nodes {
buf.Reset()
buf.Write(b)
fn(node, buf)
2015-11-08 04:48:48 +03:00
}
2015-12-23 23:05:47 +03:00
return nil
}
2015-12-23 23:05:47 +03:00
node := service.Nodes[rand.Int()%len(service.Nodes)]
buf.Reset()
buf.Write(b)
fn(node, buf)
return nil
}
2015-12-23 23:05:47 +03:00
buf.Reset()
buf = nil
return nil
}
2015-12-23 22:07:26 +03:00
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
opt := newSubscribeOptions(opts...)
// parse address for host, port
parts := strings.Split(h.Address(), ":")
host := strings.Join(parts[:len(parts)-1], ":")
port, _ := strconv.Atoi(parts[len(parts)-1])
id := uuid.NewUUID().String()
// register service
node := &registry.Node{
2016-01-17 01:13:02 +03:00
Id: h.id + "." + id,
Address: host,
Port: port,
2016-01-17 01:13:02 +03:00
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", h.opts.Secure),
},
}
2015-12-23 23:05:47 +03:00
version := opt.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := &registry.Service{
2015-12-23 23:05:47 +03:00
Name: "topic:" + topic,
Version: version,
Nodes: []*registry.Node{node},
}
subscriber := &httpSubscriber{
2015-12-23 22:07:26 +03:00
opts: opt,
2016-01-17 01:13:02 +03:00
id: h.id + "." + id,
topic: topic,
ch: h.unsubscribe,
fn: handler,
svc: service,
}
if err := registry.Register(service); err != nil {
return nil, err
}
h.Lock()
h.subscribers[topic] = append(h.subscribers[topic], subscriber)
h.Unlock()
return subscriber, nil
}
2015-12-20 00:56:14 +03:00
func (h *httpBroker) String() string {
return "http"
}