Fix the broker and add secure option
This commit is contained in:
parent
60ee085cbc
commit
36e709c9f7
@ -2,6 +2,7 @@ package broker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -9,6 +10,8 @@ import (
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -30,6 +33,8 @@ type httpBroker struct {
|
||||
unsubscribe chan *httpSubscriber
|
||||
opts Options
|
||||
|
||||
c *http.Client
|
||||
|
||||
sync.RWMutex
|
||||
subscribers map[string][]*httpSubscriber
|
||||
running bool
|
||||
@ -59,6 +64,24 @@ func init() {
|
||||
rand.Seed(time.Now().Unix())
|
||||
}
|
||||
|
||||
func newTransport() *http.Transport {
|
||||
t := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
}
|
||||
runtime.SetFinalizer(&t, func(tr **http.Transport) {
|
||||
(*tr).CloseIdleConnections()
|
||||
})
|
||||
return t
|
||||
}
|
||||
|
||||
func newHttpBroker(addrs []string, opt ...Option) Broker {
|
||||
addr := ":0"
|
||||
if len(addrs) > 0 && len(addrs[0]) > 0 {
|
||||
@ -68,6 +91,7 @@ func newHttpBroker(addrs []string, opt ...Option) Broker {
|
||||
return &httpBroker{
|
||||
id: "broker-" + uuid.NewUUID().String(),
|
||||
address: addr,
|
||||
c: &http.Client{Transport: newTransport()},
|
||||
subscribers: make(map[string][]*httpSubscriber),
|
||||
unsubscribe: make(chan *httpSubscriber),
|
||||
exit: make(chan chan error),
|
||||
@ -159,6 +183,8 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
defer req.Body.Close()
|
||||
|
||||
req.ParseForm()
|
||||
|
||||
b, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error reading request body: %v", err))
|
||||
@ -186,9 +212,13 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
|
||||
p := &httpPublication{m: m, t: topic}
|
||||
id := req.Form.Get("id")
|
||||
|
||||
h.RLock()
|
||||
for _, subscriber := range h.subscribers[topic] {
|
||||
subscriber.fn(p)
|
||||
if id == subscriber.id {
|
||||
subscriber.fn(p)
|
||||
}
|
||||
}
|
||||
h.RUnlock()
|
||||
}
|
||||
@ -231,7 +261,16 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
}
|
||||
|
||||
fn := func(node *registry.Node, b io.Reader) {
|
||||
r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", b)
|
||||
scheme := "http"
|
||||
// check if secure is added in metadata
|
||||
if node.Metadata["secure"] == "true" {
|
||||
scheme = "https"
|
||||
}
|
||||
|
||||
vals := url.Values{}
|
||||
vals.Add("id", node.Id)
|
||||
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
|
||||
r, err := h.c.Post(uri, "application/json", b)
|
||||
if err == nil {
|
||||
r.Body.Close()
|
||||
}
|
||||
@ -275,9 +314,12 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
|
||||
// register service
|
||||
node := ®istry.Node{
|
||||
Id: topic + "." + h.id + "." + id,
|
||||
Id: h.id + "." + id,
|
||||
Address: host,
|
||||
Port: port,
|
||||
Metadata: map[string]string{
|
||||
"secure": fmt.Sprintf("%t", h.opts.Secure),
|
||||
},
|
||||
}
|
||||
|
||||
version := opt.Queue
|
||||
@ -293,7 +335,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
|
||||
subscriber := &httpSubscriber{
|
||||
opts: opt,
|
||||
id: id,
|
||||
id: h.id + "." + id,
|
||||
topic: topic,
|
||||
ch: h.unsubscribe,
|
||||
fn: handler,
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Secure bool
|
||||
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
@ -37,6 +38,18 @@ type PublishOption func(*PublishOptions)
|
||||
|
||||
type SubscribeOption func(*SubscribeOptions)
|
||||
|
||||
func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||
opt := SubscribeOptions{
|
||||
AutoAck: true,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
return opt
|
||||
}
|
||||
|
||||
// DisableAutoAck will disable auto acking of messages
|
||||
// after they have been handled.
|
||||
func DisableAutoAck() SubscribeOption {
|
||||
@ -52,14 +65,9 @@ func QueueName(name string) SubscribeOption {
|
||||
}
|
||||
}
|
||||
|
||||
func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||
opt := SubscribeOptions{
|
||||
AutoAck: true,
|
||||
// Secure communication with the broker
|
||||
func Secure(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.Secure = b
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
return opt
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user