Compare commits
39 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
5cae330732 | ||
|
ff982b5fd1 | ||
|
28324412a4 | ||
|
5f2ce6fac4 | ||
|
8b54a850f7 | ||
|
fae8c5eb4c | ||
|
3bc6556d36 | ||
5bcdf189de | |||
|
f2efc685d3 | ||
|
67d10e5f39 | ||
|
770c16a66d | ||
|
c2cc03a472 | ||
|
b02e1e04fc | ||
|
cf8c059711 | ||
|
b343420af6 | ||
|
1ed2b589a2 | ||
|
72d8dc89fb | ||
|
8706aa4a46 | ||
|
57dcba666e | ||
|
489573afb9 | ||
|
a9593bad66 | ||
|
d0d8db7c45 | ||
|
a07150b6dd | ||
|
519f091fe8 | ||
|
9c2689301c | ||
|
a1665ab37a | ||
|
1be0e8776f | ||
|
92082ac927 | ||
|
c622f3a8d6 | ||
|
f1817c9c6b | ||
|
16e97cce9b | ||
|
bc404c9a82 | ||
|
29bb63b717 | ||
|
0d917bbf37 | ||
|
45c05c4e2b | ||
|
77b1a25faf | ||
|
34ed5235a3 | ||
|
5996a91dde | ||
|
7171c00e42 |
39
README.md
39
README.md
@@ -1,8 +1,15 @@
|
||||
# Go Micro [](https://opensource.org/licenses/Apache-2.0) [](https://godoc.org/github.com/micro/go-micro) [](https://travis-ci.org/micro/go-micro) [](https://goreportcard.com/report/github.com/micro/go-micro)
|
||||
|
||||
Go Micro is a pluggable framework for distributed systems development.
|
||||
Go Micro is a pluggable framework for micro service development.
|
||||
|
||||
## Overview
|
||||
|
||||
Go Micro provides the core requirements for distributed systems development including RPC and Event driven communication.
|
||||
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly
|
||||
but everything can be easily swapped out.
|
||||
|
||||
<img src="https://micro.mu/docs/images/go-micro.png" />
|
||||
|
||||
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be swapped out.
|
||||
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).
|
||||
|
||||
Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://slack.micro.mu/) community.
|
||||
@@ -11,11 +18,29 @@ Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://s
|
||||
|
||||
Go Micro abstracts away the details of distributed systems. Here are the main features.
|
||||
|
||||
- **Service Discovery** - Automatic service registration and name resolution
|
||||
- **Load Balancing** - Client side load balancing built on discovery
|
||||
- **Message Encoding** - Dynamic encoding based on content-type with protobuf and json support
|
||||
- **Sync Streaming** - RPC based request/response with support for bidirectional streaming
|
||||
- **Async Messaging** - Native pubsub messaging built in for event driven architectures
|
||||
- **Service Discovery** - Automatic service registration and name resolution. Service discovery is at the core of micro service
|
||||
development. When service A needs to speak to service B it needs the location of that service. Consul is the default discovery
|
||||
system with multicast DNS (mdns) as a local option or the SWIM protocol (gossip) for zero dependency p2p networks.
|
||||
|
||||
- **Load Balancing** - Client side load balancing built on service discovery. Once we have the addresses of any number of instances
|
||||
of a service we now need a way to decide which node to route to. We use random hashed load balancing to provide even distribution
|
||||
across the services and retry a different node if there's a problem.
|
||||
|
||||
- **Message Encoding** - Dynamic message encoding based on content-type. The client and server will use codecs along with content-type
|
||||
to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client
|
||||
and server handle this by default. This includes protobuf and json by default.
|
||||
|
||||
- **Sync Streaming** - RPC based request/response with support for bidirectional streaming. We provide an abstraction for synchronous
|
||||
communication. A request made to a service will be automatically resolved, load balanced, dialled and streamed. The default
|
||||
transport is http/1.1 or http2 when tls is enabled.
|
||||
|
||||
- **Async Messaging** - PubSub is built in as a first class citizen for asynchronous communication and event driven architectures.
|
||||
Event notifications are a core pattern in micro service development. The default messaging is point-to-point http/1.1 or http2 when tls
|
||||
is enabled.
|
||||
|
||||
- **Pluggable Interfaces** - Go Micro makes use of Go interfaces for each distributed system abstraction. Because of this these interfaces
|
||||
are pluggable and allows Go Micro to be runtime agnostic. You can plugin any underlying technology. Find plugins in
|
||||
[github.com/micro/go-plugins](https://github.com/micro/go-plugins).
|
||||
|
||||
## Getting Started
|
||||
|
||||
|
@@ -27,6 +27,7 @@ import (
|
||||
maddr "github.com/micro/util/go/lib/addr"
|
||||
mnet "github.com/micro/util/go/lib/net"
|
||||
mls "github.com/micro/util/go/lib/tls"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
// HTTP Broker is a point to point async broker
|
||||
@@ -78,6 +79,10 @@ func newTransport(config *tls.Config) *http.Transport {
|
||||
}
|
||||
}
|
||||
|
||||
dialTLS := func(network string, addr string) (net.Conn, error) {
|
||||
return tls.Dial(network, addr, config)
|
||||
}
|
||||
|
||||
t := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
@@ -85,11 +90,15 @@ func newTransport(config *tls.Config) *http.Transport {
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
TLSClientConfig: config,
|
||||
DialTLS: dialTLS,
|
||||
}
|
||||
runtime.SetFinalizer(&t, func(tr **http.Transport) {
|
||||
(*tr).CloseIdleConnections()
|
||||
})
|
||||
|
||||
// setup http2
|
||||
http2.ConfigureTransport(t)
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
@@ -430,6 +439,13 @@ func (h *httpBroker) Init(opts ...Option) error {
|
||||
// set registry
|
||||
h.r = rcache.New(reg)
|
||||
|
||||
// reconfigure tls config
|
||||
if c := h.opts.TLSConfig; c != nil {
|
||||
h.c = &http.Client{
|
||||
Transport: newTransport(c),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -507,7 +523,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
}
|
||||
|
||||
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
options := newSubscribeOptions(opts...)
|
||||
options := NewSubscribeOptions(opts...)
|
||||
|
||||
// parse address for host, port
|
||||
parts := strings.Split(h.Address(), ":")
|
||||
|
@@ -1,3 +1,4 @@
|
||||
// Package mock provides a mock broker for testing
|
||||
package mock
|
||||
|
||||
import (
|
||||
|
@@ -50,7 +50,7 @@ var (
|
||||
registryKey = contextKeyT("github.com/micro/go-micro/registry")
|
||||
)
|
||||
|
||||
func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||
opt := SubscribeOptions{
|
||||
AutoAck: true,
|
||||
}
|
||||
|
@@ -1,3 +1,4 @@
|
||||
// Package mock provides a mock client for testing
|
||||
package mock
|
||||
|
||||
import (
|
||||
|
@@ -132,7 +132,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
grr = ctx.Err()
|
||||
return errors.New("go.micro.client", fmt.Sprintf("request timeout: %v", ctx.Err()), 408)
|
||||
return errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,7 +192,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
|
||||
case err := <-ch:
|
||||
grr = err
|
||||
case <-ctx.Done():
|
||||
grr = errors.New("go.micro.client", fmt.Sprintf("request timeout: %v", ctx.Err()), 408)
|
||||
grr = errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
|
||||
}
|
||||
|
||||
if grr != nil {
|
||||
@@ -274,7 +274,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
// should we noop right here?
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
||||
return errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
|
||||
default:
|
||||
}
|
||||
|
||||
@@ -329,7 +329,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408)
|
||||
return errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
|
||||
case err := <-ch:
|
||||
// if the call succeeded lets bail early
|
||||
if err == nil {
|
||||
@@ -367,7 +367,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
// should we noop right here?
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
||||
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
|
||||
default:
|
||||
}
|
||||
|
||||
@@ -416,7 +416,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408)
|
||||
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
|
||||
case rsp := <-ch:
|
||||
// if the call succeeded lets bail early
|
||||
if rsp.err == nil {
|
||||
|
@@ -105,12 +105,16 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.Ne
|
||||
|
||||
func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error {
|
||||
c.buf.wbuf.Reset()
|
||||
|
||||
m := &codec.Message{
|
||||
Id: req.Seq,
|
||||
Target: req.Service,
|
||||
Method: req.ServiceMethod,
|
||||
Type: codec.Request,
|
||||
Header: map[string]string{},
|
||||
Header: map[string]string{
|
||||
"X-Micro-Target": req.Service,
|
||||
"X-Micro-Method": req.ServiceMethod,
|
||||
},
|
||||
}
|
||||
if err := c.codec.Write(m, body); err != nil {
|
||||
return errors.InternalServerError("go.micro.client.codec", err.Error())
|
||||
|
36
cmd/cmd.go
36
cmd/cmd.go
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/micro/cli"
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/server"
|
||||
|
||||
@@ -20,11 +21,11 @@ import (
|
||||
// registries
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/registry/consul"
|
||||
"github.com/micro/go-micro/registry/gossip"
|
||||
"github.com/micro/go-micro/registry/mdns"
|
||||
|
||||
// selectors
|
||||
"github.com/micro/go-micro/selector"
|
||||
"github.com/micro/go-micro/selector/cache"
|
||||
|
||||
// transports
|
||||
"github.com/micro/go-micro/transport"
|
||||
@@ -147,7 +148,6 @@ var (
|
||||
Name: "selector",
|
||||
EnvVar: "MICRO_SELECTOR",
|
||||
Usage: "Selector used to pick nodes for querying",
|
||||
Value: "cache",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "transport",
|
||||
@@ -171,12 +171,13 @@ var (
|
||||
|
||||
DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
|
||||
"consul": consul.NewRegistry,
|
||||
"gossip": gossip.NewRegistry,
|
||||
"mdns": mdns.NewRegistry,
|
||||
}
|
||||
|
||||
DefaultSelectors = map[string]func(...selector.Option) selector.Selector{
|
||||
"default": selector.NewSelector,
|
||||
"cache": cache.NewSelector,
|
||||
"cache": selector.NewSelector,
|
||||
}
|
||||
|
||||
DefaultServers = map[string]func(...server.Option) server.Server{
|
||||
@@ -299,10 +300,15 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
|
||||
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
|
||||
|
||||
(*c.opts.Selector).Init(selector.Registry(*c.opts.Registry))
|
||||
if err := (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)); err != nil {
|
||||
log.Fatalf("Error configuring registry: %v", err)
|
||||
}
|
||||
|
||||
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
|
||||
|
||||
(*c.opts.Broker).Init(broker.Registry(*c.opts.Registry))
|
||||
if err := (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)); err != nil {
|
||||
log.Fatalf("Error configuring broker: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Set the selector
|
||||
@@ -347,15 +353,21 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
}
|
||||
|
||||
if len(ctx.String("broker_address")) > 0 {
|
||||
(*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
|
||||
if err := (*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...)); err != nil {
|
||||
log.Fatalf("Error configuring broker: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(ctx.String("registry_address")) > 0 {
|
||||
(*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
|
||||
if err := (*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...)); err != nil {
|
||||
log.Fatalf("Error configuring registry: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(ctx.String("transport_address")) > 0 {
|
||||
(*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
|
||||
if err := (*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...)); err != nil {
|
||||
log.Fatalf("Error configuring transport: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(ctx.String("server_name")) > 0 {
|
||||
@@ -410,12 +422,16 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
// We have some command line opts for the server.
|
||||
// Lets set it up
|
||||
if len(serverOpts) > 0 {
|
||||
(*c.opts.Server).Init(serverOpts...)
|
||||
if err := (*c.opts.Server).Init(serverOpts...); err != nil {
|
||||
log.Fatalf("Error configuring server: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Use an init option?
|
||||
if len(clientOpts) > 0 {
|
||||
(*c.opts.Client).Init(clientOpts...)
|
||||
if err := (*c.opts.Client).Init(clientOpts...); err != nil {
|
||||
log.Fatalf("Error configuring client: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@@ -92,13 +92,13 @@ func MethodNotAllowed(id, format string, a ...interface{}) error {
|
||||
}
|
||||
}
|
||||
|
||||
// InternalServerError generates a 500 error.
|
||||
func InternalServerError(id, format string, a ...interface{}) error {
|
||||
// Timeout generates a 408 error.
|
||||
func Timeout(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 500,
|
||||
Code: 408,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
Status: http.StatusText(500),
|
||||
Status: http.StatusText(408),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,3 +111,13 @@ func Conflict(id, format string, a ...interface{}) error {
|
||||
Status: http.StatusText(409),
|
||||
}
|
||||
}
|
||||
|
||||
// InternalServerError generates a 500 error.
|
||||
func InternalServerError(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 500,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
Status: http.StatusText(500),
|
||||
}
|
||||
}
|
||||
|
@@ -174,6 +174,7 @@ func RegisterInterval(t time.Duration) Option {
|
||||
|
||||
// WrapClient is a convenience method for wrapping a Client with
|
||||
// some middleware component. A list of wrappers can be provided.
|
||||
// Wrappers are applied in reverse order so the last is executed first.
|
||||
func WrapClient(w ...client.Wrapper) Option {
|
||||
return func(o *Options) {
|
||||
// apply in reverse
|
||||
|
@@ -1,9 +1,11 @@
|
||||
// Package consul provides a consul based registry and is the default discovery system
|
||||
package consul
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
// NewRegistry returns a new consul registry
|
||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||
return registry.NewRegistry(opts...)
|
||||
}
|
||||
|
@@ -27,6 +27,40 @@ func Config(c *consul.Config) registry.Option {
|
||||
}
|
||||
}
|
||||
|
||||
// AllowStale sets whether any Consul server (non-leader) can service
|
||||
// a read. This allows for lower latency and higher throughput
|
||||
// at the cost of potentially stale data.
|
||||
// Works similar to Consul DNS Config option [1].
|
||||
// Defaults to true.
|
||||
//
|
||||
// [1] https://www.consul.io/docs/agent/options.html#allow_stale
|
||||
//
|
||||
func AllowStale(v bool) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "consul_allow_stale", v)
|
||||
}
|
||||
}
|
||||
|
||||
// QueryOptions specifies the QueryOptions to be used when calling
|
||||
// Consul. See `Consul API` for more information [1].
|
||||
//
|
||||
// [1] https://godoc.org/github.com/hashicorp/consul/api#QueryOptions
|
||||
//
|
||||
func QueryOptions(q *consul.QueryOptions) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if q == nil {
|
||||
return
|
||||
}
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "consul_query_options", q)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// TCPCheck will tell the service provider to check the service address
|
||||
// and port every `t` interval. It will enabled only if `t` is greater than 0.
|
||||
|
@@ -22,6 +22,8 @@ type consulRegistry struct {
|
||||
// connect enabled
|
||||
connect bool
|
||||
|
||||
queryOptions *consul.QueryOptions
|
||||
|
||||
sync.Mutex
|
||||
register map[string]uint64
|
||||
// lastChecked tracks when a node was last checked as existing in Consul
|
||||
@@ -80,6 +82,14 @@ func configure(c *consulRegistry, opts ...Option) {
|
||||
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
|
||||
c.connect = cn
|
||||
}
|
||||
|
||||
// Use the consul query options passed in the options, if available
|
||||
if qo, ok := c.opts.Context.Value("consul_query_options").(*consul.QueryOptions); ok && qo != nil {
|
||||
c.queryOptions = qo
|
||||
}
|
||||
if as, ok := c.opts.Context.Value("consul_allow_stale").(bool); ok {
|
||||
c.queryOptions.AllowStale = as
|
||||
}
|
||||
}
|
||||
|
||||
// check if there are any addrs
|
||||
@@ -123,6 +133,9 @@ func newConsulRegistry(opts ...Option) Registry {
|
||||
opts: Options{},
|
||||
register: make(map[string]uint64),
|
||||
lastChecked: make(map[string]time.Time),
|
||||
queryOptions: &consul.QueryOptions{
|
||||
AllowStale: true,
|
||||
},
|
||||
}
|
||||
configure(cr, opts...)
|
||||
return cr
|
||||
@@ -177,21 +190,20 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
// use first node
|
||||
node := s.Nodes[0]
|
||||
|
||||
// get existing hash
|
||||
// get existing hash and last checked time
|
||||
c.Lock()
|
||||
v, ok := c.register[s.Name]
|
||||
lastChecked := c.lastChecked[s.Name]
|
||||
c.Unlock()
|
||||
|
||||
// if it's already registered and matches then just pass the check
|
||||
if ok && v == h {
|
||||
if options.TTL == time.Duration(0) {
|
||||
// ensure that our service hasn't been deregistered by Consul
|
||||
if time.Since(c.lastChecked[s.Name]) <= getDeregisterTTL(regInterval) {
|
||||
if time.Since(lastChecked) <= getDeregisterTTL(regInterval) {
|
||||
return nil
|
||||
}
|
||||
services, _, err := c.Client.Health().Checks(s.Name, &consul.QueryOptions{
|
||||
AllowStale: true,
|
||||
})
|
||||
services, _, err := c.Client.Health().Checks(s.Name, c.queryOptions)
|
||||
if err == nil {
|
||||
for _, v := range services {
|
||||
if v.ServiceID == node.Id {
|
||||
@@ -276,9 +288,9 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
|
||||
// if we're connect enabled only get connect services
|
||||
if c.connect {
|
||||
rsp, _, err = c.Client.Health().Connect(name, "", false, nil)
|
||||
rsp, _, err = c.Client.Health().Connect(name, "", false, c.queryOptions)
|
||||
} else {
|
||||
rsp, _, err = c.Client.Health().Service(name, "", false, nil)
|
||||
rsp, _, err = c.Client.Health().Service(name, "", false, c.queryOptions)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -347,7 +359,7 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
}
|
||||
|
||||
func (c *consulRegistry) ListServices() ([]*Service, error) {
|
||||
rsp, _, err := c.Client.Catalog().Services(nil)
|
||||
rsp, _, err := c.Client.Catalog().Services(c.queryOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
)
|
||||
@@ -53,9 +54,14 @@ func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) {
|
||||
go newMockServer(r, l)
|
||||
|
||||
return &consulRegistry{
|
||||
Address: cfg.Address,
|
||||
Client: cl,
|
||||
register: make(map[string]uint64),
|
||||
Address: cfg.Address,
|
||||
Client: cl,
|
||||
opts: Options{},
|
||||
register: make(map[string]uint64),
|
||||
lastChecked: make(map[string]time.Time),
|
||||
queryOptions: &consul.QueryOptions{
|
||||
AllowStale: true,
|
||||
},
|
||||
}, func() {
|
||||
l.Close()
|
||||
}
|
||||
|
24
registry/gossip/README.md
Normal file
24
registry/gossip/README.md
Normal file
@@ -0,0 +1,24 @@
|
||||
# Gossip Registry
|
||||
|
||||
Gossip is a zero dependency registry which uses hashicorp/memberlist to broadcast registry information
|
||||
via the SWIM protocol.
|
||||
|
||||
## Usage
|
||||
|
||||
Start with the registry flag or env var
|
||||
|
||||
```bash
|
||||
MICRO_REGISTRY=gossip go run service.go
|
||||
```
|
||||
|
||||
On startup you'll see something like
|
||||
|
||||
```bash
|
||||
2018/12/06 18:17:48 Registry Listening on 192.168.1.65:56390
|
||||
```
|
||||
|
||||
To join this gossip ring set the registry address using flag or env var
|
||||
|
||||
```bash
|
||||
MICRO_REGISTRY_ADDRESS= 192.168.1.65:56390
|
||||
```
|
594
registry/gossip/gossip.go
Normal file
594
registry/gossip/gossip.go
Normal file
@@ -0,0 +1,594 @@
|
||||
// Package Gossip provides a gossip registry based on hashicorp/memberlist
|
||||
package gossip
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hashicorp/memberlist"
|
||||
log "github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/registry"
|
||||
pb "github.com/micro/go-micro/registry/gossip/proto"
|
||||
"github.com/mitchellh/hashstructure"
|
||||
)
|
||||
|
||||
const (
|
||||
addAction = "update"
|
||||
delAction = "delete"
|
||||
syncAction = "sync"
|
||||
)
|
||||
|
||||
type broadcast struct {
|
||||
update *pb.Update
|
||||
notify chan<- struct{}
|
||||
}
|
||||
|
||||
type delegate struct {
|
||||
queue *memberlist.TransmitLimitedQueue
|
||||
updates chan *update
|
||||
}
|
||||
|
||||
type gossipRegistry struct {
|
||||
queue *memberlist.TransmitLimitedQueue
|
||||
updates chan *update
|
||||
options registry.Options
|
||||
member *memberlist.Memberlist
|
||||
interval time.Duration
|
||||
|
||||
sync.RWMutex
|
||||
services map[string][]*registry.Service
|
||||
|
||||
s sync.RWMutex
|
||||
watchers map[string]chan *registry.Result
|
||||
}
|
||||
|
||||
type update struct {
|
||||
Update *pb.Update
|
||||
Service *registry.Service
|
||||
sync chan *registry.Service
|
||||
}
|
||||
|
||||
var (
|
||||
// You should change this if using secure
|
||||
DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes
|
||||
ExpiryTick = time.Second * 5
|
||||
)
|
||||
|
||||
func configure(g *gossipRegistry, opts ...registry.Option) error {
|
||||
// loop through address list and get valid entries
|
||||
addrs := func(curAddrs []string) []string {
|
||||
var newAddrs []string
|
||||
for _, addr := range curAddrs {
|
||||
if trimAddr := strings.TrimSpace(addr); len(trimAddr) > 0 {
|
||||
newAddrs = append(newAddrs, trimAddr)
|
||||
}
|
||||
}
|
||||
return newAddrs
|
||||
}
|
||||
|
||||
// current address list
|
||||
curAddrs := addrs(g.options.Addrs)
|
||||
|
||||
// parse options
|
||||
for _, o := range opts {
|
||||
o(&g.options)
|
||||
}
|
||||
|
||||
// new address list
|
||||
newAddrs := addrs(g.options.Addrs)
|
||||
|
||||
// no new nodes and existing member. no configure
|
||||
if (len(newAddrs) == len(curAddrs)) && g.member != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// shutdown old member
|
||||
if g.member != nil {
|
||||
g.member.Shutdown()
|
||||
}
|
||||
|
||||
// replace addresses
|
||||
curAddrs = newAddrs
|
||||
|
||||
// create a queue
|
||||
queue := &memberlist.TransmitLimitedQueue{
|
||||
NumNodes: func() int {
|
||||
return len(curAddrs)
|
||||
},
|
||||
RetransmitMult: 3,
|
||||
}
|
||||
|
||||
// create a new default config
|
||||
c := memberlist.DefaultLocalConfig()
|
||||
|
||||
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
|
||||
c = optConfig
|
||||
}
|
||||
|
||||
if hostport, ok := g.options.Context.Value(contextAddress{}).(string); ok {
|
||||
host, port, err := net.SplitHostPort(hostport)
|
||||
if err == nil {
|
||||
pn, err := strconv.Atoi(port)
|
||||
if err == nil {
|
||||
c.BindPort = pn
|
||||
}
|
||||
c.BindAddr = host
|
||||
}
|
||||
} else {
|
||||
// set bind to random port
|
||||
c.BindPort = 0
|
||||
}
|
||||
|
||||
if hostport, ok := g.options.Context.Value(contextAdvertise{}).(string); ok {
|
||||
host, port, err := net.SplitHostPort(hostport)
|
||||
if err == nil {
|
||||
pn, err := strconv.Atoi(port)
|
||||
if err == nil {
|
||||
c.AdvertisePort = pn
|
||||
}
|
||||
c.AdvertiseAddr = host
|
||||
}
|
||||
}
|
||||
|
||||
// machine hostname
|
||||
hostname, _ := os.Hostname()
|
||||
|
||||
// set the name
|
||||
c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-")
|
||||
|
||||
// set the delegate
|
||||
c.Delegate = &delegate{
|
||||
updates: g.updates,
|
||||
queue: queue,
|
||||
}
|
||||
|
||||
// log to dev null
|
||||
c.LogOutput = ioutil.Discard
|
||||
|
||||
// set a secret key if secure
|
||||
if g.options.Secure {
|
||||
k, ok := g.options.Context.Value(contextSecretKey{}).([]byte)
|
||||
if !ok {
|
||||
// use the default secret
|
||||
k = DefaultSecret
|
||||
}
|
||||
c.SecretKey = k
|
||||
}
|
||||
|
||||
// create the memberlist
|
||||
m, err := memberlist.Create(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// join the memberlist
|
||||
if len(curAddrs) > 0 {
|
||||
_, err := m.Join(curAddrs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// set internals
|
||||
g.queue = queue
|
||||
g.member = m
|
||||
g.interval = c.GossipInterval
|
||||
|
||||
log.Logf("Registry Listening on %s", m.LocalNode().Address())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
|
||||
up := new(pb.Update)
|
||||
if err := proto.Unmarshal(other.Message(), up); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// ids do not match
|
||||
if b.update.Id == up.Id {
|
||||
return false
|
||||
}
|
||||
|
||||
// timestamps do not match
|
||||
if b.update.Timestamp != up.Timestamp {
|
||||
return false
|
||||
}
|
||||
|
||||
// type does not match
|
||||
if b.update.Type != up.Type {
|
||||
return false
|
||||
}
|
||||
|
||||
// invalidates
|
||||
return true
|
||||
}
|
||||
|
||||
func (b *broadcast) Message() []byte {
|
||||
up, err := proto.Marshal(b.update)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return up
|
||||
}
|
||||
|
||||
func (b *broadcast) Finished() {
|
||||
if b.notify != nil {
|
||||
close(b.notify)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *delegate) NodeMeta(limit int) []byte {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
func (d *delegate) NotifyMsg(b []byte) {
|
||||
if len(b) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
up := new(pb.Update)
|
||||
if err := proto.Unmarshal(b, up); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// only process service action
|
||||
if up.Type != "service" {
|
||||
return
|
||||
}
|
||||
|
||||
var service *registry.Service
|
||||
|
||||
switch up.Metadata["Content-Type"] {
|
||||
case "application/json":
|
||||
if err := json.Unmarshal(up.Data, &service); err != nil {
|
||||
return
|
||||
}
|
||||
// no other content type
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
// send update
|
||||
d.updates <- &update{
|
||||
Update: up,
|
||||
Service: service,
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
||||
return d.queue.GetBroadcasts(overhead, limit)
|
||||
}
|
||||
|
||||
func (d *delegate) LocalState(join bool) []byte {
|
||||
if !join {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
syncCh := make(chan *registry.Service, 1)
|
||||
services := map[string][]*registry.Service{}
|
||||
|
||||
d.updates <- &update{
|
||||
Update: &pb.Update{
|
||||
Action: syncAction,
|
||||
},
|
||||
sync: syncCh,
|
||||
}
|
||||
|
||||
for srv := range syncCh {
|
||||
services[srv.Name] = append(services[srv.Name], srv)
|
||||
}
|
||||
|
||||
b, _ := json.Marshal(services)
|
||||
return b
|
||||
}
|
||||
|
||||
func (d *delegate) MergeRemoteState(buf []byte, join bool) {
|
||||
if len(buf) == 0 {
|
||||
return
|
||||
}
|
||||
if !join {
|
||||
return
|
||||
}
|
||||
|
||||
var services map[string][]*registry.Service
|
||||
if err := json.Unmarshal(buf, &services); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, service := range services {
|
||||
for _, srv := range service {
|
||||
d.updates <- &update{
|
||||
Update: &pb.Update{Action: addAction},
|
||||
Service: srv,
|
||||
sync: nil,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) publish(action string, services []*registry.Service) {
|
||||
g.s.RLock()
|
||||
for _, sub := range g.watchers {
|
||||
go func(sub chan *registry.Result) {
|
||||
for _, service := range services {
|
||||
sub <- ®istry.Result{Action: action, Service: service}
|
||||
}
|
||||
}(sub)
|
||||
}
|
||||
g.s.RUnlock()
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
|
||||
next := make(chan *registry.Result, 10)
|
||||
exit := make(chan bool)
|
||||
|
||||
id := uuid.New().String()
|
||||
|
||||
g.s.Lock()
|
||||
g.watchers[id] = next
|
||||
g.s.Unlock()
|
||||
|
||||
go func() {
|
||||
<-exit
|
||||
g.s.Lock()
|
||||
delete(g.watchers, id)
|
||||
close(next)
|
||||
g.s.Unlock()
|
||||
}()
|
||||
|
||||
return next, exit
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) run() {
|
||||
var mtx sync.Mutex
|
||||
updates := map[uint64]*update{}
|
||||
|
||||
// expiry loop
|
||||
go func() {
|
||||
t := time.NewTicker(ExpiryTick)
|
||||
defer t.Stop()
|
||||
|
||||
for _ = range t.C {
|
||||
now := uint64(time.Now().UnixNano())
|
||||
|
||||
mtx.Lock()
|
||||
|
||||
// process all the updates
|
||||
for k, v := range updates {
|
||||
// check if expiry time has passed
|
||||
if d := (v.Update.Timestamp + v.Update.Expires); d < now {
|
||||
// delete from records
|
||||
delete(updates, k)
|
||||
// set to delete
|
||||
v.Update.Action = delAction
|
||||
// fire a new update
|
||||
g.updates <- v
|
||||
}
|
||||
}
|
||||
|
||||
mtx.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
// process the updates
|
||||
for u := range g.updates {
|
||||
switch u.Update.Action {
|
||||
case addAction:
|
||||
g.Lock()
|
||||
if service, ok := g.services[u.Service.Name]; !ok {
|
||||
g.services[u.Service.Name] = []*registry.Service{u.Service}
|
||||
|
||||
} else {
|
||||
g.services[u.Service.Name] = addServices(service, []*registry.Service{u.Service})
|
||||
}
|
||||
g.Unlock()
|
||||
|
||||
// publish update to watchers
|
||||
go g.publish(addAction, []*registry.Service{u.Service})
|
||||
|
||||
// we need to expire the node at some point in the future
|
||||
if u.Update.Expires > 0 {
|
||||
// create a hash of this service
|
||||
if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
|
||||
mtx.Lock()
|
||||
updates[hash] = u
|
||||
mtx.Unlock()
|
||||
}
|
||||
}
|
||||
case delAction:
|
||||
g.Lock()
|
||||
if service, ok := g.services[u.Service.Name]; ok {
|
||||
if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 {
|
||||
delete(g.services, u.Service.Name)
|
||||
} else {
|
||||
g.services[u.Service.Name] = services
|
||||
}
|
||||
}
|
||||
g.Unlock()
|
||||
|
||||
// publish update to watchers
|
||||
go g.publish(delAction, []*registry.Service{u.Service})
|
||||
|
||||
// delete from expiry checks
|
||||
if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
|
||||
mtx.Lock()
|
||||
delete(updates, hash)
|
||||
mtx.Unlock()
|
||||
}
|
||||
case syncAction:
|
||||
// no sync channel provided
|
||||
if u.sync == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
g.RLock()
|
||||
|
||||
// push all services through the sync chan
|
||||
for _, service := range g.services {
|
||||
for _, srv := range service {
|
||||
u.sync <- srv
|
||||
}
|
||||
|
||||
// publish to watchers
|
||||
go g.publish(addAction, service)
|
||||
}
|
||||
|
||||
g.RUnlock()
|
||||
|
||||
// close the sync chan
|
||||
close(u.sync)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) Init(opts ...registry.Option) error {
|
||||
return configure(g, opts...)
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) Options() registry.Options {
|
||||
return g.options
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
||||
b, err := json.Marshal(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
g.Lock()
|
||||
if service, ok := g.services[s.Name]; !ok {
|
||||
g.services[s.Name] = []*registry.Service{s}
|
||||
} else {
|
||||
g.services[s.Name] = addServices(service, []*registry.Service{s})
|
||||
}
|
||||
g.Unlock()
|
||||
|
||||
var options registry.RegisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
up := &pb.Update{
|
||||
Id: uuid.New().String(),
|
||||
Timestamp: uint64(time.Now().UnixNano()),
|
||||
Expires: uint64(options.TTL.Nanoseconds()),
|
||||
Action: "update",
|
||||
Type: "service",
|
||||
Metadata: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
Data: b,
|
||||
}
|
||||
|
||||
g.queue.QueueBroadcast(&broadcast{
|
||||
update: up,
|
||||
notify: nil,
|
||||
})
|
||||
|
||||
// wait
|
||||
<-time.After(g.interval * 2)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) Deregister(s *registry.Service) error {
|
||||
b, err := json.Marshal(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
g.Lock()
|
||||
if service, ok := g.services[s.Name]; ok {
|
||||
if services := delServices(service, []*registry.Service{s}); len(services) == 0 {
|
||||
delete(g.services, s.Name)
|
||||
} else {
|
||||
g.services[s.Name] = services
|
||||
}
|
||||
}
|
||||
g.Unlock()
|
||||
|
||||
up := &pb.Update{
|
||||
Id: uuid.New().String(),
|
||||
Timestamp: uint64(time.Now().UnixNano()),
|
||||
Action: "delete",
|
||||
Type: "service",
|
||||
Metadata: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
Data: b,
|
||||
}
|
||||
|
||||
g.queue.QueueBroadcast(&broadcast{
|
||||
update: up,
|
||||
notify: nil,
|
||||
})
|
||||
|
||||
// wait
|
||||
<-time.After(g.interval * 2)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) GetService(name string) ([]*registry.Service, error) {
|
||||
g.RLock()
|
||||
service, ok := g.services[name]
|
||||
g.RUnlock()
|
||||
if !ok {
|
||||
return nil, registry.ErrNotFound
|
||||
}
|
||||
return service, nil
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) ListServices() ([]*registry.Service, error) {
|
||||
var services []*registry.Service
|
||||
g.RLock()
|
||||
for _, service := range g.services {
|
||||
services = append(services, service...)
|
||||
}
|
||||
g.RUnlock()
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
n, e := g.subscribe()
|
||||
return newGossipWatcher(n, e, opts...)
|
||||
}
|
||||
|
||||
func (g *gossipRegistry) String() string {
|
||||
return "gossip"
|
||||
}
|
||||
|
||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||
gossip := &gossipRegistry{
|
||||
options: registry.Options{
|
||||
Context: context.Background(),
|
||||
},
|
||||
updates: make(chan *update, 100),
|
||||
services: make(map[string][]*registry.Service),
|
||||
watchers: make(map[string]chan *registry.Result),
|
||||
}
|
||||
|
||||
// run the updater
|
||||
go gossip.run()
|
||||
|
||||
// configure the gossiper
|
||||
if err := configure(gossip, opts...); err != nil {
|
||||
log.Fatalf("Error configuring registry: %v", err)
|
||||
}
|
||||
|
||||
// wait for setup
|
||||
<-time.After(gossip.interval * 2)
|
||||
|
||||
return gossip
|
||||
}
|
45
registry/gossip/options.go
Normal file
45
registry/gossip/options.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package gossip
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
type contextSecretKey struct{}
|
||||
|
||||
// Secret specifies an encryption key. The value should be either
|
||||
// 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
|
||||
func Secret(k []byte) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
o.Context = context.WithValue(o.Context, contextSecretKey{}, k)
|
||||
}
|
||||
}
|
||||
|
||||
type contextAddress struct{}
|
||||
|
||||
// Address to bind to - host:port
|
||||
func Address(a string) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
o.Context = context.WithValue(o.Context, contextAddress{}, a)
|
||||
}
|
||||
}
|
||||
|
||||
type contextConfig struct{}
|
||||
|
||||
// Config allow to inject a *memberlist.Config struct for configuring gossip
|
||||
func Config(c *memberlist.Config) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
o.Context = context.WithValue(o.Context, contextConfig{}, c)
|
||||
}
|
||||
}
|
||||
|
||||
type contextAdvertise struct{}
|
||||
|
||||
// The address to advertise for other gossip members - host:port
|
||||
func Advertise(a string) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
o.Context = context.WithValue(o.Context, contextAdvertise{}, a)
|
||||
}
|
||||
}
|
28
registry/gossip/proto/gossip.micro.go
Normal file
28
registry/gossip/proto/gossip.micro.go
Normal file
@@ -0,0 +1,28 @@
|
||||
// Code generated by protoc-gen-micro. DO NOT EDIT.
|
||||
// source: github.com/micro/go-micro/registry/gossip/proto/gossip.proto
|
||||
|
||||
/*
|
||||
Package gossip is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
github.com/micro/go-micro/registry/gossip/proto/gossip.proto
|
||||
|
||||
It has these top-level messages:
|
||||
Update
|
||||
*/
|
||||
package gossip
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
|
145
registry/gossip/proto/gossip.pb.go
Normal file
145
registry/gossip/proto/gossip.pb.go
Normal file
@@ -0,0 +1,145 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// source: github.com/micro/go-micro/registry/gossip/proto/gossip.proto
|
||||
|
||||
package gossip
|
||||
|
||||
import (
|
||||
fmt "fmt"
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
math "math"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
// Update is the message broadcast
|
||||
type Update struct {
|
||||
// unique id of update
|
||||
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
|
||||
// unix nano timestamp of update
|
||||
Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
// time to live for entry
|
||||
Expires uint64 `protobuf:"varint,3,opt,name=expires,proto3" json:"expires,omitempty"`
|
||||
// type of update; service
|
||||
Type string `protobuf:"bytes,4,opt,name=type,proto3" json:"type,omitempty"`
|
||||
// what action is taken; add, del, put
|
||||
Action string `protobuf:"bytes,5,opt,name=action,proto3" json:"action,omitempty"`
|
||||
// any other associated metadata about the data
|
||||
Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
|
||||
// the payload data;
|
||||
Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Update) Reset() { *m = Update{} }
|
||||
func (m *Update) String() string { return proto.CompactTextString(m) }
|
||||
func (*Update) ProtoMessage() {}
|
||||
func (*Update) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_18cba623e76e57f3, []int{0}
|
||||
}
|
||||
|
||||
func (m *Update) XXX_Unmarshal(b []byte) error {
|
||||
return xxx_messageInfo_Update.Unmarshal(m, b)
|
||||
}
|
||||
func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
return xxx_messageInfo_Update.Marshal(b, m, deterministic)
|
||||
}
|
||||
func (m *Update) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_Update.Merge(m, src)
|
||||
}
|
||||
func (m *Update) XXX_Size() int {
|
||||
return xxx_messageInfo_Update.Size(m)
|
||||
}
|
||||
func (m *Update) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_Update.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_Update proto.InternalMessageInfo
|
||||
|
||||
func (m *Update) GetId() string {
|
||||
if m != nil {
|
||||
return m.Id
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Update) GetTimestamp() uint64 {
|
||||
if m != nil {
|
||||
return m.Timestamp
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Update) GetExpires() uint64 {
|
||||
if m != nil {
|
||||
return m.Expires
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Update) GetType() string {
|
||||
if m != nil {
|
||||
return m.Type
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Update) GetAction() string {
|
||||
if m != nil {
|
||||
return m.Action
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Update) GetMetadata() map[string]string {
|
||||
if m != nil {
|
||||
return m.Metadata
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Update) GetData() []byte {
|
||||
if m != nil {
|
||||
return m.Data
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*Update)(nil), "gossip.Update")
|
||||
proto.RegisterMapType((map[string]string)(nil), "gossip.Update.MetadataEntry")
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_18cba623e76e57f3)
|
||||
}
|
||||
|
||||
var fileDescriptor_18cba623e76e57f3 = []byte{
|
||||
// 251 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0xcf, 0x4a, 0xc4, 0x30,
|
||||
0x10, 0x87, 0x69, 0xb6, 0x9b, 0xb5, 0xe3, 0x1f, 0x64, 0x10, 0x09, 0xb2, 0x87, 0xe2, 0xa9, 0x17,
|
||||
0x5b, 0xd0, 0xcb, 0xa2, 0x5e, 0x3d, 0x7a, 0x09, 0xf8, 0x00, 0xd9, 0x36, 0xd4, 0xa0, 0xd9, 0x84,
|
||||
0x64, 0x56, 0xec, 0x13, 0xf8, 0xda, 0xb2, 0x69, 0x54, 0xbc, 0x7d, 0xdf, 0xcc, 0x24, 0x99, 0x5f,
|
||||
0xe0, 0x71, 0x34, 0xf4, 0xba, 0xdf, 0xb6, 0xbd, 0xb3, 0x9d, 0x35, 0x7d, 0x70, 0xdd, 0xe8, 0x6e,
|
||||
0x66, 0x08, 0x7a, 0x34, 0x91, 0xc2, 0xd4, 0x8d, 0x2e, 0x46, 0xe3, 0x3b, 0x1f, 0x1c, 0xb9, 0x2c,
|
||||
0x6d, 0x12, 0xe4, 0xb3, 0x5d, 0x7f, 0x31, 0xe0, 0x2f, 0x7e, 0x50, 0xa4, 0xf1, 0x0c, 0x98, 0x19,
|
||||
0x44, 0x51, 0x17, 0x4d, 0x25, 0x99, 0x19, 0x70, 0x0d, 0x15, 0x19, 0xab, 0x23, 0x29, 0xeb, 0x05,
|
||||
0xab, 0x8b, 0xa6, 0x94, 0x7f, 0x05, 0x14, 0xb0, 0xd2, 0x9f, 0xde, 0x04, 0x1d, 0xc5, 0x22, 0xf5,
|
||||
0x7e, 0x14, 0x11, 0x4a, 0x9a, 0xbc, 0x16, 0x65, 0xba, 0x29, 0x31, 0x5e, 0x02, 0x57, 0x3d, 0x19,
|
||||
0xb7, 0x13, 0xcb, 0x54, 0xcd, 0x86, 0x1b, 0x38, 0xb2, 0x9a, 0xd4, 0xa0, 0x48, 0x09, 0x5e, 0x2f,
|
||||
0x9a, 0xe3, 0xdb, 0x75, 0x9b, 0xf7, 0x9c, 0xb7, 0x6a, 0x9f, 0x73, 0xfb, 0x69, 0x47, 0x61, 0x92,
|
||||
0xbf, 0xd3, 0x87, 0x57, 0xd2, 0xa9, 0x55, 0x5d, 0x34, 0x27, 0x32, 0xf1, 0xd5, 0x03, 0x9c, 0xfe,
|
||||
0x1b, 0xc7, 0x73, 0x58, 0xbc, 0xe9, 0x29, 0x67, 0x3a, 0x20, 0x5e, 0xc0, 0xf2, 0x43, 0xbd, 0xef,
|
||||
0x75, 0x0a, 0x54, 0xc9, 0x59, 0xee, 0xd9, 0xa6, 0xd8, 0xf2, 0xf4, 0x31, 0x77, 0xdf, 0x01, 0x00,
|
||||
0x00, 0xff, 0xff, 0x06, 0x6e, 0x00, 0x3c, 0x58, 0x01, 0x00, 0x00,
|
||||
}
|
21
registry/gossip/proto/gossip.proto
Normal file
21
registry/gossip/proto/gossip.proto
Normal file
@@ -0,0 +1,21 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package gossip;
|
||||
|
||||
// Update is the message broadcast
|
||||
message Update {
|
||||
// unique id of update
|
||||
string id = 1;
|
||||
// unix nano timestamp of update
|
||||
uint64 timestamp = 2;
|
||||
// time to live for entry
|
||||
uint64 expires = 3;
|
||||
// type of update; service
|
||||
string type = 4;
|
||||
// what action is taken; add, del, put
|
||||
string action = 5;
|
||||
// any other associated metadata about the data
|
||||
map<string, string> metadata = 6;
|
||||
// the payload data;
|
||||
bytes data = 7;
|
||||
}
|
132
registry/gossip/util.go
Normal file
132
registry/gossip/util.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package gossip
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
func cp(current []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
|
||||
for _, service := range current {
|
||||
// copy service
|
||||
s := new(registry.Service)
|
||||
*s = *service
|
||||
|
||||
// copy nodes
|
||||
var nodes []*registry.Node
|
||||
for _, node := range service.Nodes {
|
||||
n := new(registry.Node)
|
||||
*n = *node
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
s.Nodes = nodes
|
||||
|
||||
// copy endpoints
|
||||
var eps []*registry.Endpoint
|
||||
for _, ep := range service.Endpoints {
|
||||
e := new(registry.Endpoint)
|
||||
*e = *ep
|
||||
eps = append(eps, e)
|
||||
}
|
||||
s.Endpoints = eps
|
||||
|
||||
// append service
|
||||
services = append(services, s)
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
||||
|
||||
func addNodes(old, neu []*registry.Node) []*registry.Node {
|
||||
var nodes []*registry.Node
|
||||
|
||||
// add all new nodes
|
||||
for _, n := range neu {
|
||||
node := *n
|
||||
nodes = append(nodes, &node)
|
||||
}
|
||||
|
||||
// look at old nodes
|
||||
for _, o := range old {
|
||||
var exists bool
|
||||
|
||||
// check against new nodes
|
||||
for _, n := range nodes {
|
||||
// ids match then skip
|
||||
if o.Id == n.Id {
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// keep old node
|
||||
if !exists {
|
||||
node := *o
|
||||
nodes = append(nodes, &node)
|
||||
}
|
||||
}
|
||||
|
||||
return nodes
|
||||
}
|
||||
|
||||
func addServices(old, neu []*registry.Service) []*registry.Service {
|
||||
for _, s := range neu {
|
||||
var seen bool
|
||||
for i, o := range old {
|
||||
if o.Version == s.Version {
|
||||
s.Nodes = addNodes(o.Nodes, s.Nodes)
|
||||
seen = true
|
||||
old[i] = s
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
old = append(old, s)
|
||||
}
|
||||
}
|
||||
return old
|
||||
}
|
||||
|
||||
func delNodes(old, del []*registry.Node) []*registry.Node {
|
||||
var nodes []*registry.Node
|
||||
for _, o := range old {
|
||||
var rem bool
|
||||
for _, n := range del {
|
||||
if o.Id == n.Id {
|
||||
rem = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !rem {
|
||||
nodes = append(nodes, o)
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func delServices(old, del []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
|
||||
for _, o := range old {
|
||||
srv := new(registry.Service)
|
||||
*srv = *o
|
||||
|
||||
var rem bool
|
||||
|
||||
for _, s := range del {
|
||||
if srv.Version == s.Version {
|
||||
srv.Nodes = delNodes(srv.Nodes, s.Nodes)
|
||||
|
||||
if len(srv.Nodes) == 0 {
|
||||
rem = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !rem {
|
||||
services = append(services, srv)
|
||||
}
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
78
registry/gossip/util_test.go
Normal file
78
registry/gossip/util_test.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package gossip
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
func TestDelServices(t *testing.T) {
|
||||
services := []*registry.Service{
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
Address: "localhost",
|
||||
Port: 9999,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
Address: "localhost",
|
||||
Port: 6666,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
servs := delServices([]*registry.Service{services[0]}, []*registry.Service{services[1]})
|
||||
if i := len(servs); i > 0 {
|
||||
t.Errorf("Expected 0 nodes, got %d: %+v", i, servs)
|
||||
}
|
||||
t.Logf("Services %+v", servs)
|
||||
}
|
||||
|
||||
func TestDelNodes(t *testing.T) {
|
||||
services := []*registry.Service{
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
Address: "localhost",
|
||||
Port: 9999,
|
||||
},
|
||||
{
|
||||
Id: "foo-321",
|
||||
Address: "localhost",
|
||||
Port: 6666,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-123",
|
||||
Address: "localhost",
|
||||
Port: 6666,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
nodes := delNodes(services[0].Nodes, services[1].Nodes)
|
||||
if i := len(nodes); i != 1 {
|
||||
t.Errorf("Expected only 1 node, got %d: %+v", i, nodes)
|
||||
}
|
||||
t.Logf("Nodes %+v", nodes)
|
||||
}
|
51
registry/gossip/watcher.go
Normal file
51
registry/gossip/watcher.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package gossip
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
type gossipWatcher struct {
|
||||
wo registry.WatchOptions
|
||||
next chan *registry.Result
|
||||
stop chan bool
|
||||
}
|
||||
|
||||
func newGossipWatcher(ch chan *registry.Result, stop chan bool, opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wo registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
|
||||
return &gossipWatcher{
|
||||
wo: wo,
|
||||
next: ch,
|
||||
stop: stop,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *gossipWatcher) Next() (*registry.Result, error) {
|
||||
for {
|
||||
select {
|
||||
case r, ok := <-m.next:
|
||||
if !ok {
|
||||
return nil, registry.ErrWatcherStopped
|
||||
}
|
||||
// check watch options
|
||||
if len(m.wo.Service) > 0 && r.Service.Name != m.wo.Service {
|
||||
continue
|
||||
}
|
||||
return r, nil
|
||||
case <-m.stop:
|
||||
return nil, registry.ErrWatcherStopped
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *gossipWatcher) Stop() {
|
||||
select {
|
||||
case <-m.stop:
|
||||
return
|
||||
default:
|
||||
close(m.stop)
|
||||
}
|
||||
}
|
@@ -1,3 +1,4 @@
|
||||
// Package mdns is a multicast dns registry
|
||||
package mdns
|
||||
|
||||
/*
|
||||
|
@@ -1,3 +1,4 @@
|
||||
// Package mock provides a mock registry for testing
|
||||
package mock
|
||||
|
||||
import (
|
||||
|
@@ -28,7 +28,10 @@ type WatchOption func(*WatchOptions)
|
||||
var (
|
||||
DefaultRegistry = newConsulRegistry()
|
||||
|
||||
// Not found error when GetService is called
|
||||
ErrNotFound = errors.New("not found")
|
||||
// Watcher stopped error when watcher is stopped
|
||||
ErrWatcherStopped = errors.New("watcher stopped")
|
||||
)
|
||||
|
||||
func NewRegistry(opts ...Option) Registry {
|
||||
|
424
selector/cache/cache.go
vendored
424
selector/cache/cache.go
vendored
@@ -1,424 +0,0 @@
|
||||
// Package cache is a caching selector. It uses the registry watcher.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/selector"
|
||||
)
|
||||
|
||||
type cacheSelector struct {
|
||||
so selector.Options
|
||||
ttl time.Duration
|
||||
|
||||
// registry cache
|
||||
sync.Mutex
|
||||
cache map[string][]*registry.Service
|
||||
ttls map[string]time.Time
|
||||
|
||||
watched map[string]bool
|
||||
|
||||
// used to close or reload watcher
|
||||
reload chan bool
|
||||
exit chan bool
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultTTL = time.Minute
|
||||
)
|
||||
|
||||
func (c *cacheSelector) quit() bool {
|
||||
select {
|
||||
case <-c.exit:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// cp copies a service. Because we're caching handing back pointers would
|
||||
// create a race condition, so we do this instead
|
||||
// its fast enough
|
||||
func (c *cacheSelector) cp(current []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
|
||||
for _, service := range current {
|
||||
// copy service
|
||||
s := new(registry.Service)
|
||||
*s = *service
|
||||
|
||||
// copy nodes
|
||||
var nodes []*registry.Node
|
||||
for _, node := range service.Nodes {
|
||||
n := new(registry.Node)
|
||||
*n = *node
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
s.Nodes = nodes
|
||||
|
||||
// copy endpoints
|
||||
var eps []*registry.Endpoint
|
||||
for _, ep := range service.Endpoints {
|
||||
e := new(registry.Endpoint)
|
||||
*e = *ep
|
||||
eps = append(eps, e)
|
||||
}
|
||||
s.Endpoints = eps
|
||||
|
||||
// append service
|
||||
services = append(services, s)
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
||||
|
||||
func (c *cacheSelector) del(service string) {
|
||||
delete(c.cache, service)
|
||||
delete(c.ttls, service)
|
||||
}
|
||||
|
||||
func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// watch service if not watched
|
||||
if _, ok := c.watched[service]; !ok {
|
||||
go c.run(service)
|
||||
c.watched[service] = true
|
||||
}
|
||||
|
||||
// get does the actual request for a service
|
||||
// it also caches it
|
||||
get := func(service string) ([]*registry.Service, error) {
|
||||
// ask the registry
|
||||
services, err := c.so.Registry.GetService(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// cache results
|
||||
c.set(service, c.cp(services))
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// check the cache first
|
||||
services, ok := c.cache[service]
|
||||
|
||||
// cache miss or no services
|
||||
if !ok || len(services) == 0 {
|
||||
return get(service)
|
||||
}
|
||||
|
||||
// got cache but lets check ttl
|
||||
ttl, kk := c.ttls[service]
|
||||
|
||||
// within ttl so return cache
|
||||
if kk && time.Since(ttl) < c.ttl {
|
||||
return c.cp(services), nil
|
||||
}
|
||||
|
||||
// expired entry so get service
|
||||
services, err := get(service)
|
||||
|
||||
// no error then return error
|
||||
if err == nil {
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// not found error then return
|
||||
if err == registry.ErrNotFound {
|
||||
return nil, selector.ErrNotFound
|
||||
}
|
||||
|
||||
// other error
|
||||
|
||||
// return expired cache as last resort
|
||||
return c.cp(services), nil
|
||||
}
|
||||
|
||||
func (c *cacheSelector) set(service string, services []*registry.Service) {
|
||||
c.cache[service] = services
|
||||
c.ttls[service] = time.Now().Add(c.ttl)
|
||||
}
|
||||
|
||||
func (c *cacheSelector) update(res *registry.Result) {
|
||||
if res == nil || res.Service == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
services, ok := c.cache[res.Service.Name]
|
||||
if !ok {
|
||||
// we're not going to cache anything
|
||||
// unless there was already a lookup
|
||||
return
|
||||
}
|
||||
|
||||
if len(res.Service.Nodes) == 0 {
|
||||
switch res.Action {
|
||||
case "delete":
|
||||
c.del(res.Service.Name)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// existing service found
|
||||
var service *registry.Service
|
||||
var index int
|
||||
for i, s := range services {
|
||||
if s.Version == res.Service.Version {
|
||||
service = s
|
||||
index = i
|
||||
}
|
||||
}
|
||||
|
||||
switch res.Action {
|
||||
case "create", "update":
|
||||
if service == nil {
|
||||
c.set(res.Service.Name, append(services, res.Service))
|
||||
return
|
||||
}
|
||||
|
||||
// append old nodes to new service
|
||||
for _, cur := range service.Nodes {
|
||||
var seen bool
|
||||
for _, node := range res.Service.Nodes {
|
||||
if cur.Id == node.Id {
|
||||
seen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
res.Service.Nodes = append(res.Service.Nodes, cur)
|
||||
}
|
||||
}
|
||||
|
||||
services[index] = res.Service
|
||||
c.set(res.Service.Name, services)
|
||||
case "delete":
|
||||
if service == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var nodes []*registry.Node
|
||||
|
||||
// filter cur nodes to remove the dead one
|
||||
for _, cur := range service.Nodes {
|
||||
var seen bool
|
||||
for _, del := range res.Service.Nodes {
|
||||
if del.Id == cur.Id {
|
||||
seen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
nodes = append(nodes, cur)
|
||||
}
|
||||
}
|
||||
|
||||
// still got nodes, save and return
|
||||
if len(nodes) > 0 {
|
||||
service.Nodes = nodes
|
||||
services[index] = service
|
||||
c.set(service.Name, services)
|
||||
return
|
||||
}
|
||||
|
||||
// zero nodes left
|
||||
|
||||
// only have one thing to delete
|
||||
// nuke the thing
|
||||
if len(services) == 1 {
|
||||
c.del(service.Name)
|
||||
return
|
||||
}
|
||||
|
||||
// still have more than 1 service
|
||||
// check the version and keep what we know
|
||||
var srvs []*registry.Service
|
||||
for _, s := range services {
|
||||
if s.Version != service.Version {
|
||||
srvs = append(srvs, s)
|
||||
}
|
||||
}
|
||||
|
||||
// save
|
||||
c.set(service.Name, srvs)
|
||||
}
|
||||
}
|
||||
|
||||
// run starts the cache watcher loop
|
||||
// it creates a new watcher if there's a problem
|
||||
// reloads the watcher if Init is called
|
||||
// and returns when Close is called
|
||||
func (c *cacheSelector) run(name string) {
|
||||
for {
|
||||
// exit early if already dead
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
|
||||
// create new watcher
|
||||
w, err := c.so.Registry.Watch(
|
||||
registry.WatchService(name),
|
||||
)
|
||||
if err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
log.Log(err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
// watch for events
|
||||
if err := c.watch(w); err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
log.Log(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// watch loops the next event and calls update
|
||||
// it returns if there's an error
|
||||
func (c *cacheSelector) watch(w registry.Watcher) error {
|
||||
defer w.Stop()
|
||||
|
||||
// manage this loop
|
||||
go func() {
|
||||
// wait for exit or reload signal
|
||||
select {
|
||||
case <-c.exit:
|
||||
case <-c.reload:
|
||||
}
|
||||
|
||||
// stop the watcher
|
||||
w.Stop()
|
||||
}()
|
||||
|
||||
for {
|
||||
res, err := w.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.update(res)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Init(opts ...selector.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&c.so)
|
||||
}
|
||||
|
||||
// reload the watcher
|
||||
go func() {
|
||||
select {
|
||||
case <-c.exit:
|
||||
return
|
||||
default:
|
||||
c.reload <- true
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Options() selector.Options {
|
||||
return c.so
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
sopts := selector.SelectOptions{
|
||||
Strategy: c.so.Strategy,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
// get the service
|
||||
// try the cache first
|
||||
// if that fails go directly to the registry
|
||||
services, err := c.get(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// apply the filters
|
||||
for _, filter := range sopts.Filters {
|
||||
services = filter(services)
|
||||
}
|
||||
|
||||
// if there's nothing left, return
|
||||
if len(services) == 0 {
|
||||
return nil, selector.ErrNoneAvailable
|
||||
}
|
||||
|
||||
return sopts.Strategy(services), nil
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Reset(service string) {
|
||||
}
|
||||
|
||||
// Close stops the watcher and destroys the cache
|
||||
func (c *cacheSelector) Close() error {
|
||||
c.Lock()
|
||||
c.cache = make(map[string][]*registry.Service)
|
||||
c.watched = make(map[string]bool)
|
||||
c.Unlock()
|
||||
|
||||
select {
|
||||
case <-c.exit:
|
||||
return nil
|
||||
default:
|
||||
close(c.exit)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cacheSelector) String() string {
|
||||
return "cache"
|
||||
}
|
||||
|
||||
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
sopts := selector.Options{
|
||||
Strategy: selector.Random,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&sopts)
|
||||
}
|
||||
|
||||
if sopts.Registry == nil {
|
||||
sopts.Registry = registry.DefaultRegistry
|
||||
}
|
||||
|
||||
ttl := DefaultTTL
|
||||
|
||||
if sopts.Context != nil {
|
||||
if t, ok := sopts.Context.Value(ttlKey{}).(time.Duration); ok {
|
||||
ttl = t
|
||||
}
|
||||
}
|
||||
|
||||
return &cacheSelector{
|
||||
so: sopts,
|
||||
ttl: ttl,
|
||||
watched: make(map[string]bool),
|
||||
cache: make(map[string][]*registry.Service),
|
||||
ttls: make(map[string]time.Time),
|
||||
reload: make(chan bool, 1),
|
||||
exit: make(chan bool),
|
||||
}
|
||||
}
|
29
selector/cache/cache_test.go
vendored
29
selector/cache/cache_test.go
vendored
@@ -1,29 +0,0 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry/mock"
|
||||
"github.com/micro/go-micro/selector"
|
||||
)
|
||||
|
||||
func TestCacheSelector(t *testing.T) {
|
||||
counts := map[string]int{}
|
||||
|
||||
cache := NewSelector(selector.Registry(mock.NewRegistry()))
|
||||
|
||||
next, err := cache.Select("foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error calling cache select: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
node, err := next()
|
||||
if err != nil {
|
||||
t.Errorf("Expected node err, got err: %v", err)
|
||||
}
|
||||
counts[node.Id]++
|
||||
}
|
||||
|
||||
t.Logf("Cache Counts %v", counts)
|
||||
}
|
@@ -1,27 +1,341 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
type defaultSelector struct {
|
||||
so Options
|
||||
type registrySelector struct {
|
||||
so Options
|
||||
ttl time.Duration
|
||||
|
||||
// registry cache
|
||||
sync.RWMutex
|
||||
cache map[string][]*registry.Service
|
||||
ttls map[string]time.Time
|
||||
|
||||
watched map[string]bool
|
||||
|
||||
// used to close or reload watcher
|
||||
reload chan bool
|
||||
exit chan bool
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&r.so)
|
||||
var (
|
||||
DefaultTTL = time.Minute
|
||||
)
|
||||
|
||||
func (c *registrySelector) quit() bool {
|
||||
select {
|
||||
case <-c.exit:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// cp copies a service. Because we're caching handing back pointers would
|
||||
// create a race condition, so we do this instead
|
||||
// its fast enough
|
||||
func (c *registrySelector) cp(current []*registry.Service) []*registry.Service {
|
||||
var services []*registry.Service
|
||||
|
||||
for _, service := range current {
|
||||
// copy service
|
||||
s := new(registry.Service)
|
||||
*s = *service
|
||||
|
||||
// copy nodes
|
||||
var nodes []*registry.Node
|
||||
for _, node := range service.Nodes {
|
||||
n := new(registry.Node)
|
||||
*n = *node
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
s.Nodes = nodes
|
||||
|
||||
// copy endpoints
|
||||
var eps []*registry.Endpoint
|
||||
for _, ep := range service.Endpoints {
|
||||
e := new(registry.Endpoint)
|
||||
*e = *ep
|
||||
eps = append(eps, e)
|
||||
}
|
||||
s.Endpoints = eps
|
||||
|
||||
// append service
|
||||
services = append(services, s)
|
||||
}
|
||||
|
||||
return services
|
||||
}
|
||||
|
||||
func (c *registrySelector) del(service string) {
|
||||
delete(c.cache, service)
|
||||
delete(c.ttls, service)
|
||||
}
|
||||
|
||||
func (c *registrySelector) get(service string) ([]*registry.Service, error) {
|
||||
// read lock
|
||||
c.RLock()
|
||||
|
||||
// check the cache first
|
||||
services, ok := c.cache[service]
|
||||
// get cache ttl
|
||||
ttl, kk := c.ttls[service]
|
||||
|
||||
// got services && within ttl so return cache
|
||||
if ok && kk && time.Since(ttl) < c.ttl {
|
||||
// make a copy
|
||||
cp := c.cp(services)
|
||||
// unlock the read
|
||||
c.RUnlock()
|
||||
// return servics
|
||||
return cp, nil
|
||||
}
|
||||
|
||||
// get does the actual request for a service and cache it
|
||||
get := func(service string) ([]*registry.Service, error) {
|
||||
// ask the registry
|
||||
services, err := c.so.Registry.GetService(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// cache results
|
||||
c.Lock()
|
||||
c.set(service, c.cp(services))
|
||||
c.Unlock()
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// watch service if not watched
|
||||
if _, ok := c.watched[service]; !ok {
|
||||
go c.run(service)
|
||||
}
|
||||
|
||||
// unlock the read lock
|
||||
c.RUnlock()
|
||||
|
||||
// get and return services
|
||||
return get(service)
|
||||
}
|
||||
|
||||
func (c *registrySelector) set(service string, services []*registry.Service) {
|
||||
c.cache[service] = services
|
||||
c.ttls[service] = time.Now().Add(c.ttl)
|
||||
}
|
||||
|
||||
func (c *registrySelector) update(res *registry.Result) {
|
||||
if res == nil || res.Service == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
services, ok := c.cache[res.Service.Name]
|
||||
if !ok {
|
||||
// we're not going to cache anything
|
||||
// unless there was already a lookup
|
||||
return
|
||||
}
|
||||
|
||||
if len(res.Service.Nodes) == 0 {
|
||||
switch res.Action {
|
||||
case "delete":
|
||||
c.del(res.Service.Name)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// existing service found
|
||||
var service *registry.Service
|
||||
var index int
|
||||
for i, s := range services {
|
||||
if s.Version == res.Service.Version {
|
||||
service = s
|
||||
index = i
|
||||
}
|
||||
}
|
||||
|
||||
switch res.Action {
|
||||
case "create", "update":
|
||||
if service == nil {
|
||||
c.set(res.Service.Name, append(services, res.Service))
|
||||
return
|
||||
}
|
||||
|
||||
// append old nodes to new service
|
||||
for _, cur := range service.Nodes {
|
||||
var seen bool
|
||||
for _, node := range res.Service.Nodes {
|
||||
if cur.Id == node.Id {
|
||||
seen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
res.Service.Nodes = append(res.Service.Nodes, cur)
|
||||
}
|
||||
}
|
||||
|
||||
services[index] = res.Service
|
||||
c.set(res.Service.Name, services)
|
||||
case "delete":
|
||||
if service == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var nodes []*registry.Node
|
||||
|
||||
// filter cur nodes to remove the dead one
|
||||
for _, cur := range service.Nodes {
|
||||
var seen bool
|
||||
for _, del := range res.Service.Nodes {
|
||||
if del.Id == cur.Id {
|
||||
seen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !seen {
|
||||
nodes = append(nodes, cur)
|
||||
}
|
||||
}
|
||||
|
||||
// still got nodes, save and return
|
||||
if len(nodes) > 0 {
|
||||
service.Nodes = nodes
|
||||
services[index] = service
|
||||
c.set(service.Name, services)
|
||||
return
|
||||
}
|
||||
|
||||
// zero nodes left
|
||||
|
||||
// only have one thing to delete
|
||||
// nuke the thing
|
||||
if len(services) == 1 {
|
||||
c.del(service.Name)
|
||||
return
|
||||
}
|
||||
|
||||
// still have more than 1 service
|
||||
// check the version and keep what we know
|
||||
var srvs []*registry.Service
|
||||
for _, s := range services {
|
||||
if s.Version != service.Version {
|
||||
srvs = append(srvs, s)
|
||||
}
|
||||
}
|
||||
|
||||
// save
|
||||
c.set(service.Name, srvs)
|
||||
}
|
||||
}
|
||||
|
||||
// run starts the cache watcher loop
|
||||
// it creates a new watcher if there's a problem
|
||||
// reloads the watcher if Init is called
|
||||
// and returns when Close is called
|
||||
func (c *registrySelector) run(name string) {
|
||||
// set watcher
|
||||
c.Lock()
|
||||
c.watched[name] = true
|
||||
c.Unlock()
|
||||
|
||||
// delete watcher on exit
|
||||
defer func() {
|
||||
c.Lock()
|
||||
delete(c.watched, name)
|
||||
c.Unlock()
|
||||
}()
|
||||
|
||||
for {
|
||||
// exit early if already dead
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
|
||||
// create new watcher
|
||||
w, err := c.so.Registry.Watch(
|
||||
registry.WatchService(name),
|
||||
)
|
||||
if err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
log.Log(err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
// watch for events
|
||||
if err := c.watch(w); err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
log.Log(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// watch loops the next event and calls update
|
||||
// it returns if there's an error
|
||||
func (c *registrySelector) watch(w registry.Watcher) error {
|
||||
defer w.Stop()
|
||||
|
||||
// manage this loop
|
||||
go func() {
|
||||
// wait for exit or reload signal
|
||||
select {
|
||||
case <-c.exit:
|
||||
case <-c.reload:
|
||||
}
|
||||
|
||||
// stop the watcher
|
||||
w.Stop()
|
||||
}()
|
||||
|
||||
for {
|
||||
res, err := w.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.update(res)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *registrySelector) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&c.so)
|
||||
}
|
||||
|
||||
// reload the watcher
|
||||
go func() {
|
||||
select {
|
||||
case <-c.exit:
|
||||
return
|
||||
default:
|
||||
c.reload <- true
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Options() Options {
|
||||
return r.so
|
||||
func (c *registrySelector) Options() Options {
|
||||
return c.so
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) {
|
||||
func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) {
|
||||
sopts := SelectOptions{
|
||||
Strategy: r.so.Strategy,
|
||||
Strategy: c.so.Strategy,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
@@ -29,7 +343,9 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
|
||||
}
|
||||
|
||||
// get the service
|
||||
services, err := r.so.Registry.GetService(service)
|
||||
// try the cache first
|
||||
// if that fails go directly to the registry
|
||||
services, err := c.get(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -47,21 +363,33 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
|
||||
return sopts.Strategy(services), nil
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
|
||||
func (c *registrySelector) Mark(service string, node *registry.Node, err error) {
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Reset(service string) {
|
||||
func (c *registrySelector) Reset(service string) {
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Close() error {
|
||||
// Close stops the watcher and destroys the cache
|
||||
func (c *registrySelector) Close() error {
|
||||
c.Lock()
|
||||
c.cache = make(map[string][]*registry.Service)
|
||||
c.watched = make(map[string]bool)
|
||||
c.Unlock()
|
||||
|
||||
select {
|
||||
case <-c.exit:
|
||||
return nil
|
||||
default:
|
||||
close(c.exit)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *defaultSelector) String() string {
|
||||
return "default"
|
||||
func (c *registrySelector) String() string {
|
||||
return "registry"
|
||||
}
|
||||
|
||||
func newDefaultSelector(opts ...Option) Selector {
|
||||
func NewSelector(opts ...Option) Selector {
|
||||
sopts := Options{
|
||||
Strategy: Random,
|
||||
}
|
||||
@@ -74,7 +402,21 @@ func newDefaultSelector(opts ...Option) Selector {
|
||||
sopts.Registry = registry.DefaultRegistry
|
||||
}
|
||||
|
||||
return &defaultSelector{
|
||||
so: sopts,
|
||||
ttl := DefaultTTL
|
||||
|
||||
if sopts.Context != nil {
|
||||
if t, ok := sopts.Context.Value("selector_ttl").(time.Duration); ok {
|
||||
ttl = t
|
||||
}
|
||||
}
|
||||
|
||||
return ®istrySelector{
|
||||
so: sopts,
|
||||
ttl: ttl,
|
||||
watched: make(map[string]bool),
|
||||
cache: make(map[string][]*registry.Service),
|
||||
ttls: make(map[string]time.Time),
|
||||
reload: make(chan bool, 1),
|
||||
exit: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
@@ -6,14 +6,14 @@ import (
|
||||
"github.com/micro/go-micro/registry/mock"
|
||||
)
|
||||
|
||||
func TestDefaultSelector(t *testing.T) {
|
||||
func TestRegistrySelector(t *testing.T) {
|
||||
counts := map[string]int{}
|
||||
|
||||
rs := newDefaultSelector(Registry(mock.NewRegistry()))
|
||||
cache := NewSelector(Registry(mock.NewRegistry()))
|
||||
|
||||
next, err := rs.Select("foo")
|
||||
next, err := cache.Select("foo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error calling default select: %v", err)
|
||||
t.Errorf("Unexpected error calling cache select: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
@@ -24,5 +24,5 @@ func TestDefaultSelector(t *testing.T) {
|
||||
counts[node.Id]++
|
||||
}
|
||||
|
||||
t.Logf("Default Counts %v", counts)
|
||||
t.Logf("Selector Counts %v", counts)
|
||||
}
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package cache
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -7,14 +7,12 @@ import (
|
||||
"github.com/micro/go-micro/selector"
|
||||
)
|
||||
|
||||
type ttlKey struct{}
|
||||
|
||||
// Set the cache ttl
|
||||
// Set the registry cache ttl
|
||||
func TTL(t time.Duration) selector.Option {
|
||||
return func(o *selector.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, ttlKey{}, t)
|
||||
o.Context = context.WithValue(o.Context, "selector_ttl", t)
|
||||
}
|
||||
}
|
11
selector/registry/registry.go
Normal file
11
selector/registry/registry.go
Normal file
@@ -0,0 +1,11 @@
|
||||
// Package registry is uses the go-micro registry for selection
|
||||
package registry
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/selector"
|
||||
)
|
||||
|
||||
// NewSelector returns a new registry selector
|
||||
func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
return selector.NewSelector(opts...)
|
||||
}
|
@@ -35,12 +35,8 @@ type Filter func([]*registry.Service) []*registry.Service
|
||||
type Strategy func([]*registry.Service) Next
|
||||
|
||||
var (
|
||||
DefaultSelector = newDefaultSelector()
|
||||
DefaultSelector = NewSelector()
|
||||
|
||||
ErrNotFound = errors.New("not found")
|
||||
ErrNoneAvailable = errors.New("none available")
|
||||
)
|
||||
|
||||
func NewSelector(opts ...Option) Selector {
|
||||
return newDefaultSelector(opts...)
|
||||
}
|
||||
|
@@ -1,37 +1,5 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
// Handler interface represents a Service request handler. It's generated
|
||||
// by passing any type of public concrete object with methods into server.NewHandler.
|
||||
// Most will pass in a struct.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// type Service struct {}
|
||||
//
|
||||
// func (s *Service) Method(context, request, response) error {
|
||||
// return nil
|
||||
// }
|
||||
//
|
||||
type Handler interface {
|
||||
Name() string
|
||||
Handler() interface{}
|
||||
Endpoints() []*registry.Endpoint
|
||||
Options() HandlerOptions
|
||||
}
|
||||
|
||||
// Subscriber interface represents a subscription to a given topic using
|
||||
// a specific subscriber function or object with methods.
|
||||
type Subscriber interface {
|
||||
Topic() string
|
||||
Subscriber() interface{}
|
||||
Endpoints() []*registry.Endpoint
|
||||
Options() SubscriberOptions
|
||||
}
|
||||
|
||||
type HandlerOptions struct {
|
||||
Internal bool
|
||||
Metadata map[string]map[string]string
|
||||
|
@@ -10,7 +10,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type rpcPlusCodec struct {
|
||||
type rpcCodec struct {
|
||||
socket transport.Socket
|
||||
codec codec.Codec
|
||||
|
||||
@@ -47,12 +47,12 @@ func (rwc *readWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) serverCodec {
|
||||
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) serverCodec {
|
||||
rwc := &readWriteCloser{
|
||||
rbuf: bytes.NewBuffer(req.Body),
|
||||
wbuf: bytes.NewBuffer(nil),
|
||||
}
|
||||
r := &rpcPlusCodec{
|
||||
r := &rpcCodec{
|
||||
buf: rwc,
|
||||
codec: c(rwc),
|
||||
req: req,
|
||||
@@ -61,7 +61,7 @@ func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.Ne
|
||||
return r
|
||||
}
|
||||
|
||||
func (c *rpcPlusCodec) ReadRequestHeader(r *request, first bool) error {
|
||||
func (c *rpcCodec) ReadRequestHeader(r *request, first bool) error {
|
||||
m := codec.Message{Header: c.req.Header}
|
||||
|
||||
if !first {
|
||||
@@ -83,11 +83,11 @@ func (c *rpcPlusCodec) ReadRequestHeader(r *request, first bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *rpcPlusCodec) ReadRequestBody(b interface{}) error {
|
||||
func (c *rpcCodec) ReadRequestBody(b interface{}) error {
|
||||
return c.codec.ReadBody(b)
|
||||
}
|
||||
|
||||
func (c *rpcPlusCodec) WriteResponse(r *response, body interface{}, last bool) error {
|
||||
func (c *rpcCodec) WriteResponse(r *response, body interface{}, last bool) error {
|
||||
c.buf.wbuf.Reset()
|
||||
m := &codec.Message{
|
||||
Method: r.ServiceMethod,
|
||||
@@ -111,7 +111,7 @@ func (c *rpcPlusCodec) WriteResponse(r *response, body interface{}, last bool) e
|
||||
})
|
||||
}
|
||||
|
||||
func (c *rpcPlusCodec) Close() error {
|
||||
func (c *rpcCodec) Close() error {
|
||||
c.buf.Close()
|
||||
c.codec.Close()
|
||||
return c.socket.Close()
|
||||
|
@@ -38,7 +38,7 @@ func TestCodecWriteError(t *testing.T) {
|
||||
wbuf: new(bytes.Buffer),
|
||||
}
|
||||
|
||||
c := rpcPlusCodec{
|
||||
c := rpcCodec{
|
||||
buf: &rwc,
|
||||
codec: &testCodec{
|
||||
buf: rwc.wbuf,
|
||||
|
@@ -87,7 +87,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
|
||||
return
|
||||
}
|
||||
|
||||
codec := newRpcPlusCodec(&msg, sock, cf)
|
||||
codec := newRpcCodec(&msg, sock, cf)
|
||||
|
||||
// strip our headers
|
||||
hdr := make(map[string]string)
|
||||
|
@@ -9,8 +9,10 @@ import (
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
// Server is a simple micro server abstraction
|
||||
type Server interface {
|
||||
Options() Options
|
||||
Init(...Option) error
|
||||
@@ -25,12 +27,14 @@ type Server interface {
|
||||
String() string
|
||||
}
|
||||
|
||||
// Message is an async message interface
|
||||
type Message interface {
|
||||
Topic() string
|
||||
Payload() interface{}
|
||||
ContentType() string
|
||||
}
|
||||
|
||||
// Request is a synchronous request interface
|
||||
type Request interface {
|
||||
Service() string
|
||||
Method() string
|
||||
@@ -43,7 +47,7 @@ type Request interface {
|
||||
// Stream represents a stream established with a client.
|
||||
// A stream can be bidirectional which is indicated by the request.
|
||||
// The last error will be left in Error().
|
||||
// EOF indicated end of the stream.
|
||||
// EOF indicates end of the stream.
|
||||
type Stream interface {
|
||||
Context() context.Context
|
||||
Request() Request
|
||||
@@ -53,6 +57,34 @@ type Stream interface {
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Handler interface represents a request handler. It's generated
|
||||
// by passing any type of public concrete object with methods into server.NewHandler.
|
||||
// Most will pass in a struct.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// type Greeter struct {}
|
||||
//
|
||||
// func (g *Greeter) Hello(context, request, response) error {
|
||||
// return nil
|
||||
// }
|
||||
//
|
||||
type Handler interface {
|
||||
Name() string
|
||||
Handler() interface{}
|
||||
Endpoints() []*registry.Endpoint
|
||||
Options() HandlerOptions
|
||||
}
|
||||
|
||||
// Subscriber interface represents a subscription to a given topic using
|
||||
// a specific subscriber function or object with methods.
|
||||
type Subscriber interface {
|
||||
Topic() string
|
||||
Subscriber() interface{}
|
||||
Endpoints() []*registry.Endpoint
|
||||
Options() SubscriberOptions
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
type HandlerOption func(*HandlerOptions)
|
||||
|
Reference in New Issue
Block a user