micro/broker/http_broker.go

681 lines
12 KiB
Go
Raw Normal View History

package broker
import (
"bytes"
2018-03-03 11:53:52 +00:00
"context"
2016-01-16 22:13:02 +00:00
"crypto/tls"
2017-10-26 20:48:11 +01:00
"errors"
"fmt"
2015-12-23 20:05:47 +00:00
"io"
"io/ioutil"
2015-12-23 20:05:47 +00:00
"math/rand"
"net"
"net/http"
2016-01-16 22:13:02 +00:00
"net/url"
"runtime"
"strconv"
"strings"
"sync"
2015-12-23 20:05:47 +00:00
"time"
"github.com/google/uuid"
2019-01-10 09:42:02 +00:00
"github.com/micro/go-micro/codec/json"
2017-10-26 20:48:11 +01:00
merr "github.com/micro/go-micro/errors"
2015-11-20 16:17:33 +00:00
"github.com/micro/go-micro/registry"
2019-05-30 23:52:10 +01:00
maddr "github.com/micro/go-micro/util/addr"
mnet "github.com/micro/go-micro/util/net"
mls "github.com/micro/go-micro/util/tls"
2017-10-26 20:48:11 +01:00
"github.com/micro/go-rcache"
2018-11-29 12:10:33 +00:00
"golang.org/x/net/http2"
)
2017-10-26 20:48:11 +01:00
// HTTP Broker is a point to point async broker
type httpBroker struct {
2017-10-26 20:48:11 +01:00
id string
address string
opts Options
mux *http.ServeMux
2017-10-28 13:55:59 +01:00
c *http.Client
r registry.Registry
2016-01-16 22:13:02 +00:00
sync.RWMutex
subscribers map[string][]*httpSubscriber
running bool
exit chan chan error
2019-01-02 19:27:46 +00:00
// offline message inbox
mtx sync.RWMutex
inbox map[string][][]byte
}
type httpSubscriber struct {
2015-12-23 19:07:26 +00:00
opts SubscribeOptions
id string
topic string
fn Handler
svc *registry.Service
2017-10-26 20:48:11 +01:00
hb *httpBroker
}
2015-12-23 19:07:26 +00:00
type httpPublication struct {
m *Message
t string
}
var (
2015-12-23 20:05:47 +00:00
DefaultSubPath = "/_sub"
broadcastVersion = "ff.http.broadcast"
registerTTL = time.Minute
registerInterval = time.Second * 30
)
2015-12-23 20:05:47 +00:00
func init() {
rand.Seed(time.Now().Unix())
}
func newTransport(config *tls.Config) *http.Transport {
if config == nil {
config = &tls.Config{
InsecureSkipVerify: true,
}
}
2018-11-29 12:10:33 +00:00
dialTLS := func(network string, addr string) (net.Conn, error) {
return tls.Dial(network, addr, config)
}
2016-01-16 22:13:02 +00:00
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
2018-11-29 12:10:33 +00:00
DialTLS: dialTLS,
2016-01-16 22:13:02 +00:00
}
runtime.SetFinalizer(&t, func(tr **http.Transport) {
(*tr).CloseIdleConnections()
})
2018-11-29 12:10:33 +00:00
// setup http2
http2.ConfigureTransport(t)
2016-01-16 22:13:02 +00:00
return t
}
2016-03-15 22:12:28 +00:00
func newHttpBroker(opts ...Option) Broker {
2016-01-20 15:22:44 +00:00
options := Options{
2019-01-10 09:42:02 +00:00
Codec: json.Marshaler{},
2016-01-20 15:22:44 +00:00
Context: context.TODO(),
}
for _, o := range opts {
o(&options)
}
2017-10-26 20:48:11 +01:00
// set address
addr := ":0"
2016-03-15 22:12:28 +00:00
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = options.Addrs[0]
}
2017-10-26 20:48:11 +01:00
// get registry
2016-01-20 15:22:44 +00:00
reg, ok := options.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
h := &httpBroker{
id: "broker-" + uuid.New().String(),
address: addr,
opts: options,
2016-01-20 15:22:44 +00:00
r: reg,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
2019-01-02 19:27:46 +00:00
inbox: make(map[string][][]byte),
}
2018-11-18 20:40:43 +00:00
// specify the message handler
h.mux.Handle(DefaultSubPath, h)
2018-11-18 20:40:43 +00:00
// get optional handlers
if h.opts.Context != nil {
handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
h.mux.Handle(pattern, handler)
}
}
}
return h
}
2015-12-23 19:07:26 +00:00
func (h *httpPublication) Ack() error {
return nil
}
func (h *httpPublication) Message() *Message {
return h.m
}
func (h *httpPublication) Topic() string {
return h.t
}
func (h *httpSubscriber) Options() SubscribeOptions {
2015-12-23 19:07:26 +00:00
return h.opts
}
func (h *httpSubscriber) Topic() string {
return h.topic
}
func (h *httpSubscriber) Unsubscribe() error {
2017-10-26 20:48:11 +01:00
return h.hb.unsubscribe(h)
}
2019-01-02 19:27:46 +00:00
func (h *httpBroker) saveMessage(topic string, msg []byte) {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c := h.inbox[topic]
// save message
c = append(c, msg)
// max length 64
if len(c) > 64 {
c = c[:64]
}
// save inbox
h.inbox[topic] = c
}
func (h *httpBroker) getMessage(topic string, num int) [][]byte {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c, ok := h.inbox[topic]
if !ok {
return nil
}
// more message than requests
if len(c) >= num {
msg := c[:num]
h.inbox[topic] = c[num:]
return msg
}
// reset inbox
h.inbox[topic] = nil
// return all messages
return c
}
2017-10-26 20:48:11 +01:00
func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
return err
}
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
}
func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
var subscribers []*httpSubscriber
// look for subscriber
for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward
if sub.id == s.id {
2018-11-13 11:56:21 +03:00
_ = h.r.Deregister(sub.svc)
2017-10-26 20:48:11 +01:00
continue
}
// keep subscriber
subscribers = append(subscribers, sub)
}
// set subscribers
h.subscribers[s.topic] = subscribers
return nil
}
func (h *httpBroker) run(l net.Listener) {
t := time.NewTicker(registerInterval)
defer t.Stop()
for {
select {
// heartbeat for each subscriber
case <-t.C:
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
2018-11-13 11:56:21 +03:00
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
2017-10-26 20:48:11 +01:00
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
2018-11-13 11:56:21 +03:00
_ = h.r.Deregister(sub.svc)
}
}
2017-10-26 20:48:11 +01:00
h.RUnlock()
return
}
}
}
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := merr.BadRequest("go.micro.broker", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
req.ParseForm()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
var m *Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := merr.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
p := &httpPublication{m: m, t: topic}
id := req.Form.Get("id")
h.RLock()
for _, subscriber := range h.subscribers[topic] {
if id == subscriber.id {
// sub is sync; crufty rate limiting
// so we don't hose the cpu
subscriber.fn(p)
}
}
2017-10-26 20:48:11 +01:00
h.RUnlock()
}
func (h *httpBroker) Address() string {
h.RLock()
defer h.RUnlock()
return h.address
}
2017-10-26 20:48:11 +01:00
func (h *httpBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
2016-01-16 23:39:47 +00:00
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
2016-01-16 23:39:47 +00:00
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {
return err
}
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {
h.run(l)
h.Lock()
h.address = addr
h.Unlock()
}()
2017-10-26 20:48:11 +01:00
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
// set rcache
h.r = rcache.New(reg)
// set running
h.running = true
return nil
}
2017-10-26 20:48:11 +01:00
func (h *httpBroker) Disconnect() error {
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
2017-10-26 20:48:11 +01:00
// stop rcache
rc, ok := h.r.(rcache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
2017-10-26 20:48:11 +01:00
// set not running
h.running = false
return err
}
2017-10-26 20:48:11 +01:00
func (h *httpBroker) Init(opts ...Option) error {
h.RLock()
2017-10-26 20:48:11 +01:00
if h.running {
h.RUnlock()
2017-10-26 20:48:11 +01:00
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
2016-01-16 23:10:14 +00:00
o(&h.opts)
}
2018-08-18 17:28:58 +01:00
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "broker-" + uuid.New().String()
}
2017-10-26 20:48:11 +01:00
// get registry
2016-01-20 15:22:44 +00:00
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
2017-10-26 20:48:11 +01:00
// get rcache
if rc, ok := h.r.(rcache.Cache); ok {
rc.Stop()
}
// set registry
h.r = rcache.New(reg)
2016-01-20 15:22:44 +00:00
2018-11-29 12:10:33 +00:00
// reconfigure tls config
if c := h.opts.TLSConfig; c != nil {
h.c = &http.Client{
Transport: newTransport(c),
}
}
return nil
}
func (h *httpBroker) Options() Options {
return h.opts
}
2015-12-23 19:07:26 +00:00
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
2019-01-02 19:27:46 +00:00
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
2016-02-21 23:52:08 +00:00
}
m.Header[":topic"] = topic
2019-01-02 19:27:46 +00:00
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
2019-01-02 19:27:46 +00:00
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
// ignore error
return nil
}
h.RUnlock()
2019-01-03 11:23:06 +00:00
pub := func(node *registry.Node, t string, b []byte) error {
2016-01-16 22:13:02 +00:00
scheme := "http"
2016-01-16 22:13:02 +00:00
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
2016-01-16 22:13:02 +00:00
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
2019-01-02 19:27:46 +00:00
if err != nil {
2019-01-03 11:23:06 +00:00
return err
2015-12-23 20:05:47 +00:00
}
2019-01-02 19:27:46 +00:00
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
2019-01-03 11:23:06 +00:00
return nil
2015-12-23 20:05:47 +00:00
}
2019-01-02 19:27:46 +00:00
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
// only process if we have nodes
if len(service.Nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
2019-01-03 11:23:06 +00:00
var success bool
// publish to all nodes
2019-01-02 19:27:46 +00:00
for _, node := range service.Nodes {
// publish async
2019-01-03 11:23:06 +00:00
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
2019-01-02 19:27:46 +00:00
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
2019-01-03 11:23:06 +00:00
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
2015-11-08 01:48:48 +00:00
}
}
}
2015-12-23 20:05:47 +00:00
2019-01-02 19:27:46 +00:00
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
}
2015-12-23 19:07:26 +00:00
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
2018-11-30 17:32:48 +00:00
options := NewSubscribeOptions(opts...)
2015-12-23 19:07:26 +00:00
// parse address for host, port
parts := strings.Split(h.Address(), ":")
host := strings.Join(parts[:len(parts)-1], ":")
port, _ := strconv.Atoi(parts[len(parts)-1])
addr, err := maddr.Extract(host)
2016-02-15 21:57:17 +00:00
if err != nil {
return nil, err
}
2017-10-26 20:48:11 +01:00
// create unique id
id := h.id + "." + uuid.New().String()
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
}
// register service
node := &registry.Node{
2017-10-26 20:48:11 +01:00
Id: id,
2016-02-15 21:57:17 +00:00
Address: addr,
Port: port,
2016-01-16 22:13:02 +00:00
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
2016-01-16 22:13:02 +00:00
},
}
2017-10-26 20:48:11 +01:00
// check for queue group or broadcast queue
version := options.Queue
2015-12-23 20:05:47 +00:00
if len(version) == 0 {
version = broadcastVersion
}
service := &registry.Service{
2015-12-23 20:05:47 +00:00
Name: "topic:" + topic,
Version: version,
Nodes: []*registry.Node{node},
}
2017-10-26 20:48:11 +01:00
// generate subscriber
subscriber := &httpSubscriber{
2017-10-26 20:48:11 +01:00
opts: options,
hb: h,
id: id,
topic: topic,
fn: handler,
svc: service,
}
2017-10-26 20:48:11 +01:00
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
2017-10-26 20:48:11 +01:00
// return the subscriber
return subscriber, nil
}
2015-12-19 21:56:14 +00:00
func (h *httpBroker) String() string {
return "http"
}