Fix the proxy watcher
This commit is contained in:
		| @@ -8,6 +8,7 @@ import ( | |||||||
| 	"sort" | 	"sort" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/client" | 	"github.com/micro/go-micro/client" | ||||||
| 	"github.com/micro/go-micro/codec" | 	"github.com/micro/go-micro/codec" | ||||||
| @@ -440,7 +441,16 @@ func NewProxy(opts ...options.Option) proxy.Proxy { | |||||||
|  |  | ||||||
| 	// watch router service routes | 	// watch router service routes | ||||||
| 	p.errChan = make(chan error, 1) | 	p.errChan = make(chan error, 1) | ||||||
| 	go p.watchRoutes() |  | ||||||
|  | 	go func() { | ||||||
|  | 		// continuously attempt to watch routes | ||||||
|  | 		for { | ||||||
|  | 			// watch the routes | ||||||
|  | 			p.watchRoutes() | ||||||
|  | 			// in case of failure just wait a second | ||||||
|  | 			time.Sleep(time.Second) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
| 	return p | 	return p | ||||||
| } | } | ||||||
|   | |||||||
| @@ -164,7 +164,7 @@ func (r *Router) Watch(ctx context.Context, req *pb.WatchRequest, stream pb.Rout | |||||||
| 	for { | 	for { | ||||||
| 		event, err := watcher.Next() | 		event, err := watcher.Next() | ||||||
| 		if err == router.ErrWatcherStopped { | 		if err == router.ErrWatcherStopped { | ||||||
| 			break | 			return errors.InternalServerError("go.micro.router", "watcher stopped") | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|   | |||||||
| @@ -357,7 +357,9 @@ func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	var options router.WatchOptions | 	options := router.WatchOptions{ | ||||||
|  | 		Service: "*", | ||||||
|  | 	} | ||||||
| 	for _, o := range opts { | 	for _, o := range opts { | ||||||
| 		o(&options) | 		o(&options) | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -70,11 +70,9 @@ func (w *watcher) watch(stream pb.Router_WatchService) error { | |||||||
| 			Route:     route, | 			Route:     route, | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		for { | 		select { | ||||||
| 			select { | 		case w.resChan <- event: | ||||||
| 			case w.resChan <- event: | 		case <-w.done: | ||||||
| 			case <-w.done: |  | ||||||
| 			} |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user