Add service registry

This commit is contained in:
Asim Aslam
2019-09-09 08:57:57 -07:00
parent 5596407144
commit b076ef906a
6 changed files with 1482 additions and 0 deletions

155
registry/service/service.go Normal file
View File

@@ -0,0 +1,155 @@
// Package service uses the registry service
package service
import (
"context"
"time"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
pb "github.com/micro/go-micro/registry/proto"
)
var (
// The default service name
DefaultService = "go.micro.service"
)
type serviceRegistry struct {
opts registry.Options
// name of the registry
name string
// address
address []string
// client to call registry
client pb.RegistryService
}
func (s *serviceRegistry) callOpts() []client.CallOption {
var opts []client.CallOption
// set registry address
if len(s.address) > 0 {
opts = append(opts, client.WithAddress(s.address...))
}
// set timeout
if s.opts.Timeout > time.Duration(0) {
opts = append(opts, client.WithRequestTimeout(s.opts.Timeout))
}
return opts
}
func (s *serviceRegistry) Init(opts ...registry.Option) error {
for _, o := range opts {
o(&s.opts)
}
return nil
}
func (s *serviceRegistry) Options() registry.Options {
return s.opts
}
func (s *serviceRegistry) Register(srv *registry.Service, opts ...registry.RegisterOption) error {
var options registry.RegisterOptions
for _, o := range opts {
o(&options)
}
// register the service
_, err := s.client.Register(context.TODO(), toProto(srv), s.callOpts()...)
if err != nil {
return err
}
return nil
}
func (s *serviceRegistry) Deregister(srv *registry.Service) error {
// deregister the service
_, err := s.client.Deregister(context.TODO(), toProto(srv), s.callOpts()...)
if err != nil {
return err
}
return nil
}
func (s *serviceRegistry) GetService(name string) ([]*registry.Service, error) {
rsp, err := s.client.GetService(context.TODO(), &pb.GetRequest{
Service: name,
}, s.callOpts()...)
if err != nil {
return nil, err
}
var services []*registry.Service
for _, service := range rsp.Services {
services = append(services, toService(service))
}
return services, nil
}
func (s *serviceRegistry) ListServices() ([]*registry.Service, error) {
rsp, err := s.client.ListServices(context.TODO(), &pb.ListRequest{}, s.callOpts()...)
if err != nil {
return nil, err
}
var services []*registry.Service
for _, service := range rsp.Services {
services = append(services, toService(service))
}
return services, nil
}
func (s *serviceRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var options registry.WatchOptions
for _, o := range opts {
o(&options)
}
stream, err := s.client.Watch(context.TODO(), &pb.WatchRequest{
Service: options.Service,
}, s.callOpts()...)
if err != nil {
return nil, err
}
return newWatcher(stream), nil
}
func (s *serviceRegistry) String() string {
return s.name
}
// NewRegistry returns a new registry service client
func NewRegistry(opts ...registry.Option) registry.Registry {
var options registry.Options
for _, o := range opts {
o(&options)
}
// use mdns to find the service registry
mReg := registry.NewRegistry()
// create new client with mdns
cli := client.NewClient(
client.Registry(mReg),
)
// service name
// TODO: accept option
name := DefaultService
return &serviceRegistry{
opts: options,
name: name,
address: options.Addrs,
client: pb.NewRegistryService(name, cli),
}
}

133
registry/service/util.go Normal file
View File

@@ -0,0 +1,133 @@
package service
import (
"github.com/micro/go-micro/registry"
pb "github.com/micro/go-micro/registry/proto"
)
func values(v []*registry.Value) []*pb.Value {
if len(v) == 0 {
return []*pb.Value{}
}
var vs []*pb.Value
for _, vi := range v {
vs = append(vs, &pb.Value{
Name: vi.Name,
Type: vi.Type,
Values: values(vi.Values),
})
}
return vs
}
func toValues(v []*pb.Value) []*registry.Value {
if len(v) == 0 {
return []*registry.Value{}
}
var vs []*registry.Value
for _, vi := range v {
vs = append(vs, &registry.Value{
Name: vi.Name,
Type: vi.Type,
Values: toValues(vi.Values),
})
}
return vs
}
func toProto(s *registry.Service) *pb.Service {
var endpoints []*pb.Endpoint
for _, ep := range s.Endpoints {
var request, response *pb.Value
if ep.Request != nil {
request = &pb.Value{
Name: ep.Request.Name,
Type: ep.Request.Type,
Values: values(ep.Request.Values),
}
}
if ep.Response != nil {
response = &pb.Value{
Name: ep.Response.Name,
Type: ep.Response.Type,
Values: values(ep.Response.Values),
}
}
endpoints = append(endpoints, &pb.Endpoint{
Name: ep.Name,
Request: request,
Response: response,
Metadata: ep.Metadata,
})
}
var nodes []*pb.Node
for _, node := range s.Nodes {
nodes = append(nodes, &pb.Node{
Id: node.Id,
Address: node.Address,
Metadata: node.Metadata,
})
}
return &pb.Service{
Name: s.Name,
Version: s.Version,
Metadata: s.Metadata,
Endpoints: endpoints,
Nodes: nodes,
}
}
func toService(s *pb.Service) *registry.Service {
var endpoints []*registry.Endpoint
for _, ep := range s.Endpoints {
var request, response *registry.Value
if ep.Request != nil {
request = &registry.Value{
Name: ep.Request.Name,
Type: ep.Request.Type,
Values: toValues(ep.Request.Values),
}
}
if ep.Response != nil {
response = &registry.Value{
Name: ep.Response.Name,
Type: ep.Response.Type,
Values: toValues(ep.Response.Values),
}
}
endpoints = append(endpoints, &registry.Endpoint{
Name: ep.Name,
Request: request,
Response: response,
Metadata: ep.Metadata,
})
}
var nodes []*registry.Node
for _, node := range s.Nodes {
nodes = append(nodes, &registry.Node{
Id: node.Id,
Address: node.Address,
Metadata: node.Metadata,
})
}
return &registry.Service{
Name: s.Name,
Version: s.Version,
Metadata: s.Metadata,
Endpoints: endpoints,
Nodes: nodes,
}
}

View File

@@ -0,0 +1,49 @@
package service
import (
"github.com/micro/go-micro/registry"
pb "github.com/micro/go-micro/registry/proto"
)
type serviceWatcher struct {
stream pb.Registry_WatchService
closed chan bool
}
func (s *serviceWatcher) Next() (*registry.Result, error) {
for {
// check if closed
select {
case <-s.closed:
return nil, registry.ErrWatcherStopped
default:
}
r, err := s.stream.Recv()
if err != nil {
return nil, err
}
return &registry.Result{
Action: r.Action,
Service: toService(r.Service),
}, nil
}
}
func (s *serviceWatcher) Stop() {
select {
case <-s.closed:
return
default:
close(s.closed)
s.stream.Close()
}
}
func newWatcher(stream pb.Registry_WatchService) registry.Watcher {
return &serviceWatcher{
stream: stream,
closed: make(chan bool),
}
}