Fix and comment broker service
This commit is contained in:
		| @@ -1,6 +1,7 @@ | ||||
| package service | ||||
|  | ||||
| import ( | ||||
| 	"github.com/micro/go-micro/util/log" | ||||
| 	"github.com/micro/go-micro/broker" | ||||
| 	pb "github.com/micro/go-micro/broker/service/proto" | ||||
| ) | ||||
| @@ -31,24 +32,45 @@ func (s *serviceEvent) Ack() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *serviceSub) run() { | ||||
| func (s *serviceSub) isClosed() bool { | ||||
| 	select { | ||||
| 	case <-s.closed: | ||||
| 		return true | ||||
| 	default: | ||||
| 		return false | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *serviceSub) run() error { | ||||
| 	exit := make(chan bool) | ||||
| 	go func() { | ||||
| 		select { | ||||
| 		case <-exit: | ||||
| 			return | ||||
| 		case <-s.closed: | ||||
| 			s.stream.Close() | ||||
| 		} | ||||
|  | ||||
| 		// close the stream | ||||
| 		s.stream.Close() | ||||
| 	}() | ||||
|  | ||||
| 	for { | ||||
| 		// TODO: do not fail silently | ||||
| 		msg, err := s.stream.Recv() | ||||
| 		if err != nil { | ||||
| 			log.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err) | ||||
|  | ||||
| 			// close the exit channel | ||||
| 			close(exit) | ||||
| 			return | ||||
|  | ||||
| 			// don't return an error if we unsubscribed | ||||
| 			if s.isClosed() { | ||||
| 				return nil | ||||
| 			} | ||||
|  | ||||
| 			// return stream error | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		// TODO: handle error | ||||
| 		s.handler(&serviceEvent{ | ||||
| 			topic: s.topic, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user