meh
This commit is contained in:
parent
ce0c5908a6
commit
5ec9d561a6
18
options.go
18
options.go
@ -1,6 +1,8 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/micro/cli"
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/client"
|
||||
@ -20,6 +22,10 @@ type Options struct {
|
||||
Registry registry.Registry
|
||||
Transport transport.Transport
|
||||
|
||||
// Registration options
|
||||
RegisterTTL time.Duration
|
||||
RegisterInterval time.Duration
|
||||
|
||||
// Before and After funcs
|
||||
BeforeStart []func() error
|
||||
AfterStop []func() error
|
||||
@ -117,6 +123,18 @@ func Action(a func(*cli.Context)) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func RegisterTTL(t time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.RegisterTTL = t
|
||||
}
|
||||
}
|
||||
|
||||
func RegisterInterval(t time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.RegisterInterval = t
|
||||
}
|
||||
}
|
||||
|
||||
// Before and Afters
|
||||
|
||||
func BeforeStart(fn func() error) Option {
|
||||
|
@ -161,19 +161,28 @@ func (c *consulRegistry) Deregister(s *Service) error {
|
||||
|
||||
node := s.Nodes[0]
|
||||
|
||||
_, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{
|
||||
if _, err := c.Client.Catalog().Deregister(&consul.CatalogDeregistration{
|
||||
Node: node.Id,
|
||||
Address: node.Address,
|
||||
ServiceID: node.Id,
|
||||
}, nil)
|
||||
return err
|
||||
CheckID: node.Id,
|
||||
}, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.Client.Agent().ServiceDeregister(node.Id)
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Register(s *Service) error {
|
||||
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
|
||||
var options RegisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
node := s.Nodes[0]
|
||||
|
||||
tags := encodeMetadata(node.Metadata)
|
||||
@ -191,15 +200,37 @@ func (c *consulRegistry) Register(s *Service) error {
|
||||
Tags: tags,
|
||||
Address: node.Address,
|
||||
},
|
||||
Check: &consul.AgentCheck{
|
||||
Node: node.Id,
|
||||
CheckID: node.Id,
|
||||
Name: s.Name,
|
||||
ServiceID: node.Id,
|
||||
ServiceName: s.Name,
|
||||
Status: "passing",
|
||||
},
|
||||
}, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
if options.TTL <= time.Duration(0) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// this is cruft
|
||||
return c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
|
||||
ID: node.Id,
|
||||
Name: s.Name,
|
||||
Tags: tags,
|
||||
Port: node.Port,
|
||||
Address: node.Address,
|
||||
Check: &consul.AgentServiceCheck{
|
||||
TTL: fmt.Sprintf("%v", options.TTL),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
rsp, _, err := c.Client.Catalog().Service(name, "", nil)
|
||||
rsp, _, err := c.Client.Health().Service(name, "", true, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -207,37 +238,37 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
serviceMap := map[string]*Service{}
|
||||
|
||||
for _, s := range rsp {
|
||||
if s.ServiceName != name {
|
||||
if s.Service.Service != name {
|
||||
continue
|
||||
}
|
||||
|
||||
// version is now a tag
|
||||
version, found := decodeVersion(s.ServiceTags)
|
||||
version, found := decodeVersion(s.Service.Tags)
|
||||
// service ID is now the node id
|
||||
id := s.ServiceID
|
||||
id := s.Service.ID
|
||||
// key is always the version
|
||||
key := version
|
||||
// address is service address
|
||||
address := s.ServiceAddress
|
||||
address := s.Service.Address
|
||||
|
||||
// if we can't get the new type of version
|
||||
// use old the old ways
|
||||
if !found {
|
||||
// id was set as node
|
||||
id = s.Node
|
||||
id = s.Node.Node
|
||||
// key was service id
|
||||
key = s.ServiceID
|
||||
key = s.Service.ID
|
||||
// version was service id
|
||||
version = s.ServiceID
|
||||
version = s.Service.ID
|
||||
// address was address
|
||||
address = s.Address
|
||||
address = s.Node.Address
|
||||
}
|
||||
|
||||
svc, ok := serviceMap[key]
|
||||
if !ok {
|
||||
svc = &Service{
|
||||
Endpoints: decodeEndpoints(s.ServiceTags),
|
||||
Name: s.ServiceName,
|
||||
Endpoints: decodeEndpoints(s.Service.Tags),
|
||||
Name: s.Service.Service,
|
||||
Version: version,
|
||||
}
|
||||
serviceMap[key] = svc
|
||||
@ -246,8 +277,8 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
svc.Nodes = append(svc.Nodes, &Node{
|
||||
Id: id,
|
||||
Address: address,
|
||||
Port: s.ServicePort,
|
||||
Metadata: decodeMetadata(s.ServiceTags),
|
||||
Port: s.Service.Port,
|
||||
Metadata: decodeMetadata(s.Service.Tags),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,13 @@ type Options struct {
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
type RegisterOptions struct {
|
||||
TTL time.Duration
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
func Timeout(t time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.Timeout = t
|
||||
@ -36,3 +43,9 @@ func TLSConfig(t *tls.Config) Option {
|
||||
o.TLSConfig = t
|
||||
}
|
||||
}
|
||||
|
||||
func WithTTL(t time.Duration) RegisterOption {
|
||||
return func(o *RegisterOptions) {
|
||||
o.TTL = t
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package registry
|
||||
|
||||
type Registry interface {
|
||||
Register(*Service) error
|
||||
Register(*Service, ...RegisterOption) error
|
||||
Deregister(*Service) error
|
||||
GetService(string) ([]*Service, error)
|
||||
ListServices() ([]*Service, error)
|
||||
@ -11,6 +11,8 @@ type Registry interface {
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
type RegisterOption func(*RegisterOptions)
|
||||
|
||||
var (
|
||||
DefaultRegistry = newConsulRegistry([]string{})
|
||||
)
|
||||
@ -19,8 +21,8 @@ func NewRegistry(addrs []string, opt ...Option) Registry {
|
||||
return newConsulRegistry(addrs, opt...)
|
||||
}
|
||||
|
||||
func Register(s *Service) error {
|
||||
return DefaultRegistry.Register(s)
|
||||
func Register(s *Service, opts ...RegisterOption) error {
|
||||
return DefaultRegistry.Register(s, opts...)
|
||||
}
|
||||
|
||||
func Deregister(s *Service) error {
|
||||
|
@ -1,6 +1,8 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/codec"
|
||||
"github.com/micro/go-micro/registry"
|
||||
@ -32,6 +34,10 @@ type Options struct {
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
type RegisterOptions struct {
|
||||
TTL time.Duration
|
||||
}
|
||||
|
||||
func newOptions(opt ...Option) Options {
|
||||
opts := Options{
|
||||
Codecs: make(map[string]codec.NewCodec),
|
||||
@ -167,3 +173,10 @@ func WrapSubscriber(w SubscriberWrapper) Option {
|
||||
o.SubWrappers = append(o.SubWrappers, w)
|
||||
}
|
||||
}
|
||||
|
||||
// Register the service with a TTL
|
||||
func RegisterTTL(t time.Duration) RegisterOption {
|
||||
return func(o *RegisterOptions) {
|
||||
o.TTL = t
|
||||
}
|
||||
}
|
||||
|
@ -155,7 +155,15 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *rpcServer) Register() error {
|
||||
func (s *rpcServer) Register(opts ...RegisterOption) error {
|
||||
var options RegisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// create registry options
|
||||
rOpts := []registry.RegisterOption{registry.WithTTL(options.TTL)}
|
||||
|
||||
// parse address for host, port
|
||||
config := s.Options()
|
||||
var advt, host string
|
||||
@ -220,7 +228,7 @@ func (s *rpcServer) Register() error {
|
||||
}
|
||||
|
||||
log.Infof("Registering node: %s", node.Id)
|
||||
if err := config.Registry.Register(service); err != nil {
|
||||
if err := config.Registry.Register(service, rOpts...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -45,7 +45,7 @@ type Server interface {
|
||||
NewHandler(interface{}, ...HandlerOption) Handler
|
||||
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
|
||||
Subscribe(Subscriber) error
|
||||
Register() error
|
||||
Register(...RegisterOption) error
|
||||
Deregister() error
|
||||
Start() error
|
||||
Stop() error
|
||||
@ -86,6 +86,8 @@ type HandlerOption func(*HandlerOptions)
|
||||
|
||||
type SubscriberOption func(*SubscriberOptions)
|
||||
|
||||
type RegisterOption func(*RegisterOptions)
|
||||
|
||||
var (
|
||||
DefaultAddress = ":0"
|
||||
DefaultName = "go-server"
|
||||
|
32
service.go
32
service.go
@ -1,9 +1,11 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/cmd"
|
||||
@ -30,6 +32,27 @@ func newService(opts ...Option) Service {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) run(exit chan bool) {
|
||||
if s.opts.RegisterInterval <= time.Duration(0) {
|
||||
return
|
||||
}
|
||||
|
||||
t := time.NewTicker(s.opts.RegisterInterval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
fmt.Println("heartbeat")
|
||||
if err := s.opts.Server.Register(server.RegisterTTL(s.opts.RegisterTTL)); err != nil {
|
||||
fmt.Println("FUCK", err)
|
||||
}
|
||||
case <-exit:
|
||||
t.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) Init(opts ...Option) {
|
||||
// We might get more command flags or the action here
|
||||
// This is pretty ugly, find a better way
|
||||
@ -82,7 +105,7 @@ func (s *service) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.opts.Server.Register(); err != nil {
|
||||
if err := s.opts.Server.Register(server.RegisterTTL(s.opts.RegisterTTL)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -115,10 +138,17 @@ func (s *service) Run() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// start reg loop
|
||||
ex := make(chan bool)
|
||||
go s.run(ex)
|
||||
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
|
||||
<-ch
|
||||
|
||||
// exit reg loop
|
||||
close(ex)
|
||||
|
||||
if err := s.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user