From 32928bfb6763861c210b79f87551fb88938fb501 Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 23 May 2015 23:16:26 +0100 Subject: [PATCH] Publish/Subscribe with context --- broker/broker.go | 13 +++++++------ broker/http_broker.go | 39 +++++++++++++++++++++++++++++---------- broker/nats/nats.go | 31 ++++++++++++++++++++++++------- examples/pub_sub.go | 14 +++++++++++--- 4 files changed, 71 insertions(+), 26 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 55eefa23..2d02eae5 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -2,6 +2,7 @@ package broker import ( "code.google.com/p/go-uuid/uuid" + "golang.org/x/net/context" ) type Broker interface { @@ -9,15 +10,15 @@ type Broker interface { Connect() error Disconnect() error Init() error - Publish(string, []byte) error - Subscribe(string, func(*Message)) (Subscriber, error) + Publish(context.Context, string, []byte) error + Subscribe(string, func(context.Context, *Message)) (Subscriber, error) } type Message struct { Id string Timestamp int64 Topic string - Data []byte + Body []byte } type Subscriber interface { @@ -59,10 +60,10 @@ func Disconnect() error { return DefaultBroker.Disconnect() } -func Publish(topic string, data []byte) error { - return DefaultBroker.Publish(topic, data) +func Publish(ctx context.Context, topic string, body []byte) error { + return DefaultBroker.Publish(ctx, topic, body) } -func Subscribe(topic string, function func(*Message)) (Subscriber, error) { +func Subscribe(topic string, function func(context.Context, *Message)) (Subscriber, error) { return DefaultBroker.Subscribe(topic, function) } diff --git a/broker/http_broker.go b/broker/http_broker.go index 6eb30769..9907cd58 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -17,8 +17,11 @@ import ( "code.google.com/p/go-uuid/uuid" log "github.com/golang/glog" + c "github.com/myodc/go-micro/context" "github.com/myodc/go-micro/errors" "github.com/myodc/go-micro/registry" + + "golang.org/x/net/context" ) type httpBroker struct { @@ -36,10 +39,16 @@ type httpSubscriber struct { id string topic string ch chan *httpSubscriber - fn func(*Message) + fn func(context.Context, *Message) svc registry.Service } +// used in brokers where there is no support for headers +type envelope struct { + Header map[string]string + Message *Message +} + var ( DefaultSubPath = "/_sub" ) @@ -141,24 +150,26 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - var msg *Message - if err = json.Unmarshal(b, &msg); err != nil { + var e *envelope + if err = json.Unmarshal(b, &e); err != nil { errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err)) w.WriteHeader(500) w.Write([]byte(errr.Error())) return } - if len(msg.Topic) == 0 { + if len(e.Message.Topic) == 0 { errr := errors.InternalServerError("go.micro.broker", "Topic not found") w.WriteHeader(500) w.Write([]byte(errr.Error())) return } + ctx := c.WithMetaData(context.Background(), e.Header) + h.RLock() - for _, subscriber := range h.subscribers[msg.Topic] { - subscriber.fn(msg) + for _, subscriber := range h.subscribers[e.Message.Topic] { + subscriber.fn(ctx, e.Message) } h.RUnlock() } @@ -184,18 +195,26 @@ func (h *httpBroker) Init() error { return nil } -func (h *httpBroker) Publish(topic string, data []byte) error { +func (h *httpBroker) Publish(ctx context.Context, topic string, body []byte) error { s, err := registry.GetService("topic:" + topic) if err != nil { return err } - b, err := json.Marshal(&Message{ + message := &Message{ Id: uuid.NewUUID().String(), Timestamp: time.Now().Unix(), Topic: topic, - Data: data, + Body: body, + } + + header, _ := c.GetMetaData(ctx) + + b, err := json.Marshal(&envelope{ + header, + message, }) + if err != nil { return err } @@ -210,7 +229,7 @@ func (h *httpBroker) Publish(topic string, data []byte) error { return nil } -func (h *httpBroker) Subscribe(topic string, function func(*Message)) (Subscriber, error) { +func (h *httpBroker) Subscribe(topic string, function func(context.Context, *Message)) (Subscriber, error) { // parse address for host, port parts := strings.Split(h.Address(), ":") host := strings.Join(parts[:len(parts)-1], ":") diff --git a/broker/nats/nats.go b/broker/nats/nats.go index 520b45a8..f19078fc 100644 --- a/broker/nats/nats.go +++ b/broker/nats/nats.go @@ -8,6 +8,9 @@ import ( "code.google.com/p/go-uuid/uuid" "github.com/apcera/nats" "github.com/myodc/go-micro/broker" + c "github.com/myodc/go-micro/context" + + "golang.org/x/net/context" ) type nbroker struct { @@ -19,6 +22,12 @@ type subscriber struct { s *nats.Subscription } +// used in brokers where there is no support for headers +type envelope struct { + Header map[string]string + Message *broker.Message +} + func (n *subscriber) Topic() string { return n.s.Subject } @@ -58,12 +67,19 @@ func (n *nbroker) Init() error { return nil } -func (n *nbroker) Publish(topic string, data []byte) error { - b, err := json.Marshal(&broker.Message{ +func (n *nbroker) Publish(ctx context.Context, topic string, body []byte) error { + header, _ := c.GetMetaData(ctx) + + message := &broker.Message{ Id: uuid.NewUUID().String(), Timestamp: time.Now().Unix(), Topic: topic, - Data: data, + Body: body, + } + + b, err := json.Marshal(&envelope{ + header, + message, }) if err != nil { return err @@ -71,13 +87,14 @@ func (n *nbroker) Publish(topic string, data []byte) error { return n.conn.Publish(topic, b) } -func (n *nbroker) Subscribe(topic string, function func(*broker.Message)) (broker.Subscriber, error) { +func (n *nbroker) Subscribe(topic string, function func(context.Context, *broker.Message)) (broker.Subscriber, error) { sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) { - var data *broker.Message - if err := json.Unmarshal(msg.Data, &data); err != nil { + var e *envelope + if err := json.Unmarshal(msg.Data, &e); err != nil { return } - function(data) + ctx := c.WithMetaData(context.Background(), e.Header) + function(ctx, e.Message) }) if err != nil { return nil, err diff --git a/examples/pub_sub.go b/examples/pub_sub.go index 6b22ac73..2c6d2059 100644 --- a/examples/pub_sub.go +++ b/examples/pub_sub.go @@ -7,6 +7,9 @@ import ( log "github.com/golang/glog" "github.com/myodc/go-micro/broker" "github.com/myodc/go-micro/cmd" + c "github.com/myodc/go-micro/context" + + "golang.org/x/net/context" ) var ( @@ -17,8 +20,12 @@ func pub() { tick := time.NewTicker(time.Second) i := 0 for _ = range tick.C { + ctx := c.WithMetaData(context.Background(), map[string]string{ + "id": fmt.Sprintf("%d", i), + }) + msg := fmt.Sprintf("%d: %s", i, time.Now().String()) - if err := broker.Publish(topic, []byte(msg)); err != nil { + if err := broker.Publish(ctx, topic, []byte(msg)); err != nil { log.Errorf("[pub] failed: %v", err) } else { fmt.Println("[pub] pubbed message:", msg) @@ -28,8 +35,9 @@ func pub() { } func sub() { - _, err := broker.Subscribe(topic, func(msg *broker.Message) { - fmt.Println("[sub] received message:", string(msg.Data)) + _, err := broker.Subscribe(topic, func(ctx context.Context, msg *broker.Message) { + md, _ := c.GetMetaData(ctx) + fmt.Println("[sub] received message:", string(msg.Body), "context", md) }) if err != nil { fmt.Println(err)