Merge pull request #1034 from micro/strip-handlers

Strip handlers
This commit is contained in:
Asim Aslam 2019-12-10 11:54:06 +00:00 committed by GitHub
commit 8289dbabc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 43 additions and 934 deletions

View File

@ -1,66 +0,0 @@
package handler
import (
"context"
"github.com/micro/go-micro/broker"
pb "github.com/micro/go-micro/broker/service/proto"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/util/log"
)
type Broker struct {
Broker broker.Broker
}
func (b *Broker) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Empty) error {
log.Debugf("Publishing message to %s topic", req.Topic)
err := b.Broker.Publish(req.Topic, &broker.Message{
Header: req.Message.Header,
Body: req.Message.Body,
})
log.Debugf("Published message to %s topic", req.Topic)
if err != nil {
return errors.InternalServerError("go.micro.broker", err.Error())
}
return nil
}
func (b *Broker) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Broker_SubscribeStream) error {
errChan := make(chan error, 1)
// message handler to stream back messages from broker
handler := func(p broker.Event) error {
if err := stream.Send(&pb.Message{
Header: p.Message().Header,
Body: p.Message().Body,
}); err != nil {
select {
case errChan <- err:
return err
default:
return err
}
}
return nil
}
log.Debugf("Subscribing to %s topic", req.Topic)
sub, err := b.Broker.Subscribe(req.Topic, handler, broker.Queue(req.Queue))
if err != nil {
return errors.InternalServerError("go.micro.broker", err.Error())
}
defer func() {
log.Debugf("Unsubscribing from topic %s", req.Topic)
sub.Unsubscribe()
}()
select {
case <-ctx.Done():
log.Debugf("Context done for subscription to topic %s", req.Topic)
return nil
case err := <-errChan:
log.Debugf("Subscription error for topic %s: %v", req.Topic, err)
return err
}
}

View File

@ -11,7 +11,7 @@ import (
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/router"
pb "github.com/micro/go-micro/router/proto"
pb "github.com/micro/go-micro/router/service/proto"
)
type routerSelector struct {

2
go.mod
View File

@ -34,7 +34,7 @@ require (
github.com/miekg/dns v1.1.22
github.com/mitchellh/hashstructure v1.0.0
github.com/nats-io/nats.go v1.9.1
github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249 // indirect
github.com/nlopes/slack v0.6.1-0.20191106133607-d06c2a2b3249
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.4.0

View File

@ -12,11 +12,11 @@ import (
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/client"
rtr "github.com/micro/go-micro/client/selector/router"
pbNet "github.com/micro/go-micro/network/proto"
"github.com/micro/go-micro/network/resolver/dns"
pbNet "github.com/micro/go-micro/network/service/proto"
"github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/router"
pbRtr "github.com/micro/go-micro/router/proto"
pbRtr "github.com/micro/go-micro/router/service/proto"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/tunnel"

View File

@ -6,7 +6,7 @@ import (
"sync"
"time"
pb "github.com/micro/go-micro/network/proto"
pb "github.com/micro/go-micro/network/service/proto"
)
var (

View File

@ -4,7 +4,7 @@ import (
"testing"
"time"
pb "github.com/micro/go-micro/network/proto"
pb "github.com/micro/go-micro/network/service/proto"
)
var (

View File

@ -1,207 +0,0 @@
// Package handler implements network RPC handler
package handler
import (
"context"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/network"
pbNet "github.com/micro/go-micro/network/proto"
"github.com/micro/go-micro/router"
pbRtr "github.com/micro/go-micro/router/proto"
"github.com/micro/go-micro/util/log"
)
// Network implements network handler
type Network struct {
Network network.Network
}
func flatten(n network.Node, visited map[string]bool) []network.Node {
// if node is nil runaway
if n == nil {
return nil
}
// set visisted
if visited == nil {
visited = make(map[string]bool)
}
// create new list of nodes
//nolint:prealloc
var nodes []network.Node
// check if already visited
if !visited[n.Id()] {
// append the current node
nodes = append(nodes, n)
}
// set to visited
visited[n.Id()] = true
// visit the list of peers
for _, node := range n.Peers() {
nodes = append(nodes, flatten(node, visited)...)
}
return nodes
}
func (n *Network) Connect(ctx context.Context, req *pbNet.ConnectRequest, resp *pbNet.ConnectResponse) error {
if len(req.Nodes) == 0 {
return nil
}
// get list of existing nodes
nodes := n.Network.Options().Nodes
// generate a node map
nodeMap := make(map[string]bool)
for _, node := range nodes {
nodeMap[node] = true
}
for _, node := range req.Nodes {
// TODO: we may have been provided a network only
// so process anad resolve node.Network
if len(node.Address) == 0 {
continue
}
// already exists
if _, ok := nodeMap[node.Address]; ok {
continue
}
nodeMap[node.Address] = true
nodes = append(nodes, node.Address)
}
log.Infof("Network.Connect setting peers: %v", nodes)
// reinitialise the peers
n.Network.Init(
network.Nodes(nodes...),
)
// call the connect method
n.Network.Connect()
return nil
}
// Nodes returns the list of nodes
func (n *Network) Nodes(ctx context.Context, req *pbNet.NodesRequest, resp *pbNet.NodesResponse) error {
// root node
nodes := map[string]network.Node{}
// get peers encoded into protobuf
peers := flatten(n.Network, nil)
// walk all the peers
for _, peer := range peers {
if peer == nil {
continue
}
if _, ok := nodes[peer.Id()]; ok {
continue
}
// add to visited list
nodes[n.Network.Id()] = peer
resp.Nodes = append(resp.Nodes, &pbNet.Node{
Id: peer.Id(),
Address: peer.Address(),
})
}
return nil
}
// Graph returns the network graph from this root node
func (n *Network) Graph(ctx context.Context, req *pbNet.GraphRequest, resp *pbNet.GraphResponse) error {
depth := uint(req.Depth)
if depth <= 0 || depth > network.MaxDepth {
depth = network.MaxDepth
}
// get peers encoded into protobuf
peers := network.PeersToProto(n.Network, depth)
// set the root node
resp.Root = peers
return nil
}
// Routes returns a list of routing table routes
func (n *Network) Routes(ctx context.Context, req *pbNet.RoutesRequest, resp *pbNet.RoutesResponse) error {
// build query
var qOpts []router.QueryOption
if q := req.Query; q != nil {
if len(q.Service) > 0 {
qOpts = append(qOpts, router.QueryService(q.Service))
}
if len(q.Address) > 0 {
qOpts = append(qOpts, router.QueryAddress(q.Address))
}
if len(q.Gateway) > 0 {
qOpts = append(qOpts, router.QueryGateway(q.Gateway))
}
if len(q.Router) > 0 {
qOpts = append(qOpts, router.QueryRouter(q.Router))
}
if len(q.Network) > 0 {
qOpts = append(qOpts, router.QueryNetwork(q.Network))
}
}
routes, err := n.Network.Options().Router.Table().Query(qOpts...)
if err != nil {
return errors.InternalServerError("go.micro.network", "failed to list routes: %s", err)
}
respRoutes := make([]*pbRtr.Route, 0, len(routes))
for _, route := range routes {
respRoute := &pbRtr.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Router: route.Router,
Link: route.Link,
Metric: int64(route.Metric),
}
respRoutes = append(respRoutes, respRoute)
}
resp.Routes = respRoutes
return nil
}
// Services returns a list of services based on the routing table
func (n *Network) Services(ctx context.Context, req *pbNet.ServicesRequest, resp *pbNet.ServicesResponse) error {
routes, err := n.Network.Options().Router.Table().List()
if err != nil {
return errors.InternalServerError("go.micro.network", "failed to list services: %s", err)
}
services := make(map[string]bool)
for _, route := range routes {
if _, ok := services[route.Service]; ok {
continue
}
services[route.Service] = true
resp.Services = append(resp.Services, route.Service)
}
return nil
}

View File

@ -6,7 +6,7 @@ package go_micro_network
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
proto1 "github.com/micro/go-micro/router/proto"
proto1 "github.com/micro/go-micro/router/service/proto"
math "math"
)
@ -697,41 +697,41 @@ func init() {
}
var fileDescriptor_0b7953b26a7c4730 = []byte{
// 573 bytes of a gzipped FileDescriptorProto
// 576 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x61, 0x6a, 0xdb, 0x4c,
0x10, 0x8d, 0x2c, 0xcb, 0x76, 0xe6, 0x8b, 0xfd, 0xb9, 0x4b, 0x49, 0x85, 0x7e, 0xb4, 0xee, 0xe2,
0x1f, 0xa1, 0x34, 0x32, 0xc4, 0x04, 0x4a, 0x43, 0x43, 0x20, 0x94, 0x42, 0x21, 0x21, 0x55, 0x2e,
0x1f, 0xa1, 0x34, 0x32, 0x24, 0x04, 0x4a, 0x4d, 0x43, 0x20, 0x94, 0x42, 0x21, 0x21, 0x55, 0x2e,
0x50, 0xc5, 0x1a, 0x6c, 0x93, 0x58, 0xeb, 0xac, 0xd6, 0x09, 0x3e, 0x41, 0x8f, 0xd0, 0x33, 0xf5,
0x56, 0x65, 0x77, 0x47, 0x8a, 0x1d, 0xcb, 0xa2, 0xf9, 0xe7, 0xd1, 0xbc, 0xf7, 0x66, 0x67, 0xe6,
0x8d, 0xe1, 0x78, 0x3c, 0x55, 0x93, 0xc5, 0x4d, 0x38, 0x12, 0xb3, 0xc1, 0x6c, 0x3a, 0x92, 0x62,
0x8d, 0xe1, 0x64, 0x3c, 0x55, 0x93, 0xc5, 0x4d, 0x38, 0x12, 0xb3, 0xc1, 0x6c, 0x3a, 0x92, 0x62,
0x30, 0x16, 0x87, 0xf6, 0x47, 0x8a, 0xea, 0x51, 0xc8, 0xdb, 0xc1, 0x5c, 0x0a, 0x55, 0x44, 0xa1,
0x89, 0x58, 0x77, 0x2c, 0x42, 0x83, 0x0a, 0xe9, 0x7b, 0x30, 0xdc, 0x2e, 0x24, 0xc5, 0x42, 0xa1,
0x24, 0x1d, 0x1b, 0x58, 0x19, 0xfe, 0xcb, 0x01, 0xef, 0xc7, 0x02, 0xe5, 0x92, 0xf9, 0xd0, 0xcc,
0x50, 0x3e, 0x4c, 0x47, 0xe8, 0x3b, 0x3d, 0xe7, 0x60, 0x37, 0xca, 0x43, 0x9d, 0x89, 0x93, 0x44,
0x62, 0x96, 0xf9, 0x35, 0x9b, 0xa1, 0x50, 0x67, 0xc6, 0xb1, 0xc2, 0xc7, 0x78, 0xe9, 0xbb, 0x36,
0x43, 0x21, 0xdb, 0x87, 0x86, 0xad, 0xe3, 0xd7, 0x4d, 0x82, 0x22, 0xcd, 0xa0, 0xf7, 0xfa, 0x9e,
0x65, 0x50, 0xc8, 0x4f, 0xa1, 0x73, 0x2e, 0xd2, 0x14, 0x47, 0x2a, 0xc2, 0xfb, 0x05, 0x66, 0x8a,
0x7d, 0x04, 0x2f, 0x15, 0x09, 0x66, 0xbe, 0xd3, 0x73, 0x0f, 0xfe, 0x3b, 0xda, 0x0f, 0x9f, 0xb7,
0x1c, 0x5e, 0x8a, 0x04, 0x23, 0x0b, 0xe2, 0xaf, 0xe0, 0xff, 0x82, 0x9f, 0xcd, 0x45, 0x9a, 0x21,
0xef, 0xc3, 0x9e, 0x46, 0x64, 0xb9, 0xe0, 0x6b, 0xf0, 0x12, 0x9c, 0xab, 0x89, 0x69, 0xb0, 0x1d,
0xd9, 0x80, 0x7f, 0x81, 0x36, 0xa1, 0x2c, 0xed, 0x85, 0x75, 0xfb, 0xb0, 0xf7, 0x4d, 0xc6, 0xf3,
0x49, 0x75, 0x91, 0x13, 0x68, 0x13, 0x8a, 0x8a, 0x7c, 0x80, 0xba, 0x14, 0x42, 0x19, 0x54, 0x69,
0x8d, 0x2b, 0x44, 0x19, 0x19, 0x0c, 0x3f, 0x85, 0x76, 0xa4, 0xc7, 0x57, 0x34, 0x72, 0x08, 0xde,
0xbd, 0x5e, 0x1a, 0xb1, 0xdf, 0x6c, 0xb2, 0xcd, 0x4e, 0x23, 0x8b, 0xe2, 0x67, 0xd0, 0xc9, 0xf9,
0x54, 0x3d, 0xa4, 0xf5, 0x94, 0xf4, 0x48, 0xf6, 0x30, 0x04, 0x5a, 0x9b, 0x19, 0xee, 0xb5, 0x75,
0x43, 0xfe, 0x06, 0x1e, 0x42, 0xf7, 0xe9, 0x13, 0xc9, 0x06, 0xd0, 0x22, 0xd3, 0x58, 0xe1, 0xdd,
0xa8, 0x88, 0xf9, 0x1f, 0x07, 0xea, 0x7a, 0x6e, 0xac, 0x03, 0xb5, 0x69, 0x42, 0x1e, 0xab, 0x4d,
0x93, 0x6a, 0x7b, 0xe5, 0x66, 0x71, 0xd7, 0xcc, 0xc2, 0xce, 0xa0, 0x35, 0x43, 0x15, 0x27, 0xb1,
0x8a, 0xfd, 0xba, 0xe9, 0xa0, 0x5f, 0xbe, 0xa5, 0xf0, 0x82, 0x60, 0x5f, 0x53, 0x25, 0x97, 0x51,
0xc1, 0x0a, 0x4e, 0xa0, 0xbd, 0x96, 0x62, 0x5d, 0x70, 0x6f, 0x71, 0x49, 0xef, 0xd2, 0x3f, 0xf5,
0x26, 0x1f, 0xe2, 0xbb, 0x05, 0xd2, 0xb3, 0x6c, 0xf0, 0xb9, 0xf6, 0xc9, 0xe1, 0xc7, 0xd0, 0x24,
0xaf, 0xe9, 0x3d, 0x6a, 0x1f, 0x6c, 0xdf, 0xa3, 0xf1, 0x8a, 0xc1, 0xf0, 0x21, 0x78, 0xe7, 0x77,
0xc2, 0x2e, 0xff, 0x9f, 0x49, 0x3f, 0xa1, 0xae, 0xad, 0xf0, 0x12, 0x8e, 0x76, 0xf0, 0x1c, 0x51,
0xea, 0x81, 0xba, 0x15, 0xee, 0xb2, 0xa0, 0xa3, 0xdf, 0x2e, 0x34, 0x2f, 0x69, 0xb0, 0x57, 0x4f,
0x9d, 0xf5, 0x36, 0x59, 0xeb, 0x07, 0x1a, 0xbc, 0xaf, 0x40, 0xd0, 0x09, 0xee, 0xb0, 0xef, 0xe0,
0x19, 0xe7, 0xb3, 0xb7, 0x9b, 0xe8, 0xd5, 0xc3, 0x09, 0xde, 0x6d, 0xcd, 0xaf, 0x6a, 0x99, 0x53,
0x2d, 0xd3, 0x5a, 0xbd, 0xf4, 0x32, 0xad, 0xb5, 0x1b, 0xe7, 0x3b, 0xec, 0x02, 0x1a, 0xf6, 0x28,
0x58, 0x09, 0x78, 0xed, 0xdc, 0x82, 0xde, 0x76, 0x40, 0x21, 0x77, 0x0d, 0xad, 0xfc, 0x1c, 0x58,
0xc9, 0x5c, 0x9e, 0x5d, 0x4f, 0xc0, 0xab, 0x20, 0xb9, 0xe8, 0x4d, 0xc3, 0xfc, 0x49, 0x0f, 0xff,
0x06, 0x00, 0x00, 0xff, 0xff, 0x79, 0x8a, 0x5f, 0xf0, 0x24, 0x06, 0x00, 0x00,
0x1c, 0x64, 0x28, 0x1f, 0xa6, 0x23, 0x24, 0x3d, 0xfb, 0xd1, 0xca, 0xf1, 0x5f, 0x0e, 0x78, 0x3f,
0x16, 0x28, 0x97, 0xcc, 0x87, 0x26, 0xe1, 0x7c, 0xa7, 0xe7, 0x1c, 0xec, 0x46, 0x79, 0xa8, 0x33,
0x71, 0x92, 0x48, 0xcc, 0x32, 0xbf, 0x66, 0x33, 0x14, 0xea, 0xcc, 0x38, 0x56, 0xf8, 0x18, 0x2f,
0x7d, 0xd7, 0x66, 0x28, 0x64, 0xfb, 0xd0, 0xb0, 0x75, 0xfc, 0xba, 0x49, 0x50, 0xa4, 0x19, 0xf4,
0x6e, 0xdf, 0xb3, 0x0c, 0x0a, 0xf9, 0x29, 0x74, 0xce, 0x45, 0x9a, 0xe2, 0x48, 0x45, 0x78, 0xbf,
0xc0, 0x4c, 0xb1, 0x8f, 0xe0, 0xa5, 0x22, 0xc1, 0xcc, 0x77, 0x7a, 0xee, 0xc1, 0x7f, 0x47, 0xfb,
0xe1, 0xf3, 0xd6, 0xc3, 0x4b, 0x91, 0x60, 0x64, 0x41, 0xfc, 0x15, 0xfc, 0x5f, 0xf0, 0xb3, 0xb9,
0x48, 0x33, 0xe4, 0x7d, 0xd8, 0xd3, 0x88, 0x2c, 0x17, 0x7c, 0x0d, 0x5e, 0x82, 0x73, 0x35, 0x31,
0x0d, 0xb6, 0x23, 0x1b, 0xf0, 0x2f, 0xd0, 0x26, 0x94, 0xa5, 0xbd, 0xb0, 0x6e, 0x1f, 0xf6, 0xbe,
0xc9, 0x78, 0x3e, 0xa9, 0x2e, 0x32, 0x84, 0x36, 0xa1, 0xa8, 0xc8, 0x07, 0xa8, 0x4b, 0x21, 0x94,
0x41, 0x95, 0xd6, 0xb8, 0x42, 0x94, 0x91, 0xc1, 0xf0, 0x53, 0x68, 0x47, 0x7a, 0x7c, 0x45, 0x23,
0x87, 0xe0, 0xdd, 0xeb, 0xa5, 0x11, 0xfb, 0xcd, 0x26, 0xdb, 0xec, 0x34, 0xb2, 0x28, 0x7e, 0x06,
0x9d, 0x9c, 0x4f, 0xd5, 0x43, 0x5a, 0x4f, 0x49, 0x8f, 0x64, 0x0f, 0x43, 0xa0, 0xb5, 0x99, 0xe1,
0x5e, 0x5b, 0x37, 0xe4, 0x6f, 0xe0, 0x21, 0x74, 0x9f, 0x3e, 0x91, 0x6c, 0x00, 0x2d, 0x32, 0x8d,
0x15, 0xde, 0x8d, 0x8a, 0x98, 0xff, 0x71, 0xa0, 0xae, 0xe7, 0xc6, 0x3a, 0x50, 0x9b, 0x26, 0xe4,
0xb1, 0xda, 0x34, 0xa9, 0xb6, 0x57, 0x6e, 0x16, 0x77, 0xcd, 0x2c, 0xec, 0x0c, 0x5a, 0x33, 0x54,
0x71, 0x12, 0xab, 0xd8, 0xaf, 0x9b, 0x0e, 0xfa, 0xe5, 0x5b, 0x0a, 0x2f, 0x08, 0xf6, 0x35, 0x55,
0x72, 0x19, 0x15, 0xac, 0x60, 0x08, 0xed, 0xb5, 0x14, 0xeb, 0x82, 0x7b, 0x8b, 0x4b, 0x7a, 0x97,
0xfe, 0xa9, 0x37, 0xf9, 0x10, 0xdf, 0x2d, 0x90, 0x9e, 0x65, 0x83, 0xcf, 0xb5, 0x4f, 0x0e, 0x3f,
0x81, 0x26, 0x79, 0x4d, 0xef, 0x51, 0xfb, 0x60, 0xfb, 0x1e, 0x8d, 0x57, 0x0c, 0x86, 0x1f, 0x83,
0x77, 0x7e, 0x27, 0xec, 0xf2, 0xff, 0x99, 0xf4, 0x13, 0xea, 0xda, 0x0a, 0x2f, 0xe1, 0x68, 0x07,
0xcf, 0x11, 0xa5, 0x1e, 0xa8, 0x5b, 0xe1, 0x2e, 0x0b, 0x3a, 0xfa, 0xed, 0x42, 0xf3, 0x92, 0x06,
0x7b, 0xf5, 0xd4, 0x59, 0x6f, 0x93, 0xb5, 0x7e, 0xa0, 0xc1, 0xfb, 0x0a, 0x04, 0x9d, 0xe0, 0x0e,
0xfb, 0x0e, 0x9e, 0x71, 0x3e, 0x7b, 0xbb, 0x89, 0x5e, 0x3d, 0x9c, 0xe0, 0xdd, 0xd6, 0xfc, 0xaa,
0x96, 0x39, 0xd5, 0x32, 0xad, 0xd5, 0x4b, 0x2f, 0xd3, 0x5a, 0xbb, 0x71, 0xbe, 0xc3, 0x2e, 0xa0,
0x61, 0x8f, 0x82, 0x95, 0x80, 0xd7, 0xce, 0x2d, 0xe8, 0x6d, 0x07, 0x14, 0x72, 0xd7, 0xd0, 0xca,
0xcf, 0x81, 0x95, 0xcc, 0xe5, 0xd9, 0xf5, 0x04, 0xbc, 0x0a, 0x92, 0x8b, 0xde, 0x34, 0xcc, 0x9f,
0xf4, 0xf1, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa7, 0x5b, 0x0a, 0x25, 0x2c, 0x06, 0x00, 0x00,
}

View File

@ -6,7 +6,7 @@ package go_micro_network
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
_ "github.com/micro/go-micro/router/proto"
_ "github.com/micro/go-micro/router/service/proto"
math "math"
)

View File

@ -2,7 +2,7 @@ syntax = "proto3";
package go.micro.network;
import "github.com/micro/go-micro/router/proto/router.proto";
import "github.com/micro/go-micro/router/service/proto/router.proto";
// Network service is usesd to gain visibility into networks
service Network {

View File

@ -1,82 +0,0 @@
package handler
import (
"context"
"time"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/service"
pb "github.com/micro/go-micro/registry/service/proto"
)
type Registry struct {
// internal registry
Registry registry.Registry
}
func (r *Registry) GetService(ctx context.Context, req *pb.GetRequest, rsp *pb.GetResponse) error {
services, err := r.Registry.GetService(req.Service)
if err != nil {
return errors.InternalServerError("go.micro.registry", err.Error())
}
for _, srv := range services {
rsp.Services = append(rsp.Services, service.ToProto(srv))
}
return nil
}
func (r *Registry) Register(ctx context.Context, req *pb.Service, rsp *pb.EmptyResponse) error {
var regOpts []registry.RegisterOption
if req.Options != nil {
ttl := time.Duration(req.Options.Ttl) * time.Second
regOpts = append(regOpts, registry.RegisterTTL(ttl))
}
err := r.Registry.Register(service.ToService(req), regOpts...)
if err != nil {
return errors.InternalServerError("go.micro.registry", err.Error())
}
return nil
}
func (r *Registry) Deregister(ctx context.Context, req *pb.Service, rsp *pb.EmptyResponse) error {
err := r.Registry.Deregister(service.ToService(req))
if err != nil {
return errors.InternalServerError("go.micro.registry", err.Error())
}
return nil
}
func (r *Registry) ListServices(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResponse) error {
services, err := r.Registry.ListServices()
if err != nil {
return errors.InternalServerError("go.micro.registry", err.Error())
}
for _, srv := range services {
rsp.Services = append(rsp.Services, service.ToProto(srv))
}
return nil
}
func (r *Registry) Watch(ctx context.Context, req *pb.WatchRequest, rsp pb.Registry_WatchStream) error {
watcher, err := r.Registry.Watch(registry.WatchService(req.Service))
if err != nil {
return errors.InternalServerError("go.micro.registry", err.Error())
}
for {
next, err := watcher.Next()
if err != nil {
return errors.InternalServerError("go.micro.registry", err.Error())
}
err = rsp.Send(&pb.Result{
Action: next.Action,
Service: service.ToProto(next.Service),
})
if err != nil {
return errors.InternalServerError("go.micro.registry", err.Error())
}
}
}

View File

@ -1,190 +0,0 @@
package handler
import (
"context"
"io"
"time"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/router"
pb "github.com/micro/go-micro/router/proto"
)
// Router implements router handler
type Router struct {
Router router.Router
}
// Lookup looks up routes in the routing table and returns them
func (r *Router) Lookup(ctx context.Context, req *pb.LookupRequest, resp *pb.LookupResponse) error {
routes, err := r.Router.Lookup(router.QueryService(req.Query.Service))
if err != nil {
return errors.InternalServerError("go.micro.router", "failed to lookup routes: %v", err)
}
respRoutes := make([]*pb.Route, 0, len(routes))
for _, route := range routes {
respRoute := &pb.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Router: route.Router,
Link: route.Link,
Metric: route.Metric,
}
respRoutes = append(respRoutes, respRoute)
}
resp.Routes = respRoutes
return nil
}
// Solicit triggers full routing table advertisement
func (r *Router) Solicit(ctx context.Context, req *pb.Request, resp *pb.Response) error {
if err := r.Router.Solicit(); err != nil {
return err
}
return nil
}
// Advertise streams router advertisements
func (r *Router) Advertise(ctx context.Context, req *pb.Request, stream pb.Router_AdvertiseStream) error {
advertChan, err := r.Router.Advertise()
if err != nil {
return errors.InternalServerError("go.micro.router", "failed to get adverts: %v", err)
}
for advert := range advertChan {
var events []*pb.Event
for _, event := range advert.Events {
route := &pb.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Router: event.Route.Router,
Link: event.Route.Link,
Metric: event.Route.Metric,
}
e := &pb.Event{
Type: pb.EventType(event.Type),
Timestamp: event.Timestamp.UnixNano(),
Route: route,
}
events = append(events, e)
}
advert := &pb.Advert{
Id: advert.Id,
Type: pb.AdvertType(advert.Type),
Timestamp: advert.Timestamp.UnixNano(),
Events: events,
}
// send the advert
err := stream.Send(advert)
if err == io.EOF {
return nil
}
if err != nil {
return errors.InternalServerError("go.micro.router", "error sending message %v", err)
}
}
return nil
}
// Process processes advertisements
func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessResponse) error {
events := make([]*router.Event, len(req.Events))
for i, event := range req.Events {
route := router.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Router: event.Route.Router,
Link: event.Route.Link,
Metric: event.Route.Metric,
}
events[i] = &router.Event{
Type: router.EventType(event.Type),
Timestamp: time.Unix(0, event.Timestamp),
Route: route,
}
}
advert := &router.Advert{
Id: req.Id,
Type: router.AdvertType(req.Type),
Timestamp: time.Unix(0, req.Timestamp),
TTL: time.Duration(req.Ttl),
Events: events,
}
if err := r.Router.Process(advert); err != nil {
return errors.InternalServerError("go.micro.router", "error publishing advert: %v", err)
}
return nil
}
// Status returns router status
func (r *Router) Status(ctx context.Context, req *pb.Request, rsp *pb.StatusResponse) error {
status := r.Router.Status()
rsp.Status = &pb.Status{
Code: status.Code.String(),
}
if status.Error != nil {
rsp.Status.Error = status.Error.Error()
}
return nil
}
// Watch streans routing table events
func (r *Router) Watch(ctx context.Context, req *pb.WatchRequest, stream pb.Router_WatchStream) error {
watcher, err := r.Router.Watch()
if err != nil {
return errors.InternalServerError("go.micro.router", "failed creating event watcher: %v", err)
}
defer stream.Close()
for {
event, err := watcher.Next()
if err == router.ErrWatcherStopped {
return errors.InternalServerError("go.micro.router", "watcher stopped")
}
if err != nil {
return errors.InternalServerError("go.micro.router", "error watching events: %v", err)
}
route := &pb.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Router: event.Route.Router,
Link: event.Route.Link,
Metric: event.Route.Metric,
}
tableEvent := &pb.Event{
Type: pb.EventType(event.Type),
Timestamp: event.Timestamp.UnixNano(),
Route: route,
}
if err := stream.Send(tableEvent); err != nil {
return err
}
}
}

