optimise http broker with rcache

This commit is contained in:
Asim Aslam 2017-10-26 20:48:11 +01:00
parent 42235bc973
commit bd46e60c13
2 changed files with 163 additions and 113 deletions

View File

@ -2,8 +2,6 @@
package broker
// Broker is an interface used for asynchronous messaging.
// Its an abstraction over various message brokers
// {NATS, RabbitMQ, Kafka, ...}
type Broker interface {
Options() Options
Address() string

View File

@ -3,6 +3,7 @@ package broker
import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
@ -18,8 +19,9 @@ import (
"github.com/micro/go-log"
"github.com/micro/go-micro/broker/codec/json"
"github.com/micro/go-micro/errors"
merr "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-rcache"
maddr "github.com/micro/misc/lib/addr"
mnet "github.com/micro/misc/lib/net"
mls "github.com/micro/misc/lib/tls"
@ -28,20 +30,17 @@ import (
"golang.org/x/net/context"
)
// HTTP Broker is a placeholder for actual message brokers.
// This should not really be used in production but useful
// in developer where you want zero dependencies.
// HTTP Broker is a point to point async broker
type httpBroker struct {
id string
address string
unsubscribe chan *httpSubscriber
opts Options
id string
address string
opts Options
mux *http.ServeMux
c *http.Client
r registry.Registry
c *http.Client
r registry.Registry
rc rcache.Cache
sync.RWMutex
subscribers map[string][]*httpSubscriber
@ -53,9 +52,9 @@ type httpSubscriber struct {
opts SubscribeOptions
id string
topic string
ch chan *httpSubscriber
fn Handler
svc *registry.Service
hb *httpBroker
}
type httpPublication struct {
@ -106,11 +105,13 @@ func newHttpBroker(opts ...Option) Broker {
o(&options)
}
// set address
addr := ":0"
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = options.Addrs[0]
}
// get registry
reg, ok := options.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
@ -123,7 +124,6 @@ func newHttpBroker(opts ...Option) Broker {
r: reg,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
unsubscribe: make(chan *httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
}
@ -153,9 +153,41 @@ func (h *httpSubscriber) Topic() string {
}
func (h *httpSubscriber) Unsubscribe() error {
h.ch <- h
// artificial delay
time.Sleep(time.Millisecond * 10)
return h.hb.unsubscribe(h)
}
func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
return err
}
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
}
func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
var subscribers []*httpSubscriber
// look for subscriber
for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward
if sub.id == s.id {
h.r.Deregister(sub.svc)
continue
}
// keep subscriber
subscribers = append(subscribers, sub)
}
// set subscribers
h.subscribers[s.topic] = subscribers
return nil
}
@ -177,29 +209,75 @@ func (h *httpBroker) run(l net.Listener) {
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.Lock()
h.running = false
h.Unlock()
return
// unsubscribe subscriber
case subscriber := <-h.unsubscribe:
h.Lock()
var subscribers []*httpSubscriber
for _, sub := range h.subscribers[subscriber.topic] {
// deregister and skip forward
if sub.id == subscriber.id {
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Deregister(sub.svc)
continue
}
subscribers = append(subscribers, sub)
}
h.subscribers[subscriber.topic] = subscribers
h.Unlock()
h.RUnlock()
return
}
}
}
func (h *httpBroker) start() error {
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := merr.BadRequest("go.micro.broker", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
req.ParseForm()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
var m *Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := merr.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
p := &httpPublication{m: m, t: topic}
id := req.Form.Get("id")
h.RLock()
for _, subscriber := range h.subscribers[topic] {
if id == subscriber.id {
// sub is sync; crufty rate limiting
// so we don't hose the cpu
subscriber.fn(p)
}
}
h.RUnlock()
}
func (h *httpBroker) Address() string {
h.RLock()
defer h.RUnlock()
return h.address
}
func (h *httpBroker) Connect() error {
h.Lock()
defer h.Unlock()
@ -255,11 +333,20 @@ func (h *httpBroker) start() error {
go http.Serve(l, h.mux)
go h.run(l)
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
// set rcache
h.r = rcache.New(reg)
// set running
h.running = true
return nil
}
func (h *httpBroker) stop() error {
func (h *httpBroker) Disconnect() error {
h.Lock()
defer h.Unlock()
@ -267,76 +354,30 @@ func (h *httpBroker) stop() error {
return nil
}
// stop rcache
rc, ok := h.r.(rcache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
}
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := errors.BadRequest("go.micro.broker", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
req.ParseForm()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := errors.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
var m *Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := errors.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := errors.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
p := &httpPublication{m: m, t: topic}
id := req.Form.Get("id")
h.RLock()
for _, subscriber := range h.subscribers[topic] {
if id == subscriber.id {
// sub is sync; crufty rate limiting
// so we don't hose the cpu
subscriber.fn(p)
}
}
h.RUnlock()
}
func (h *httpBroker) Address() string {
return h.address
}
func (h *httpBroker) Connect() error {
return h.start()
}
func (h *httpBroker) Disconnect() error {
return h.stop()
}
func (h *httpBroker) Init(opts ...Option) error {
h.Lock()
defer h.Unlock()
if h.running {
return errors.New("cannot init while connected")
}
for _, o := range opts {
o(&h.opts)
}
@ -345,12 +386,19 @@ func (h *httpBroker) Init(opts ...Option) error {
h.id = "broker-" + uuid.NewUUID().String()
}
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
h.r = reg
// get rcache
if rc, ok := h.r.(rcache.Cache); ok {
rc.Stop()
}
// set registry
h.r = rcache.New(reg)
return nil
}
@ -360,10 +408,13 @@ func (h *httpBroker) Options() Options {
}
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
m := &Message{
Header: make(map[string]string),
@ -381,7 +432,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
return err
}
fn := func(node *registry.Node, b []byte) {
pub := func(node *registry.Node, b []byte) {
scheme := "http"
// check if secure is added in metadata
@ -411,15 +462,14 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
case broadcastVersion:
for _, node := range service.Nodes {
// publish async
go fn(node, b)
go pub(node, b)
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async
go fn(node, b)
go pub(node, b)
}
}
@ -427,7 +477,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
}
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
opt := newSubscribeOptions(opts...)
options := newSubscribeOptions(opts...)
// parse address for host, port
parts := strings.Split(h.Address(), ":")
@ -439,7 +489,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
return nil, err
}
id := uuid.NewUUID().String()
// create unique id
id := h.id + "." + uuid.NewUUID().String()
var secure bool
@ -449,7 +500,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
// register service
node := &registry.Node{
Id: h.id + "." + id,
Id: id,
Address: addr,
Port: port,
Metadata: map[string]string{
@ -457,7 +508,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
},
}
version := opt.Queue
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
@ -468,22 +520,22 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: opt,
id: h.id + "." + id,
opts: options,
hb: h,
id: id,
topic: topic,
ch: h.unsubscribe,
fn: handler,
svc: service,
}
if err := h.r.Register(service, registry.RegisterTTL(registerTTL)); err != nil {
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
h.Lock()
h.subscribers[topic] = append(h.subscribers[topic], subscriber)
h.Unlock()
// return the subscriber
return subscriber, nil
}