Removed debug logs
This commit is contained in:
parent
70665e5a7d
commit
c5fb409760
@ -8,7 +8,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/micro/go-log"
|
|
||||||
"github.com/micro/go-micro/network/router/table"
|
"github.com/micro/go-micro/network/router/table"
|
||||||
"github.com/micro/go-micro/registry"
|
"github.com/micro/go-micro/registry"
|
||||||
"github.com/olekukonko/tablewriter"
|
"github.com/olekukonko/tablewriter"
|
||||||
@ -141,7 +140,6 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
|
|||||||
// get the service to retrieve all its info
|
// get the service to retrieve all its info
|
||||||
srvs, err := reg.GetService(service.Name)
|
srvs, err := reg.GetService(service.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Logf("r.manageRegistryRoutes() GetService() error: %v", err)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// manage the routes for all returned services
|
// manage the routes for all returned services
|
||||||
@ -178,8 +176,6 @@ func (r *router) watchServices(w registry.Watcher) error {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service)
|
|
||||||
|
|
||||||
if err := r.manageServiceRoutes(res.Service, res.Action); err != nil {
|
if err := r.manageServiceRoutes(res.Service, res.Action); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -211,8 +207,6 @@ func (r *router) watchTable(w table.Watcher) error {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logf("r.watchTable() new table event: %s", event)
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-r.exit:
|
case <-r.exit:
|
||||||
close(r.eventChan)
|
close(r.eventChan)
|
||||||
@ -230,8 +224,6 @@ func (r *router) watchTable(w table.Watcher) error {
|
|||||||
func (r *router) advertEvents(advType AdvertType, events []*table.Event) {
|
func (r *router) advertEvents(advType AdvertType, events []*table.Event) {
|
||||||
defer r.advertWg.Done()
|
defer r.advertWg.Done()
|
||||||
|
|
||||||
log.Logf("r.advertEvents(): start event: %s", advType)
|
|
||||||
|
|
||||||
a := &Advert{
|
a := &Advert{
|
||||||
ID: r.ID(),
|
ID: r.ID(),
|
||||||
Type: advType,
|
Type: advType,
|
||||||
@ -241,24 +233,19 @@ func (r *router) advertEvents(advType AdvertType, events []*table.Event) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case r.advertChan <- a:
|
case r.advertChan <- a:
|
||||||
log.Logf("r.advertEvents(): advertised event: %s", advType)
|
|
||||||
case <-r.exit:
|
case <-r.exit:
|
||||||
log.Logf("r.advertEvents(): DONE exit")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logf("r.advertEvents(): REGULAR exit")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// isFlapping detects if the event is flapping based on the current and previous event status.
|
// isFlapping detects if the event is flapping based on the current and previous event status.
|
||||||
func isFlapping(curr, prev *table.Event) bool {
|
func isFlapping(curr, prev *table.Event) bool {
|
||||||
if curr.Type == table.Update && prev.Type == table.Update {
|
if curr.Type == table.Update && prev.Type == table.Update {
|
||||||
log.Logf("isFlapping(): Update flap")
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
if curr.Type == table.Insert && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Insert {
|
if curr.Type == table.Insert && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Insert {
|
||||||
log.Logf("isFlapping(): Create/Delete flap")
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -312,7 +299,7 @@ func (r *router) processEvents() error {
|
|||||||
if e == nil {
|
if e == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Logf("r.processEvents(): event received:\n%s", e)
|
|
||||||
// determine the event penalty
|
// determine the event penalty
|
||||||
var penalty float64
|
var penalty float64
|
||||||
switch e.Type {
|
switch e.Type {
|
||||||
@ -337,6 +324,7 @@ func (r *router) processEvents() error {
|
|||||||
delta := time.Since(event.timestamp).Seconds()
|
delta := time.Since(event.timestamp).Seconds()
|
||||||
event.penalty = event.penalty*math.Exp(-delta) + penalty
|
event.penalty = event.penalty*math.Exp(-delta) + penalty
|
||||||
event.timestamp = now
|
event.timestamp = now
|
||||||
|
|
||||||
// suppress or recover the event based on its current penalty
|
// suppress or recover the event based on its current penalty
|
||||||
if !event.isSuppressed && event.penalty > AdvertSuppress {
|
if !event.isSuppressed && event.penalty > AdvertSuppress {
|
||||||
event.isSuppressed = true
|
event.isSuppressed = true
|
||||||
@ -353,13 +341,11 @@ func (r *router) processEvents() error {
|
|||||||
r.advertWg.Wait()
|
r.advertWg.Wait()
|
||||||
// close the advert channel
|
// close the advert channel
|
||||||
close(r.advertChan)
|
close(r.advertChan)
|
||||||
log.Logf("r.processEvents(): event processor stopped")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// we probably never reach this place
|
// we probably never reach this place
|
||||||
log.Logf("r.processEvents(): event processor stopped")
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -368,8 +354,6 @@ func (r *router) processEvents() error {
|
|||||||
func (r *router) watchErrors(errChan <-chan error) {
|
func (r *router) watchErrors(errChan <-chan error) {
|
||||||
defer r.wg.Done()
|
defer r.wg.Done()
|
||||||
|
|
||||||
log.Logf("r.manage(): manage start")
|
|
||||||
|
|
||||||
var code StatusCode
|
var code StatusCode
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@ -380,8 +364,6 @@ func (r *router) watchErrors(errChan <-chan error) {
|
|||||||
code = Error
|
code = Error
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logf("r.watchErrors(): watchErrors exiting")
|
|
||||||
|
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
status := Status{
|
status := Status{
|
||||||
@ -397,14 +379,11 @@ func (r *router) watchErrors(errChan <-chan error) {
|
|||||||
// drain the advertise channel
|
// drain the advertise channel
|
||||||
for range r.advertChan {
|
for range r.advertChan {
|
||||||
}
|
}
|
||||||
log.Logf("r.watchErrors(): advert channel drained")
|
|
||||||
// drain the event channel
|
// drain the event channel
|
||||||
for range r.eventChan {
|
for range r.eventChan {
|
||||||
}
|
}
|
||||||
log.Logf("r.watchErrors(): event channel drained")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logf("r.watchErrors(): watchErrors exit")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Advertise advertises the routes to the network.
|
// Advertise advertises the routes to the network.
|
||||||
@ -418,7 +397,6 @@ func (r *router) Advertise() (<-chan *Advert, error) {
|
|||||||
if err := r.manageRegistryRoutes(r.opts.Registry, "insert"); err != nil {
|
if err := r.manageRegistryRoutes(r.opts.Registry, "insert"); err != nil {
|
||||||
return nil, fmt.Errorf("failed adding routes: %s", err)
|
return nil, fmt.Errorf("failed adding routes: %s", err)
|
||||||
}
|
}
|
||||||
log.Logf("Routing table:\n%s", r.opts.Table)
|
|
||||||
|
|
||||||
// list routing table routes to announce
|
// list routing table routes to announce
|
||||||
routes, err := r.opts.Table.List()
|
routes, err := r.opts.Table.List()
|
||||||
@ -476,28 +454,22 @@ func (r *router) Advertise() (<-chan *Advert, error) {
|
|||||||
r.wg.Add(1)
|
r.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer r.wg.Done()
|
defer r.wg.Done()
|
||||||
log.Logf("r.Advertise(): r.watchServices() start")
|
|
||||||
// watch local registry and register routes in routine table
|
// watch local registry and register routes in routine table
|
||||||
errChan <- r.watchServices(svcWatcher)
|
errChan <- r.watchServices(svcWatcher)
|
||||||
log.Logf("r.Advertise(): r.watchServices() exit")
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
r.wg.Add(1)
|
r.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer r.wg.Done()
|
defer r.wg.Done()
|
||||||
log.Logf("r.Advertise(): r.watchTable() start")
|
|
||||||
// watch local registry and register routes in routing table
|
// watch local registry and register routes in routing table
|
||||||
errChan <- r.watchTable(tableWatcher)
|
errChan <- r.watchTable(tableWatcher)
|
||||||
log.Logf("r.Advertise(): r.watchTable() exit")
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
r.wg.Add(1)
|
r.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer r.wg.Done()
|
defer r.wg.Done()
|
||||||
log.Logf("r.Advertise(): r.processEvents() start")
|
|
||||||
// listen to routing table events and process them
|
// listen to routing table events and process them
|
||||||
errChan <- r.processEvents()
|
errChan <- r.processEvents()
|
||||||
log.Logf("r.Advertise(): r.processEvents() exit")
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// watch for errors and cleanup
|
// watch for errors and cleanup
|
||||||
@ -554,28 +526,23 @@ func (r *router) Status() Status {
|
|||||||
|
|
||||||
// Stop stops the router
|
// Stop stops the router
|
||||||
func (r *router) Stop() error {
|
func (r *router) Stop() error {
|
||||||
log.Logf("r.Stop(): Stopping router")
|
|
||||||
r.RLock()
|
r.RLock()
|
||||||
// only close the channel if the router is running
|
// only close the channel if the router is running
|
||||||
if r.status.Code == Running {
|
if r.status.Code == Running {
|
||||||
// notify all goroutines to finish
|
// notify all goroutines to finish
|
||||||
close(r.exit)
|
close(r.exit)
|
||||||
log.Logf("r.Stop(): exit closed")
|
|
||||||
// drain the advertise channel
|
// drain the advertise channel
|
||||||
for range r.advertChan {
|
for range r.advertChan {
|
||||||
}
|
}
|
||||||
log.Logf("r.Stop(): advert channel drained")
|
|
||||||
// drain the event channel
|
// drain the event channel
|
||||||
for range r.eventChan {
|
for range r.eventChan {
|
||||||
}
|
}
|
||||||
log.Logf("r.Stop(): event channel drained")
|
|
||||||
}
|
}
|
||||||
r.RUnlock()
|
r.RUnlock()
|
||||||
|
|
||||||
// wait for all goroutines to finish
|
// wait for all goroutines to finish
|
||||||
r.wg.Wait()
|
r.wg.Wait()
|
||||||
|
|
||||||
log.Logf("r.Stop(): Router stopped")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,7 +6,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/micro/go-log"
|
|
||||||
"github.com/olekukonko/tablewriter"
|
"github.com/olekukonko/tablewriter"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -209,16 +208,12 @@ func (t *table) sendEvent(r *Event) {
|
|||||||
t.RLock()
|
t.RLock()
|
||||||
defer t.RUnlock()
|
defer t.RUnlock()
|
||||||
|
|
||||||
log.Logf("sending event to %d registered table watchers", len(t.w))
|
|
||||||
|
|
||||||
for _, w := range t.w {
|
for _, w := range t.w {
|
||||||
select {
|
select {
|
||||||
case w.resChan <- r:
|
case w.resChan <- r:
|
||||||
case <-w.done:
|
case <-w.done:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Logf("sending event done")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of the routing table
|
// Size returns the size of the routing table
|
||||||
|
@ -6,7 +6,6 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/micro/go-log"
|
|
||||||
"github.com/olekukonko/tablewriter"
|
"github.com/olekukonko/tablewriter"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -101,7 +100,6 @@ func (w *tableWatcher) Next() (*Event, error) {
|
|||||||
case res.Route.Service, "*":
|
case res.Route.Service, "*":
|
||||||
return res, nil
|
return res, nil
|
||||||
default:
|
default:
|
||||||
log.Logf("no table watcher available to receive the event")
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
case <-w.done:
|
case <-w.done:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user