View File

@ -1,115 +0,0 @@
package handler
import (
"context"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/router"
pb "github.com/micro/go-micro/router/proto"
)
type Table struct {
Router router.Router
}
func (t *Table) Create(ctx context.Context, route *pb.Route, resp *pb.CreateResponse) error {
err := t.Router.Table().Create(router.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Router: route.Router,
Link: route.Link,
Metric: route.Metric,
})
if err != nil {
return errors.InternalServerError("go.micro.router", "failed to create route: %s", err)
}
return nil
}
func (t *Table) Update(ctx context.Context, route *pb.Route, resp *pb.UpdateResponse) error {
err := t.Router.Table().Update(router.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Router: route.Router,
Link: route.Link,
Metric: route.Metric,
})
if err != nil {
return errors.InternalServerError("go.micro.router", "failed to update route: %s", err)
}
return nil
}
func (t *Table) Delete(ctx context.Context, route *pb.Route, resp *pb.DeleteResponse) error {
err := t.Router.Table().Delete(router.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Router: route.Router,
Link: route.Link,
Metric: route.Metric,
})
if err != nil {
return errors.InternalServerError("go.micro.router", "failed to delete route: %s", err)
}
return nil
}
// List returns all routes in the routing table
func (t *Table) List(ctx context.Context, req *pb.Request, resp *pb.ListResponse) error {
routes, err := t.Router.Table().List()
if err != nil {
return errors.InternalServerError("go.micro.router", "failed to list routes: %s", err)
}
respRoutes := make([]*pb.Route, 0, len(routes))
for _, route := range routes {
respRoute := &pb.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Router: route.Router,
Link: route.Link,
Metric: route.Metric,
}
respRoutes = append(respRoutes, respRoute)
}
resp.Routes = respRoutes
return nil
}
func (t *Table) Query(ctx context.Context, req *pb.QueryRequest, resp *pb.QueryResponse) error {
routes, err := t.Router.Table().Query(router.QueryService(req.Query.Service))
if err != nil {
return errors.InternalServerError("go.micro.router", "failed to lookup routes: %s", err)
}
respRoutes := make([]*pb.Route, 0, len(routes))
for _, route := range routes {
respRoute := &pb.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Router: route.Router,
Link: route.Link,
Metric: route.Metric,
}
respRoutes = append(respRoutes, respRoute)
}
resp.Routes = respRoutes
return nil
}

View File

@ -10,7 +10,7 @@ import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/router"
pb "github.com/micro/go-micro/router/proto"
pb "github.com/micro/go-micro/router/service/proto"
)
type svc struct {

View File

@ -5,7 +5,7 @@ import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/router"
pb "github.com/micro/go-micro/router/proto"
pb "github.com/micro/go-micro/router/service/proto"
)
type table struct {

View File

@ -6,7 +6,7 @@ import (
"time"
"github.com/micro/go-micro/router"
pb "github.com/micro/go-micro/router/proto"
pb "github.com/micro/go-micro/router/service/proto"
)
type watcher struct {

View File

@ -1,142 +0,0 @@
package handler
import (
"context"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/runtime"
pb "github.com/micro/go-micro/runtime/service/proto"
)
type Runtime struct {
Runtime runtime.Runtime
}
func toProto(s *runtime.Service) *pb.Service {
return &pb.Service{
Name: s.Name,
Version: s.Version,
Source: s.Source,
Metadata: s.Metadata,
}
}
func toService(s *pb.Service) *runtime.Service {
return &runtime.Service{
Name: s.Name,
Version: s.Version,
Source: s.Source,
Metadata: s.Metadata,
}
}
func toCreateOptions(opts *pb.CreateOptions) []runtime.CreateOption {
options := []runtime.CreateOption{}
// command options
if len(opts.Command) > 0 {
options = append(options, runtime.WithCommand(opts.Command...))
}
// env options
if len(opts.Env) > 0 {
options = append(options, runtime.WithEnv(opts.Env))
}
// TODO: output options
return options
}
func toReadOptions(opts *pb.ReadOptions) []runtime.ReadOption {
options := []runtime.ReadOption{}
if len(opts.Service) > 0 {
options = append(options, runtime.ReadService(opts.Service))
}
if len(opts.Version) > 0 {
options = append(options, runtime.ReadVersion(opts.Version))
}
if len(opts.Type) > 0 {
options = append(options, runtime.ReadType(opts.Type))
}
return options
}
func (r *Runtime) Create(ctx context.Context, req *pb.CreateRequest, rsp *pb.CreateResponse) error {
if req.Service == nil {
return errors.BadRequest("go.micro.runtime", "blank service")
}
var options []runtime.CreateOption
if req.Options != nil {
options = toCreateOptions(req.Options)
}
service := toService(req.Service)
err := r.Runtime.Create(service, options...)
if err != nil {
return errors.InternalServerError("go.micro.runtime", err.Error())
}
return nil
}
func (r *Runtime) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error {
var options []runtime.ReadOption
if req.Options != nil {
options = toReadOptions(req.Options)
}
services, err := r.Runtime.Read(options...)
if err != nil {
return errors.InternalServerError("go.micro.runtime", err.Error())
}
for _, service := range services {
rsp.Services = append(rsp.Services, toProto(service))
}
return nil
}
func (r *Runtime) Update(ctx context.Context, req *pb.UpdateRequest, rsp *pb.UpdateResponse) error {
if req.Service == nil {
return errors.BadRequest("go.micro.runtime", "blank service")
}
// TODO: add opts
service := toService(req.Service)
err := r.Runtime.Update(service)
if err != nil {
return errors.InternalServerError("go.micro.runtime", err.Error())
}
return nil
}
func (r *Runtime) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error {
if req.Service == nil {
return errors.BadRequest("go.micro.runtime", "blank service")
}
// TODO: add opts
service := toService(req.Service)
err := r.Runtime.Delete(service)
if err != nil {
return errors.InternalServerError("go.micro.runtime", err.Error())
}
return nil
}
func (r *Runtime) List(ctx context.Context, req *pb.ListRequest, rsp *pb.ListResponse) error {
services, err := r.Runtime.List()
if err != nil {
return errors.InternalServerError("go.micro.runtime", err.Error())
}
for _, service := range services {
rsp.Services = append(rsp.Services, toProto(service))
}
return nil
}

View File

@ -1,89 +0,0 @@
package handler
import (
"context"
"io"
"time"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/store"
pb "github.com/micro/go-micro/store/service/proto"
)
type Store struct {
Store store.Store
}
func (s *Store) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error {
vals, err := s.Store.Read(req.Keys...)
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
for _, val := range vals {
rsp.Records = append(rsp.Records, &pb.Record{
Key: val.Key,
Value: val.Value,
Expiry: int64(val.Expiry.Seconds()),
})
}
return nil
}
func (s *Store) Write(ctx context.Context, req *pb.WriteRequest, rsp *pb.WriteResponse) error {
records := make([]*store.Record, 0, len(req.Records))
for _, record := range req.Records {
records = append(records, &store.Record{
Key: record.Key,
Value: record.Value,
Expiry: time.Duration(record.Expiry) * time.Second,
})
}
err := s.Store.Write(records...)
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
return nil
}
func (s *Store) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error {
err := s.Store.Delete(req.Keys...)
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
return nil
}
func (s *Store) List(ctx context.Context, req *pb.ListRequest, stream pb.Store_ListStream) error {
var vals []*store.Record
var err error
if len(req.Key) > 0 {
vals, err = s.Store.Read(req.Key)
} else {
vals, err = s.Store.List()
}
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
rsp := new(pb.ListResponse)
// TODO: batch sync
for _, val := range vals {
rsp.Records = append(rsp.Records, &pb.Record{
Key: val.Key,
Value: val.Value,
Expiry: int64(val.Expiry.Seconds()),
})
}
err = stream.Send(rsp)
if err == io.EOF {
return nil
}
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
return nil
}