Merge pull request #792 from milosgajdos83/router-fixes
Simplified table code. Fixed event dedup
This commit is contained in:
		| @@ -622,7 +622,7 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste | |||||||
| 					Events:    events, | 					Events:    events, | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
| 				log.Debugf("Network router processing advert: %s", advert.Id) | 				log.Debugf("Network router %s processing advert: %s", n.Id(), advert.Id) | ||||||
| 				if err := n.router.Process(advert); err != nil { | 				if err := n.router.Process(advert); err != nil { | ||||||
| 					log.Debugf("Network failed to process advert %s: %v", advert.Id, err) | 					log.Debugf("Network failed to process advert %s: %v", advert.Id, err) | ||||||
| 				} | 				} | ||||||
|   | |||||||
| @@ -113,19 +113,19 @@ func (r *router) manageRoute(route Route, action string) error { | |||||||
| 		if err := r.table.Create(route); err != nil && err != ErrDuplicateRoute { | 		if err := r.table.Create(route); err != nil && err != ErrDuplicateRoute { | ||||||
| 			return fmt.Errorf("failed adding route for service %s: %s", route.Service, err) | 			return fmt.Errorf("failed adding route for service %s: %s", route.Service, err) | ||||||
| 		} | 		} | ||||||
| 	case "update": |  | ||||||
| 		if err := r.table.Update(route); err != nil && err != ErrDuplicateRoute { |  | ||||||
| 			return fmt.Errorf("failed updating route for service %s: %s", route.Service, err) |  | ||||||
| 		} |  | ||||||
| 	case "delete": | 	case "delete": | ||||||
| 		if err := r.table.Delete(route); err != nil && err != ErrRouteNotFound { | 		if err := r.table.Delete(route); err != nil && err != ErrRouteNotFound { | ||||||
| 			return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err) | 			return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err) | ||||||
| 		} | 		} | ||||||
|  | 	case "update": | ||||||
|  | 		if err := r.table.Update(route); err != nil { | ||||||
|  | 			return fmt.Errorf("failed updating route for service %s: %s", route.Service, err) | ||||||
|  | 		} | ||||||
| 	case "solicit": | 	case "solicit": | ||||||
| 		// nothing to do here | 		// nothing to do here | ||||||
| 		return nil | 		return nil | ||||||
| 	default: | 	default: | ||||||
| 		return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", route.Service, action) | 		return fmt.Errorf("failed to manage route for service %s: unknown action %s", route.Service, action) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| @@ -426,6 +426,7 @@ func (r *router) advertiseEvents() error { | |||||||
| 			// advertise all Update events to subscribers | 			// advertise all Update events to subscribers | ||||||
| 			if len(events) > 0 { | 			if len(events) > 0 { | ||||||
| 				r.advertWg.Add(1) | 				r.advertWg.Add(1) | ||||||
|  | 				log.Debugf("Router publishing %d events", len(events)) | ||||||
| 				go r.publishAdvert(RouteUpdate, events) | 				go r.publishAdvert(RouteUpdate, events) | ||||||
| 			} | 			} | ||||||
| 		case e := <-r.eventChan: | 		case e := <-r.eventChan: | ||||||
| @@ -433,7 +434,7 @@ func (r *router) advertiseEvents() error { | |||||||
| 			if e == nil { | 			if e == nil { | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
|  | 			log.Debugf("Router processing table event %s for service %s", e.Type, e.Route.Address) | ||||||
| 			// determine the event penalty | 			// determine the event penalty | ||||||
| 			var penalty float64 | 			var penalty float64 | ||||||
| 			switch e.Type { | 			switch e.Type { | ||||||
| @@ -460,7 +461,8 @@ func (r *router) advertiseEvents() error { | |||||||
|  |  | ||||||
| 			// attempt to squash last two events if possible | 			// attempt to squash last two events if possible | ||||||
| 			lastEvent := advert.events[len(advert.events)-1] | 			lastEvent := advert.events[len(advert.events)-1] | ||||||
| 			if lastEvent.Type == e.Type { | 			if lastEvent.Type == e.Type && lastEvent.Route.Hash() == hash { | ||||||
|  | 				log.Debugf("Router squashing event %s with hash %d for service %s", e.Type, hash, e.Route.Address) | ||||||
| 				advert.events[len(advert.events)-1] = e | 				advert.events[len(advert.events)-1] = e | ||||||
| 			} else { | 			} else { | ||||||
| 				advert.events = append(advert.events, e) | 				advert.events = append(advert.events, e) | ||||||
| @@ -666,6 +668,8 @@ func (r *router) Process(a *Advert) error { | |||||||
| 		return events[i].Timestamp.Before(events[j].Timestamp) | 		return events[i].Timestamp.Before(events[j].Timestamp) | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
|  | 	log.Debugf("Router %s processing advert from: %s", r.options.Id, a.Id) | ||||||
|  |  | ||||||
| 	for _, event := range events { | 	for _, event := range events { | ||||||
| 		// skip if the router is the origin of this route | 		// skip if the router is the origin of this route | ||||||
| 		if event.Route.Router == r.options.Id { | 		if event.Route.Router == r.options.Id { | ||||||
| @@ -675,7 +679,7 @@ func (r *router) Process(a *Advert) error { | |||||||
| 		// create a copy of the route | 		// create a copy of the route | ||||||
| 		route := event.Route | 		route := event.Route | ||||||
| 		action := event.Type | 		action := event.Type | ||||||
| 		log.Debugf("Router processing route action %s: %s", action, r.options.Id) | 		log.Debugf("Router %s applying %s from router %s for address: %s", r.options.Id, action, route.Router, route.Address) | ||||||
| 		if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil { | 		if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil { | ||||||
| 			return fmt.Errorf("failed applying action %s to routing table: %s", action, err) | 			return fmt.Errorf("failed applying action %s to routing table: %s", action, err) | ||||||
| 		} | 		} | ||||||
|   | |||||||
| @@ -6,6 +6,7 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/google/uuid" | 	"github.com/google/uuid" | ||||||
|  | 	"github.com/micro/go-micro/util/log" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| @@ -56,14 +57,12 @@ func (t *table) Create(r Route) error { | |||||||
| 	// check if there are any routes in the table for the route destination | 	// check if there are any routes in the table for the route destination | ||||||
| 	if _, ok := t.routes[service]; !ok { | 	if _, ok := t.routes[service]; !ok { | ||||||
| 		t.routes[service] = make(map[uint64]Route) | 		t.routes[service] = make(map[uint64]Route) | ||||||
| 		t.routes[service][sum] = r |  | ||||||
| 		go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r}) |  | ||||||
| 		return nil |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// add new route to the table for the route destination | 	// add new route to the table for the route destination | ||||||
| 	if _, ok := t.routes[service][sum]; !ok { | 	if _, ok := t.routes[service][sum]; !ok { | ||||||
| 		t.routes[service][sum] = r | 		t.routes[service][sum] = r | ||||||
|  | 		log.Debugf("Router emitting %s for route: %s", Create, r.Address) | ||||||
| 		go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r}) | 		go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r}) | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| @@ -83,11 +82,14 @@ func (t *table) Delete(r Route) error { | |||||||
| 		return ErrRouteNotFound | 		return ErrRouteNotFound | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if _, ok := t.routes[service][sum]; ok { | 	if _, ok := t.routes[service][sum]; !ok { | ||||||
| 		delete(t.routes[service], sum) | 		return ErrRouteNotFound | ||||||
| 		go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r}) |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	delete(t.routes[service], sum) | ||||||
|  | 	log.Debugf("Router emitting %s for route: %s", Delete, r.Address) | ||||||
|  | 	go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r}) | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -102,19 +104,17 @@ func (t *table) Update(r Route) error { | |||||||
| 	// check if the route destination has any routes in the table | 	// check if the route destination has any routes in the table | ||||||
| 	if _, ok := t.routes[service]; !ok { | 	if _, ok := t.routes[service]; !ok { | ||||||
| 		t.routes[service] = make(map[uint64]Route) | 		t.routes[service] = make(map[uint64]Route) | ||||||
| 		t.routes[service][sum] = r |  | ||||||
| 		go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r}) |  | ||||||
| 		return nil |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if _, ok := t.routes[service][sum]; !ok { | 	if _, ok := t.routes[service][sum]; !ok { | ||||||
| 		t.routes[service][sum] = r | 		t.routes[service][sum] = r | ||||||
|  | 		log.Debugf("Router emitting %s for route: %s", Update, r.Address) | ||||||
| 		go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r}) | 		go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r}) | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// just update the route, but dont emit Update event | ||||||
| 	t.routes[service][sum] = r | 	t.routes[service][sum] = r | ||||||
| 	go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r}) |  | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user