Compare commits
17 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
9d3cb65daa | ||
|
a0d3917832 | ||
|
9968c7d007 | ||
|
68f5e71153 | ||
|
af328ee7b4 | ||
|
88505388c1 | ||
|
8a778644cf | ||
|
d3a76e646a | ||
|
cfa824bc5f | ||
|
39be61685c | ||
|
ac2106ced7 | ||
|
1b4f7d8a68 | ||
|
5eb2e79b86 | ||
|
a2eff9918e | ||
|
52a470532d | ||
|
cd9441fafb | ||
|
356cf82af5 |
@@ -276,12 +276,15 @@ func (h *httpBroker) Address() string {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Connect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if h.running {
|
||||
h.RUnlock()
|
||||
return nil
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
var l net.Listener
|
||||
var err error
|
||||
@@ -351,12 +354,16 @@ func (h *httpBroker) Connect() error {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Disconnect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if !h.running {
|
||||
h.RUnlock()
|
||||
return nil
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
// stop rcache
|
||||
rc, ok := h.r.(rcache.Cache)
|
||||
@@ -375,17 +382,24 @@ func (h *httpBroker) Disconnect() error {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Init(opts ...Option) error {
|
||||
h.RLock()
|
||||
if h.running {
|
||||
h.RUnlock()
|
||||
return errors.New("cannot init while connected")
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if h.running {
|
||||
return errors.New("cannot init while connected")
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
}
|
||||
|
||||
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
|
||||
h.address = h.opts.Addrs[0]
|
||||
}
|
||||
|
||||
if len(h.id) == 0 {
|
||||
h.id = "broker-" + uuid.NewUUID().String()
|
||||
}
|
||||
|
@@ -68,7 +68,7 @@ var (
|
||||
// DefaultBackoff is the default backoff function for retries
|
||||
DefaultBackoff = exponentialBackoff
|
||||
// DefaultRetry is the default check-for-retry function for retries
|
||||
DefaultRetry = alwaysRetry
|
||||
DefaultRetry = RetryOnError
|
||||
// DefaultRetries is the default number of times a request is tried
|
||||
DefaultRetries = 1
|
||||
// DefaultRequestTimeout is the default request timeout
|
||||
|
@@ -80,8 +80,12 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
|
||||
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
|
||||
v = reflect.Indirect(v)
|
||||
}
|
||||
response := r.Response
|
||||
if t := reflect.TypeOf(r.Response); t.Kind() == reflect.Func {
|
||||
response = reflect.ValueOf(r.Response).Call([]reflect.Value{})[0].Interface()
|
||||
}
|
||||
|
||||
v.Set(reflect.ValueOf(r.Response))
|
||||
v.Set(reflect.ValueOf(response))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@@ -16,6 +16,8 @@ func TestClient(t *testing.T) {
|
||||
{Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
|
||||
{Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
|
||||
{Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
|
||||
{Method: "Foo.Func", Response: func() string { return "string" }},
|
||||
{Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
|
||||
}
|
||||
|
||||
c := NewClient(Response("go.mock", response))
|
||||
|
@@ -2,12 +2,34 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/errors"
|
||||
)
|
||||
|
||||
// note that returning either false or a non-nil error will result in the call not being retried
|
||||
type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)
|
||||
|
||||
// always retry on error
|
||||
func alwaysRetry(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
|
||||
// RetryAlways always retry on error
|
||||
func RetryAlways(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// RetryOnError retries a request on a 500 or timeout error
|
||||
func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
e := errors.Parse(err.Error())
|
||||
if e == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
switch e.Code {
|
||||
// retry on timeout or internal server error
|
||||
case 408, 500:
|
||||
return true, nil
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
@@ -159,7 +159,15 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
|
||||
c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout))
|
||||
dOpts := []transport.DialOption{
|
||||
transport.WithStream(),
|
||||
}
|
||||
|
||||
if opts.DialTimeout >= 0 {
|
||||
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
|
||||
}
|
||||
|
||||
c, err := r.opts.Transport.Dial(address, dOpts...)
|
||||
if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
|
||||
}
|
||||
@@ -230,9 +238,9 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
|
||||
// get next nodes from the selector
|
||||
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
|
||||
if err != nil && err == selector.ErrNotFound {
|
||||
return nil, errors.NotFound("go.micro.client", err.Error())
|
||||
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
return next, nil
|
||||
@@ -282,7 +290,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
// call backoff first. Someone may want an initial start delay
|
||||
t, err := callOpts.Backoff(ctx, request, i)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
|
||||
}
|
||||
|
||||
// only sleep if greater than 0
|
||||
@@ -293,9 +301,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
// select next node
|
||||
node, err := next()
|
||||
if err != nil && err == selector.ErrNotFound {
|
||||
return errors.NotFound("go.micro.client", err.Error())
|
||||
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
// set the address
|
||||
@@ -355,18 +363,6 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check if we already have a deadline
|
||||
d, ok := ctx.Deadline()
|
||||
if !ok {
|
||||
// no deadline so we create a new one
|
||||
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
|
||||
} else {
|
||||
// got a deadline so no need to setup context
|
||||
// but we need to set the timeout we pass along
|
||||
opt := WithRequestTimeout(d.Sub(time.Now()))
|
||||
opt(&callOpts)
|
||||
}
|
||||
|
||||
// should we noop right here?
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -378,7 +374,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
// call backoff first. Someone may want an initial start delay
|
||||
t, err := callOpts.Backoff(ctx, request, i)
|
||||
if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
|
||||
}
|
||||
|
||||
// only sleep if greater than 0
|
||||
@@ -388,9 +384,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
|
||||
node, err := next()
|
||||
if err != nil && err == selector.ErrNotFound {
|
||||
return nil, errors.NotFound("go.micro.client", err.Error())
|
||||
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
address := node.Address
|
||||
|
@@ -2,10 +2,10 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/errors"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/registry/mock"
|
||||
"github.com/micro/go-micro/selector"
|
||||
@@ -69,7 +69,7 @@ func TestCallRetry(t *testing.T) {
|
||||
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
||||
called++
|
||||
if called == 1 {
|
||||
return errors.New("retry request")
|
||||
return errors.InternalServerError("test.error", "retry request")
|
||||
}
|
||||
|
||||
// don't do the call
|
||||
|
69
cmd/cmd.go
69
cmd/cmd.go
@@ -262,48 +262,40 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
|
||||
// Set the client
|
||||
if name := ctx.String("client"); len(name) > 0 {
|
||||
if cl, ok := c.opts.Clients[name]; ok {
|
||||
// only change if we have the client and type differs
|
||||
if cl, ok := c.opts.Clients[name]; ok && (*c.opts.Client).String() != name {
|
||||
*c.opts.Client = cl()
|
||||
}
|
||||
}
|
||||
|
||||
// Set the server
|
||||
if name := ctx.String("server"); len(name) > 0 {
|
||||
if s, ok := c.opts.Servers[name]; ok {
|
||||
// only change if we have the server and type differs
|
||||
if s, ok := c.opts.Servers[name]; ok && (*c.opts.Server).String() != name {
|
||||
*c.opts.Server = s()
|
||||
}
|
||||
}
|
||||
|
||||
// Set the broker
|
||||
if name := ctx.String("broker"); len(name) > 0 || len(ctx.String("broker_address")) > 0 {
|
||||
if len(name) == 0 {
|
||||
name = defaultBroker
|
||||
}
|
||||
|
||||
if b, ok := c.opts.Brokers[name]; ok {
|
||||
n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
|
||||
*c.opts.Broker = n
|
||||
} else {
|
||||
if name := ctx.String("broker"); len(name) > 0 && (*c.opts.Broker).String() != name {
|
||||
b, ok := c.opts.Brokers[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("Broker %s not found", name)
|
||||
}
|
||||
|
||||
*c.opts.Broker = b()
|
||||
serverOpts = append(serverOpts, server.Broker(*c.opts.Broker))
|
||||
clientOpts = append(clientOpts, client.Broker(*c.opts.Broker))
|
||||
}
|
||||
|
||||
// Set the registry
|
||||
if name := ctx.String("registry"); len(name) > 0 || len(ctx.String("registry_address")) > 0 {
|
||||
if len(name) == 0 {
|
||||
name = defaultRegistry
|
||||
}
|
||||
|
||||
if r, ok := c.opts.Registries[name]; ok {
|
||||
n := r(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
|
||||
*c.opts.Registry = n
|
||||
} else {
|
||||
if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
|
||||
r, ok := c.opts.Registries[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("Registry %s not found", name)
|
||||
}
|
||||
|
||||
*c.opts.Registry = r()
|
||||
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
|
||||
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
|
||||
|
||||
@@ -314,31 +306,26 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
}
|
||||
|
||||
// Set the selector
|
||||
if name := ctx.String("selector"); len(name) > 0 {
|
||||
if s, ok := c.opts.Selectors[name]; ok {
|
||||
n := s(selector.Registry(*c.opts.Registry))
|
||||
*c.opts.Selector = n
|
||||
} else {
|
||||
if name := ctx.String("selector"); len(name) > 0 && (*c.opts.Selector).String() != name {
|
||||
s, ok := c.opts.Selectors[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("Selector %s not found", name)
|
||||
}
|
||||
|
||||
*c.opts.Selector = s(selector.Registry(*c.opts.Registry))
|
||||
|
||||
// No server option here. Should there be?
|
||||
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
|
||||
}
|
||||
|
||||
// Set the transport
|
||||
if name := ctx.String("transport"); len(name) > 0 || len(ctx.String("transport_address")) > 0 {
|
||||
if len(name) == 0 {
|
||||
name = defaultTransport
|
||||
}
|
||||
|
||||
if t, ok := c.opts.Transports[name]; ok {
|
||||
n := t(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
|
||||
*c.opts.Transport = n
|
||||
} else {
|
||||
if name := ctx.String("transport"); len(name) > 0 && (*c.opts.Transport).String() != name {
|
||||
t, ok := c.opts.Transports[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("Transport %s not found", name)
|
||||
}
|
||||
|
||||
*c.opts.Transport = t()
|
||||
serverOpts = append(serverOpts, server.Transport(*c.opts.Transport))
|
||||
clientOpts = append(clientOpts, client.Transport(*c.opts.Transport))
|
||||
}
|
||||
@@ -359,6 +346,18 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
serverOpts = append(serverOpts, server.Metadata(metadata))
|
||||
}
|
||||
|
||||
if len(ctx.String("broker_address")) > 0 {
|
||||
(*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
|
||||
}
|
||||
|
||||
if len(ctx.String("registry_address")) > 0 {
|
||||
(*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
|
||||
}
|
||||
|
||||
if len(ctx.String("transport_address")) > 0 {
|
||||
(*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
|
||||
}
|
||||
|
||||
if len(ctx.String("server_name")) > 0 {
|
||||
serverOpts = append(serverOpts, server.Name(ctx.String("server_name")))
|
||||
}
|
||||
@@ -384,7 +383,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
}
|
||||
|
||||
// client opts
|
||||
if r := ctx.Int("client_retries"); r > 0 {
|
||||
if r := ctx.Int("client_retries"); r >= 0 {
|
||||
clientOpts = append(clientOpts, client.Retries(r))
|
||||
}
|
||||
|
||||
|
@@ -8,6 +8,16 @@ import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
// Connect specifies services should be registered as Consul Connect services
|
||||
func Connect() registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "consul_connect", true)
|
||||
}
|
||||
}
|
||||
|
||||
func Config(c *consul.Config) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
|
@@ -19,10 +19,26 @@ type consulRegistry struct {
|
||||
Client *consul.Client
|
||||
opts Options
|
||||
|
||||
// connect enabled
|
||||
connect bool
|
||||
|
||||
sync.Mutex
|
||||
register map[string]uint64
|
||||
}
|
||||
|
||||
func getDeregisterTTL(t time.Duration) time.Duration {
|
||||
// splay slightly for the watcher?
|
||||
splay := time.Second * 5
|
||||
deregTTL := t + splay
|
||||
|
||||
// consul has a minimum timeout on deregistration of 1 minute.
|
||||
if t < time.Minute {
|
||||
deregTTL = time.Minute + splay
|
||||
}
|
||||
|
||||
return deregTTL
|
||||
}
|
||||
|
||||
func newTransport(config *tls.Config) *http.Transport {
|
||||
if config == nil {
|
||||
config = &tls.Config{
|
||||
@@ -45,27 +61,31 @@ func newTransport(config *tls.Config) *http.Transport {
|
||||
return t
|
||||
}
|
||||
|
||||
func newConsulRegistry(opts ...Option) Registry {
|
||||
var options Options
|
||||
func configure(c *consulRegistry, opts ...Option) {
|
||||
// set opts
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
o(&c.opts)
|
||||
}
|
||||
|
||||
// use default config
|
||||
config := consul.DefaultConfig()
|
||||
if options.Context != nil {
|
||||
|
||||
if c.opts.Context != nil {
|
||||
// Use the consul config passed in the options, if available
|
||||
if c, ok := options.Context.Value("consul_config").(*consul.Config); ok {
|
||||
config = c
|
||||
if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok {
|
||||
config = co
|
||||
}
|
||||
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
|
||||
c.connect = cn
|
||||
}
|
||||
}
|
||||
|
||||
// check if there are any addrs
|
||||
if len(options.Addrs) > 0 {
|
||||
addr, port, err := net.SplitHostPort(options.Addrs[0])
|
||||
if len(c.opts.Addrs) > 0 {
|
||||
addr, port, err := net.SplitHostPort(c.opts.Addrs[0])
|
||||
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
||||
port = "8500"
|
||||
addr = options.Addrs[0]
|
||||
addr = c.opts.Addrs[0]
|
||||
config.Address = fmt.Sprintf("%s:%s", addr, port)
|
||||
} else if err == nil {
|
||||
config.Address = fmt.Sprintf("%s:%s", addr, port)
|
||||
@@ -73,34 +93,43 @@ func newConsulRegistry(opts ...Option) Registry {
|
||||
}
|
||||
|
||||
// requires secure connection?
|
||||
if options.Secure || options.TLSConfig != nil {
|
||||
if c.opts.Secure || c.opts.TLSConfig != nil {
|
||||
if config.HttpClient == nil {
|
||||
config.HttpClient = new(http.Client)
|
||||
}
|
||||
|
||||
config.Scheme = "https"
|
||||
// We're going to support InsecureSkipVerify
|
||||
config.HttpClient.Transport = newTransport(options.TLSConfig)
|
||||
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)
|
||||
}
|
||||
|
||||
// set timeout
|
||||
if c.opts.Timeout > 0 {
|
||||
config.HttpClient.Timeout = c.opts.Timeout
|
||||
}
|
||||
|
||||
// create the client
|
||||
client, _ := consul.NewClient(config)
|
||||
|
||||
// set timeout
|
||||
if options.Timeout > 0 {
|
||||
config.HttpClient.Timeout = options.Timeout
|
||||
}
|
||||
// set address/client
|
||||
c.Address = config.Address
|
||||
c.Client = client
|
||||
}
|
||||
|
||||
func newConsulRegistry(opts ...Option) Registry {
|
||||
cr := &consulRegistry{
|
||||
Address: config.Address,
|
||||
Client: client,
|
||||
opts: options,
|
||||
opts: Options{},
|
||||
register: make(map[string]uint64),
|
||||
}
|
||||
|
||||
configure(cr, opts...)
|
||||
return cr
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Init(opts ...Option) error {
|
||||
configure(c, opts...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Deregister(s *Service) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
@@ -115,19 +144,6 @@ func (c *consulRegistry) Deregister(s *Service) error {
|
||||
return c.Client.Agent().ServiceDeregister(node.Id)
|
||||
}
|
||||
|
||||
func getDeregisterTTL(t time.Duration) time.Duration {
|
||||
// splay slightly for the watcher?
|
||||
splay := time.Second * 5
|
||||
deregTTL := t + splay
|
||||
|
||||
// consul has a minimum timeout on deregistration of 1 minute.
|
||||
if t < time.Minute {
|
||||
deregTTL = time.Minute + splay
|
||||
}
|
||||
|
||||
return deregTTL
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
@@ -198,14 +214,23 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
}
|
||||
|
||||
// register the service
|
||||
if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
|
||||
asr := &consul.AgentServiceRegistration{
|
||||
ID: node.Id,
|
||||
Name: s.Name,
|
||||
Tags: tags,
|
||||
Port: node.Port,
|
||||
Address: node.Address,
|
||||
Check: check,
|
||||
}); err != nil {
|
||||
}
|
||||
|
||||
// Specify consul connect
|
||||
if c.connect {
|
||||
asr.Connect = &consul.AgentServiceConnect{
|
||||
Native: true,
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.Client.Agent().ServiceRegister(asr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -224,7 +249,15 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
}
|
||||
|
||||
func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
rsp, _, err := c.Client.Health().Service(name, "", false, nil)
|
||||
var rsp []*consul.ServiceEntry
|
||||
var err error
|
||||
|
||||
// if we're connect enabled only get connect services
|
||||
if c.connect {
|
||||
rsp, _, err = c.Client.Health().Connect(name, "", false, nil)
|
||||
} else {
|
||||
rsp, _, err = c.Client.Health().Service(name, "", false, nil)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -49,6 +49,17 @@ func newRegistry(opts ...registry.Option) registry.Registry {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Init(opts ...registry.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Options() registry.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
@@ -322,10 +333,6 @@ func (m *mdnsRegistry) String() string {
|
||||
return "mdns"
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Options() registry.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||
return newRegistry(opts...)
|
||||
}
|
||||
|
@@ -99,11 +99,15 @@ func (m *mockRegistry) String() string {
|
||||
return "mock"
|
||||
}
|
||||
|
||||
func (m *mockRegistry) Init(opts ...registry.Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockRegistry) Options() registry.Options {
|
||||
return registry.Options{}
|
||||
}
|
||||
|
||||
func NewRegistry() registry.Registry {
|
||||
func NewRegistry(opts ...registry.Options) registry.Registry {
|
||||
m := &mockRegistry{Services: make(map[string][]*registry.Service)}
|
||||
m.init()
|
||||
return m
|
||||
|
@@ -9,13 +9,14 @@ import (
|
||||
// and an abstraction over varying implementations
|
||||
// {consul, etcd, zookeeper, ...}
|
||||
type Registry interface {
|
||||
Init(...Option) error
|
||||
Options() Options
|
||||
Register(*Service, ...RegisterOption) error
|
||||
Deregister(*Service) error
|
||||
GetService(string) ([]*Service, error)
|
||||
ListServices() ([]*Service, error)
|
||||
Watch(...WatchOption) (Watcher, error)
|
||||
String() string
|
||||
Options() Options
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
@@ -423,6 +423,17 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *httpTransport) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpTransport) Options() Options {
|
||||
return h.opts
|
||||
}
|
||||
|
||||
func (h *httpTransport) String() string {
|
||||
return "http"
|
||||
}
|
||||
|
@@ -171,6 +171,17 @@ func (m *mockTransport) Listen(addr string, opts ...transport.ListenOption) (tra
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
func (m *mockTransport) Init(opts ...transport.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockTransport) Options() transport.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *mockTransport) String() string {
|
||||
return "mock"
|
||||
}
|
||||
|
@@ -30,6 +30,8 @@ type Listener interface {
|
||||
// services. It uses socket send/recv semantics and had various
|
||||
// implementations {HTTP, RabbitMQ, NATS, ...}
|
||||
type Transport interface {
|
||||
Init(...Option) error
|
||||
Options() Options
|
||||
Dial(addr string, opts ...DialOption) (Client, error)
|
||||
Listen(addr string, opts ...ListenOption) (Listener, error)
|
||||
String() string
|
||||
|
Reference in New Issue
Block a user