From bd46e60c137af4300e0ce6ea76baed02166f79ea Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 26 Oct 2017 20:48:11 +0100 Subject: [PATCH] optimise http broker with rcache --- broker/broker.go | 2 - broker/http_broker.go | 274 +++++++++++++++++++++++++----------------- 2 files changed, 163 insertions(+), 113 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index d2e60d2e..39db9f17 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -2,8 +2,6 @@ package broker // Broker is an interface used for asynchronous messaging. -// Its an abstraction over various message brokers -// {NATS, RabbitMQ, Kafka, ...} type Broker interface { Options() Options Address() string diff --git a/broker/http_broker.go b/broker/http_broker.go index 98abac97..cbccd08a 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -3,6 +3,7 @@ package broker import ( "bytes" "crypto/tls" + "errors" "fmt" "io" "io/ioutil" @@ -18,8 +19,9 @@ import ( "github.com/micro/go-log" "github.com/micro/go-micro/broker/codec/json" - "github.com/micro/go-micro/errors" + merr "github.com/micro/go-micro/errors" "github.com/micro/go-micro/registry" + "github.com/micro/go-rcache" maddr "github.com/micro/misc/lib/addr" mnet "github.com/micro/misc/lib/net" mls "github.com/micro/misc/lib/tls" @@ -28,20 +30,17 @@ import ( "golang.org/x/net/context" ) -// HTTP Broker is a placeholder for actual message brokers. -// This should not really be used in production but useful -// in developer where you want zero dependencies. - +// HTTP Broker is a point to point async broker type httpBroker struct { - id string - address string - unsubscribe chan *httpSubscriber - opts Options + id string + address string + opts Options mux *http.ServeMux - c *http.Client - r registry.Registry + c *http.Client + r registry.Registry + rc rcache.Cache sync.RWMutex subscribers map[string][]*httpSubscriber @@ -53,9 +52,9 @@ type httpSubscriber struct { opts SubscribeOptions id string topic string - ch chan *httpSubscriber fn Handler svc *registry.Service + hb *httpBroker } type httpPublication struct { @@ -106,11 +105,13 @@ func newHttpBroker(opts ...Option) Broker { o(&options) } + // set address addr := ":0" if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { addr = options.Addrs[0] } + // get registry reg, ok := options.Context.Value(registryKey).(registry.Registry) if !ok { reg = registry.DefaultRegistry @@ -123,7 +124,6 @@ func newHttpBroker(opts ...Option) Broker { r: reg, c: &http.Client{Transport: newTransport(options.TLSConfig)}, subscribers: make(map[string][]*httpSubscriber), - unsubscribe: make(chan *httpSubscriber), exit: make(chan chan error), mux: http.NewServeMux(), } @@ -153,9 +153,41 @@ func (h *httpSubscriber) Topic() string { } func (h *httpSubscriber) Unsubscribe() error { - h.ch <- h - // artificial delay - time.Sleep(time.Millisecond * 10) + return h.hb.unsubscribe(h) +} + +func (h *httpBroker) subscribe(s *httpSubscriber) error { + h.Lock() + defer h.Unlock() + + if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { + return err + } + + h.subscribers[s.topic] = append(h.subscribers[s.topic], s) + return nil +} + +func (h *httpBroker) unsubscribe(s *httpSubscriber) error { + h.Lock() + defer h.Unlock() + + var subscribers []*httpSubscriber + + // look for subscriber + for _, sub := range h.subscribers[s.topic] { + // deregister and skip forward + if sub.id == s.id { + h.r.Deregister(sub.svc) + continue + } + // keep subscriber + subscribers = append(subscribers, sub) + } + + // set subscribers + h.subscribers[s.topic] = subscribers + return nil } @@ -177,29 +209,75 @@ func (h *httpBroker) run(l net.Listener) { // received exit signal case ch := <-h.exit: ch <- l.Close() - h.Lock() - h.running = false - h.Unlock() - return - // unsubscribe subscriber - case subscriber := <-h.unsubscribe: - h.Lock() - var subscribers []*httpSubscriber - for _, sub := range h.subscribers[subscriber.topic] { - // deregister and skip forward - if sub.id == subscriber.id { + h.RLock() + for _, subs := range h.subscribers { + for _, sub := range subs { h.r.Deregister(sub.svc) - continue } - subscribers = append(subscribers, sub) } - h.subscribers[subscriber.topic] = subscribers - h.Unlock() + h.RUnlock() + return } } } -func (h *httpBroker) start() error { +func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != "POST" { + err := merr.BadRequest("go.micro.broker", "Method not allowed") + http.Error(w, err.Error(), http.StatusMethodNotAllowed) + return + } + defer req.Body.Close() + + req.ParseForm() + + b, err := ioutil.ReadAll(req.Body) + if err != nil { + errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err) + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + var m *Message + if err = h.opts.Codec.Unmarshal(b, &m); err != nil { + errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + topic := m.Header[":topic"] + delete(m.Header, ":topic") + + if len(topic) == 0 { + errr := merr.InternalServerError("go.micro.broker", "Topic not found") + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + p := &httpPublication{m: m, t: topic} + id := req.Form.Get("id") + + h.RLock() + for _, subscriber := range h.subscribers[topic] { + if id == subscriber.id { + // sub is sync; crufty rate limiting + // so we don't hose the cpu + subscriber.fn(p) + } + } + h.RUnlock() +} + +func (h *httpBroker) Address() string { + h.RLock() + defer h.RUnlock() + return h.address +} + +func (h *httpBroker) Connect() error { h.Lock() defer h.Unlock() @@ -255,11 +333,20 @@ func (h *httpBroker) start() error { go http.Serve(l, h.mux) go h.run(l) + // get registry + reg, ok := h.opts.Context.Value(registryKey).(registry.Registry) + if !ok { + reg = registry.DefaultRegistry + } + // set rcache + h.r = rcache.New(reg) + + // set running h.running = true return nil } -func (h *httpBroker) stop() error { +func (h *httpBroker) Disconnect() error { h.Lock() defer h.Unlock() @@ -267,76 +354,30 @@ func (h *httpBroker) stop() error { return nil } + // stop rcache + rc, ok := h.r.(rcache.Cache) + if ok { + rc.Stop() + } + + // exit and return err ch := make(chan error) h.exit <- ch err := <-ch + + // set not running h.running = false return err } -func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - err := errors.BadRequest("go.micro.broker", "Method not allowed") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - return - } - defer req.Body.Close() - - req.ParseForm() - - b, err := ioutil.ReadAll(req.Body) - if err != nil { - errr := errors.InternalServerError("go.micro.broker", "Error reading request body: %v", err) - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - var m *Message - if err = h.opts.Codec.Unmarshal(b, &m); err != nil { - errr := errors.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - topic := m.Header[":topic"] - delete(m.Header, ":topic") - - if len(topic) == 0 { - errr := errors.InternalServerError("go.micro.broker", "Topic not found") - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - p := &httpPublication{m: m, t: topic} - id := req.Form.Get("id") - - h.RLock() - for _, subscriber := range h.subscribers[topic] { - if id == subscriber.id { - // sub is sync; crufty rate limiting - // so we don't hose the cpu - subscriber.fn(p) - } - } - h.RUnlock() -} - -func (h *httpBroker) Address() string { - return h.address -} - -func (h *httpBroker) Connect() error { - return h.start() -} - -func (h *httpBroker) Disconnect() error { - return h.stop() -} - func (h *httpBroker) Init(opts ...Option) error { + h.Lock() + defer h.Unlock() + + if h.running { + return errors.New("cannot init while connected") + } + for _, o := range opts { o(&h.opts) } @@ -345,12 +386,19 @@ func (h *httpBroker) Init(opts ...Option) error { h.id = "broker-" + uuid.NewUUID().String() } + // get registry reg, ok := h.opts.Context.Value(registryKey).(registry.Registry) if !ok { reg = registry.DefaultRegistry } - h.r = reg + // get rcache + if rc, ok := h.r.(rcache.Cache); ok { + rc.Stop() + } + + // set registry + h.r = rcache.New(reg) return nil } @@ -360,10 +408,13 @@ func (h *httpBroker) Options() Options { } func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { + h.RLock() s, err := h.r.GetService("topic:" + topic) if err != nil { + h.RUnlock() return err } + h.RUnlock() m := &Message{ Header: make(map[string]string), @@ -381,7 +432,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) return err } - fn := func(node *registry.Node, b []byte) { + pub := func(node *registry.Node, b []byte) { scheme := "http" // check if secure is added in metadata @@ -411,15 +462,14 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) case broadcastVersion: for _, node := range service.Nodes { // publish async - go fn(node, b) + go pub(node, b) } - default: // select node to publish to node := service.Nodes[rand.Int()%len(service.Nodes)] // publish async - go fn(node, b) + go pub(node, b) } } @@ -427,7 +477,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) } func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { - opt := newSubscribeOptions(opts...) + options := newSubscribeOptions(opts...) // parse address for host, port parts := strings.Split(h.Address(), ":") @@ -439,7 +489,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO return nil, err } - id := uuid.NewUUID().String() + // create unique id + id := h.id + "." + uuid.NewUUID().String() var secure bool @@ -449,7 +500,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO // register service node := ®istry.Node{ - Id: h.id + "." + id, + Id: id, Address: addr, Port: port, Metadata: map[string]string{ @@ -457,7 +508,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO }, } - version := opt.Queue + // check for queue group or broadcast queue + version := options.Queue if len(version) == 0 { version = broadcastVersion } @@ -468,22 +520,22 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO Nodes: []*registry.Node{node}, } + // generate subscriber subscriber := &httpSubscriber{ - opts: opt, - id: h.id + "." + id, + opts: options, + hb: h, + id: id, topic: topic, - ch: h.unsubscribe, fn: handler, svc: service, } - if err := h.r.Register(service, registry.RegisterTTL(registerTTL)); err != nil { + // subscribe now + if err := h.subscribe(subscriber); err != nil { return nil, err } - h.Lock() - h.subscribers[topic] = append(h.subscribers[topic], subscriber) - h.Unlock() + // return the subscriber return subscriber, nil }