diff --git a/broker/service/handler/handler.go b/broker/service/handler/handler.go deleted file mode 100644 index 3c96fd0d..00000000 --- a/broker/service/handler/handler.go +++ /dev/null @@ -1,66 +0,0 @@ -package handler - -import ( - "context" - - "github.com/micro/go-micro/broker" - pb "github.com/micro/go-micro/broker/service/proto" - "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/util/log" -) - -type Broker struct { - Broker broker.Broker -} - -func (b *Broker) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Empty) error { - log.Debugf("Publishing message to %s topic", req.Topic) - err := b.Broker.Publish(req.Topic, &broker.Message{ - Header: req.Message.Header, - Body: req.Message.Body, - }) - log.Debugf("Published message to %s topic", req.Topic) - if err != nil { - return errors.InternalServerError("go.micro.broker", err.Error()) - } - return nil -} - -func (b *Broker) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Broker_SubscribeStream) error { - errChan := make(chan error, 1) - - // message handler to stream back messages from broker - handler := func(p broker.Event) error { - if err := stream.Send(&pb.Message{ - Header: p.Message().Header, - Body: p.Message().Body, - }); err != nil { - select { - case errChan <- err: - return err - default: - return err - } - } - return nil - } - - log.Debugf("Subscribing to %s topic", req.Topic) - sub, err := b.Broker.Subscribe(req.Topic, handler, broker.Queue(req.Queue)) - if err != nil { - return errors.InternalServerError("go.micro.broker", err.Error()) - } - defer func() { - log.Debugf("Unsubscribing from topic %s", req.Topic) - sub.Unsubscribe() - }() - - select { - case <-ctx.Done(): - log.Debugf("Context done for subscription to topic %s", req.Topic) - return nil - case err := <-errChan: - log.Debugf("Subscription error for topic %s: %v", req.Topic, err) - return err - } -} diff --git a/client/selector/router/router.go b/client/selector/router/router.go index ed7c10e6..d8e8aae3 100644 --- a/client/selector/router/router.go +++ b/client/selector/router/router.go @@ -11,7 +11,7 @@ import ( "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/router" - pb "github.com/micro/go-micro/router/proto" + pb "github.com/micro/go-micro/router/service/proto" ) type routerSelector struct { diff --git a/go.mod b/go.mod index 47c95a32..6b438e41 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/miekg/dns v1.1.22 github.com/mitchellh/hashstructure v1.0.0 github.com/nats-io/nats.go v1.9.1 - github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 // indirect + github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.4.0 diff --git a/network/default.go b/network/default.go index 19b9c8c6..914653ec 100644 --- a/network/default.go +++ b/network/default.go @@ -12,11 +12,11 @@ import ( "github.com/golang/protobuf/proto" "github.com/micro/go-micro/client" rtr "github.com/micro/go-micro/client/selector/router" - pbNet "github.com/micro/go-micro/network/proto" "github.com/micro/go-micro/network/resolver/dns" + pbNet "github.com/micro/go-micro/network/service/proto" "github.com/micro/go-micro/proxy" "github.com/micro/go-micro/router" - pbRtr "github.com/micro/go-micro/router/proto" + pbRtr "github.com/micro/go-micro/router/service/proto" "github.com/micro/go-micro/server" "github.com/micro/go-micro/transport" "github.com/micro/go-micro/tunnel" diff --git a/network/node.go b/network/node.go index 9964f347..658c0964 100644 --- a/network/node.go +++ b/network/node.go @@ -6,7 +6,7 @@ import ( "sync" "time" - pb "github.com/micro/go-micro/network/proto" + pb "github.com/micro/go-micro/network/service/proto" ) var ( diff --git a/network/node_test.go b/network/node_test.go index 71b21a19..51c4988f 100644 --- a/network/node_test.go +++ b/network/node_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - pb "github.com/micro/go-micro/network/proto" + pb "github.com/micro/go-micro/network/service/proto" ) var ( diff --git a/network/service/handler/handler.go b/network/service/handler/handler.go deleted file mode 100644 index d8538a7c..00000000 --- a/network/service/handler/handler.go +++ /dev/null @@ -1,207 +0,0 @@ -// Package handler implements network RPC handler -package handler - -import ( - "context" - - "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/network" - pbNet "github.com/micro/go-micro/network/proto" - "github.com/micro/go-micro/router" - pbRtr "github.com/micro/go-micro/router/proto" - "github.com/micro/go-micro/util/log" -) - -// Network implements network handler -type Network struct { - Network network.Network -} - -func flatten(n network.Node, visited map[string]bool) []network.Node { - // if node is nil runaway - if n == nil { - return nil - } - - // set visisted - if visited == nil { - visited = make(map[string]bool) - } - - // create new list of nodes - //nolint:prealloc - var nodes []network.Node - - // check if already visited - if !visited[n.Id()] { - // append the current node - nodes = append(nodes, n) - } - - // set to visited - visited[n.Id()] = true - - // visit the list of peers - for _, node := range n.Peers() { - nodes = append(nodes, flatten(node, visited)...) - } - - return nodes -} - -func (n *Network) Connect(ctx context.Context, req *pbNet.ConnectRequest, resp *pbNet.ConnectResponse) error { - if len(req.Nodes) == 0 { - return nil - } - - // get list of existing nodes - nodes := n.Network.Options().Nodes - - // generate a node map - nodeMap := make(map[string]bool) - - for _, node := range nodes { - nodeMap[node] = true - } - - for _, node := range req.Nodes { - // TODO: we may have been provided a network only - // so process anad resolve node.Network - if len(node.Address) == 0 { - continue - } - - // already exists - if _, ok := nodeMap[node.Address]; ok { - continue - } - - nodeMap[node.Address] = true - nodes = append(nodes, node.Address) - } - - log.Infof("Network.Connect setting peers: %v", nodes) - - // reinitialise the peers - n.Network.Init( - network.Nodes(nodes...), - ) - - // call the connect method - n.Network.Connect() - - return nil -} - -// Nodes returns the list of nodes -func (n *Network) Nodes(ctx context.Context, req *pbNet.NodesRequest, resp *pbNet.NodesResponse) error { - // root node - nodes := map[string]network.Node{} - - // get peers encoded into protobuf - peers := flatten(n.Network, nil) - - // walk all the peers - for _, peer := range peers { - if peer == nil { - continue - } - if _, ok := nodes[peer.Id()]; ok { - continue - } - - // add to visited list - nodes[n.Network.Id()] = peer - - resp.Nodes = append(resp.Nodes, &pbNet.Node{ - Id: peer.Id(), - Address: peer.Address(), - }) - } - - return nil -} - -// Graph returns the network graph from this root node -func (n *Network) Graph(ctx context.Context, req *pbNet.GraphRequest, resp *pbNet.GraphResponse) error { - depth := uint(req.Depth) - if depth <= 0 || depth > network.MaxDepth { - depth = network.MaxDepth - } - - // get peers encoded into protobuf - peers := network.PeersToProto(n.Network, depth) - - // set the root node - resp.Root = peers - - return nil -} - -// Routes returns a list of routing table routes -func (n *Network) Routes(ctx context.Context, req *pbNet.RoutesRequest, resp *pbNet.RoutesResponse) error { - // build query - - var qOpts []router.QueryOption - - if q := req.Query; q != nil { - if len(q.Service) > 0 { - qOpts = append(qOpts, router.QueryService(q.Service)) - } - if len(q.Address) > 0 { - qOpts = append(qOpts, router.QueryAddress(q.Address)) - } - if len(q.Gateway) > 0 { - qOpts = append(qOpts, router.QueryGateway(q.Gateway)) - } - if len(q.Router) > 0 { - qOpts = append(qOpts, router.QueryRouter(q.Router)) - } - if len(q.Network) > 0 { - qOpts = append(qOpts, router.QueryNetwork(q.Network)) - } - } - - routes, err := n.Network.Options().Router.Table().Query(qOpts...) - if err != nil { - return errors.InternalServerError("go.micro.network", "failed to list routes: %s", err) - } - - respRoutes := make([]*pbRtr.Route, 0, len(routes)) - for _, route := range routes { - respRoute := &pbRtr.Route{ - Service: route.Service, - Address: route.Address, - Gateway: route.Gateway, - Network: route.Network, - Router: route.Router, - Link: route.Link, - Metric: int64(route.Metric), - } - respRoutes = append(respRoutes, respRoute) - } - - resp.Routes = respRoutes - - return nil -} - -// Services returns a list of services based on the routing table -func (n *Network) Services(ctx context.Context, req *pbNet.ServicesRequest, resp *pbNet.ServicesResponse) error { - routes, err := n.Network.Options().Router.Table().List() - if err != nil { - return errors.InternalServerError("go.micro.network", "failed to list services: %s", err) - } - - services := make(map[string]bool) - - for _, route := range routes { - if _, ok := services[route.Service]; ok { - continue - } - services[route.Service] = true - resp.Services = append(resp.Services, route.Service) - } - - return nil -} diff --git a/network/proto/network.pb.go b/network/service/proto/network.pb.go similarity index 88% rename from network/proto/network.pb.go rename to network/service/proto/network.pb.go index 97b38402..d94687f5 100644 --- a/network/proto/network.pb.go +++ b/network/service/proto/network.pb.go @@ -6,7 +6,7 @@ package go_micro_network import ( fmt "fmt" proto "github.com/golang/protobuf/proto" - proto1 "github.com/micro/go-micro/router/proto" + proto1 "github.com/micro/go-micro/router/service/proto" math "math" ) @@ -697,41 +697,41 @@ func init() { } var fileDescriptor_0b7953b26a7c4730 = []byte{ - // 573 bytes of a gzipped FileDescriptorProto + // 576 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x61, 0x6a, 0xdb, 0x4c, 0x10, 0x8d, 0x2c, 0xcb, 0x76, 0xe6, 0x8b, 0xfd, 0xb9, 0x4b, 0x49, 0x85, 0x7e, 0xb4, 0xee, 0xe2, - 0x1f, 0xa1, 0x34, 0x32, 0xc4, 0x04, 0x4a, 0x43, 0x43, 0x20, 0x94, 0x42, 0x21, 0x21, 0x55, 0x2e, + 0x1f, 0xa1, 0x34, 0x32, 0x24, 0x04, 0x4a, 0x4d, 0x43, 0x20, 0x94, 0x42, 0x21, 0x21, 0x55, 0x2e, 0x50, 0xc5, 0x1a, 0x6c, 0x93, 0x58, 0xeb, 0xac, 0xd6, 0x09, 0x3e, 0x41, 0x8f, 0xd0, 0x33, 0xf5, 0x56, 0x65, 0x77, 0x47, 0x8a, 0x1d, 0xcb, 0xa2, 0xf9, 0xe7, 0xd1, 0xbc, 0xf7, 0x66, 0x67, 0xe6, - 0x8d, 0xe1, 0x78, 0x3c, 0x55, 0x93, 0xc5, 0x4d, 0x38, 0x12, 0xb3, 0xc1, 0x6c, 0x3a, 0x92, 0x62, + 0x8d, 0xe1, 0x64, 0x3c, 0x55, 0x93, 0xc5, 0x4d, 0x38, 0x12, 0xb3, 0xc1, 0x6c, 0x3a, 0x92, 0x62, 0x30, 0x16, 0x87, 0xf6, 0x47, 0x8a, 0xea, 0x51, 0xc8, 0xdb, 0xc1, 0x5c, 0x0a, 0x55, 0x44, 0xa1, 0x89, 0x58, 0x77, 0x2c, 0x42, 0x83, 0x0a, 0xe9, 0x7b, 0x30, 0xdc, 0x2e, 0x24, 0xc5, 0x42, 0xa1, - 0x24, 0x1d, 0x1b, 0x58, 0x19, 0xfe, 0xcb, 0x01, 0xef, 0xc7, 0x02, 0xe5, 0x92, 0xf9, 0xd0, 0xcc, - 0x50, 0x3e, 0x4c, 0x47, 0xe8, 0x3b, 0x3d, 0xe7, 0x60, 0x37, 0xca, 0x43, 0x9d, 0x89, 0x93, 0x44, - 0x62, 0x96, 0xf9, 0x35, 0x9b, 0xa1, 0x50, 0x67, 0xc6, 0xb1, 0xc2, 0xc7, 0x78, 0xe9, 0xbb, 0x36, - 0x43, 0x21, 0xdb, 0x87, 0x86, 0xad, 0xe3, 0xd7, 0x4d, 0x82, 0x22, 0xcd, 0xa0, 0xf7, 0xfa, 0x9e, - 0x65, 0x50, 0xc8, 0x4f, 0xa1, 0x73, 0x2e, 0xd2, 0x14, 0x47, 0x2a, 0xc2, 0xfb, 0x05, 0x66, 0x8a, - 0x7d, 0x04, 0x2f, 0x15, 0x09, 0x66, 0xbe, 0xd3, 0x73, 0x0f, 0xfe, 0x3b, 0xda, 0x0f, 0x9f, 0xb7, - 0x1c, 0x5e, 0x8a, 0x04, 0x23, 0x0b, 0xe2, 0xaf, 0xe0, 0xff, 0x82, 0x9f, 0xcd, 0x45, 0x9a, 0x21, - 0xef, 0xc3, 0x9e, 0x46, 0x64, 0xb9, 0xe0, 0x6b, 0xf0, 0x12, 0x9c, 0xab, 0x89, 0x69, 0xb0, 0x1d, - 0xd9, 0x80, 0x7f, 0x81, 0x36, 0xa1, 0x2c, 0xed, 0x85, 0x75, 0xfb, 0xb0, 0xf7, 0x4d, 0xc6, 0xf3, - 0x49, 0x75, 0x91, 0x13, 0x68, 0x13, 0x8a, 0x8a, 0x7c, 0x80, 0xba, 0x14, 0x42, 0x19, 0x54, 0x69, - 0x8d, 0x2b, 0x44, 0x19, 0x19, 0x0c, 0x3f, 0x85, 0x76, 0xa4, 0xc7, 0x57, 0x34, 0x72, 0x08, 0xde, - 0xbd, 0x5e, 0x1a, 0xb1, 0xdf, 0x6c, 0xb2, 0xcd, 0x4e, 0x23, 0x8b, 0xe2, 0x67, 0xd0, 0xc9, 0xf9, - 0x54, 0x3d, 0xa4, 0xf5, 0x94, 0xf4, 0x48, 0xf6, 0x30, 0x04, 0x5a, 0x9b, 0x19, 0xee, 0xb5, 0x75, - 0x43, 0xfe, 0x06, 0x1e, 0x42, 0xf7, 0xe9, 0x13, 0xc9, 0x06, 0xd0, 0x22, 0xd3, 0x58, 0xe1, 0xdd, - 0xa8, 0x88, 0xf9, 0x1f, 0x07, 0xea, 0x7a, 0x6e, 0xac, 0x03, 0xb5, 0x69, 0x42, 0x1e, 0xab, 0x4d, - 0x93, 0x6a, 0x7b, 0xe5, 0x66, 0x71, 0xd7, 0xcc, 0xc2, 0xce, 0xa0, 0x35, 0x43, 0x15, 0x27, 0xb1, - 0x8a, 0xfd, 0xba, 0xe9, 0xa0, 0x5f, 0xbe, 0xa5, 0xf0, 0x82, 0x60, 0x5f, 0x53, 0x25, 0x97, 0x51, - 0xc1, 0x0a, 0x4e, 0xa0, 0xbd, 0x96, 0x62, 0x5d, 0x70, 0x6f, 0x71, 0x49, 0xef, 0xd2, 0x3f, 0xf5, - 0x26, 0x1f, 0xe2, 0xbb, 0x05, 0xd2, 0xb3, 0x6c, 0xf0, 0xb9, 0xf6, 0xc9, 0xe1, 0xc7, 0xd0, 0x24, - 0xaf, 0xe9, 0x3d, 0x6a, 0x1f, 0x6c, 0xdf, 0xa3, 0xf1, 0x8a, 0xc1, 0xf0, 0x21, 0x78, 0xe7, 0x77, - 0xc2, 0x2e, 0xff, 0x9f, 0x49, 0x3f, 0xa1, 0xae, 0xad, 0xf0, 0x12, 0x8e, 0x76, 0xf0, 0x1c, 0x51, - 0xea, 0x81, 0xba, 0x15, 0xee, 0xb2, 0xa0, 0xa3, 0xdf, 0x2e, 0x34, 0x2f, 0x69, 0xb0, 0x57, 0x4f, - 0x9d, 0xf5, 0x36, 0x59, 0xeb, 0x07, 0x1a, 0xbc, 0xaf, 0x40, 0xd0, 0x09, 0xee, 0xb0, 0xef, 0xe0, - 0x19, 0xe7, 0xb3, 0xb7, 0x9b, 0xe8, 0xd5, 0xc3, 0x09, 0xde, 0x6d, 0xcd, 0xaf, 0x6a, 0x99, 0x53, - 0x2d, 0xd3, 0x5a, 0xbd, 0xf4, 0x32, 0xad, 0xb5, 0x1b, 0xe7, 0x3b, 0xec, 0x02, 0x1a, 0xf6, 0x28, - 0x58, 0x09, 0x78, 0xed, 0xdc, 0x82, 0xde, 0x76, 0x40, 0x21, 0x77, 0x0d, 0xad, 0xfc, 0x1c, 0x58, - 0xc9, 0x5c, 0x9e, 0x5d, 0x4f, 0xc0, 0xab, 0x20, 0xb9, 0xe8, 0x4d, 0xc3, 0xfc, 0x49, 0x0f, 0xff, - 0x06, 0x00, 0x00, 0xff, 0xff, 0x79, 0x8a, 0x5f, 0xf0, 0x24, 0x06, 0x00, 0x00, + 0x1c, 0x64, 0x28, 0x1f, 0xa6, 0x23, 0x24, 0x3d, 0xfb, 0xd1, 0xca, 0xf1, 0x5f, 0x0e, 0x78, 0x3f, + 0x16, 0x28, 0x97, 0xcc, 0x87, 0x26, 0xe1, 0x7c, 0xa7, 0xe7, 0x1c, 0xec, 0x46, 0x79, 0xa8, 0x33, + 0x71, 0x92, 0x48, 0xcc, 0x32, 0xbf, 0x66, 0x33, 0x14, 0xea, 0xcc, 0x38, 0x56, 0xf8, 0x18, 0x2f, + 0x7d, 0xd7, 0x66, 0x28, 0x64, 0xfb, 0xd0, 0xb0, 0x75, 0xfc, 0xba, 0x49, 0x50, 0xa4, 0x19, 0xf4, + 0x6e, 0xdf, 0xb3, 0x0c, 0x0a, 0xf9, 0x29, 0x74, 0xce, 0x45, 0x9a, 0xe2, 0x48, 0x45, 0x78, 0xbf, + 0xc0, 0x4c, 0xb1, 0x8f, 0xe0, 0xa5, 0x22, 0xc1, 0xcc, 0x77, 0x7a, 0xee, 0xc1, 0x7f, 0x47, 0xfb, + 0xe1, 0xf3, 0xd6, 0xc3, 0x4b, 0x91, 0x60, 0x64, 0x41, 0xfc, 0x15, 0xfc, 0x5f, 0xf0, 0xb3, 0xb9, + 0x48, 0x33, 0xe4, 0x7d, 0xd8, 0xd3, 0x88, 0x2c, 0x17, 0x7c, 0x0d, 0x5e, 0x82, 0x73, 0x35, 0x31, + 0x0d, 0xb6, 0x23, 0x1b, 0xf0, 0x2f, 0xd0, 0x26, 0x94, 0xa5, 0xbd, 0xb0, 0x6e, 0x1f, 0xf6, 0xbe, + 0xc9, 0x78, 0x3e, 0xa9, 0x2e, 0x32, 0x84, 0x36, 0xa1, 0xa8, 0xc8, 0x07, 0xa8, 0x4b, 0x21, 0x94, + 0x41, 0x95, 0xd6, 0xb8, 0x42, 0x94, 0x91, 0xc1, 0xf0, 0x53, 0x68, 0x47, 0x7a, 0x7c, 0x45, 0x23, + 0x87, 0xe0, 0xdd, 0xeb, 0xa5, 0x11, 0xfb, 0xcd, 0x26, 0xdb, 0xec, 0x34, 0xb2, 0x28, 0x7e, 0x06, + 0x9d, 0x9c, 0x4f, 0xd5, 0x43, 0x5a, 0x4f, 0x49, 0x8f, 0x64, 0x0f, 0x43, 0xa0, 0xb5, 0x99, 0xe1, + 0x5e, 0x5b, 0x37, 0xe4, 0x6f, 0xe0, 0x21, 0x74, 0x9f, 0x3e, 0x91, 0x6c, 0x00, 0x2d, 0x32, 0x8d, + 0x15, 0xde, 0x8d, 0x8a, 0x98, 0xff, 0x71, 0xa0, 0xae, 0xe7, 0xc6, 0x3a, 0x50, 0x9b, 0x26, 0xe4, + 0xb1, 0xda, 0x34, 0xa9, 0xb6, 0x57, 0x6e, 0x16, 0x77, 0xcd, 0x2c, 0xec, 0x0c, 0x5a, 0x33, 0x54, + 0x71, 0x12, 0xab, 0xd8, 0xaf, 0x9b, 0x0e, 0xfa, 0xe5, 0x5b, 0x0a, 0x2f, 0x08, 0xf6, 0x35, 0x55, + 0x72, 0x19, 0x15, 0xac, 0x60, 0x08, 0xed, 0xb5, 0x14, 0xeb, 0x82, 0x7b, 0x8b, 0x4b, 0x7a, 0x97, + 0xfe, 0xa9, 0x37, 0xf9, 0x10, 0xdf, 0x2d, 0x90, 0x9e, 0x65, 0x83, 0xcf, 0xb5, 0x4f, 0x0e, 0x3f, + 0x81, 0x26, 0x79, 0x4d, 0xef, 0x51, 0xfb, 0x60, 0xfb, 0x1e, 0x8d, 0x57, 0x0c, 0x86, 0x1f, 0x83, + 0x77, 0x7e, 0x27, 0xec, 0xf2, 0xff, 0x99, 0xf4, 0x13, 0xea, 0xda, 0x0a, 0x2f, 0xe1, 0x68, 0x07, + 0xcf, 0x11, 0xa5, 0x1e, 0xa8, 0x5b, 0xe1, 0x2e, 0x0b, 0x3a, 0xfa, 0xed, 0x42, 0xf3, 0x92, 0x06, + 0x7b, 0xf5, 0xd4, 0x59, 0x6f, 0x93, 0xb5, 0x7e, 0xa0, 0xc1, 0xfb, 0x0a, 0x04, 0x9d, 0xe0, 0x0e, + 0xfb, 0x0e, 0x9e, 0x71, 0x3e, 0x7b, 0xbb, 0x89, 0x5e, 0x3d, 0x9c, 0xe0, 0xdd, 0xd6, 0xfc, 0xaa, + 0x96, 0x39, 0xd5, 0x32, 0xad, 0xd5, 0x4b, 0x2f, 0xd3, 0x5a, 0xbb, 0x71, 0xbe, 0xc3, 0x2e, 0xa0, + 0x61, 0x8f, 0x82, 0x95, 0x80, 0xd7, 0xce, 0x2d, 0xe8, 0x6d, 0x07, 0x14, 0x72, 0xd7, 0xd0, 0xca, + 0xcf, 0x81, 0x95, 0xcc, 0xe5, 0xd9, 0xf5, 0x04, 0xbc, 0x0a, 0x92, 0x8b, 0xde, 0x34, 0xcc, 0x9f, + 0xf4, 0xf1, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa7, 0x5b, 0x0a, 0x25, 0x2c, 0x06, 0x00, 0x00, } diff --git a/network/proto/network.pb.micro.go b/network/service/proto/network.pb.micro.go similarity index 99% rename from network/proto/network.pb.micro.go rename to network/service/proto/network.pb.micro.go index 28e473cb..488d0c97 100644 --- a/network/proto/network.pb.micro.go +++ b/network/service/proto/network.pb.micro.go @@ -6,7 +6,7 @@ package go_micro_network import ( fmt "fmt" proto "github.com/golang/protobuf/proto" - _ "github.com/micro/go-micro/router/proto" + _ "github.com/micro/go-micro/router/service/proto" math "math" ) diff --git a/network/proto/network.proto b/network/service/proto/network.proto similarity index 96% rename from network/proto/network.proto rename to network/service/proto/network.proto index d58ed5b8..b4dece64 100644 --- a/network/proto/network.proto +++ b/network/service/proto/network.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package go.micro.network; -import "github.com/micro/go-micro/router/proto/router.proto"; +import "github.com/micro/go-micro/router/service/proto/router.proto"; // Network service is usesd to gain visibility into networks service Network { diff --git a/registry/service/handler/handler.go b/registry/service/handler/handler.go deleted file mode 100644 index e2042e5e..00000000 --- a/registry/service/handler/handler.go +++ /dev/null @@ -1,82 +0,0 @@ -package handler - -import ( - "context" - "time" - - "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/registry/service" - pb "github.com/micro/go-micro/registry/service/proto" -) - -type Registry struct { - // internal registry - Registry registry.Registry -} - -func (r *Registry) GetService(ctx context.Context, req *pb.GetRequest, rsp *pb.GetResponse) error { - services, err := r.Registry.GetService(req.Service) - if err != nil { - return errors.InternalServerError("go.micro.registry", err.Error()) - } - for _, srv := range services { - rsp.Services = append(rsp.Services, service.ToProto(srv)) - } - return nil -} - -func (r *Registry) Register(ctx context.Context, req *pb.Service, rsp *pb.EmptyResponse) error { - var regOpts []registry.RegisterOption - if req.Options != nil { - ttl := time.Duration(req.Options.Ttl) * time.Second - regOpts = append(regOpts, registry.RegisterTTL(ttl)) - } - - err := r.Registry.Register(service.ToService(req), regOpts...) - if err != nil { - return errors.InternalServerError("go.micro.registry", err.Error()) - } - - return nil -} - -func (r *Registry) Deregister(ctx context.Context, req *pb.Service, rsp *pb.EmptyResponse) error { - err := r.Registry.Deregister(service.ToService(req)) - if err != nil { - return errors.InternalServerError("go.micro.registry", err.Error()) - } - return nil -} - -func (r *Registry) ListServices(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResponse) error { - services, err := r.Registry.ListServices() - if err != nil { - return errors.InternalServerError("go.micro.registry", err.Error()) - } - for _, srv := range services { - rsp.Services = append(rsp.Services, service.ToProto(srv)) - } - return nil -} - -func (r *Registry) Watch(ctx context.Context, req *pb.WatchRequest, rsp pb.Registry_WatchStream) error { - watcher, err := r.Registry.Watch(registry.WatchService(req.Service)) - if err != nil { - return errors.InternalServerError("go.micro.registry", err.Error()) - } - - for { - next, err := watcher.Next() - if err != nil { - return errors.InternalServerError("go.micro.registry", err.Error()) - } - err = rsp.Send(&pb.Result{ - Action: next.Action, - Service: service.ToProto(next.Service), - }) - if err != nil { - return errors.InternalServerError("go.micro.registry", err.Error()) - } - } -} diff --git a/router/handler/router.go b/router/handler/router.go deleted file mode 100644 index 971e2336..00000000 --- a/router/handler/router.go +++ /dev/null @@ -1,190 +0,0 @@ -package handler - -import ( - "context" - "io" - "time" - - "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/router" - pb "github.com/micro/go-micro/router/proto" -) - -// Router implements router handler -type Router struct { - Router router.Router -} - -// Lookup looks up routes in the routing table and returns them -func (r *Router) Lookup(ctx context.Context, req *pb.LookupRequest, resp *pb.LookupResponse) error { - routes, err := r.Router.Lookup(router.QueryService(req.Query.Service)) - if err != nil { - return errors.InternalServerError("go.micro.router", "failed to lookup routes: %v", err) - } - - respRoutes := make([]*pb.Route, 0, len(routes)) - for _, route := range routes { - respRoute := &pb.Route{ - Service: route.Service, - Address: route.Address, - Gateway: route.Gateway, - Network: route.Network, - Router: route.Router, - Link: route.Link, - Metric: route.Metric, - } - respRoutes = append(respRoutes, respRoute) - } - - resp.Routes = respRoutes - - return nil -} - -// Solicit triggers full routing table advertisement -func (r *Router) Solicit(ctx context.Context, req *pb.Request, resp *pb.Response) error { - if err := r.Router.Solicit(); err != nil { - return err - } - - return nil -} - -// Advertise streams router advertisements -func (r *Router) Advertise(ctx context.Context, req *pb.Request, stream pb.Router_AdvertiseStream) error { - advertChan, err := r.Router.Advertise() - if err != nil { - return errors.InternalServerError("go.micro.router", "failed to get adverts: %v", err) - } - - for advert := range advertChan { - var events []*pb.Event - for _, event := range advert.Events { - route := &pb.Route{ - Service: event.Route.Service, - Address: event.Route.Address, - Gateway: event.Route.Gateway, - Network: event.Route.Network, - Router: event.Route.Router, - Link: event.Route.Link, - Metric: event.Route.Metric, - } - e := &pb.Event{ - Type: pb.EventType(event.Type), - Timestamp: event.Timestamp.UnixNano(), - Route: route, - } - events = append(events, e) - } - - advert := &pb.Advert{ - Id: advert.Id, - Type: pb.AdvertType(advert.Type), - Timestamp: advert.Timestamp.UnixNano(), - Events: events, - } - - // send the advert - err := stream.Send(advert) - if err == io.EOF { - return nil - } - if err != nil { - return errors.InternalServerError("go.micro.router", "error sending message %v", err) - } - } - - return nil -} - -// Process processes advertisements -func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessResponse) error { - events := make([]*router.Event, len(req.Events)) - for i, event := range req.Events { - route := router.Route{ - Service: event.Route.Service, - Address: event.Route.Address, - Gateway: event.Route.Gateway, - Network: event.Route.Network, - Router: event.Route.Router, - Link: event.Route.Link, - Metric: event.Route.Metric, - } - - events[i] = &router.Event{ - Type: router.EventType(event.Type), - Timestamp: time.Unix(0, event.Timestamp), - Route: route, - } - } - - advert := &router.Advert{ - Id: req.Id, - Type: router.AdvertType(req.Type), - Timestamp: time.Unix(0, req.Timestamp), - TTL: time.Duration(req.Ttl), - Events: events, - } - - if err := r.Router.Process(advert); err != nil { - return errors.InternalServerError("go.micro.router", "error publishing advert: %v", err) - } - - return nil -} - -// Status returns router status -func (r *Router) Status(ctx context.Context, req *pb.Request, rsp *pb.StatusResponse) error { - status := r.Router.Status() - - rsp.Status = &pb.Status{ - Code: status.Code.String(), - } - - if status.Error != nil { - rsp.Status.Error = status.Error.Error() - } - - return nil -} - -// Watch streans routing table events -func (r *Router) Watch(ctx context.Context, req *pb.WatchRequest, stream pb.Router_WatchStream) error { - watcher, err := r.Router.Watch() - if err != nil { - return errors.InternalServerError("go.micro.router", "failed creating event watcher: %v", err) - } - - defer stream.Close() - - for { - event, err := watcher.Next() - if err == router.ErrWatcherStopped { - return errors.InternalServerError("go.micro.router", "watcher stopped") - } - - if err != nil { - return errors.InternalServerError("go.micro.router", "error watching events: %v", err) - } - - route := &pb.Route{ - Service: event.Route.Service, - Address: event.Route.Address, - Gateway: event.Route.Gateway, - Network: event.Route.Network, - Router: event.Route.Router, - Link: event.Route.Link, - Metric: event.Route.Metric, - } - - tableEvent := &pb.Event{ - Type: pb.EventType(event.Type), - Timestamp: event.Timestamp.UnixNano(), - Route: route, - } - - if err := stream.Send(tableEvent); err != nil { - return err - } - } -} diff --git a/router/handler/table.go b/router/handler/table.go deleted file mode 100644 index 42a824fe..00000000 --- a/router/handler/table.go +++ /dev/null @@ -1,115 +0,0 @@ -package handler - -import ( - "context" - - "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/router" - pb "github.com/micro/go-micro/router/proto" -) - -type Table struct { - Router router.Router -} - -func (t *Table) Create(ctx context.Context, route *pb.Route, resp *pb.CreateResponse) error { - err := t.Router.Table().Create(router.Route{ - Service: route.Service, - Address: route.Address, - Gateway: route.Gateway, - Network: route.Network, - Router: route.Router, - Link: route.Link, - Metric: route.Metric, - }) - if err != nil { - return errors.InternalServerError("go.micro.router", "failed to create route: %s", err) - } - - return nil -} - -func (t *Table) Update(ctx context.Context, route *pb.Route, resp *pb.UpdateResponse) error { - err := t.Router.Table().Update(router.Route{ - Service: route.Service, - Address: route.Address, - Gateway: route.Gateway, - Network: route.Network, - Router: route.Router, - Link: route.Link, - Metric: route.Metric, - }) - if err != nil { - return errors.InternalServerError("go.micro.router", "failed to update route: %s", err) - } - - return nil -} - -func (t *Table) Delete(ctx context.Context, route *pb.Route, resp *pb.DeleteResponse) error { - err := t.Router.Table().Delete(router.Route{ - Service: route.Service, - Address: route.Address, - Gateway: route.Gateway, - Network: route.Network, - Router: route.Router, - Link: route.Link, - Metric: route.Metric, - }) - if err != nil { - return errors.InternalServerError("go.micro.router", "failed to delete route: %s", err) - } - - return nil -} - -// List returns all routes in the routing table -func (t *Table) List(ctx context.Context, req *pb.Request, resp *pb.ListResponse) error { - routes, err := t.Router.Table().List() - if err != nil { - return errors.InternalServerError("go.micro.router", "failed to list routes: %s", err) - } - - respRoutes := make([]*pb.Route, 0, len(routes)) - for _, route := range routes { - respRoute := &pb.Route{ - Service: route.Service, - Address: route.Address, - Gateway: route.Gateway, - Network: route.Network, - Router: route.Router, - Link: route.Link, - Metric: route.Metric, - } - respRoutes = append(respRoutes, respRoute) - } - - resp.Routes = respRoutes - - return nil -} - -func (t *Table) Query(ctx context.Context, req *pb.QueryRequest, resp *pb.QueryResponse) error { - routes, err := t.Router.Table().Query(router.QueryService(req.Query.Service)) - if err != nil { - return errors.InternalServerError("go.micro.router", "failed to lookup routes: %s", err) - } - - respRoutes := make([]*pb.Route, 0, len(routes)) - for _, route := range routes { - respRoute := &pb.Route{ - Service: route.Service, - Address: route.Address, - Gateway: route.Gateway, - Network: route.Network, - Router: route.Router, - Link: route.Link, - Metric: route.Metric, - } - respRoutes = append(respRoutes, respRoute) - } - - resp.Routes = respRoutes - - return nil -} diff --git a/router/proto/router.pb.go b/router/service/proto/router.pb.go similarity index 100% rename from router/proto/router.pb.go rename to router/service/proto/router.pb.go diff --git a/router/proto/router.pb.micro.go b/router/service/proto/router.pb.micro.go similarity index 100% rename from router/proto/router.pb.micro.go rename to router/service/proto/router.pb.micro.go diff --git a/router/proto/router.proto b/router/service/proto/router.proto similarity index 100% rename from router/proto/router.proto rename to router/service/proto/router.proto diff --git a/router/service/service.go b/router/service/service.go index 15a0ae37..1087da3d 100644 --- a/router/service/service.go +++ b/router/service/service.go @@ -10,7 +10,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/router" - pb "github.com/micro/go-micro/router/proto" + pb "github.com/micro/go-micro/router/service/proto" ) type svc struct { diff --git a/router/service/table.go b/router/service/table.go index 77e288db..bcfc8ef3 100644 --- a/router/service/table.go +++ b/router/service/table.go @@ -5,7 +5,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/router" - pb "github.com/micro/go-micro/router/proto" + pb "github.com/micro/go-micro/router/service/proto" ) type table struct { diff --git a/router/service/watcher.go b/router/service/watcher.go index 616dc2d4..663d0fce 100644 --- a/router/service/watcher.go +++ b/router/service/watcher.go @@ -6,7 +6,7 @@ import ( "time" "github.com/micro/go-micro/router" - pb "github.com/micro/go-micro/router/proto" + pb "github.com/micro/go-micro/router/service/proto" ) type watcher struct { diff --git a/runtime/service/handler/handler.go b/runtime/service/handler/handler.go deleted file mode 100644 index b545eab4..00000000 --- a/runtime/service/handler/handler.go +++ /dev/null @@ -1,142 +0,0 @@ -package handler - -import ( - "context" - - "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/runtime" - pb "github.com/micro/go-micro/runtime/service/proto" -) - -type Runtime struct { - Runtime runtime.Runtime -} - -func toProto(s *runtime.Service) *pb.Service { - return &pb.Service{ - Name: s.Name, - Version: s.Version, - Source: s.Source, - Metadata: s.Metadata, - } -} - -func toService(s *pb.Service) *runtime.Service { - return &runtime.Service{ - Name: s.Name, - Version: s.Version, - Source: s.Source, - Metadata: s.Metadata, - } -} - -func toCreateOptions(opts *pb.CreateOptions) []runtime.CreateOption { - options := []runtime.CreateOption{} - // command options - if len(opts.Command) > 0 { - options = append(options, runtime.WithCommand(opts.Command...)) - } - // env options - if len(opts.Env) > 0 { - options = append(options, runtime.WithEnv(opts.Env)) - } - - // TODO: output options - - return options -} - -func toReadOptions(opts *pb.ReadOptions) []runtime.ReadOption { - options := []runtime.ReadOption{} - if len(opts.Service) > 0 { - options = append(options, runtime.ReadService(opts.Service)) - } - if len(opts.Version) > 0 { - options = append(options, runtime.ReadVersion(opts.Version)) - } - if len(opts.Type) > 0 { - options = append(options, runtime.ReadType(opts.Type)) - } - - return options -} - -func (r *Runtime) Create(ctx context.Context, req *pb.CreateRequest, rsp *pb.CreateResponse) error { - if req.Service == nil { - return errors.BadRequest("go.micro.runtime", "blank service") - } - - var options []runtime.CreateOption - if req.Options != nil { - options = toCreateOptions(req.Options) - } - - service := toService(req.Service) - err := r.Runtime.Create(service, options...) - if err != nil { - return errors.InternalServerError("go.micro.runtime", err.Error()) - } - - return nil -} - -func (r *Runtime) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error { - var options []runtime.ReadOption - if req.Options != nil { - options = toReadOptions(req.Options) - } - - services, err := r.Runtime.Read(options...) - if err != nil { - return errors.InternalServerError("go.micro.runtime", err.Error()) - } - - for _, service := range services { - rsp.Services = append(rsp.Services, toProto(service)) - } - - return nil -} - -func (r *Runtime) Update(ctx context.Context, req *pb.UpdateRequest, rsp *pb.UpdateResponse) error { - if req.Service == nil { - return errors.BadRequest("go.micro.runtime", "blank service") - } - - // TODO: add opts - service := toService(req.Service) - err := r.Runtime.Update(service) - if err != nil { - return errors.InternalServerError("go.micro.runtime", err.Error()) - } - - return nil -} - -func (r *Runtime) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error { - if req.Service == nil { - return errors.BadRequest("go.micro.runtime", "blank service") - } - - // TODO: add opts - service := toService(req.Service) - err := r.Runtime.Delete(service) - if err != nil { - return errors.InternalServerError("go.micro.runtime", err.Error()) - } - - return nil -} - -func (r *Runtime) List(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResponse) error { - services, err := r.Runtime.List() - if err != nil { - return errors.InternalServerError("go.micro.runtime", err.Error()) - } - - for _, service := range services { - rsp.Services = append(rsp.Services, toProto(service)) - } - - return nil -} diff --git a/store/service/handler/handler.go b/store/service/handler/handler.go deleted file mode 100644 index 3f546a9d..00000000 --- a/store/service/handler/handler.go +++ /dev/null @@ -1,89 +0,0 @@ -package handler - -import ( - "context" - "io" - "time" - - "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/store" - pb "github.com/micro/go-micro/store/service/proto" -) - -type Store struct { - Store store.Store -} - -func (s *Store) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error { - vals, err := s.Store.Read(req.Keys...) - if err != nil { - return errors.InternalServerError("go.micro.store", err.Error()) - } - for _, val := range vals { - rsp.Records = append(rsp.Records, &pb.Record{ - Key: val.Key, - Value: val.Value, - Expiry: int64(val.Expiry.Seconds()), - }) - } - return nil -} - -func (s *Store) Write(ctx context.Context, req *pb.WriteRequest, rsp *pb.WriteResponse) error { - records := make([]*store.Record, 0, len(req.Records)) - - for _, record := range req.Records { - records = append(records, &store.Record{ - Key: record.Key, - Value: record.Value, - Expiry: time.Duration(record.Expiry) * time.Second, - }) - } - - err := s.Store.Write(records...) - if err != nil { - return errors.InternalServerError("go.micro.store", err.Error()) - } - return nil -} - -func (s *Store) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error { - err := s.Store.Delete(req.Keys...) - if err != nil { - return errors.InternalServerError("go.micro.store", err.Error()) - } - return nil -} - -func (s *Store) List(ctx context.Context, req *pb.ListRequest, stream pb.Store_ListStream) error { - var vals []*store.Record - var err error - - if len(req.Key) > 0 { - vals, err = s.Store.Read(req.Key) - } else { - vals, err = s.Store.List() - } - if err != nil { - return errors.InternalServerError("go.micro.store", err.Error()) - } - rsp := new(pb.ListResponse) - - // TODO: batch sync - for _, val := range vals { - rsp.Records = append(rsp.Records, &pb.Record{ - Key: val.Key, - Value: val.Value, - Expiry: int64(val.Expiry.Seconds()), - }) - } - - err = stream.Send(rsp) - if err == io.EOF { - return nil - } - if err != nil { - return errors.InternalServerError("go.micro.store", err.Error()) - } - return nil -}