diff --git a/broker/broker.go b/broker/broker.go index d2e60d2e..39db9f17 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -2,8 +2,6 @@ package broker // Broker is an interface used for asynchronous messaging. -// Its an abstraction over various message brokers -// {NATS, RabbitMQ, Kafka, ...} type Broker interface { Options() Options Address() string diff --git a/broker/http_broker.go b/broker/http_broker.go index 98abac97..2adee5b3 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -3,6 +3,7 @@ package broker import ( "bytes" "crypto/tls" + "errors" "fmt" "io" "io/ioutil" @@ -18,8 +19,9 @@ import ( "github.com/micro/go-log" "github.com/micro/go-micro/broker/codec/json" - "github.com/micro/go-micro/errors" + merr "github.com/micro/go-micro/errors" "github.com/micro/go-micro/registry" + "github.com/micro/go-rcache" maddr "github.com/micro/misc/lib/addr" mnet "github.com/micro/misc/lib/net" mls "github.com/micro/misc/lib/tls" @@ -28,15 +30,11 @@ import ( "golang.org/x/net/context" ) -// HTTP Broker is a placeholder for actual message brokers. -// This should not really be used in production but useful -// in developer where you want zero dependencies. - +// HTTP Broker is a point to point async broker type httpBroker struct { - id string - address string - unsubscribe chan *httpSubscriber - opts Options + id string + address string + opts Options mux *http.ServeMux @@ -53,9 +51,9 @@ type httpSubscriber struct { opts SubscribeOptions id string topic string - ch chan *httpSubscriber fn Handler svc *registry.Service + hb *httpBroker } type httpPublication struct { @@ -106,11 +104,13 @@ func newHttpBroker(opts ...Option) Broker { o(&options) } + // set address addr := ":0" if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { addr = options.Addrs[0] } + // get registry reg, ok := options.Context.Value(registryKey).(registry.Registry) if !ok { reg = registry.DefaultRegistry @@ -123,7 +123,6 @@ func newHttpBroker(opts ...Option) Broker { r: reg, c: &http.Client{Transport: newTransport(options.TLSConfig)}, subscribers: make(map[string][]*httpSubscriber), - unsubscribe: make(chan *httpSubscriber), exit: make(chan chan error), mux: http.NewServeMux(), } @@ -153,9 +152,41 @@ func (h *httpSubscriber) Topic() string { } func (h *httpSubscriber) Unsubscribe() error { - h.ch <- h - // artificial delay - time.Sleep(time.Millisecond * 10) + return h.hb.unsubscribe(h) +} + +func (h *httpBroker) subscribe(s *httpSubscriber) error { + h.Lock() + defer h.Unlock() + + if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { + return err + } + + h.subscribers[s.topic] = append(h.subscribers[s.topic], s) + return nil +} + +func (h *httpBroker) unsubscribe(s *httpSubscriber) error { + h.Lock() + defer h.Unlock() + + var subscribers []*httpSubscriber + + // look for subscriber + for _, sub := range h.subscribers[s.topic] { + // deregister and skip forward + if sub.id == s.id { + h.r.Deregister(sub.svc) + continue + } + // keep subscriber + subscribers = append(subscribers, sub) + } + + // set subscribers + h.subscribers[s.topic] = subscribers + return nil } @@ -177,29 +208,75 @@ func (h *httpBroker) run(l net.Listener) { // received exit signal case ch := <-h.exit: ch <- l.Close() - h.Lock() - h.running = false - h.Unlock() - return - // unsubscribe subscriber - case subscriber := <-h.unsubscribe: - h.Lock() - var subscribers []*httpSubscriber - for _, sub := range h.subscribers[subscriber.topic] { - // deregister and skip forward - if sub.id == subscriber.id { + h.RLock() + for _, subs := range h.subscribers { + for _, sub := range subs { h.r.Deregister(sub.svc) - continue } - subscribers = append(subscribers, sub) } - h.subscribers[subscriber.topic] = subscribers - h.Unlock() + h.RUnlock() + return } } } -func (h *httpBroker) start() error { +func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method != "POST" { + err := merr.BadRequest("go.micro.broker", "Method not allowed") + http.Error(w, err.Error(), http.StatusMethodNotAllowed) + return + } + defer req.Body.Close() + + req.ParseForm() + + b, err := ioutil.ReadAll(req.Body) + if err != nil { + errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err) + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + var m *Message + if err = h.opts.Codec.Unmarshal(b, &m); err != nil { + errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + topic := m.Header[":topic"] + delete(m.Header, ":topic") + + if len(topic) == 0 { + errr := merr.InternalServerError("go.micro.broker", "Topic not found") + w.WriteHeader(500) + w.Write([]byte(errr.Error())) + return + } + + p := &httpPublication{m: m, t: topic} + id := req.Form.Get("id") + + h.RLock() + for _, subscriber := range h.subscribers[topic] { + if id == subscriber.id { + // sub is sync; crufty rate limiting + // so we don't hose the cpu + subscriber.fn(p) + } + } + h.RUnlock() +} + +func (h *httpBroker) Address() string { + h.RLock() + defer h.RUnlock() + return h.address +} + +func (h *httpBroker) Connect() error { h.Lock() defer h.Unlock() @@ -255,11 +332,20 @@ func (h *httpBroker) start() error { go http.Serve(l, h.mux) go h.run(l) + // get registry + reg, ok := h.opts.Context.Value(registryKey).(registry.Registry) + if !ok { + reg = registry.DefaultRegistry + } + // set rcache + h.r = rcache.New(reg) + + // set running h.running = true return nil } -func (h *httpBroker) stop() error { +func (h *httpBroker) Disconnect() error { h.Lock() defer h.Unlock() @@ -267,76 +353,30 @@ func (h *httpBroker) stop() error { return nil } + // stop rcache + rc, ok := h.r.(rcache.Cache) + if ok { + rc.Stop() + } + + // exit and return err ch := make(chan error) h.exit <- ch err := <-ch + + // set not running h.running = false return err } -func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - err := errors.BadRequest("go.micro.broker", "Method not allowed") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - return - } - defer req.Body.Close() - - req.ParseForm() - - b, err := ioutil.ReadAll(req.Body) - if err != nil { - errr := errors.InternalServerError("go.micro.broker", "Error reading request body: %v", err) - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - var m *Message - if err = h.opts.Codec.Unmarshal(b, &m); err != nil { - errr := errors.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - topic := m.Header[":topic"] - delete(m.Header, ":topic") - - if len(topic) == 0 { - errr := errors.InternalServerError("go.micro.broker", "Topic not found") - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - p := &httpPublication{m: m, t: topic} - id := req.Form.Get("id") - - h.RLock() - for _, subscriber := range h.subscribers[topic] { - if id == subscriber.id { - // sub is sync; crufty rate limiting - // so we don't hose the cpu - subscriber.fn(p) - } - } - h.RUnlock() -} - -func (h *httpBroker) Address() string { - return h.address -} - -func (h *httpBroker) Connect() error { - return h.start() -} - -func (h *httpBroker) Disconnect() error { - return h.stop() -} - func (h *httpBroker) Init(opts ...Option) error { + h.Lock() + defer h.Unlock() + + if h.running { + return errors.New("cannot init while connected") + } + for _, o := range opts { o(&h.opts) } @@ -345,12 +385,19 @@ func (h *httpBroker) Init(opts ...Option) error { h.id = "broker-" + uuid.NewUUID().String() } + // get registry reg, ok := h.opts.Context.Value(registryKey).(registry.Registry) if !ok { reg = registry.DefaultRegistry } - h.r = reg + // get rcache + if rc, ok := h.r.(rcache.Cache); ok { + rc.Stop() + } + + // set registry + h.r = rcache.New(reg) return nil } @@ -360,10 +407,13 @@ func (h *httpBroker) Options() Options { } func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { + h.RLock() s, err := h.r.GetService("topic:" + topic) if err != nil { + h.RUnlock() return err } + h.RUnlock() m := &Message{ Header: make(map[string]string), @@ -381,7 +431,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) return err } - fn := func(node *registry.Node, b []byte) { + pub := func(node *registry.Node, b []byte) { scheme := "http" // check if secure is added in metadata @@ -411,15 +461,14 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) case broadcastVersion: for _, node := range service.Nodes { // publish async - go fn(node, b) + go pub(node, b) } - default: // select node to publish to node := service.Nodes[rand.Int()%len(service.Nodes)] // publish async - go fn(node, b) + go pub(node, b) } } @@ -427,7 +476,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) } func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { - opt := newSubscribeOptions(opts...) + options := newSubscribeOptions(opts...) // parse address for host, port parts := strings.Split(h.Address(), ":") @@ -439,7 +488,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO return nil, err } - id := uuid.NewUUID().String() + // create unique id + id := h.id + "." + uuid.NewUUID().String() var secure bool @@ -449,7 +499,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO // register service node := ®istry.Node{ - Id: h.id + "." + id, + Id: id, Address: addr, Port: port, Metadata: map[string]string{ @@ -457,7 +507,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO }, } - version := opt.Queue + // check for queue group or broadcast queue + version := options.Queue if len(version) == 0 { version = broadcastVersion } @@ -468,22 +519,22 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO Nodes: []*registry.Node{node}, } + // generate subscriber subscriber := &httpSubscriber{ - opts: opt, - id: h.id + "." + id, + opts: options, + hb: h, + id: id, topic: topic, - ch: h.unsubscribe, fn: handler, svc: service, } - if err := h.r.Register(service, registry.RegisterTTL(registerTTL)); err != nil { + // subscribe now + if err := h.subscribe(subscriber); err != nil { return nil, err } - h.Lock() - h.subscribers[topic] = append(h.subscribers[topic], subscriber) - h.Unlock() + // return the subscriber return subscriber, nil } diff --git a/cmd/cmd.go b/cmd/cmd.go index 58d55880..abf93453 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -70,7 +70,8 @@ var ( cli.IntFlag{ Name: "client_pool_size", EnvVar: "MICRO_CLIENT_POOL_SIZE", - Usage: "Sets the client connection pool size. Default: 0", + Usage: "Sets the client connection pool size. Default: 1", + Value: 1, }, cli.StringFlag{ Name: "client_pool_ttl", @@ -132,6 +133,7 @@ var ( Name: "selector", EnvVar: "MICRO_SELECTOR", Usage: "Selector used to pick nodes for querying", + Value: "cache", }, cli.StringFlag{ Name: "server", @@ -181,7 +183,7 @@ var ( defaultServer = "rpc" defaultBroker = "http" defaultRegistry = "consul" - defaultSelector = "default" + defaultSelector = "cache" defaultTransport = "http" ) diff --git a/selector/cache/cache.go b/selector/cache/cache.go index ceec5120..5e61dce7 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -23,6 +23,8 @@ type cacheSelector struct { cache map[string][]*registry.Service ttls map[string]time.Time + once sync.Once + // used to close or reload watcher reload chan bool exit chan bool @@ -86,30 +88,53 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) { c.Lock() defer c.Unlock() + // get does the actual request for a service + // it also caches it + get := func(service string) ([]*registry.Service, error) { + // ask the registry + services, err := c.so.Registry.GetService(service) + if err != nil { + return nil, err + } + + // cache results + c.set(service, c.cp(services)) + return services, nil + } + // check the cache first services, ok := c.cache[service] + + // cache miss or no services + if !ok || len(services) == 0 { + return get(service) + } + + // got cache but lets check ttl ttl, kk := c.ttls[service] - // got results, copy and return - if ok && len(services) > 0 { - // only return if its less than the ttl - if kk && time.Since(ttl) < c.ttl { - return c.cp(services), nil - } + // within ttl so return cache + if kk && time.Since(ttl) < c.ttl { + return c.cp(services), nil } - // cache miss or ttl expired + // expired entry so get service + services, err := get(service) - // now ask the registry - services, err := c.so.Registry.GetService(service) - if err != nil { - return nil, err + // no error then return error + if err == nil { + return services, nil } - // we didn't have any results so cache - c.cache[service] = c.cp(services) - c.ttls[service] = time.Now().Add(c.ttl) - return services, nil + // not found error then return + if err == registry.ErrNotFound { + return nil, selector.ErrNotFound + } + + // other error + + // return expired cache as last resort + return c.cp(services), nil } func (c *cacheSelector) set(service string, services []*registry.Service) { @@ -230,8 +255,6 @@ func (c *cacheSelector) update(res *registry.Result) { // reloads the watcher if Init is called // and returns when Close is called func (c *cacheSelector) run() { - go c.tick() - for { // exit early if already dead if c.quit() { @@ -241,6 +264,9 @@ func (c *cacheSelector) run() { // create new watcher w, err := c.so.Registry.Watch() if err != nil { + if c.quit() { + return + } log.Log(err) time.Sleep(time.Second) continue @@ -248,33 +274,15 @@ func (c *cacheSelector) run() { // watch for events if err := c.watch(w); err != nil { + if c.quit() { + return + } log.Log(err) continue } } } -// check cache and expire on each tick -func (c *cacheSelector) tick() { - t := time.NewTicker(time.Minute) - - for { - select { - case <-t.C: - c.Lock() - for service, expiry := range c.ttls { - if d := time.Since(expiry); d > c.ttl { - // TODO: maybe refresh the cache rather than blowing it away - c.del(service) - } - } - c.Unlock() - case <-c.exit: - return - } - } -} - // watch loops the next event and calls update // it returns if there's an error func (c *cacheSelector) watch(w registry.Watcher) error { @@ -324,6 +332,10 @@ func (c *cacheSelector) Options() selector.Options { } func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { + c.once.Do(func() { + go c.run() + }) + sopts := selector.SelectOptions{ Strategy: c.so.Strategy, } @@ -401,7 +413,7 @@ func NewSelector(opts ...selector.Option) selector.Selector { } } - c := &cacheSelector{ + return &cacheSelector{ so: sopts, ttl: ttl, cache: make(map[string][]*registry.Service), @@ -409,7 +421,4 @@ func NewSelector(opts ...selector.Option) selector.Selector { reload: make(chan bool, 1), exit: make(chan bool), } - - go c.run() - return c }