181 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			181 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package handler
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"io"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/micro/go-micro/errors"
 | |
| 	"github.com/micro/go-micro/router"
 | |
| 	pb "github.com/micro/go-micro/router/proto"
 | |
| )
 | |
| 
 | |
| // Router implements router handler
 | |
| type Router struct {
 | |
| 	Router router.Router
 | |
| }
 | |
| 
 | |
| // Lookup looks up routes in the routing table and returns them
 | |
| func (r *Router) Lookup(ctx context.Context, req *pb.LookupRequest, resp *pb.LookupResponse) error {
 | |
| 	query := router.NewQuery(
 | |
| 		router.QueryService(req.Query.Service),
 | |
| 	)
 | |
| 
 | |
| 	routes, err := r.Router.Lookup(query)
 | |
| 	if err != nil {
 | |
| 		return errors.InternalServerError("go.micro.router", "failed to lookup routes: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	var respRoutes []*pb.Route
 | |
| 	for _, route := range routes {
 | |
| 		respRoute := &pb.Route{
 | |
| 			Service: route.Service,
 | |
| 			Address: route.Address,
 | |
| 			Gateway: route.Gateway,
 | |
| 			Network: route.Network,
 | |
| 			Link:    route.Link,
 | |
| 			Metric:  int64(route.Metric),
 | |
| 		}
 | |
| 		respRoutes = append(respRoutes, respRoute)
 | |
| 	}
 | |
| 
 | |
| 	resp.Routes = respRoutes
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (r *Router) Advertise(ctx context.Context, req *pb.Request, stream pb.Router_AdvertiseStream) error {
 | |
| 	advertChan, err := r.Router.Advertise()
 | |
| 	if err != nil {
 | |
| 		return errors.InternalServerError("go.micro.router", "failed to get adverts: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	for advert := range advertChan {
 | |
| 		var events []*pb.Event
 | |
| 		for _, event := range advert.Events {
 | |
| 			route := &pb.Route{
 | |
| 				Service: event.Route.Service,
 | |
| 				Address: event.Route.Address,
 | |
| 				Gateway: event.Route.Gateway,
 | |
| 				Network: event.Route.Network,
 | |
| 				Link:    event.Route.Link,
 | |
| 				Metric:  int64(event.Route.Metric),
 | |
| 			}
 | |
| 			e := &pb.Event{
 | |
| 				Type:      pb.EventType(event.Type),
 | |
| 				Timestamp: event.Timestamp.UnixNano(),
 | |
| 				Route:     route,
 | |
| 			}
 | |
| 			events = append(events, e)
 | |
| 		}
 | |
| 
 | |
| 		advert := &pb.Advert{
 | |
| 			Id:        advert.Id,
 | |
| 			Type:      pb.AdvertType(advert.Type),
 | |
| 			Timestamp: advert.Timestamp.UnixNano(),
 | |
| 			Events:    events,
 | |
| 		}
 | |
| 
 | |
| 		// send the advert
 | |
| 		err := stream.Send(advert)
 | |
| 		if err == io.EOF {
 | |
| 			return nil
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return errors.InternalServerError("go.micro.router", "error sending message %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessResponse) error {
 | |
| 	events := make([]*router.Event, len(req.Events))
 | |
| 	for i, event := range req.Events {
 | |
| 		route := router.Route{
 | |
| 			Service: event.Route.Service,
 | |
| 			Address: event.Route.Address,
 | |
| 			Gateway: event.Route.Gateway,
 | |
| 			Network: event.Route.Network,
 | |
| 			Link:    event.Route.Link,
 | |
| 			Metric:  int(event.Route.Metric),
 | |
| 		}
 | |
| 
 | |
| 		events[i] = &router.Event{
 | |
| 			Type:      router.EventType(event.Type),
 | |
| 			Timestamp: time.Unix(0, event.Timestamp),
 | |
| 			Route:     route,
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	advert := &router.Advert{
 | |
| 		Id:        req.Id,
 | |
| 		Type:      router.AdvertType(req.Type),
 | |
| 		Timestamp: time.Unix(0, req.Timestamp),
 | |
| 		TTL:       time.Duration(req.Ttl),
 | |
| 		Events:    events,
 | |
| 	}
 | |
| 
 | |
| 	if err := r.Router.Process(advert); err != nil {
 | |
| 		return errors.InternalServerError("go.micro.router", "error publishing advert: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (r *Router) Status(ctx context.Context, req *pb.Request, rsp *pb.StatusResponse) error {
 | |
| 	status := r.Router.Status()
 | |
| 
 | |
| 	rsp.Status = &pb.Status{
 | |
| 		Code: status.Code.String(),
 | |
| 	}
 | |
| 
 | |
| 	if status.Error != nil {
 | |
| 		rsp.Status.Error = status.Error.Error()
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Watch streans routing table events
 | |
| func (r *Router) Watch(ctx context.Context, req *pb.WatchRequest, stream pb.Router_WatchStream) error {
 | |
| 	watcher, err := r.Router.Watch()
 | |
| 	if err != nil {
 | |
| 		return errors.InternalServerError("go.micro.router", "failed creating event watcher: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	defer stream.Close()
 | |
| 
 | |
| 	for {
 | |
| 		event, err := watcher.Next()
 | |
| 		if err == router.ErrWatcherStopped {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		if err != nil {
 | |
| 			return errors.InternalServerError("go.micro.router", "error watching events: %v", err)
 | |
| 		}
 | |
| 
 | |
| 		route := &pb.Route{
 | |
| 			Service: event.Route.Service,
 | |
| 			Address: event.Route.Address,
 | |
| 			Gateway: event.Route.Gateway,
 | |
| 			Network: event.Route.Network,
 | |
| 			Link:    event.Route.Link,
 | |
| 			Metric:  int64(event.Route.Metric),
 | |
| 		}
 | |
| 
 | |
| 		tableEvent := &pb.Event{
 | |
| 			Type:      pb.EventType(event.Type),
 | |
| 			Timestamp: event.Timestamp.UnixNano(),
 | |
| 			Route:     route,
 | |
| 		}
 | |
| 
 | |
| 		if err := stream.Send(tableEvent); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |