Merge pull request #32 from micro/broker
Update the broker interface to support acking and queue distribution
This commit is contained in:
		| @@ -4,28 +4,35 @@ type Broker interface { | |||||||
| 	Address() string | 	Address() string | ||||||
| 	Connect() error | 	Connect() error | ||||||
| 	Disconnect() error | 	Disconnect() error | ||||||
| 	Init() error | 	Init(...Option) error | ||||||
| 	Publish(string, *Message) error | 	Publish(string, *Message, ...PublishOption) error | ||||||
| 	Subscribe(string, Handler) (Subscriber, error) | 	Subscribe(string, Handler, ...SubscribeOption) (Subscriber, error) | ||||||
| 	String() string | 	String() string | ||||||
| } | } | ||||||
|  |  | ||||||
| type Handler func(*Message) | // Handler is used to process messages via a subscription of a topic. | ||||||
|  | // The handler is passed a publication interface which contains the | ||||||
|  | // message and optional Ack method to acknowledge receipt of the message. | ||||||
|  | type Handler func(Publication) error | ||||||
|  |  | ||||||
| type Message struct { | type Message struct { | ||||||
| 	Header map[string]string | 	Header map[string]string | ||||||
| 	Body   []byte | 	Body   []byte | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Publication is given to a subscription handler for processing | ||||||
|  | type Publication interface { | ||||||
|  | 	Topic() string | ||||||
|  | 	Message() *Message | ||||||
|  | 	Ack() error | ||||||
|  | } | ||||||
|  |  | ||||||
| type Subscriber interface { | type Subscriber interface { | ||||||
|  | 	Config() SubscribeOptions | ||||||
| 	Topic() string | 	Topic() string | ||||||
| 	Unsubscribe() error | 	Unsubscribe() error | ||||||
| } | } | ||||||
|  |  | ||||||
| type options struct{} |  | ||||||
|  |  | ||||||
| type Option func(*options) |  | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| 	DefaultBroker Broker = newHttpBroker([]string{}) | 	DefaultBroker Broker = newHttpBroker([]string{}) | ||||||
| ) | ) | ||||||
| @@ -34,8 +41,8 @@ func NewBroker(addrs []string, opt ...Option) Broker { | |||||||
| 	return newHttpBroker(addrs, opt...) | 	return newHttpBroker(addrs, opt...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func Init() error { | func Init(opts ...Option) error { | ||||||
| 	return DefaultBroker.Init() | 	return DefaultBroker.Init(opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func Connect() error { | func Connect() error { | ||||||
| @@ -46,12 +53,12 @@ func Disconnect() error { | |||||||
| 	return DefaultBroker.Disconnect() | 	return DefaultBroker.Disconnect() | ||||||
| } | } | ||||||
|  |  | ||||||
| func Publish(topic string, msg *Message) error { | func Publish(topic string, msg *Message, opts ...PublishOption) error { | ||||||
| 	return DefaultBroker.Publish(topic, msg) | 	return DefaultBroker.Publish(topic, msg, opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func Subscribe(topic string, handler Handler) (Subscriber, error) { | func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { | ||||||
| 	return DefaultBroker.Subscribe(topic, handler) | 	return DefaultBroker.Subscribe(topic, handler, opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func String() string { | func String() string { | ||||||
|   | |||||||
| @@ -4,12 +4,15 @@ import ( | |||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"io" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
|  | 	"math/rand" | ||||||
| 	"net" | 	"net" | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	log "github.com/golang/glog" | 	log "github.com/golang/glog" | ||||||
| 	"github.com/micro/go-micro/errors" | 	"github.com/micro/go-micro/errors" | ||||||
| @@ -17,6 +20,10 @@ import ( | |||||||
| 	"github.com/pborman/uuid" | 	"github.com/pborman/uuid" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // HTTP Broker is a placeholder for actual message brokers. | ||||||
|  | // This should not really be used in production but useful | ||||||
|  | // in developer where you want zero dependencies. | ||||||
|  |  | ||||||
| type httpBroker struct { | type httpBroker struct { | ||||||
| 	id          string | 	id          string | ||||||
| 	address     string | 	address     string | ||||||
| @@ -29,6 +36,7 @@ type httpBroker struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| type httpSubscriber struct { | type httpSubscriber struct { | ||||||
|  | 	opts  SubscribeOptions | ||||||
| 	id    string | 	id    string | ||||||
| 	topic string | 	topic string | ||||||
| 	ch    chan *httpSubscriber | 	ch    chan *httpSubscriber | ||||||
| @@ -36,10 +44,20 @@ type httpSubscriber struct { | |||||||
| 	svc   *registry.Service | 	svc   *registry.Service | ||||||
| } | } | ||||||
|  |  | ||||||
|  | type httpPublication struct { | ||||||
|  | 	m *Message | ||||||
|  | 	t string | ||||||
|  | } | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| 	DefaultSubPath = "/_sub" | 	DefaultSubPath   = "/_sub" | ||||||
|  | 	broadcastVersion = "ff.http.broadcast" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	rand.Seed(time.Now().Unix()) | ||||||
|  | } | ||||||
|  |  | ||||||
| func newHttpBroker(addrs []string, opt ...Option) Broker { | func newHttpBroker(addrs []string, opt ...Option) Broker { | ||||||
| 	addr := ":0" | 	addr := ":0" | ||||||
| 	if len(addrs) > 0 && len(addrs[0]) > 0 { | 	if len(addrs) > 0 && len(addrs[0]) > 0 { | ||||||
| @@ -55,6 +73,22 @@ func newHttpBroker(addrs []string, opt ...Option) Broker { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (h *httpPublication) Ack() error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (h *httpPublication) Message() *Message { | ||||||
|  | 	return h.m | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (h *httpPublication) Topic() string { | ||||||
|  | 	return h.t | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (h *httpSubscriber) Config() SubscribeOptions { | ||||||
|  | 	return h.opts | ||||||
|  | } | ||||||
|  |  | ||||||
| func (h *httpSubscriber) Topic() string { | func (h *httpSubscriber) Topic() string { | ||||||
| 	return h.topic | 	return h.topic | ||||||
| } | } | ||||||
| @@ -150,9 +184,10 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	p := &httpPublication{m: m, t: topic} | ||||||
| 	h.RLock() | 	h.RLock() | ||||||
| 	for _, subscriber := range h.subscribers[topic] { | 	for _, subscriber := range h.subscribers[topic] { | ||||||
| 		subscriber.fn(m) | 		subscriber.fn(p) | ||||||
| 	} | 	} | ||||||
| 	h.RUnlock() | 	h.RUnlock() | ||||||
| } | } | ||||||
| @@ -169,7 +204,7 @@ func (h *httpBroker) Disconnect() error { | |||||||
| 	return h.stop() | 	return h.stop() | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpBroker) Init() error { | func (h *httpBroker) Init(opts ...Option) error { | ||||||
| 	if len(h.id) == 0 { | 	if len(h.id) == 0 { | ||||||
| 		h.id = "broker-" + uuid.NewUUID().String() | 		h.id = "broker-" + uuid.NewUUID().String() | ||||||
| 	} | 	} | ||||||
| @@ -178,7 +213,7 @@ func (h *httpBroker) Init() error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpBroker) Publish(topic string, msg *Message) error { | func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { | ||||||
| 	s, err := registry.GetService("topic:" + topic) | 	s, err := registry.GetService("topic:" + topic) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| @@ -190,18 +225,42 @@ func (h *httpBroker) Publish(topic string, msg *Message) error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, service := range s { | 	fn := func(node *registry.Node, b io.Reader) { | ||||||
| 		for _, node := range service.Nodes { | 		r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", b) | ||||||
| 			r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", bytes.NewBuffer(b)) | 		if err == nil { | ||||||
| 			if err == nil { | 			r.Body.Close() | ||||||
| 				r.Body.Close() |  | ||||||
| 			} |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	buf := bytes.NewBuffer(nil) | ||||||
|  |  | ||||||
|  | 	for _, service := range s { | ||||||
|  | 		// broadcast version means broadcast to all nodes | ||||||
|  | 		if service.Version == broadcastVersion { | ||||||
|  | 			for _, node := range service.Nodes { | ||||||
|  | 				buf.Reset() | ||||||
|  | 				buf.Write(b) | ||||||
|  | 				fn(node, buf) | ||||||
|  | 			} | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		node := service.Nodes[rand.Int()%len(service.Nodes)] | ||||||
|  | 		buf.Reset() | ||||||
|  | 		buf.Write(b) | ||||||
|  | 		fn(node, buf) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	buf.Reset() | ||||||
|  | 	buf = nil | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpBroker) Subscribe(topic string, handler Handler) (Subscriber, error) { | func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { | ||||||
|  | 	opt := newSubscribeOptions(opts...) | ||||||
|  |  | ||||||
| 	// parse address for host, port | 	// parse address for host, port | ||||||
| 	parts := strings.Split(h.Address(), ":") | 	parts := strings.Split(h.Address(), ":") | ||||||
| 	host := strings.Join(parts[:len(parts)-1], ":") | 	host := strings.Join(parts[:len(parts)-1], ":") | ||||||
| @@ -214,12 +273,19 @@ func (h *httpBroker) Subscribe(topic string, handler Handler) (Subscriber, error | |||||||
| 		Port:    port, | 		Port:    port, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	version := opt.Queue | ||||||
|  | 	if len(version) == 0 { | ||||||
|  | 		version = broadcastVersion | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	service := ®istry.Service{ | 	service := ®istry.Service{ | ||||||
| 		Name:  "topic:" + topic, | 		Name:    "topic:" + topic, | ||||||
| 		Nodes: []*registry.Node{node}, | 		Version: version, | ||||||
|  | 		Nodes:   []*registry.Node{node}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	subscriber := &httpSubscriber{ | 	subscriber := &httpSubscriber{ | ||||||
|  | 		opts:  opt, | ||||||
| 		id:    uuid.NewUUID().String(), | 		id:    uuid.NewUUID().String(), | ||||||
| 		topic: topic, | 		topic: topic, | ||||||
| 		ch:    h.unsubscribe, | 		ch:    h.unsubscribe, | ||||||
|   | |||||||
							
								
								
									
										48
									
								
								broker/options.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								broker/options.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,48 @@ | |||||||
|  | package broker | ||||||
|  |  | ||||||
|  | type Options struct{} | ||||||
|  |  | ||||||
|  | type PublishOptions struct{} | ||||||
|  |  | ||||||
|  | type SubscribeOptions struct { | ||||||
|  | 	// AutoAck defaults to true. When a handler returns | ||||||
|  | 	// with a nil error the message is acked. | ||||||
|  | 	AutoAck bool | ||||||
|  | 	// Subscribers with the same queue name | ||||||
|  | 	// will create a shared subscription where each | ||||||
|  | 	// receives a subset of messages. | ||||||
|  | 	Queue string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Option func(*Options) | ||||||
|  |  | ||||||
|  | type PublishOption func(*PublishOptions) | ||||||
|  |  | ||||||
|  | type SubscribeOption func(*SubscribeOptions) | ||||||
|  |  | ||||||
|  | // DisableAutoAck will disable auto acking of messages | ||||||
|  | // after they have been handled. | ||||||
|  | func DisableAutoAck() SubscribeOption { | ||||||
|  | 	return func(o *SubscribeOptions) { | ||||||
|  | 		o.AutoAck = false | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // QueueName sets the name of the queue to share messages on | ||||||
|  | func QueueName(name string) SubscribeOption { | ||||||
|  | 	return func(o *SubscribeOptions) { | ||||||
|  | 		o.Queue = name | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { | ||||||
|  | 	opt := SubscribeOptions{ | ||||||
|  | 		AutoAck: true, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&opt) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return opt | ||||||
|  | } | ||||||
							
								
								
									
										52
									
								
								examples/pubsub/consumer/consumer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								examples/pubsub/consumer/consumer.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,52 @@ | |||||||
|  | package main | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  |  | ||||||
|  | 	log "github.com/golang/glog" | ||||||
|  | 	"github.com/micro/go-micro/broker" | ||||||
|  | 	"github.com/micro/go-micro/cmd" | ||||||
|  |  | ||||||
|  | 	// To enable rabbitmq plugin uncomment | ||||||
|  | 	//_ "github.com/micro/go-plugins/broker/rabbitmq" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	topic = "go.micro.topic.foo" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Example of a shared subscription which receives a subset of messages | ||||||
|  | func sharedSub() { | ||||||
|  | 	_, err := broker.Subscribe(topic, func(p broker.Publication) error { | ||||||
|  | 		fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) | ||||||
|  | 		return nil | ||||||
|  | 	}, broker.QueueName("consumer")) | ||||||
|  | 	if err != nil { | ||||||
|  | 		fmt.Println(err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Example of a subscription which receives all the messages | ||||||
|  | func sub() { | ||||||
|  | 	_, err := broker.Subscribe(topic, func(p broker.Publication) error { | ||||||
|  | 		fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) | ||||||
|  | 		return nil | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		fmt.Println(err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func main() { | ||||||
|  | 	cmd.Init() | ||||||
|  |  | ||||||
|  | 	if err := broker.Init(); err != nil { | ||||||
|  | 		log.Fatalf("Broker Init error: %v", err) | ||||||
|  | 	} | ||||||
|  | 	if err := broker.Connect(); err != nil { | ||||||
|  | 		log.Fatalf("Broker Connect error: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	sub() | ||||||
|  | 	select {} | ||||||
|  | } | ||||||
| @@ -33,8 +33,9 @@ func pub() { | |||||||
| } | } | ||||||
|  |  | ||||||
| func sub() { | func sub() { | ||||||
| 	_, err := broker.Subscribe(topic, func(msg *broker.Message) { | 	_, err := broker.Subscribe(topic, func(p broker.Publication) error { | ||||||
| 		fmt.Println("[sub] received message:", string(msg.Body), "header", msg.Header) | 		fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) | ||||||
|  | 		return nil | ||||||
| 	}) | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		fmt.Println(err) | 		fmt.Println(err) | ||||||
|   | |||||||
							
								
								
									
										50
									
								
								examples/pubsub/producer/producer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								examples/pubsub/producer/producer.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,50 @@ | |||||||
|  | package main | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	log "github.com/golang/glog" | ||||||
|  | 	"github.com/micro/go-micro/broker" | ||||||
|  | 	"github.com/micro/go-micro/cmd" | ||||||
|  |  | ||||||
|  | 	// To enable rabbitmq plugin uncomment | ||||||
|  | 	//_ "github.com/micro/go-plugins/broker/rabbitmq" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	topic = "go.micro.topic.foo" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func pub() { | ||||||
|  | 	tick := time.NewTicker(time.Second) | ||||||
|  | 	i := 0 | ||||||
|  | 	for _ = range tick.C { | ||||||
|  | 		msg := &broker.Message{ | ||||||
|  | 			Header: map[string]string{ | ||||||
|  | 				"id": fmt.Sprintf("%d", i), | ||||||
|  | 			}, | ||||||
|  | 			Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), | ||||||
|  | 		} | ||||||
|  | 		if err := broker.Publish(topic, msg); err != nil { | ||||||
|  | 			log.Errorf("[pub] failed: %v", err) | ||||||
|  | 		} else { | ||||||
|  | 			fmt.Println("[pub] pubbed message:", string(msg.Body)) | ||||||
|  | 		} | ||||||
|  | 		i++ | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func main() { | ||||||
|  | 	cmd.Init() | ||||||
|  |  | ||||||
|  | 	if err := broker.Init(); err != nil { | ||||||
|  | 		log.Fatalf("Broker Init error: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := broker.Connect(); err != nil { | ||||||
|  | 		log.Fatalf("Broker Connect error: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	pub() | ||||||
|  | } | ||||||
| @@ -155,11 +155,12 @@ func validateSubscriber(sub Subscriber) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler { | func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler { | ||||||
| 	return func(msg *broker.Message) { | 	return func(p broker.Publication) error { | ||||||
|  | 		msg := p.Message() | ||||||
| 		ct := msg.Header["Content-Type"] | 		ct := msg.Header["Content-Type"] | ||||||
| 		cf, err := s.newCodec(ct) | 		cf, err := s.newCodec(ct) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return | 			return err | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		hdr := make(map[string]string) | 		hdr := make(map[string]string) | ||||||
| @@ -190,11 +191,11 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle | |||||||
| 			defer co.Close() | 			defer co.Close() | ||||||
|  |  | ||||||
| 			if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { | 			if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { | ||||||
| 				continue | 				return err | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			if err := co.ReadBody(req.Interface()); err != nil { | 			if err := co.ReadBody(req.Interface()); err != nil { | ||||||
| 				continue | 				return err | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			fn := func(ctx context.Context, msg Publication) error { | 			fn := func(ctx context.Context, msg Publication) error { | ||||||
| @@ -225,6 +226,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle | |||||||
| 				message:     req.Interface(), | 				message:     req.Interface(), | ||||||
| 			}) | 			}) | ||||||
| 		} | 		} | ||||||
|  | 		return nil | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user