router/registry: fix rlock bug when registry errors (#1788)

* client: add select options

* router/registry: fix rlock bug when registry errors

* Revert "client: add select options"

This reverts commit 4d5283452e183f7387b604b51bde1deaf87ee391.

* router/registry: findRoutes under rlock

* add test

Co-authored-by: Dominic Wong <dom@micro.mu>
This commit is contained in:
ben-toogood 2020-07-02 18:29:11 +01:00 committed by GitHub
parent c58ac35dfc
commit 4ff114e798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 9 deletions

View File

@ -240,9 +240,6 @@ func findRoutes(routes map[uint64]Route, address, gateway, network, router strin
// Lookup queries routing table and returns all routes that match the lookup query // Lookup queries routing table and returns all routes that match the lookup query
func (t *table) Query(q ...QueryOption) ([]Route, error) { func (t *table) Query(q ...QueryOption) ([]Route, error) {
t.RLock()
defer t.RUnlock()
// create new query options // create new query options
opts := NewQuery(q...) opts := NewQuery(q...)
@ -254,29 +251,44 @@ func (t *table) Query(q ...QueryOption) ([]Route, error) {
return results, nil return results, nil
} }
// readAndFilter routes for this service under read lock.
readAndFilter := func() ([]Route, bool) {
t.RLock()
defer t.RUnlock()
routes, ok := t.routes[opts.Service]
if !ok || len(routes) == 0 {
return nil, false
}
return findRoutes(routes, opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy), true
}
if opts.Service != "*" { if opts.Service != "*" {
// try and load services from the cache // try and load services from the cache
if routes, ok := t.routes[opts.Service]; ok && len(routes) > 0 { if routes, ok := readAndFilter(); ok {
return findRoutes(routes, opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy), nil return routes, nil
} }
// load the cache and try again // load the cache and try again
t.RUnlock()
if err := t.fetchRoutes(opts.Service); err != nil { if err := t.fetchRoutes(opts.Service); err != nil {
return nil, err return nil, err
} }
t.RLock()
if routes, ok := t.routes[opts.Service]; ok && len(routes) > 0 { // try again
return findRoutes(routes, opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy), nil if routes, ok := readAndFilter(); ok {
return routes, nil
} }
return nil, ErrRouteNotFound return nil, ErrRouteNotFound
} }
// search through all destinations // search through all destinations
t.RLock()
for _, routes := range t.routes { for _, routes := range t.routes {
results = append(results, findRoutes(routes, opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy)...) results = append(results, findRoutes(routes, opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy)...)
} }
t.RUnlock()
return results, nil return results, nil
} }

View File

@ -330,3 +330,19 @@ func TestFallback(t *testing.T) {
} }
} }
func TestFallbackError(t *testing.T) {
r := &router{
subscribers: make(map[string]chan *Advert),
options: DefaultOptions(),
}
r.table = newTable(func(s string) error {
return fmt.Errorf("ERROR")
})
r.start()
_, err := r.Lookup(QueryService("go.micro.service.foo"))
if err == nil {
t.Errorf("expected error looking up service but none returned")
}
}