Shared queue distribution of messages

This commit is contained in:
Asim 2015-12-23 20:05:47 +00:00
parent 536216fd01
commit 6097c3296c
2 changed files with 55 additions and 9 deletions

View File

@ -4,12 +4,15 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"math/rand"
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
log "github.com/golang/glog" log "github.com/golang/glog"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
@ -48,8 +51,13 @@ type httpPublication struct {
var ( var (
DefaultSubPath = "/_sub" DefaultSubPath = "/_sub"
broadcastVersion = "ff.http.broadcast"
) )
func init() {
rand.Seed(time.Now().Unix())
}
func newHttpBroker(addrs []string, opt ...Option) Broker { func newHttpBroker(addrs []string, opt ...Option) Broker {
addr := ":0" addr := ":0"
if len(addrs) > 0 && len(addrs[0]) > 0 { if len(addrs) > 0 && len(addrs[0]) > 0 {
@ -217,14 +225,36 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
return err return err
} }
for _, service := range s { fn := func(node *registry.Node, b io.Reader) {
for _, node := range service.Nodes { r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", b)
r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", bytes.NewBuffer(b))
if err == nil { if err == nil {
r.Body.Close() r.Body.Close()
} }
} }
buf := bytes.NewBuffer(nil)
for _, service := range s {
// broadcast version means broadcast to all nodes
if service.Version == broadcastVersion {
for _, node := range service.Nodes {
buf.Reset()
buf.Write(b)
fn(node, buf)
} }
return nil
}
node := service.Nodes[rand.Int()%len(service.Nodes)]
buf.Reset()
buf.Write(b)
fn(node, buf)
return nil
}
buf.Reset()
buf = nil
return nil return nil
} }
@ -243,8 +273,14 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
Port: port, Port: port,
} }
version := opt.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := &registry.Service{ service := &registry.Service{
Name: "topic:" + topic, Name: "topic:" + topic,
Version: version,
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }

View File

@ -9,6 +9,10 @@ type SubscribeOptions struct {
AutoAck bool AutoAck bool
// NumHandlers defaults to 1 // NumHandlers defaults to 1
NumHandlers int NumHandlers int
// Subscribers with the same queue name
// will create a shared subscription where each
// receives a subset of messages.
Queue string
} }
type Option func(*Options) type Option func(*Options)
@ -33,6 +37,12 @@ func NumHandlers(i int) SubscribeOption {
} }
} }
func QueueName(name string) SubscribeOption {
return func(o *SubscribeOptions) {
o.Queue = name
}
}
func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
opt := SubscribeOptions{ opt := SubscribeOptions{
AutoAck: true, AutoAck: true,