From b1c49a0ddc0da479e84e9cab41254db5f8115908 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 31 Jul 2019 16:10:04 +0100 Subject: [PATCH] Add router handler --- network/router/handler/router.go | 180 +++++++++++++++++++++++++++++++ network/router/handler/table.go | 114 ++++++++++++++++++++ 2 files changed, 294 insertions(+) create mode 100644 network/router/handler/router.go create mode 100644 network/router/handler/table.go diff --git a/network/router/handler/router.go b/network/router/handler/router.go new file mode 100644 index 00000000..ab82ed09 --- /dev/null +++ b/network/router/handler/router.go @@ -0,0 +1,180 @@ +package handler + +import ( + "context" + "io" + "time" + + "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/network/router" + pb "github.com/micro/go-micro/network/router/proto" +) + +// Router implements router handler +type Router struct { + Router router.Router +} + +// Lookup looks up routes in the routing table and returns them +func (r *Router) Lookup(ctx context.Context, req *pb.LookupRequest, resp *pb.LookupResponse) error { + query := router.NewQuery( + router.QueryService(req.Query.Service), + ) + + routes, err := r.Router.Lookup(query) + if err != nil { + return errors.InternalServerError("go.micro.router", "failed to lookup routes: %v", err) + } + + var respRoutes []*pb.Route + for _, route := range routes { + respRoute := &pb.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Link: route.Link, + Metric: int64(route.Metric), + } + respRoutes = append(respRoutes, respRoute) + } + + resp.Routes = respRoutes + + return nil +} + +func (r *Router) Advertise(ctx context.Context, req *pb.AdvertiseRequest, stream pb.Router_AdvertiseStream) error { + advertChan, err := r.Router.Advertise() + if err != nil { + return errors.InternalServerError("go.micro.router", "failed to get adverts: %v", err) + } + + for advert := range advertChan { + var events []*pb.Event + for _, event := range advert.Events { + route := &pb.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: event.Route.Gateway, + Network: event.Route.Network, + Link: event.Route.Link, + Metric: int64(event.Route.Metric), + } + e := &pb.Event{ + Type: pb.EventType(event.Type), + Timestamp: event.Timestamp.UnixNano(), + Route: route, + } + events = append(events, e) + } + + advert := &pb.Advert{ + Id: advert.Id, + Type: pb.AdvertType(advert.Type), + Timestamp: advert.Timestamp.UnixNano(), + Events: events, + } + + // send the advert + err := stream.Send(advert) + if err == io.EOF { + return nil + } + if err != nil { + return errors.InternalServerError("go.micro.router", "error sending message %v", err) + } + } + + return nil +} + +func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessResponse) error { + events := make([]*router.Event, len(req.Events)) + for i, event := range req.Events { + route := router.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: event.Route.Gateway, + Network: event.Route.Network, + Link: event.Route.Link, + Metric: int(event.Route.Metric), + } + + events[i] = &router.Event{ + Type: router.EventType(event.Type), + Timestamp: time.Unix(0, event.Timestamp), + Route: route, + } + } + + advert := &router.Advert{ + Id: req.Id, + Type: router.AdvertType(req.Type), + Timestamp: time.Unix(0, req.Timestamp), + TTL: time.Duration(req.Ttl), + Events: events, + } + + if err := r.Router.Process(advert); err != nil { + return errors.InternalServerError("go.micro.router", "error publishing advert: %v", err) + } + + return nil +} + +func (r *Router) Status(ctx context.Context, req *pb.Request, rsp *pb.StatusResponse) error { + status := r.Router.Status() + + rsp.Status = &pb.Status{ + Code: status.Code.String(), + } + + if status.Error != nil { + rsp.Status.Error = status.Error.Error() + } + + return nil +} + +// Watch streans routing table events +func (r *Router) Watch(ctx context.Context, req *pb.WatchRequest, stream pb.Router_WatchStream) error { + watcher, err := r.Router.Watch() + if err != nil { + return errors.InternalServerError("go.micro.router", "failed creating event watcher: %v", err) + } + + defer stream.Close() + + for { + event, err := watcher.Next() + if err == router.ErrWatcherStopped { + break + } + + if err != nil { + return errors.InternalServerError("go.micro.router", "error watching events: %v", err) + } + + route := &pb.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: event.Route.Gateway, + Network: event.Route.Network, + Link: event.Route.Link, + Metric: int64(event.Route.Metric), + } + + tableEvent := &pb.Event{ + Type: pb.EventType(event.Type), + Timestamp: event.Timestamp.UnixNano(), + Route: route, + } + + if err := stream.Send(tableEvent); err != nil { + return err + } + } + + return nil +} diff --git a/network/router/handler/table.go b/network/router/handler/table.go new file mode 100644 index 00000000..a1435511 --- /dev/null +++ b/network/router/handler/table.go @@ -0,0 +1,114 @@ +package handler + +import ( + "context" + + "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/network/router" + pb "github.com/micro/go-micro/network/router/proto" +) + +type Table struct { + Router router.Router +} + +func (t *Table) Create(ctx context.Context, route *pb.Route, resp *pb.CreateResponse) error { + err := t.Router.Table().Create(router.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Link: route.Link, + Metric: int(route.Metric), + }) + if err != nil { + return errors.InternalServerError("go.micro.router", "failed to create route: %s", err) + } + + return nil +} + +func (t *Table) Update(ctx context.Context, route *pb.Route, resp *pb.UpdateResponse) error { + err := t.Router.Table().Update(router.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Link: route.Link, + Metric: int(route.Metric), + }) + if err != nil { + return errors.InternalServerError("go.micro.router", "failed to update route: %s", err) + } + + return nil +} + +func (t *Table) Delete(ctx context.Context, route *pb.Route, resp *pb.DeleteResponse) error { + err := t.Router.Table().Delete(router.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Link: route.Link, + Metric: int(route.Metric), + }) + if err != nil { + return errors.InternalServerError("go.micro.router", "failed to delete route: %s", err) + } + + return nil +} + +// List returns all routes in the routing table +func (t *Table) List(ctx context.Context, req *pb.Request, resp *pb.ListResponse) error { + routes, err := t.Router.Table().List() + if err != nil { + return errors.InternalServerError("go.micro.router", "failed to list routes: %s", err) + } + + var respRoutes []*pb.Route + for _, route := range routes { + respRoute := &pb.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Link: route.Link, + Metric: int64(route.Metric), + } + respRoutes = append(respRoutes, respRoute) + } + + resp.Routes = respRoutes + + return nil +} + +func (t *Table) Query(ctx context.Context, req *pb.QueryRequest, resp *pb.QueryResponse) error { + query := router.NewQuery( + router.QueryService(req.Query.Service), + ) + + routes, err := t.Router.Table().Query(query) + if err != nil { + return errors.InternalServerError("go.micro.router", "failed to lookup routes: %s", err) + } + + var respRoutes []*pb.Route + for _, route := range routes { + respRoute := &pb.Route{ + Service: route.Service, + Address: route.Address, + Gateway: route.Gateway, + Network: route.Network, + Link: route.Link, + Metric: int64(route.Metric), + } + respRoutes = append(respRoutes, respRoute) + } + + resp.Routes = respRoutes + + return nil +}