From 7884e889f4c080333a4b6abb58b9a177e84c5fd5 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 31 Jul 2019 16:36:53 +0100 Subject: [PATCH] Don't publish the process rpc call and embed the router handler in the network --- network/default.go | 87 ++++++++++++++++++++++++++++++-- network/router/handler/router.go | 2 +- server/server.go | 5 ++ 3 files changed, 90 insertions(+), 4 deletions(-) diff --git a/network/default.go b/network/default.go index 42837536..838c35f5 100644 --- a/network/default.go +++ b/network/default.go @@ -1,19 +1,40 @@ package network import ( + "context" + "sync" + "github.com/micro/go-micro/client" "github.com/micro/go-micro/network/proxy/mucp" "github.com/micro/go-micro/network/router" + "github.com/micro/go-micro/network/router/handler" + pb "github.com/micro/go-micro/network/router/proto" "github.com/micro/go-micro/server" ) type network struct { - name string options Options + handler server.Router + router pb.RouterService + + sync.RWMutex + connected bool + exit chan bool } func (n *network) Name() string { - return n.name + return n.options.Name +} + +// Implements the server.ServeRequest method. +func (n *network) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { + // If we're being called then execute our handlers + if req.Service() == n.options.Name { + return n.handler.ServeRequest(ctx, req, rsp) + } + + // execute the proxy + return n.options.Proxy.ServeRequest(ctx, req, rsp) } func (n *network) Connect() error { @@ -21,9 +42,60 @@ func (n *network) Connect() error { } func (n *network) Close() error { + n.Lock() + defer n.Unlock() + + // check if we're connected + if !n.connected { + return nil + } + + advertChan, err := n.options.Router.Advertise() + if err != nil { + return err + } + + go n.run(advertChan) + + // stop the server return n.options.Server.Stop() } +func (n *network) run(advertChan <-chan *router.Advert) { + for { + select { + // process local adverts and randomly fire them at other nodes + case a := <-advertChan: + // create a proto advert + var events []*pb.Event + for _, event := range a.Events { + route := &pb.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: event.Route.Gateway, + Network: event.Route.Network, + Link: event.Route.Link, + Metric: int64(event.Route.Metric), + } + e := &pb.Event{ + Type: pb.EventType(event.Type), + Timestamp: event.Timestamp.UnixNano(), + Route: route, + } + events = append(events, e) + } + + // fire the advert to a random network node + n.router.Process(context.Background(), &pb.Advert{ + Id: n.options.Router.Options().Id, + Type: pb.AdvertType(a.Type), + Timestamp: a.Timestamp.UnixNano(), + Events: events, + }) + } + } +} + // newNetwork returns a new network node func newNetwork(opts ...Option) Network { options := Options{ @@ -39,15 +111,24 @@ func newNetwork(opts ...Option) Network { o(&options) } + // get the default server handler + sr := server.DefaultRouter + // create new router handler + hd := sr.NewHandler(&handler.Router{options.Router}) + // register the router handler + sr.Handle(hd) + // set the server name options.Server.Init( server.Name(options.Name), server.Address(options.Address), server.Advertise(options.Advertise), - server.WithRouter(options.Proxy), + server.WithRouter(sr), ) return &network{ options: options, + handler: sr, + router: pb.NewRouterService(options.Name, options.Client), } } diff --git a/network/router/handler/router.go b/network/router/handler/router.go index ab82ed09..26344c64 100644 --- a/network/router/handler/router.go +++ b/network/router/handler/router.go @@ -12,7 +12,7 @@ import ( // Router implements router handler type Router struct { - Router router.Router + Router router.Router } // Lookup looks up routes in the routing table and returns them diff --git a/server/server.go b/server/server.go index 2e742f95..b13048a3 100644 --- a/server/server.go +++ b/server/server.go @@ -142,6 +142,11 @@ func NewServer(opt ...Option) Server { return newRpcServer(opt...) } +// NewRouter returns a new router +func NewRouter() *router { + return newRpcRouter() +} + // NewSubscriber creates a new subscriber interface with the given topic // and handler using the default server func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber {