Add router handler
This commit is contained in:
parent
318367cd71
commit
b1c49a0ddc
180
network/router/handler/router.go
Normal file
180
network/router/handler/router.go
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/errors"
|
||||||
|
"github.com/micro/go-micro/network/router"
|
||||||
|
pb "github.com/micro/go-micro/network/router/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Router implements router handler
|
||||||
|
type Router struct {
|
||||||
|
Router router.Router
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lookup looks up routes in the routing table and returns them
|
||||||
|
func (r *Router) Lookup(ctx context.Context, req *pb.LookupRequest, resp *pb.LookupResponse) error {
|
||||||
|
query := router.NewQuery(
|
||||||
|
router.QueryService(req.Query.Service),
|
||||||
|
)
|
||||||
|
|
||||||
|
routes, err := r.Router.Lookup(query)
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "failed to lookup routes: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var respRoutes []*pb.Route
|
||||||
|
for _, route := range routes {
|
||||||
|
respRoute := &pb.Route{
|
||||||
|
Service: route.Service,
|
||||||
|
Address: route.Address,
|
||||||
|
Gateway: route.Gateway,
|
||||||
|
Network: route.Network,
|
||||||
|
Link: route.Link,
|
||||||
|
Metric: int64(route.Metric),
|
||||||
|
}
|
||||||
|
respRoutes = append(respRoutes, respRoute)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.Routes = respRoutes
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) Advertise(ctx context.Context, req *pb.AdvertiseRequest, stream pb.Router_AdvertiseStream) error {
|
||||||
|
advertChan, err := r.Router.Advertise()
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "failed to get adverts: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for advert := range advertChan {
|
||||||
|
var events []*pb.Event
|
||||||
|
for _, event := range advert.Events {
|
||||||
|
route := &pb.Route{
|
||||||
|
Service: event.Route.Service,
|
||||||
|
Address: event.Route.Address,
|
||||||
|
Gateway: event.Route.Gateway,
|
||||||
|
Network: event.Route.Network,
|
||||||
|
Link: event.Route.Link,
|
||||||
|
Metric: int64(event.Route.Metric),
|
||||||
|
}
|
||||||
|
e := &pb.Event{
|
||||||
|
Type: pb.EventType(event.Type),
|
||||||
|
Timestamp: event.Timestamp.UnixNano(),
|
||||||
|
Route: route,
|
||||||
|
}
|
||||||
|
events = append(events, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
advert := &pb.Advert{
|
||||||
|
Id: advert.Id,
|
||||||
|
Type: pb.AdvertType(advert.Type),
|
||||||
|
Timestamp: advert.Timestamp.UnixNano(),
|
||||||
|
Events: events,
|
||||||
|
}
|
||||||
|
|
||||||
|
// send the advert
|
||||||
|
err := stream.Send(advert)
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "error sending message %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessResponse) error {
|
||||||
|
events := make([]*router.Event, len(req.Events))
|
||||||
|
for i, event := range req.Events {
|
||||||
|
route := router.Route{
|
||||||
|
Service: event.Route.Service,
|
||||||
|
Address: event.Route.Address,
|
||||||
|
Gateway: event.Route.Gateway,
|
||||||
|
Network: event.Route.Network,
|
||||||
|
Link: event.Route.Link,
|
||||||
|
Metric: int(event.Route.Metric),
|
||||||
|
}
|
||||||
|
|
||||||
|
events[i] = &router.Event{
|
||||||
|
Type: router.EventType(event.Type),
|
||||||
|
Timestamp: time.Unix(0, event.Timestamp),
|
||||||
|
Route: route,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
advert := &router.Advert{
|
||||||
|
Id: req.Id,
|
||||||
|
Type: router.AdvertType(req.Type),
|
||||||
|
Timestamp: time.Unix(0, req.Timestamp),
|
||||||
|
TTL: time.Duration(req.Ttl),
|
||||||
|
Events: events,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.Router.Process(advert); err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "error publishing advert: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Router) Status(ctx context.Context, req *pb.Request, rsp *pb.StatusResponse) error {
|
||||||
|
status := r.Router.Status()
|
||||||
|
|
||||||
|
rsp.Status = &pb.Status{
|
||||||
|
Code: status.Code.String(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.Error != nil {
|
||||||
|
rsp.Status.Error = status.Error.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch streans routing table events
|
||||||
|
func (r *Router) Watch(ctx context.Context, req *pb.WatchRequest, stream pb.Router_WatchStream) error {
|
||||||
|
watcher, err := r.Router.Watch()
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "failed creating event watcher: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer stream.Close()
|
||||||
|
|
||||||
|
for {
|
||||||
|
event, err := watcher.Next()
|
||||||
|
if err == router.ErrWatcherStopped {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "error watching events: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
route := &pb.Route{
|
||||||
|
Service: event.Route.Service,
|
||||||
|
Address: event.Route.Address,
|
||||||
|
Gateway: event.Route.Gateway,
|
||||||
|
Network: event.Route.Network,
|
||||||
|
Link: event.Route.Link,
|
||||||
|
Metric: int64(event.Route.Metric),
|
||||||
|
}
|
||||||
|
|
||||||
|
tableEvent := &pb.Event{
|
||||||
|
Type: pb.EventType(event.Type),
|
||||||
|
Timestamp: event.Timestamp.UnixNano(),
|
||||||
|
Route: route,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stream.Send(tableEvent); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
114
network/router/handler/table.go
Normal file
114
network/router/handler/table.go
Normal file
@ -0,0 +1,114 @@
|
|||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/errors"
|
||||||
|
"github.com/micro/go-micro/network/router"
|
||||||
|
pb "github.com/micro/go-micro/network/router/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Table struct {
|
||||||
|
Router router.Router
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Table) Create(ctx context.Context, route *pb.Route, resp *pb.CreateResponse) error {
|
||||||
|
err := t.Router.Table().Create(router.Route{
|
||||||
|
Service: route.Service,
|
||||||
|
Address: route.Address,
|
||||||
|
Gateway: route.Gateway,
|
||||||
|
Network: route.Network,
|
||||||
|
Link: route.Link,
|
||||||
|
Metric: int(route.Metric),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "failed to create route: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Table) Update(ctx context.Context, route *pb.Route, resp *pb.UpdateResponse) error {
|
||||||
|
err := t.Router.Table().Update(router.Route{
|
||||||
|
Service: route.Service,
|
||||||
|
Address: route.Address,
|
||||||
|
Gateway: route.Gateway,
|
||||||
|
Network: route.Network,
|
||||||
|
Link: route.Link,
|
||||||
|
Metric: int(route.Metric),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "failed to update route: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Table) Delete(ctx context.Context, route *pb.Route, resp *pb.DeleteResponse) error {
|
||||||
|
err := t.Router.Table().Delete(router.Route{
|
||||||
|
Service: route.Service,
|
||||||
|
Address: route.Address,
|
||||||
|
Gateway: route.Gateway,
|
||||||
|
Network: route.Network,
|
||||||
|
Link: route.Link,
|
||||||
|
Metric: int(route.Metric),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "failed to delete route: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List returns all routes in the routing table
|
||||||
|
func (t *Table) List(ctx context.Context, req *pb.Request, resp *pb.ListResponse) error {
|
||||||
|
routes, err := t.Router.Table().List()
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "failed to list routes: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var respRoutes []*pb.Route
|
||||||
|
for _, route := range routes {
|
||||||
|
respRoute := &pb.Route{
|
||||||
|
Service: route.Service,
|
||||||
|
Address: route.Address,
|
||||||
|
Gateway: route.Gateway,
|
||||||
|
Network: route.Network,
|
||||||
|
Link: route.Link,
|
||||||
|
Metric: int64(route.Metric),
|
||||||
|
}
|
||||||
|
respRoutes = append(respRoutes, respRoute)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.Routes = respRoutes
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Table) Query(ctx context.Context, req *pb.QueryRequest, resp *pb.QueryResponse) error {
|
||||||
|
query := router.NewQuery(
|
||||||
|
router.QueryService(req.Query.Service),
|
||||||
|
)
|
||||||
|
|
||||||
|
routes, err := t.Router.Table().Query(query)
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.router", "failed to lookup routes: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var respRoutes []*pb.Route
|
||||||
|
for _, route := range routes {
|
||||||
|
respRoute := &pb.Route{
|
||||||
|
Service: route.Service,
|
||||||
|
Address: route.Address,
|
||||||
|
Gateway: route.Gateway,
|
||||||
|
Network: route.Network,
|
||||||
|
Link: route.Link,
|
||||||
|
Metric: int64(route.Metric),
|
||||||
|
}
|
||||||
|
respRoutes = append(respRoutes, respRoute)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.Routes = respRoutes
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user