diff --git a/broker/http_broker.go b/broker/http_broker.go index 5b362c14..4a22fe12 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -324,15 +324,21 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { p := &httpEvent{m: m, t: topic} id := req.Form.Get("id") + var subs []Handler + h.RLock() for _, subscriber := range h.subscribers[topic] { - if id == subscriber.id { - // sub is sync; crufty rate limiting - // so we don't hose the cpu - subscriber.fn(p) + if id != subscriber.id { + continue } + subs = append(subs, subscriber.fn) } h.RUnlock() + + // execute the handler + for _, fn := range subs { + fn(p) + } } func (h *httpBroker) Address() string { @@ -420,7 +426,6 @@ func (h *httpBroker) Connect() error { } func (h *httpBroker) Disconnect() error { - h.RLock() if !h.running { h.RUnlock() diff --git a/broker/service/handler/handler.go b/broker/service/handler/handler.go index 3d59dc2e..e0b5c3d4 100644 --- a/broker/service/handler/handler.go +++ b/broker/service/handler/handler.go @@ -19,6 +19,7 @@ func (b *Broker) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Em Header: req.Message.Header, Body: req.Message.Body, }) + log.Debugf("Published message to %s topic", req.Topic) if err != nil { return errors.InternalServerError("go.micro.broker", err.Error()) } @@ -49,12 +50,17 @@ func (b *Broker) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream if err != nil { return errors.InternalServerError("go.micro.broker", err.Error()) } - defer sub.Unsubscribe() + defer func() { + log.Debugf("Unsubscribing from topic %s", req.Topic) + sub.Unsubscribe() + }() select { case <-ctx.Done(): + log.Debugf("Context done for subscription to topic %s", req.Topic) return nil - case <-errChan: + case err := <-errChan: + log.Debugf("Subscription error for topic %s: %v", req.Topic, err) return err } } diff --git a/broker/service/service.go b/broker/service/service.go index 63c09624..58942fe3 100644 --- a/broker/service/service.go +++ b/broker/service/service.go @@ -3,7 +3,9 @@ package service import ( "context" + "time" + "github.com/micro/go-micro/util/log" "github.com/micro/go-micro/broker" pb "github.com/micro/go-micro/broker/service/proto" "github.com/micro/go-micro/client" @@ -43,6 +45,7 @@ func (b *serviceBroker) Options() broker.Options { } func (b *serviceBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { + log.Debugf("Publishing to topic %s broker %v", topic, b.Addrs) _, err := b.Client.Publish(context.TODO(), &pb.PublishRequest{ Topic: topic, Message: &pb.Message{ @@ -58,6 +61,7 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ... for _, o := range opts { o(&options) } + log.Debugf("Subscribing to topic %s queue %s broker %v", topic, options.Queue, b.Addrs) stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ Topic: topic, Queue: options.Queue, @@ -74,7 +78,33 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ... closed: make(chan bool), options: options, } - go sub.run() + + go func() { + for { + select { + case <-sub.closed: + log.Debugf("Unsubscribed from topic %s", topic) + return + default: + // run the subscriber + log.Debugf("Streaming from broker %v to topic [%s] queue [%s]", b.Addrs, topic, options.Queue) + if err := sub.run(); err != nil { + log.Debugf("Resubscribing to topic %s broker %v", topic, b.Addrs) + stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Topic: topic, + Queue: options.Queue, + }, client.WithAddress(b.Addrs...)) + if err != nil { + log.Debugf("Failed to resubscribe to topic %s: %v", topic, err) + time.Sleep(time.Second) + continue + } + // new stream + sub.stream = stream + } + } + } + }() return sub, nil } diff --git a/broker/service/subscriber.go b/broker/service/subscriber.go index 09a942c8..02511196 100644 --- a/broker/service/subscriber.go +++ b/broker/service/subscriber.go @@ -1,6 +1,7 @@ package service import ( + "github.com/micro/go-micro/util/log" "github.com/micro/go-micro/broker" pb "github.com/micro/go-micro/broker/service/proto" ) @@ -31,24 +32,45 @@ func (s *serviceEvent) Ack() error { return nil } -func (s *serviceSub) run() { +func (s *serviceSub) isClosed() bool { + select { + case <-s.closed: + return true + default: + return false + } +} + +func (s *serviceSub) run() error { exit := make(chan bool) go func() { select { case <-exit: - return case <-s.closed: - s.stream.Close() } + + // close the stream + s.stream.Close() }() for { // TODO: do not fail silently msg, err := s.stream.Recv() if err != nil { + log.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err) + + // close the exit channel close(exit) - return + + // don't return an error if we unsubscribed + if s.isClosed() { + return nil + } + + // return stream error + return err } + // TODO: handle error s.handler(&serviceEvent{ topic: s.topic,