// Package service provides the broker service client package service import ( "context" "time" "github.com/micro/go-micro/v2/broker" pb "github.com/micro/go-micro/v2/broker/service/proto" "github.com/micro/go-micro/v2/client" "github.com/micro/go-micro/v2/logger" ) type serviceBroker struct { Addrs []string Client pb.BrokerService options broker.Options } var ( DefaultName = "go.micro.broker" ) func (b *serviceBroker) Address() string { return b.Addrs[0] } func (b *serviceBroker) Connect() error { return nil } func (b *serviceBroker) Disconnect() error { return nil } func (b *serviceBroker) Init(opts ...broker.Option) error { for _, o := range opts { o(&b.options) } return nil } func (b *serviceBroker) Options() broker.Options { return b.options } func (b *serviceBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Publishing to topic %s broker %v", topic, b.Addrs) } _, err := b.Client.Publish(context.TODO(), &pb.PublishRequest{ Topic: topic, Message: &pb.Message{ Header: msg.Header, Body: msg.Body, }, }, client.WithAddress(b.Addrs...)) return err } func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { var options broker.SubscribeOptions for _, o := range opts { o(&options) } if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Subscribing to topic %s queue %s broker %v", topic, options.Queue, b.Addrs) } stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ Topic: topic, Queue: options.Queue, }, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour)) if err != nil { return nil, err } sub := &serviceSub{ topic: topic, queue: options.Queue, handler: handler, stream: stream, closed: make(chan bool), options: options, } go func() { for { select { case <-sub.closed: if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Unsubscribed from topic %s", topic) } return default: if logger.V(logger.DebugLevel, logger.DefaultLogger) { // run the subscriber logger.Debugf("Streaming from broker %v to topic [%s] queue [%s]", b.Addrs, topic, options.Queue) } if err := sub.run(); err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Resubscribing to topic %s broker %v", topic, b.Addrs) } stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ Topic: topic, Queue: options.Queue, }, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour)) if err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Failed to resubscribe to topic %s: %v", topic, err) } time.Sleep(time.Second) continue } // new stream sub.stream = stream } } } }() return sub, nil } func (b *serviceBroker) String() string { return "service" } func NewBroker(opts ...broker.Option) broker.Broker { var options broker.Options for _, o := range opts { o(&options) } addrs := options.Addrs if len(addrs) == 0 { addrs = []string{"127.0.0.1:8001"} } cli := client.DefaultClient // get options client from the context. We set this in the context to prevent an import loop, as // the client depends on the broker if c, ok := options.Context.Value(clientKey{}).(client.Client); ok { cli = c } return &serviceBroker{ Addrs: addrs, Client: pb.NewBrokerService(DefaultName, cli), options: options, } }