102 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			102 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package service
 | 
						|
 | 
						|
import (
 | 
						|
	"github.com/micro/go-micro/broker"
 | 
						|
	pb "github.com/micro/go-micro/broker/service/proto"
 | 
						|
	"github.com/micro/go-micro/util/log"
 | 
						|
)
 | 
						|
 | 
						|
type serviceSub struct {
 | 
						|
	topic   string
 | 
						|
	queue   string
 | 
						|
	handler broker.Handler
 | 
						|
	stream  pb.Broker_SubscribeService
 | 
						|
	closed  chan bool
 | 
						|
	options broker.SubscribeOptions
 | 
						|
}
 | 
						|
 | 
						|
type serviceEvent struct {
 | 
						|
	topic   string
 | 
						|
	message *broker.Message
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceEvent) Topic() string {
 | 
						|
	return s.topic
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceEvent) Message() *broker.Message {
 | 
						|
	return s.message
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceEvent) Ack() error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceSub) isClosed() bool {
 | 
						|
	select {
 | 
						|
	case <-s.closed:
 | 
						|
		return true
 | 
						|
	default:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceSub) run() error {
 | 
						|
	exit := make(chan bool)
 | 
						|
	go func() {
 | 
						|
		select {
 | 
						|
		case <-exit:
 | 
						|
		case <-s.closed:
 | 
						|
		}
 | 
						|
 | 
						|
		// close the stream
 | 
						|
		s.stream.Close()
 | 
						|
	}()
 | 
						|
 | 
						|
	for {
 | 
						|
		// TODO: do not fail silently
 | 
						|
		msg, err := s.stream.Recv()
 | 
						|
		if err != nil {
 | 
						|
			log.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err)
 | 
						|
 | 
						|
			// close the exit channel
 | 
						|
			close(exit)
 | 
						|
 | 
						|
			// don't return an error if we unsubscribed
 | 
						|
			if s.isClosed() {
 | 
						|
				return nil
 | 
						|
			}
 | 
						|
 | 
						|
			// return stream error
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		// TODO: handle error
 | 
						|
		s.handler(&serviceEvent{
 | 
						|
			topic: s.topic,
 | 
						|
			message: &broker.Message{
 | 
						|
				Header: msg.Header,
 | 
						|
				Body:   msg.Body,
 | 
						|
			},
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceSub) Options() broker.SubscribeOptions {
 | 
						|
	return s.options
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceSub) Topic() string {
 | 
						|
	return s.topic
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceSub) Unsubscribe() error {
 | 
						|
	select {
 | 
						|
	case <-s.closed:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
		close(s.closed)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |