Update the broker interface

This commit is contained in:
Asim 2015-12-23 19:07:26 +00:00
parent 6226a80e78
commit 02aca819d7
3 changed files with 102 additions and 18 deletions

View File

@ -4,28 +4,35 @@ type Broker interface {
Address() string Address() string
Connect() error Connect() error
Disconnect() error Disconnect() error
Init() error Init(...Option) error
Publish(string, *Message) error Publish(string, *Message, ...PublishOption) error
Subscribe(string, Handler) (Subscriber, error) Subscribe(string, Handler, ...SubscribeOption) (Subscriber, error)
String() string String() string
} }
type Handler func(*Message) // Handler is used to process messages via a subscription of a topic.
// The handler is passed a publication interface which contains the
// message and optional Ack method to acknowledge receipt of the message.
type Handler func(Publication) error
type Message struct { type Message struct {
Header map[string]string Header map[string]string
Body []byte Body []byte
} }
// Publication is given to a subscription handler for processing
type Publication interface {
Topic() string
Message() *Message
Ack() error
}
type Subscriber interface { type Subscriber interface {
Config() SubscribeOptions
Topic() string Topic() string
Unsubscribe() error Unsubscribe() error
} }
type options struct{}
type Option func(*options)
var ( var (
DefaultBroker Broker = newHttpBroker([]string{}) DefaultBroker Broker = newHttpBroker([]string{})
) )
@ -34,8 +41,8 @@ func NewBroker(addrs []string, opt ...Option) Broker {
return newHttpBroker(addrs, opt...) return newHttpBroker(addrs, opt...)
} }
func Init() error { func Init(opts ...Option) error {
return DefaultBroker.Init() return DefaultBroker.Init(opts...)
} }
func Connect() error { func Connect() error {
@ -46,12 +53,12 @@ func Disconnect() error {
return DefaultBroker.Disconnect() return DefaultBroker.Disconnect()
} }
func Publish(topic string, msg *Message) error { func Publish(topic string, msg *Message, opts ...PublishOption) error {
return DefaultBroker.Publish(topic, msg) return DefaultBroker.Publish(topic, msg, opts...)
} }
func Subscribe(topic string, handler Handler) (Subscriber, error) { func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
return DefaultBroker.Subscribe(topic, handler) return DefaultBroker.Subscribe(topic, handler, opts...)
} }
func String() string { func String() string {

View File

@ -17,6 +17,10 @@ import (
"github.com/pborman/uuid" "github.com/pborman/uuid"
) )
// HTTP Broker is a placeholder for actual message brokers.
// This should not really be used in production but useful
// in developer where you want zero dependencies.
type httpBroker struct { type httpBroker struct {
id string id string
address string address string
@ -29,6 +33,7 @@ type httpBroker struct {
} }
type httpSubscriber struct { type httpSubscriber struct {
opts SubscribeOptions
id string id string
topic string topic string
ch chan *httpSubscriber ch chan *httpSubscriber
@ -36,6 +41,11 @@ type httpSubscriber struct {
svc *registry.Service svc *registry.Service
} }
type httpPublication struct {
m *Message
t string
}
var ( var (
DefaultSubPath = "/_sub" DefaultSubPath = "/_sub"
) )
@ -55,6 +65,22 @@ func newHttpBroker(addrs []string, opt ...Option) Broker {
} }
} }
func (h *httpPublication) Ack() error {
return nil
}
func (h *httpPublication) Message() *Message {
return h.m
}
func (h *httpPublication) Topic() string {
return h.t
}
func (h *httpSubscriber) Config() SubscribeOptions {
return h.opts
}
func (h *httpSubscriber) Topic() string { func (h *httpSubscriber) Topic() string {
return h.topic return h.topic
} }
@ -150,9 +176,10 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
p := &httpPublication{m: m, t: topic}
h.RLock() h.RLock()
for _, subscriber := range h.subscribers[topic] { for _, subscriber := range h.subscribers[topic] {
subscriber.fn(m) subscriber.fn(p)
} }
h.RUnlock() h.RUnlock()
} }
@ -169,7 +196,7 @@ func (h *httpBroker) Disconnect() error {
return h.stop() return h.stop()
} }
func (h *httpBroker) Init() error { func (h *httpBroker) Init(opts ...Option) error {
if len(h.id) == 0 { if len(h.id) == 0 {
h.id = "broker-" + uuid.NewUUID().String() h.id = "broker-" + uuid.NewUUID().String()
} }
@ -178,7 +205,7 @@ func (h *httpBroker) Init() error {
return nil return nil
} }
func (h *httpBroker) Publish(topic string, msg *Message) error { func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
s, err := registry.GetService("topic:" + topic) s, err := registry.GetService("topic:" + topic)
if err != nil { if err != nil {
return err return err
@ -201,7 +228,9 @@ func (h *httpBroker) Publish(topic string, msg *Message) error {
return nil return nil
} }
func (h *httpBroker) Subscribe(topic string, handler Handler) (Subscriber, error) { func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
opt := newSubscribeOptions(opts...)
// parse address for host, port // parse address for host, port
parts := strings.Split(h.Address(), ":") parts := strings.Split(h.Address(), ":")
host := strings.Join(parts[:len(parts)-1], ":") host := strings.Join(parts[:len(parts)-1], ":")
@ -220,6 +249,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler) (Subscriber, error
} }
subscriber := &httpSubscriber{ subscriber := &httpSubscriber{
opts: opt,
id: uuid.NewUUID().String(), id: uuid.NewUUID().String(),
topic: topic, topic: topic,
ch: h.unsubscribe, ch: h.unsubscribe,

47
broker/options.go Normal file
View File

@ -0,0 +1,47 @@
package broker
type Options struct{}
type PublishOptions struct{}
type SubscribeOptions struct {
// AutoAck defaults to true
AutoAck bool
// NumHandlers defaults to 1
NumHandlers int
}
type Option func(*Options)
type PublishOption func(*PublishOptions)
type SubscribeOption func(*SubscribeOptions)
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
func DisableAutoAck() SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = false
}
}
// NumHandlers sets the number of concurrent handlers to create
// for a subscriber.
func NumHandlers(i int) SubscribeOption {
return func(o *SubscribeOptions) {
o.NumHandlers = i
}
}
func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
opt := SubscribeOptions{
AutoAck: true,
NumHandlers: 1,
}
for _, o := range opts {
o(&opt)
}
return opt
}