micro/broker/service/service.go

153 lines
3.6 KiB
Go
Raw Normal View History

2019-10-03 16:19:02 +01:00
// Package service provides the broker service client
package service
import (
"context"
2019-10-04 16:30:03 +01:00
"time"
2019-10-03 16:19:02 +01:00
"github.com/micro/go-micro/v2/broker"
pb "github.com/micro/go-micro/v2/broker/service/proto"
"github.com/micro/go-micro/v2/client"
"github.com/micro/go-micro/v2/logger"
2019-10-03 16:19:02 +01:00
)
type serviceBroker struct {
Addrs []string
Client pb.BrokerService
options broker.Options
}
var (
DefaultName = "go.micro.broker"
)
func (b *serviceBroker) Address() string {
return b.Addrs[0]
}
func (b *serviceBroker) Connect() error {
return nil
}
func (b *serviceBroker) Disconnect() error {
return nil
}
func (b *serviceBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&b.options)
}
return nil
}
func (b *serviceBroker) Options() broker.Options {
return b.options
}
func (b *serviceBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Publishing to topic %s broker %v", topic, b.Addrs)
}
2019-10-03 16:19:02 +01:00
_, err := b.Client.Publish(context.TODO(), &pb.PublishRequest{
Topic: topic,
Message: &pb.Message{
Header: msg.Header,
Body: msg.Body,
},
}, client.WithAddress(b.Addrs...))
return err
}
func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
var options broker.SubscribeOptions
for _, o := range opts {
o(&options)
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Subscribing to topic %s queue %s broker %v", topic, options.Queue, b.Addrs)
}
2019-10-03 16:19:02 +01:00
stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{
Topic: topic,
Queue: options.Queue,
2019-10-04 16:44:21 +01:00
}, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour))
2019-10-03 16:19:02 +01:00
if err != nil {
return nil, err
}
sub := &serviceSub{
topic: topic,
queue: options.Queue,
handler: handler,
stream: stream,
closed: make(chan bool),
options: options,
}
2019-10-04 16:30:03 +01:00
go func() {
for {
select {
case <-sub.closed:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Unsubscribed from topic %s", topic)
}
2019-10-04 16:30:03 +01:00
return
default:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
// run the subscriber
logger.Debugf("Streaming from broker %v to topic [%s] queue [%s]", b.Addrs, topic, options.Queue)
}
2019-10-04 16:30:03 +01:00
if err := sub.run(); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Resubscribing to topic %s broker %v", topic, b.Addrs)
}
2019-10-04 16:30:03 +01:00
stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{
Topic: topic,
Queue: options.Queue,
2019-10-04 16:44:21 +01:00
}, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour))
2019-10-04 16:30:03 +01:00
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Failed to resubscribe to topic %s: %v", topic, err)
}
2019-10-04 16:30:03 +01:00
time.Sleep(time.Second)
continue
}
// new stream
sub.stream = stream
}
}
}
}()
2019-10-03 16:19:02 +01:00
return sub, nil
}
func (b *serviceBroker) String() string {
return "service"
}
func NewBroker(opts ...broker.Option) broker.Broker {
var options broker.Options
for _, o := range opts {
o(&options)
}
addrs := options.Addrs
if len(addrs) == 0 {
addrs = []string{"127.0.0.1:8001"}
}
2020-01-19 00:55:01 +00:00
cli := client.DefaultClient
// get options client from the context. We set this in the context to prevent an import loop, as
// the client depends on the broker
if c, ok := options.Context.Value(clientKey{}).(client.Client); ok {
cli = c
}
2019-10-03 16:19:02 +01:00
return &serviceBroker{
Addrs: addrs,
2020-01-19 00:55:01 +00:00
Client: pb.NewBrokerService(DefaultName, cli),
2019-10-03 16:19:02 +01:00
options: options,
}
}