67 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			67 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package handler
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
 | 
						|
	"github.com/micro/go-micro/broker"
 | 
						|
	pb "github.com/micro/go-micro/broker/service/proto"
 | 
						|
	"github.com/micro/go-micro/errors"
 | 
						|
	"github.com/micro/go-micro/util/log"
 | 
						|
)
 | 
						|
 | 
						|
type Broker struct {
 | 
						|
	Broker broker.Broker
 | 
						|
}
 | 
						|
 | 
						|
func (b *Broker) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Empty) error {
 | 
						|
	log.Debugf("Publishing message to %s topic", req.Topic)
 | 
						|
	err := b.Broker.Publish(req.Topic, &broker.Message{
 | 
						|
		Header: req.Message.Header,
 | 
						|
		Body:   req.Message.Body,
 | 
						|
	})
 | 
						|
	log.Debugf("Published message to %s topic", req.Topic)
 | 
						|
	if err != nil {
 | 
						|
		return errors.InternalServerError("go.micro.broker", err.Error())
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (b *Broker) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Broker_SubscribeStream) error {
 | 
						|
	errChan := make(chan error, 1)
 | 
						|
 | 
						|
	// message handler to stream back messages from broker
 | 
						|
	handler := func(p broker.Event) error {
 | 
						|
		if err := stream.Send(&pb.Message{
 | 
						|
			Header: p.Message().Header,
 | 
						|
			Body:   p.Message().Body,
 | 
						|
		}); err != nil {
 | 
						|
			select {
 | 
						|
			case errChan <- err:
 | 
						|
				return err
 | 
						|
			default:
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	log.Debugf("Subscribing to %s topic", req.Topic)
 | 
						|
	sub, err := b.Broker.Subscribe(req.Topic, handler, broker.Queue(req.Queue))
 | 
						|
	if err != nil {
 | 
						|
		return errors.InternalServerError("go.micro.broker", err.Error())
 | 
						|
	}
 | 
						|
	defer func() {
 | 
						|
		log.Debugf("Unsubscribing from topic %s", req.Topic)
 | 
						|
		sub.Unsubscribe()
 | 
						|
	}()
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-ctx.Done():
 | 
						|
		log.Debugf("Context done for subscription to topic %s", req.Topic)
 | 
						|
		return nil
 | 
						|
	case err := <-errChan:
 | 
						|
		log.Debugf("Subscription error for topic %s: %v", req.Topic, err)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
}
 |