commit
78da1fde94
@ -2,8 +2,6 @@
|
||||
package broker
|
||||
|
||||
// Broker is an interface used for asynchronous messaging.
|
||||
// Its an abstraction over various message brokers
|
||||
// {NATS, RabbitMQ, Kafka, ...}
|
||||
type Broker interface {
|
||||
Options() Options
|
||||
Address() string
|
||||
|
@ -3,6 +3,7 @@ package broker
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@ -18,8 +19,9 @@ import (
|
||||
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/broker/codec/json"
|
||||
"github.com/micro/go-micro/errors"
|
||||
merr "github.com/micro/go-micro/errors"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-rcache"
|
||||
maddr "github.com/micro/misc/lib/addr"
|
||||
mnet "github.com/micro/misc/lib/net"
|
||||
mls "github.com/micro/misc/lib/tls"
|
||||
@ -28,15 +30,11 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// HTTP Broker is a placeholder for actual message brokers.
|
||||
// This should not really be used in production but useful
|
||||
// in developer where you want zero dependencies.
|
||||
|
||||
// HTTP Broker is a point to point async broker
|
||||
type httpBroker struct {
|
||||
id string
|
||||
address string
|
||||
unsubscribe chan *httpSubscriber
|
||||
opts Options
|
||||
id string
|
||||
address string
|
||||
opts Options
|
||||
|
||||
mux *http.ServeMux
|
||||
|
||||
@ -53,9 +51,9 @@ type httpSubscriber struct {
|
||||
opts SubscribeOptions
|
||||
id string
|
||||
topic string
|
||||
ch chan *httpSubscriber
|
||||
fn Handler
|
||||
svc *registry.Service
|
||||
hb *httpBroker
|
||||
}
|
||||
|
||||
type httpPublication struct {
|
||||
@ -106,11 +104,13 @@ func newHttpBroker(opts ...Option) Broker {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// set address
|
||||
addr := ":0"
|
||||
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
|
||||
addr = options.Addrs[0]
|
||||
}
|
||||
|
||||
// get registry
|
||||
reg, ok := options.Context.Value(registryKey).(registry.Registry)
|
||||
if !ok {
|
||||
reg = registry.DefaultRegistry
|
||||
@ -123,7 +123,6 @@ func newHttpBroker(opts ...Option) Broker {
|
||||
r: reg,
|
||||
c: &http.Client{Transport: newTransport(options.TLSConfig)},
|
||||
subscribers: make(map[string][]*httpSubscriber),
|
||||
unsubscribe: make(chan *httpSubscriber),
|
||||
exit: make(chan chan error),
|
||||
mux: http.NewServeMux(),
|
||||
}
|
||||
@ -153,9 +152,41 @@ func (h *httpSubscriber) Topic() string {
|
||||
}
|
||||
|
||||
func (h *httpSubscriber) Unsubscribe() error {
|
||||
h.ch <- h
|
||||
// artificial delay
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
return h.hb.unsubscribe(h)
|
||||
}
|
||||
|
||||
func (h *httpBroker) subscribe(s *httpSubscriber) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
var subscribers []*httpSubscriber
|
||||
|
||||
// look for subscriber
|
||||
for _, sub := range h.subscribers[s.topic] {
|
||||
// deregister and skip forward
|
||||
if sub.id == s.id {
|
||||
h.r.Deregister(sub.svc)
|
||||
continue
|
||||
}
|
||||
// keep subscriber
|
||||
subscribers = append(subscribers, sub)
|
||||
}
|
||||
|
||||
// set subscribers
|
||||
h.subscribers[s.topic] = subscribers
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -177,29 +208,75 @@ func (h *httpBroker) run(l net.Listener) {
|
||||
// received exit signal
|
||||
case ch := <-h.exit:
|
||||
ch <- l.Close()
|
||||
h.Lock()
|
||||
h.running = false
|
||||
h.Unlock()
|
||||
return
|
||||
// unsubscribe subscriber
|
||||
case subscriber := <-h.unsubscribe:
|
||||
h.Lock()
|
||||
var subscribers []*httpSubscriber
|
||||
for _, sub := range h.subscribers[subscriber.topic] {
|
||||
// deregister and skip forward
|
||||
if sub.id == subscriber.id {
|
||||
h.RLock()
|
||||
for _, subs := range h.subscribers {
|
||||
for _, sub := range subs {
|
||||
h.r.Deregister(sub.svc)
|
||||
continue
|
||||
}
|
||||
subscribers = append(subscribers, sub)
|
||||
}
|
||||
h.subscribers[subscriber.topic] = subscribers
|
||||
h.Unlock()
|
||||
h.RUnlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpBroker) start() error {
|
||||
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != "POST" {
|
||||
err := merr.BadRequest("go.micro.broker", "Method not allowed")
|
||||
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close()
|
||||
|
||||
req.ParseForm()
|
||||
|
||||
b, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
var m *Message
|
||||
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
|
||||
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
topic := m.Header[":topic"]
|
||||
delete(m.Header, ":topic")
|
||||
|
||||
if len(topic) == 0 {
|
||||
errr := merr.InternalServerError("go.micro.broker", "Topic not found")
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
p := &httpPublication{m: m, t: topic}
|
||||
id := req.Form.Get("id")
|
||||
|
||||
h.RLock()
|
||||
for _, subscriber := range h.subscribers[topic] {
|
||||
if id == subscriber.id {
|
||||
// sub is sync; crufty rate limiting
|
||||
// so we don't hose the cpu
|
||||
subscriber.fn(p)
|
||||
}
|
||||
}
|
||||
h.RUnlock()
|
||||
}
|
||||
|
||||
func (h *httpBroker) Address() string {
|
||||
h.RLock()
|
||||
defer h.RUnlock()
|
||||
return h.address
|
||||
}
|
||||
|
||||
func (h *httpBroker) Connect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
@ -255,11 +332,20 @@ func (h *httpBroker) start() error {
|
||||
go http.Serve(l, h.mux)
|
||||
go h.run(l)
|
||||
|
||||
// get registry
|
||||
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
|
||||
if !ok {
|
||||
reg = registry.DefaultRegistry
|
||||
}
|
||||
// set rcache
|
||||
h.r = rcache.New(reg)
|
||||
|
||||
// set running
|
||||
h.running = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpBroker) stop() error {
|
||||
func (h *httpBroker) Disconnect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
@ -267,76 +353,30 @@ func (h *httpBroker) stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// stop rcache
|
||||
rc, ok := h.r.(rcache.Cache)
|
||||
if ok {
|
||||
rc.Stop()
|
||||
}
|
||||
|
||||
// exit and return err
|
||||
ch := make(chan error)
|
||||
h.exit <- ch
|
||||
err := <-ch
|
||||
|
||||
// set not running
|
||||
h.running = false
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != "POST" {
|
||||
err := errors.BadRequest("go.micro.broker", "Method not allowed")
|
||||
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close()
|
||||
|
||||
req.ParseForm()
|
||||
|
||||
b, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
errr := errors.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
var m *Message
|
||||
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
|
||||
errr := errors.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
topic := m.Header[":topic"]
|
||||
delete(m.Header, ":topic")
|
||||
|
||||
if len(topic) == 0 {
|
||||
errr := errors.InternalServerError("go.micro.broker", "Topic not found")
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
p := &httpPublication{m: m, t: topic}
|
||||
id := req.Form.Get("id")
|
||||
|
||||
h.RLock()
|
||||
for _, subscriber := range h.subscribers[topic] {
|
||||
if id == subscriber.id {
|
||||
// sub is sync; crufty rate limiting
|
||||
// so we don't hose the cpu
|
||||
subscriber.fn(p)
|
||||
}
|
||||
}
|
||||
h.RUnlock()
|
||||
}
|
||||
|
||||
func (h *httpBroker) Address() string {
|
||||
return h.address
|
||||
}
|
||||
|
||||
func (h *httpBroker) Connect() error {
|
||||
return h.start()
|
||||
}
|
||||
|
||||
func (h *httpBroker) Disconnect() error {
|
||||
return h.stop()
|
||||
}
|
||||
|
||||
func (h *httpBroker) Init(opts ...Option) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if h.running {
|
||||
return errors.New("cannot init while connected")
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
}
|
||||
@ -345,12 +385,19 @@ func (h *httpBroker) Init(opts ...Option) error {
|
||||
h.id = "broker-" + uuid.NewUUID().String()
|
||||
}
|
||||
|
||||
// get registry
|
||||
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
|
||||
if !ok {
|
||||
reg = registry.DefaultRegistry
|
||||
}
|
||||
|
||||
h.r = reg
|
||||
// get rcache
|
||||
if rc, ok := h.r.(rcache.Cache); ok {
|
||||
rc.Stop()
|
||||
}
|
||||
|
||||
// set registry
|
||||
h.r = rcache.New(reg)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -360,10 +407,13 @@ func (h *httpBroker) Options() Options {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
|
||||
h.RLock()
|
||||
s, err := h.r.GetService("topic:" + topic)
|
||||
if err != nil {
|
||||
h.RUnlock()
|
||||
return err
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
m := &Message{
|
||||
Header: make(map[string]string),
|
||||
@ -381,7 +431,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
return err
|
||||
}
|
||||
|
||||
fn := func(node *registry.Node, b []byte) {
|
||||
pub := func(node *registry.Node, b []byte) {
|
||||
scheme := "http"
|
||||
|
||||
// check if secure is added in metadata
|
||||
@ -411,15 +461,14 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
case broadcastVersion:
|
||||
for _, node := range service.Nodes {
|
||||
// publish async
|
||||
go fn(node, b)
|
||||
go pub(node, b)
|
||||
}
|
||||
|
||||
default:
|
||||
// select node to publish to
|
||||
node := service.Nodes[rand.Int()%len(service.Nodes)]
|
||||
|
||||
// publish async
|
||||
go fn(node, b)
|
||||
go pub(node, b)
|
||||
}
|
||||
}
|
||||
|
||||
@ -427,7 +476,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
}
|
||||
|
||||
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
opt := newSubscribeOptions(opts...)
|
||||
options := newSubscribeOptions(opts...)
|
||||
|
||||
// parse address for host, port
|
||||
parts := strings.Split(h.Address(), ":")
|
||||
@ -439,7 +488,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id := uuid.NewUUID().String()
|
||||
// create unique id
|
||||
id := h.id + "." + uuid.NewUUID().String()
|
||||
|
||||
var secure bool
|
||||
|
||||
@ -449,7 +499,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
|
||||
// register service
|
||||
node := ®istry.Node{
|
||||
Id: h.id + "." + id,
|
||||
Id: id,
|
||||
Address: addr,
|
||||
Port: port,
|
||||
Metadata: map[string]string{
|
||||
@ -457,7 +507,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
},
|
||||
}
|
||||
|
||||
version := opt.Queue
|
||||
// check for queue group or broadcast queue
|
||||
version := options.Queue
|
||||
if len(version) == 0 {
|
||||
version = broadcastVersion
|
||||
}
|
||||
@ -468,22 +519,22 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
Nodes: []*registry.Node{node},
|
||||
}
|
||||
|
||||
// generate subscriber
|
||||
subscriber := &httpSubscriber{
|
||||
opts: opt,
|
||||
id: h.id + "." + id,
|
||||
opts: options,
|
||||
hb: h,
|
||||
id: id,
|
||||
topic: topic,
|
||||
ch: h.unsubscribe,
|
||||
fn: handler,
|
||||
svc: service,
|
||||
}
|
||||
|
||||
if err := h.r.Register(service, registry.RegisterTTL(registerTTL)); err != nil {
|
||||
// subscribe now
|
||||
if err := h.subscribe(subscriber); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h.Lock()
|
||||
h.subscribers[topic] = append(h.subscribers[topic], subscriber)
|
||||
h.Unlock()
|
||||
// return the subscriber
|
||||
return subscriber, nil
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,8 @@ var (
|
||||
cli.IntFlag{
|
||||
Name: "client_pool_size",
|
||||
EnvVar: "MICRO_CLIENT_POOL_SIZE",
|
||||
Usage: "Sets the client connection pool size. Default: 0",
|
||||
Usage: "Sets the client connection pool size. Default: 1",
|
||||
Value: 1,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "client_pool_ttl",
|
||||
@ -132,6 +133,7 @@ var (
|
||||
Name: "selector",
|
||||
EnvVar: "MICRO_SELECTOR",
|
||||
Usage: "Selector used to pick nodes for querying",
|
||||
Value: "cache",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "server",
|
||||
@ -181,7 +183,7 @@ var (
|
||||
defaultServer = "rpc"
|
||||
defaultBroker = "http"
|
||||
defaultRegistry = "consul"
|
||||
defaultSelector = "default"
|
||||
defaultSelector = "cache"
|
||||
defaultTransport = "http"
|
||||
)
|
||||
|
||||
|
93
selector/cache/cache.go
vendored
93
selector/cache/cache.go
vendored
@ -23,6 +23,8 @@ type cacheSelector struct {
|
||||
cache map[string][]*registry.Service
|
||||
ttls map[string]time.Time
|
||||
|
||||
once sync.Once
|
||||
|
||||
// used to close or reload watcher
|
||||
reload chan bool
|
||||
exit chan bool
|
||||
@ -86,30 +88,53 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// get does the actual request for a service
|
||||
// it also caches it
|
||||
get := func(service string) ([]*registry.Service, error) {
|
||||
// ask the registry
|
||||
services, err := c.so.Registry.GetService(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// cache results
|
||||
c.set(service, c.cp(services))
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// check the cache first
|
||||
services, ok := c.cache[service]
|
||||
|
||||
// cache miss or no services
|
||||
if !ok || len(services) == 0 {
|
||||
return get(service)
|
||||
}
|
||||
|
||||
// got cache but lets check ttl
|
||||
ttl, kk := c.ttls[service]
|
||||
|
||||
// got results, copy and return
|
||||
if ok && len(services) > 0 {
|
||||
// only return if its less than the ttl
|
||||
if kk && time.Since(ttl) < c.ttl {
|
||||
return c.cp(services), nil
|
||||
}
|
||||
// within ttl so return cache
|
||||
if kk && time.Since(ttl) < c.ttl {
|
||||
return c.cp(services), nil
|
||||
}
|
||||
|
||||
// cache miss or ttl expired
|
||||
// expired entry so get service
|
||||
services, err := get(service)
|
||||
|
||||
// now ask the registry
|
||||
services, err := c.so.Registry.GetService(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// no error then return error
|
||||
if err == nil {
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// we didn't have any results so cache
|
||||
c.cache[service] = c.cp(services)
|
||||
c.ttls[service] = time.Now().Add(c.ttl)
|
||||
return services, nil
|
||||
// not found error then return
|
||||
if err == registry.ErrNotFound {
|
||||
return nil, selector.ErrNotFound
|
||||
}
|
||||
|
||||
// other error
|
||||
|
||||
// return expired cache as last resort
|
||||
return c.cp(services), nil
|
||||
}
|
||||
|
||||
func (c *cacheSelector) set(service string, services []*registry.Service) {
|
||||
@ -230,8 +255,6 @@ func (c *cacheSelector) update(res *registry.Result) {
|
||||
// reloads the watcher if Init is called
|
||||
// and returns when Close is called
|
||||
func (c *cacheSelector) run() {
|
||||
go c.tick()
|
||||
|
||||
for {
|
||||
// exit early if already dead
|
||||
if c.quit() {
|
||||
@ -241,6 +264,9 @@ func (c *cacheSelector) run() {
|
||||
// create new watcher
|
||||
w, err := c.so.Registry.Watch()
|
||||
if err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
log.Log(err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
@ -248,33 +274,15 @@ func (c *cacheSelector) run() {
|
||||
|
||||
// watch for events
|
||||
if err := c.watch(w); err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
log.Log(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check cache and expire on each tick
|
||||
func (c *cacheSelector) tick() {
|
||||
t := time.NewTicker(time.Minute)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
c.Lock()
|
||||
for service, expiry := range c.ttls {
|
||||
if d := time.Since(expiry); d > c.ttl {
|
||||
// TODO: maybe refresh the cache rather than blowing it away
|
||||
c.del(service)
|
||||
}
|
||||
}
|
||||
c.Unlock()
|
||||
case <-c.exit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// watch loops the next event and calls update
|
||||
// it returns if there's an error
|
||||
func (c *cacheSelector) watch(w registry.Watcher) error {
|
||||
@ -324,6 +332,10 @@ func (c *cacheSelector) Options() selector.Options {
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
c.once.Do(func() {
|
||||
go c.run()
|
||||
})
|
||||
|
||||
sopts := selector.SelectOptions{
|
||||
Strategy: c.so.Strategy,
|
||||
}
|
||||
@ -401,7 +413,7 @@ func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
}
|
||||
}
|
||||
|
||||
c := &cacheSelector{
|
||||
return &cacheSelector{
|
||||
so: sopts,
|
||||
ttl: ttl,
|
||||
cache: make(map[string][]*registry.Service),
|
||||
@ -409,7 +421,4 @@ func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
reload: make(chan bool, 1),
|
||||
exit: make(chan bool),
|
||||
}
|
||||
|
||||
go c.run()
|
||||
return c
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user