* prune util/log and user logger Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * plaintext logger Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * add newline Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
		
			
				
	
	
		
			102 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			102 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package service
 | |
| 
 | |
| import (
 | |
| 	"github.com/micro/go-micro/v2/broker"
 | |
| 	pb "github.com/micro/go-micro/v2/broker/service/proto"
 | |
| 	log "github.com/micro/go-micro/v2/logger"
 | |
| )
 | |
| 
 | |
| type serviceSub struct {
 | |
| 	topic   string
 | |
| 	queue   string
 | |
| 	handler broker.Handler
 | |
| 	stream  pb.Broker_SubscribeService
 | |
| 	closed  chan bool
 | |
| 	options broker.SubscribeOptions
 | |
| }
 | |
| 
 | |
| type serviceEvent struct {
 | |
| 	topic   string
 | |
| 	message *broker.Message
 | |
| }
 | |
| 
 | |
| func (s *serviceEvent) Topic() string {
 | |
| 	return s.topic
 | |
| }
 | |
| 
 | |
| func (s *serviceEvent) Message() *broker.Message {
 | |
| 	return s.message
 | |
| }
 | |
| 
 | |
| func (s *serviceEvent) Ack() error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *serviceSub) isClosed() bool {
 | |
| 	select {
 | |
| 	case <-s.closed:
 | |
| 		return true
 | |
| 	default:
 | |
| 		return false
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *serviceSub) run() error {
 | |
| 	exit := make(chan bool)
 | |
| 	go func() {
 | |
| 		select {
 | |
| 		case <-exit:
 | |
| 		case <-s.closed:
 | |
| 		}
 | |
| 
 | |
| 		// close the stream
 | |
| 		s.stream.Close()
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		// TODO: do not fail silently
 | |
| 		msg, err := s.stream.Recv()
 | |
| 		if err != nil {
 | |
| 			log.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err)
 | |
| 
 | |
| 			// close the exit channel
 | |
| 			close(exit)
 | |
| 
 | |
| 			// don't return an error if we unsubscribed
 | |
| 			if s.isClosed() {
 | |
| 				return nil
 | |
| 			}
 | |
| 
 | |
| 			// return stream error
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		// TODO: handle error
 | |
| 		s.handler(&serviceEvent{
 | |
| 			topic: s.topic,
 | |
| 			message: &broker.Message{
 | |
| 				Header: msg.Header,
 | |
| 				Body:   msg.Body,
 | |
| 			},
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *serviceSub) Options() broker.SubscribeOptions {
 | |
| 	return s.options
 | |
| }
 | |
| 
 | |
| func (s *serviceSub) Topic() string {
 | |
| 	return s.topic
 | |
| }
 | |
| 
 | |
| func (s *serviceSub) Unsubscribe() error {
 | |
| 	select {
 | |
| 	case <-s.closed:
 | |
| 		return nil
 | |
| 	default:
 | |
| 		close(s.closed)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |