Compare commits

...

42 Commits

Author SHA1 Message Date
Asim Aslam
378af01f77 update readme 2018-04-08 15:20:10 +01:00
Asim Aslam
c317547e4d bump travis 2018-04-08 12:53:57 +01:00
Asim Aslam
e55437698b misc moved to util 2018-04-08 12:37:45 +01:00
Asim Aslam
e365cad930 Merge pull request #245 from micro/register
add flags for register ttl and interval
2018-04-06 14:14:11 +01:00
Asim Aslam
56735b4427 add flags for register ttl and interval 2018-04-06 14:03:39 +01:00
Asim Aslam
73e22eb5b1 gofmt 2018-04-06 14:03:00 +01:00
Asim Aslam
c04b974311 nitpick the readme 2018-04-05 13:50:10 +01:00
Asim Aslam
75be57d6e4 syntax highlight code 2018-03-22 17:32:16 +00:00
Asim Aslam
270e9118c4 nitpick 2018-03-22 16:46:34 +00:00
Asim Aslam
5d3d61855c nitpick 2018-03-22 16:44:40 +00:00
Asim Aslam
7e0ee9ec08 include pubsub in the readme 2018-03-22 16:43:57 +00:00
Asim Aslam
2ae4214215 Merge pull request #238 from myabuyllc/registry-tcp-check
Add option to enable TCP check with Consul registry
2018-03-21 19:07:56 +00:00
Asim Aslam
edaa0a0719 Merge pull request #240 from Leon2012/master
fix bug #239
2018-03-21 18:54:52 +00:00
Shulhan
44b934d458 registry: rename context key "consul_register_tcp_check" to "consul_tcp_check" 2018-03-21 21:57:04 +07:00
Shulhan
65a90f5a21 registry.Register: use local variable to get context value 2018-03-21 18:18:48 +07:00
Shulhan
1eb4398b6c registry/consul: rename "RegisterTCPCheck" to "TCPCheck" 2018-03-21 18:17:56 +07:00
leon.peng
9b99d50396 fix bug #239 2018-03-21 03:17:38 +00:00
Shulhan
68ab671bd0 Use registry.options.Context to set Consul TCP check option 2018-03-19 20:34:56 +07:00
Shulhan
f4cdfaf27f Fix TCP address and port on service check registration 2018-03-19 20:34:12 +07:00
Asim Aslam
d486125d07 update readme 2018-03-19 10:21:46 +00:00
Shulhan
1599d717af Add option to enable TCP check with Consul registry
One disadvantage of using TTL based health check is the high network
traffic between Consul agent (either between servers, or between server
and client).

In order for the services considered alive by Consul, microservices must
send an update TTL to Consul every n seconds (currently 30 seconds).

Here is the explanation about TTL check from Consul documentation [1]

    Time to Live (TTL) - These checks retain their last known state for a
    given TTL. The state of the check must be updated periodically over
    the HTTP interface. If an external system fails to update the status
    within a given TTL, the check is set to the failed state. This
    mechanism, conceptually similar to a dead man's switch, relies on the
    application to directly report its health. For example, a healthy app
    can periodically PUT a status update to the HTTP endpoint; if the app
    fails, the TTL will expire and the health check enters a critical
    state. The endpoints used to update health information for a given
    check are the pass endpoint and the fail endpoint. TTL checks also
    persist their last known status to disk. This allows the Consul agent
    to restore the last known status of the check across restarts.
    Persisted check status is valid through the end of the TTL from the
    time of the last check.


Hint:

    TTL checks also persist their last known status to disk. This allows
    the Consul agent to restore the last known status of the check
    across restarts.

When microservices update the TTL, Consul will write to disk. Writing to
disk means all other slaves need to replicate it, which means master need
to inform other standby Consul to pull the new catalog. Hence, the
increased traffic.

More information about this issue can be viewed at Consul mailing list [2].

[1] https://www.consul.io/docs/agent/checks.html
[2] https://groups.google.com/forum/#!topic/consul-tool/84h7qmCCpjg
2018-03-14 19:40:59 +07:00
Asim Aslam
a941a4772b parallel test causes deadlock 2018-03-13 18:50:58 +00:00
Asim Aslam
dca078f30b Merge pull request #235 from shuLhan/dev-shulhan
Fix warnings from linter output
2018-03-13 18:25:37 +00:00
Shulhan
cbbf9f7e3b [test] service.TestService: run subtest in parallel
Reason: the t.Fatalf and t.Fatal must be invoked by test routine, not by
other routine, or the the test will not stopped [1].

[1] megacheck SA2002
2018-03-13 18:12:42 +07:00
Shulhan
a54dee31de [lint] service.Init: ignore error by assigning it to blank identifier 2018-03-13 17:51:33 +07:00
Shulhan
1bd541b69e service.Run: replace signal SIGKILL with SIGQUIT
According to "os/signal" documentation [1] and libc manual [2], SIGKILL
may not be caught by a program.

[1] https://godoc.org/os/signal
[2] https://www.gnu.org/software/libc/manual/html_node/Termination-Signals.html
2018-03-13 17:45:34 +07:00
Shulhan
e769802939 service.Run: simplify return statement 2018-03-13 17:40:13 +07:00
Asim Aslam
a3741f8a11 strip namespace from readme 2018-03-09 19:11:42 +00:00
Asim Aslam
6246fa2bcb Merge pull request #233 from micro/context
switch to stdlib context
2018-03-04 09:15:25 +00:00
Asim Aslam
c9b40cb33b switch to stdlib context 2018-03-03 11:53:52 +00:00
Asim Aslam
982e6068cf support services without version 2018-03-01 17:35:13 +00:00
Asim Aslam
e8b050ffd5 update travis 2018-03-01 10:07:48 +00:00
Asim Aslam
13f8e4fef7 nitpick 2018-02-28 15:40:41 +00:00
Asim Aslam
1fe528c411 update readme 2018-02-28 15:38:50 +00:00
Asim Aslam
d0d9582b81 Merge pull request #206 from darren-west/master
Added Options() to registry interface
2018-02-19 20:52:28 +00:00
Asim Aslam
42bdca63da Merge pull request #230 from micro/watch
Add watch options
2018-02-19 20:29:17 +00:00
Asim Aslam
02260dcaa3 Add watch options 2018-02-19 17:12:37 +00:00
Asim Aslam
eb7788ce25 Merge pull request #229 from tudurom/add_conflict_error
errors: Added 409 Conflict helper function
2018-02-13 08:33:22 +00:00
Tudor Roman
3b6f38a45c errors: Added 409 Conflict helper function 2018-02-11 15:51:33 +02:00
Asim Aslam
94ea766c9e syntax highlight 2018-02-01 13:54:37 +00:00
Asim Aslam
ff9ad875af update readme 2018-01-30 16:18:11 +00:00
darren-west
d970586a29 Added Options() to registry interface 2017-09-28 11:16:56 +01:00
55 changed files with 413 additions and 221 deletions

View File

@@ -1,7 +1,7 @@
language: go
go:
- 1.8.5
- 1.9.2
- 1.9.5
- 1.10.x
notifications:
slack:
secure: aEvhLbhujaGaKSrOokiG3//PaVHTIrc3fBpoRbCRqfZpyq6WREoapJJhF+tIpWWOwaC9GmChbD6aHo/jMUgwKXVyPSaNjiEL87YzUUpL8B2zslNp1rgfTg/LrzthOx3Q1TYwpaAl3to0fuHUVFX4yMeC2vuThq7WSXgMMxFCtbc=

151
README.md
View File

@@ -2,9 +2,9 @@
Go Micro is a pluggable RPC framework for distributed systems development.
The **Micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be easily swapped out. It comes with built in support for {json,proto}-rpc encoding, consul or multicast dns for service discovery, http for communication and random hashed client side load balancing.
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be easily swapped out. It comes with built in support for {json,proto}-rpc encoding, consul or multicast dns for service discovery, http for communication and random hashed client side load balancing.
Everything in go-micro is **pluggable**. You can find and contribute to plugins at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).
Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://slack.micro.mu/) community.
@@ -12,12 +12,11 @@ Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://s
Go Micro abstracts away the details of distributed systems. Here are the main features.
- **Service Discovery** - Automatic registration and name resolution with service discovery
- **Load Balancing** - Smart client side load balancing of services built on discovery
- **Synchronous Comms** - RPC based communication with support for bidirectional streaming
- **Asynchronous Comms** - PubSub interface built in for event driven architectures
- **Message Encoding** - Dynamic encoding based on content-type with protobuf and json out of the box
- **Service Interface** - All features are packaged in a simple high level interface for developing microservices
- **Service Discovery** - Automatic service registration and name resolution
- **Load Balancing** - Client side load balancing built on discovery
- **Message Encoding** - Dynamic encoding based on content-type with protobuf and json support
- **Sync Streaming** - RPC based communication with support for bidirectional streaming
- **Async Messaging** - Native PubSub messaging built in for event driven architectures
Go Micro supports both the Service and Function programming models. Read on to learn more.
@@ -35,15 +34,25 @@ Watch the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34) vid
## Getting started
- [Install Protobuf](#install-protobuf)
- [Service Discovery](#service-discovery)
- [Writing a Service](#writing-a-service)
- [Writing a Function](#writing-a-function)
- [Publish & Subscribe](#publish--subscribe)
- [Plugins](#plugins)
- [Wrappers](#wrappers)
## Install Protobuf
Protobuf is required for code generation
You'll need to install:
- [protoc-gen-micro](https://github.com/micro/protoc-gen-micro)
## Service Discovery
Service discovery is used to resolve service names to addresses. It's the only dependency of go-micro.
Service discovery is used to resolve service names to addresses.
### Consul
@@ -57,10 +66,10 @@ Discovery is pluggable. Find plugins for etcd, kubernetes, zookeeper and more in
[Multicast DNS](https://en.wikipedia.org/wiki/Multicast_DNS) is a built in service discovery plugin for a zero dependency configuration.
Pass `--registry=mdns` to any command or the enviroment variable MICRO_REGISTRY=mdns
Pass `--registry=mdns` to any command or the enviroment variable `MICRO_REGISTRY=mdns`
```
go run main.go --registry=mdns
MICRO_REGISTRY=mdns go run main.go
```
## Writing a service
@@ -91,23 +100,12 @@ message HelloResponse {
}
```
### Install protobuf
Install [protobuf](https://developers.google.com/protocol-buffers/)
Now install the micro fork of protoc-gen-go. The protobuf compiler for Go.
```shell
go get github.com/micro/protobuf/{proto,protoc-gen-go}
```
### Generate the proto
After writing the proto definition we must compile it using protoc with the micro plugin.
```shell
protoc -I$GOPATH/src --go_out=plugins=micro:$GOPATH/src \
$GOPATH/src/github.com/micro/examples/service/proto/greeter.proto
protoc --proto_path=$GOPATH/src:. --micro_out=. --go_out=. path/to/greeter.proto
```
### Write the service
@@ -125,11 +123,11 @@ It does the following:
package main
import (
"context"
"fmt"
micro "github.com/micro/go-micro"
proto "github.com/micro/examples/service/proto"
"golang.org/x/net/context"
)
type Greeter struct{}
@@ -180,20 +178,21 @@ The generated proto includes a greeter client to reduce boilerplate code.
package main
import (
"context"
"fmt"
micro "github.com/micro/go-micro"
proto "github.com/micro/examples/service/proto"
"golang.org/x/net/context"
)
func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(micro.Name("greeter.client"))
service.Init()
// Create new greeter client
greeter := proto.NewGreeterClient("greeter", service.Client())
greeter := proto.GreeterServiceClient("greeter", service.Client())
// Call the greeter
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
@@ -229,9 +228,10 @@ A Function is a one time executing Service which exits after completing a reques
package main
import (
"context"
proto "github.com/micro/examples/function/proto"
"github.com/micro/go-micro"
"golang.org/x/net/context"
)
type Greeter struct{}
@@ -244,7 +244,7 @@ func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto
func main() {
// create a new function
fnc := micro.NewFunction(
micro.Name("go.micro.fnc.greeter"),
micro.Name("greeter"),
)
// init the command line
@@ -260,6 +260,47 @@ func main() {
It's that simple.
## Publish & Subscribe
Go-micro has a built in message broker interface for event driven architectures.
PubSub operates on the same protobuf generated messages as RPC. They are encoded/decoded automatically and sent via the broker.
By default go-micro includes a point-to-point http broker but this can be swapped out via go-plugins.
### Publish
Create a new publisher with a `topic` name and service client
```go
p := micro.NewPublisher("events", service.Client())
```
Publish a proto message
```go
p.Publish(context.TODO(), &proto.Event{Name: "event"})
```
### Subscribe
Create a message handler. It's signature should be `func(context.Context, v interface{}) error`.
```go
func ProcessEvent(ctx context.Context, event *proto.Event) error {
fmt.Printf("Got event %+v\n", event)
return nil
}
```
Register the message handler with a `topic`
```go
micro.RegisterSubscriber("events", ProcessEvent)
```
See [examples/pubsub](https://github.com/micro/examples/tree/master/pubsub) for a complete example.
## Plugins
By default go-micro only provides a few implementation of each interface at the core but it's completely pluggable. There's already dozens of plugins which are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins). Contributions are welcome!
@@ -293,6 +334,60 @@ Flag usage of plugins
service --registry=etcdv3 --transport=nats --broker=kafka
```
### Plugin as option
Alternatively you can set the plugin as an option to a service
```go
import (
"github.com/micro/go-micro"
// etcd v3 registry
"github.com/micro/go-plugins/registry/etcdv3"
// nats transport
"github.com/micro/go-plugins/transport/nats"
// kafka broker
"github.com/micro/go-plugins/broker/kafka"
)
func main() {
registry := etcdv3.NewRegistry()
broker := kafka.NewBroker()
transport := nats.NewTransport()
service := micro.NewService(
micro.Name("greeter"),
micro.Registry(registry),
micro.Broker(broker),
micro.Transport(transport),
)
service.Init()
service.Run()
}
```
### Write plugins
Plugins are a concept built on Go's interface. Each package maintains a high level interface abstraction.
Simply implement the interface and pass it in as an option to the service.
The service discovery interface is called [Registry](https://godoc.org/github.com/micro/go-micro/registry#Registry).
Anything which implements this interface can be used as a registry. The same applies to the other packages.
```go
type Registry interface {
Register(*Service, ...RegisterOption) error
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch() (Watcher, error)
String() string
}
```
Browse [go-plugins](https://github.com/micro/go-plugins) to get a better idea of implementation details.
## Wrappers
Go-micro includes the notion of middleware as wrappers. The client or handlers can be wrapped using the decorator pattern.

View File

@@ -2,6 +2,7 @@ package broker
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
@@ -22,12 +23,10 @@ import (
merr "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-rcache"
maddr "github.com/micro/misc/lib/addr"
mnet "github.com/micro/misc/lib/net"
mls "github.com/micro/misc/lib/tls"
maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls"
"github.com/pborman/uuid"
"golang.org/x/net/context"
)
// HTTP Broker is a point to point async broker

View File

@@ -1,11 +1,11 @@
package broker
import (
"context"
"crypto/tls"
"github.com/micro/go-micro/broker/codec"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -1,10 +1,9 @@
package client
import (
"context"
"math"
"time"
"golang.org/x/net/context"
)
type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error)

View File

@@ -1,11 +1,10 @@
package client
import (
"context"
"math"
"testing"
"time"
"golang.org/x/net/context"
)
func TestBackoff(t *testing.T) {

View File

@@ -2,9 +2,8 @@
package client
import (
"context"
"time"
"golang.org/x/net/context"
)
// Client is the interface used to make requests to services.

View File

@@ -1,41 +1,7 @@
package client
/*
Wrapper is a type of middleware for the go-micro client. It allows
the client to be "wrapped" so that requests and responses can be intercepted
to perform extra requirements such as auth, tracing, monitoring, logging, etc.
Example usage:
import (
"log"
"github.com/micro/go-micro/client"
)
type LogWrapper struct {
client.Client
}
func (l *LogWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
log.Println("Making request to service " + req.Service() + " method " + req.Method())
return w.Client.Call(ctx, req, rsp)
}
func Wrapper(c client.Client) client.Client {
return &LogWrapper{c}
}
func main() {
c := client.NewClient(client.Wrap(Wrapper))
}
*/
import (
"golang.org/x/net/context"
"context"
)
// CallFunc represents the individual call func

View File

@@ -1,7 +1,7 @@
package client
import (
"golang.org/x/net/context"
"context"
)
type clientKey struct{}

View File

@@ -1,7 +1,7 @@
package mock
import (
"golang.org/x/net/context"
"context"
)
type responseKey struct{}

View File

@@ -1,14 +1,13 @@
package mock
import (
"context"
"fmt"
"reflect"
"sync"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/errors"
"golang.org/x/net/context"
)
var (

View File

@@ -1,11 +1,10 @@
package mock
import (
"context"
"testing"
"github.com/micro/go-micro/errors"
"golang.org/x/net/context"
)
func TestClient(t *testing.T) {

View File

@@ -1,6 +1,7 @@
package client
import (
"context"
"time"
"github.com/micro/go-micro/broker"
@@ -8,8 +9,6 @@ import (
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -1,6 +1,8 @@
package client
import "golang.org/x/net/context"
import (
"context"
)
// note that returning either false or a non-nil error will result in the call not being retried
type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)

View File

@@ -2,6 +2,7 @@ package client
import (
"bytes"
"context"
"fmt"
"sync"
"time"
@@ -12,14 +13,14 @@ import (
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
"sync/atomic"
)
type rpcClient struct {
once sync.Once
opts Options
pool *pool
seq uint64
}
func newRpcClient(opt ...Option) Client {
@@ -29,6 +30,7 @@ func newRpcClient(opt ...Option) Client {
once: sync.Once{},
opts: opts,
pool: newPool(opts.PoolSize, opts.PoolTTL),
seq: 0,
}
c := Client(rc)
@@ -85,11 +87,15 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
r.pool.release(address, c, grr)
}()
seq := r.seq
atomic.AddUint64(&r.seq, 1)
stream := &rpcStream{
context: ctx,
request: req,
closed: make(chan bool),
codec: newRpcPlusCodec(msg, c, cf),
seq: seq,
}
defer stream.Close()

View File

@@ -1,14 +1,13 @@
package client
import (
"context"
"fmt"
"testing"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
)
func TestCallWrapper(t *testing.T) {

View File

@@ -1,10 +1,9 @@
package client
import (
"context"
"io"
"sync"
"golang.org/x/net/context"
)
// Implements the streamer interface
@@ -45,7 +44,6 @@ func (r *rpcStream) Send(msg interface{}) error {
}
seq := r.seq
r.seq++
req := request{
Service: r.request.Service(),

View File

@@ -78,6 +78,16 @@ var (
EnvVar: "MICRO_CLIENT_POOL_TTL",
Usage: "Sets the client connection pool ttl. e.g 500ms, 5s, 1m. Default: 1m",
},
cli.IntFlag{
Name: "register_ttl",
EnvVar: "MICRO_REGISTER_TTL",
Usage: "Register TTL in seconds",
},
cli.IntFlag{
Name: "register_interval",
EnvVar: "MICRO_REGISTER_INTERVAL",
Usage: "Register interval in seconds",
},
cli.StringFlag{
Name: "server_name",
EnvVar: "MICRO_SERVER_NAME",
@@ -370,6 +380,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.Advertise(ctx.String("server_advertise")))
}
if ttl := time.Duration(ctx.GlobalInt("register_ttl")); ttl > 0 {
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
}
// client opts
if r := ctx.Int("client_retries"); r > 0 {
clientOpts = append(clientOpts, client.Retries(r))

View File

@@ -1,14 +1,14 @@
package cmd
import (
"context"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -91,3 +91,13 @@ func InternalServerError(id, format string, a ...interface{}) error {
Status: http.StatusText(500),
}
}
// Conflict generates a 409 error.
func Conflict(id, format string, a ...interface{}) error {
return &Error{
Id: id,
Code: 409,
Detail: fmt.Sprintf(format, a...),
Status: http.StatusText(409),
}
}

View File

@@ -1,10 +1,10 @@
package micro
import (
"context"
"time"
"github.com/micro/go-micro/server"
"golang.org/x/net/context"
)
type function struct {

View File

@@ -1,13 +1,12 @@
package micro
import (
"context"
"sync"
"testing"
"github.com/micro/go-micro/registry/mock"
proto "github.com/micro/go-micro/server/debug/proto"
"golang.org/x/net/context"
)
func TestFunction(t *testing.T) {

View File

@@ -2,10 +2,10 @@
package micro
import (
"context"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/server"
"golang.org/x/net/context"
)
type serviceKey struct{}

View File

@@ -2,7 +2,7 @@
package metadata
import (
"golang.org/x/net/context"
"context"
)
type metaKey struct{}

View File

@@ -1,9 +1,8 @@
package metadata
import (
"context"
"testing"
"golang.org/x/net/context"
)
func TestMetadataContext(t *testing.T) {

View File

@@ -1,6 +1,7 @@
package micro
import (
"context"
"time"
"github.com/micro/cli"
@@ -11,8 +12,6 @@ import (
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -1,8 +1,9 @@
package micro
import (
"context"
"github.com/micro/go-micro/client"
"golang.org/x/net/context"
)
type publisher struct {

View File

@@ -1,9 +1,11 @@
package consul
import (
"context"
"time"
consul "github.com/hashicorp/consul/api"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
func Config(c *consul.Config) registry.Option {
@@ -14,3 +16,22 @@ func Config(c *consul.Config) registry.Option {
o.Context = context.WithValue(o.Context, "consul_config", c)
}
}
//
// TCPCheck will tell the service provider to check the service address
// and port every `t` interval. It will enabled only if `t` is greater than 0.
// See `TCP + Interval` for more information [1].
//
// [1] https://www.consul.io/docs/agent/checks.html
//
func TCPCheck(t time.Duration) registry.Option {
return func(o *registry.Options) {
if t <= time.Duration(0) {
return
}
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "consul_tcp_check", t)
}
}

View File

@@ -17,7 +17,7 @@ import (
type consulRegistry struct {
Address string
Client *consul.Client
Options Options
opts Options
sync.Mutex
register map[string]uint64
@@ -94,7 +94,7 @@ func newConsulRegistry(opts ...Option) Registry {
cr := &consulRegistry{
Address: config.Address,
Client: client,
Options: options,
opts: options,
register: make(map[string]uint64),
}
@@ -115,16 +115,39 @@ func (c *consulRegistry) Deregister(s *Service) error {
return c.Client.Agent().ServiceDeregister(node.Id)
}
func getDeregisterTTL(t time.Duration) time.Duration {
// splay slightly for the watcher?
splay := time.Second * 5
deregTTL := t + splay
// consul has a minimum timeout on deregistration of 1 minute.
if t < time.Minute {
deregTTL = time.Minute + splay
}
return deregTTL
}
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
var regTCPCheck bool
var regInterval time.Duration
var options RegisterOptions
for _, o := range opts {
o(&options)
}
if c.opts.Context != nil {
if tcpCheckInterval, ok := c.opts.Context.Value("consul_tcp_check").(time.Duration); ok {
regTCPCheck = true
regInterval = tcpCheckInterval
}
}
// create hash of service; uint64
h, err := hash.Hash(s, nil)
if err != nil {
@@ -155,16 +178,19 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
var check *consul.AgentServiceCheck
// if the TTL is greater than 0 create an associated check
if options.TTL > time.Duration(0) {
// splay slightly for the watcher?
splay := time.Second * 5
deregTTL := options.TTL + splay
// consul has a minimum timeout on deregistration of 1 minute.
if options.TTL < time.Minute {
deregTTL = time.Minute + splay
if regTCPCheck {
deregTTL := getDeregisterTTL(regInterval)
check = &consul.AgentServiceCheck{
TCP: fmt.Sprintf("%s:%d", node.Address, node.Port),
Interval: fmt.Sprintf("%v", regInterval),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
}
// if the TTL is greater than 0 create an associated check
} else if options.TTL > time.Duration(0) {
deregTTL := getDeregisterTTL(options.TTL)
check = &consul.AgentServiceCheck{
TTL: fmt.Sprintf("%v", options.TTL),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
@@ -211,18 +237,18 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
}
// version is now a tag
version, found := decodeVersion(s.Service.Tags)
version, _ := decodeVersion(s.Service.Tags)
// service ID is now the node id
id := s.Service.ID
// key is always the version
key := version
// address is service address
address := s.Service.Address
// if we can't get the version we bail
// use old the old ways
if !found {
continue
// use node address
if len(address) == 0 {
address = s.Node.Address
}
svc, ok := serviceMap[key]
@@ -236,6 +262,7 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
}
var del bool
for _, check := range s.Checks {
// delete the node if the status is critical
if check.Status == "critical" {
@@ -279,10 +306,14 @@ func (c *consulRegistry) ListServices() ([]*Service, error) {
return services, nil
}
func (c *consulRegistry) Watch() (Watcher, error) {
return newConsulWatcher(c)
func (c *consulRegistry) Watch(opts ...WatchOption) (Watcher, error) {
return newConsulWatcher(c, opts...)
}
func (c *consulRegistry) String() string {
return "consul"
}
func (c *consulRegistry) Options() Options {
return c.opts
}

View File

@@ -10,6 +10,7 @@ import (
type consulWatcher struct {
r *consulRegistry
wo WatchOptions
wp *watch.Plan
watchers map[string]*watch.Plan
@@ -20,9 +21,15 @@ type consulWatcher struct {
services map[string][]*Service
}
func newConsulWatcher(cr *consulRegistry) (Watcher, error) {
func newConsulWatcher(cr *consulRegistry, opts ...WatchOption) (Watcher, error) {
var wo WatchOptions
for _, o := range opts {
o(&wo)
}
cw := &consulWatcher{
r: cr,
wo: wo,
exit: make(chan bool),
next: make(chan *Result, 10),
watchers: make(map[string]*watch.Plan),
@@ -53,7 +60,7 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
for _, e := range entries {
serviceName = e.Service.Service
// version is now a tag
version, found := decodeVersion(e.Service.Tags)
version, _ := decodeVersion(e.Service.Tags)
// service ID is now the node id
id := e.Service.ID
// key is always the version
@@ -61,9 +68,9 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
// address is service address
address := e.Service.Address
// if we can't get the version we bail
if !found {
continue
// use node address
if len(address) == 0 {
address = e.Node.Address
}
svc, ok := serviceMap[key]
@@ -185,6 +192,12 @@ func (cw *consulWatcher) handle(idx uint64, data interface{}) {
// add new watchers
for service, _ := range services {
// Filter on watch options
// wo.Service: Only watch services we care about
if len(cw.wo.Service) > 0 && service != cw.wo.Service {
continue
}
if _, ok := cw.watchers[service]; ok {
continue
}

View File

@@ -297,8 +297,14 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) {
return services, nil
}
func (m *mdnsRegistry) Watch() (registry.Watcher, error) {
func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var wo registry.WatchOptions
for _, o := range opts {
o(&wo)
}
md := &mdnsWatcher{
wo: wo,
ch: make(chan *mdns.ServiceEntry, 32),
exit: make(chan struct{}),
}
@@ -316,6 +322,10 @@ func (m *mdnsRegistry) String() string {
return "mdns"
}
func (m *mdnsRegistry) Options() registry.Options {
return m.opts
}
func NewRegistry(opts ...registry.Option) registry.Registry {
return newRegistry(opts...)
}

View File

@@ -9,6 +9,7 @@ import (
)
type mdnsWatcher struct {
wo registry.WatchOptions
ch chan *mdns.ServiceEntry
exit chan struct{}
}
@@ -26,6 +27,12 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) {
continue
}
// Filter watch options
// wo.Service: Only keep services we care about
if len(m.wo.Service) > 0 && txt.Service != m.wo.Service {
continue
}
var action string
if e.TTL == 0 {

View File

@@ -87,14 +87,22 @@ func (m *mockRegistry) Deregister(s *registry.Service) error {
return nil
}
func (m *mockRegistry) Watch() (registry.Watcher, error) {
return &mockWatcher{exit: make(chan bool)}, nil
func (m *mockRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var wopts registry.WatchOptions
for _, o := range opts {
o(&wopts)
}
return &mockWatcher{exit: make(chan bool), opts: wopts}, nil
}
func (m *mockRegistry) String() string {
return "mock"
}
func (m *mockRegistry) Options() registry.Options {
return registry.Options{}
}
func NewRegistry() registry.Registry {
m := &mockRegistry{Services: make(map[string][]*registry.Service)}
m.init()

View File

@@ -8,6 +8,7 @@ import (
type mockWatcher struct {
exit chan bool
opts registry.WatchOptions
}
func (m *mockWatcher) Next() (*registry.Result, error) {

View File

@@ -1,10 +1,9 @@
package registry
import (
"context"
"crypto/tls"
"time"
"golang.org/x/net/context"
)
type Options struct {
@@ -25,6 +24,15 @@ type RegisterOptions struct {
Context context.Context
}
type WatchOptions struct {
// Specify a service to watch
// If blank, the watch is for all services
Service string
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
// Addrs is the registry addresses to use
func Addrs(addrs ...string) Option {
return func(o *Options) {
@@ -57,3 +65,10 @@ func RegisterTTL(t time.Duration) RegisterOption {
o.TTL = t
}
}
// Watch a service
func WatchService(name string) WatchOption {
return func(o *WatchOptions) {
o.Service = name
}
}

View File

@@ -13,14 +13,17 @@ type Registry interface {
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch() (Watcher, error)
Watch(...WatchOption) (Watcher, error)
String() string
Options() Options
}
type Option func(*Options)
type RegisterOption func(*RegisterOptions)
type WatchOption func(*WatchOptions)
var (
DefaultRegistry = newConsulRegistry()
@@ -52,8 +55,8 @@ func ListServices() ([]*Service, error) {
}
// Watch returns a watcher which allows you to track updates to the registry.
func Watch() (Watcher, error) {
return DefaultRegistry.Watch()
func Watch(opts ...WatchOption) (Watcher, error) {
return DefaultRegistry.Watch(opts...)
}
func String() string {

View File

@@ -1,3 +1,4 @@
// Package cache is a caching selector. It uses the registry watcher.
package cache
import (
@@ -9,11 +10,6 @@ import (
"github.com/micro/go-micro/selector"
)
/*
Cache selector is a selector which uses the registry.Watcher to Cache service entries.
It defaults to a TTL for 1 minute and causes a cache miss on the next request.
*/
type cacheSelector struct {
so selector.Options
ttl time.Duration
@@ -23,7 +19,7 @@ type cacheSelector struct {
cache map[string][]*registry.Service
ttls map[string]time.Time
once sync.Once
watched map[string]bool
// used to close or reload watcher
reload chan bool
@@ -88,6 +84,12 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock()
defer c.Unlock()
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
c.watched[service] = true
}
// get does the actual request for a service
// it also caches it
get := func(service string) ([]*registry.Service, error) {
@@ -254,7 +256,7 @@ func (c *cacheSelector) update(res *registry.Result) {
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *cacheSelector) run() {
func (c *cacheSelector) run(name string) {
for {
// exit early if already dead
if c.quit() {
@@ -262,7 +264,9 @@ func (c *cacheSelector) run() {
}
// create new watcher
w, err := c.so.Registry.Watch()
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
@@ -332,10 +336,6 @@ func (c *cacheSelector) Options() selector.Options {
}
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
c.once.Do(func() {
go c.run()
})
sopts := selector.SelectOptions{
Strategy: c.so.Strategy,
}
@@ -377,6 +377,7 @@ func (c *cacheSelector) Reset(service string) {
func (c *cacheSelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
select {
@@ -414,11 +415,12 @@ func NewSelector(opts ...selector.Option) selector.Selector {
}
return &cacheSelector{
so: sopts,
ttl: ttl,
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
}
}

View File

@@ -1,10 +1,10 @@
package cache
import (
"context"
"time"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
)
type ttlKey struct{}

View File

@@ -1,9 +1,9 @@
package selector
import (
"github.com/micro/go-micro/registry"
"context"
"golang.org/x/net/context"
"github.com/micro/go-micro/registry"
)
type Options struct {

View File

@@ -1,7 +1,7 @@
package server
import (
"golang.org/x/net/context"
"context"
)
type serverKey struct{}

View File

@@ -1,12 +1,11 @@
package debug
import (
"context"
"runtime"
"time"
proto "github.com/micro/go-micro/server/debug/proto"
"golang.org/x/net/context"
)
// The debug handler represents an internal server handler

View File

@@ -1,11 +1,11 @@
package server
import (
"context"
"reflect"
"testing"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
type testHandler struct{}

View File

@@ -1,6 +1,7 @@
package server
import (
"context"
"time"
"github.com/micro/go-micro/broker"
@@ -8,8 +9,6 @@ import (
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server/debug"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -1,6 +1,7 @@
package server
import (
"context"
"fmt"
"runtime/debug"
"sort"
@@ -16,9 +17,7 @@ import (
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport"
"github.com/micro/misc/lib/addr"
"golang.org/x/net/context"
"github.com/micro/util/go/lib/addr"
)
type rpcServer struct {

View File

@@ -7,6 +7,7 @@ package server
// Meh, we need to get rid of this shit
import (
"context"
"errors"
"io"
"reflect"
@@ -16,7 +17,6 @@ import (
"unicode/utf8"
"github.com/micro/go-log"
"golang.org/x/net/context"
)
var (

View File

@@ -1,9 +1,8 @@
package server
import (
"context"
"sync"
"golang.org/x/net/context"
)
// Implements the Streamer interface

View File

@@ -2,13 +2,13 @@
package server
import (
"context"
"os"
"os/signal"
"syscall"
"github.com/micro/go-log"
"github.com/pborman/uuid"
"golang.org/x/net/context"
)
type Server interface {

View File

@@ -1,7 +1,7 @@
package server
import (
"golang.org/x/net/context"
"context"
)
// HandlerFunc represents a single method of a handler. It's used primarily

View File

@@ -2,6 +2,7 @@ package server
import (
"bytes"
"context"
"fmt"
"reflect"
@@ -9,7 +10,6 @@ import (
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
const (

View File

@@ -7,7 +7,8 @@ import (
"syscall"
"time"
log "github.com/micro/go-log"
"github.com/micro/cli"
"github.com/micro/go-log"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/metadata"
@@ -66,8 +67,22 @@ func (s *service) Init(opts ...Option) {
}
s.once.Do(func() {
// save user action
action := s.opts.Cmd.App().Action
// set service action
s.opts.Cmd.App().Action = func(c *cli.Context) {
// set register interval
if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
s.opts.RegisterInterval = i * time.Second
}
// user action
action(c)
}
// Initialise the command flags, overriding new service
s.opts.Cmd.Init(
_ = s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker),
cmd.Registry(&s.opts.Registry),
cmd.Transport(&s.opts.Transport),
@@ -153,7 +168,7 @@ func (s *service) Run() error {
go s.run(ex)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
select {
// wait on kill signal
@@ -165,9 +180,5 @@ func (s *service) Run() error {
// exit reg loop
close(ex)
if err := s.Stop(); err != nil {
return err
}
return nil
return s.Stop()
}

View File

@@ -1,13 +1,12 @@
package micro
import (
"context"
"sync"
"testing"
"github.com/micro/go-micro/registry/mock"
proto "github.com/micro/go-micro/server/debug/proto"
"golang.org/x/net/context"
)
func TestService(t *testing.T) {
@@ -31,35 +30,30 @@ func TestService(t *testing.T) {
// we can't test service.Init as it parses the command line
// service.Init()
// register handler
// do that later
go func() {
// wait for start
wg.Wait()
// test call debug
req := service.Client().NewRequest(
"test.service",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
err := service.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("service response: %s", rsp.Status)
}
// shutdown the service
cancel()
}()
// run service
service.Run()
go service.Run()
// wait for start
wg.Wait()
// test call debug
req := service.Client().NewRequest(
"test.service",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
err := service.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("service response: %s", rsp.Status)
}
// shutdown the service
cancel()
}

View File

@@ -14,9 +14,9 @@ import (
"time"
"github.com/micro/go-log"
maddr "github.com/micro/misc/lib/addr"
mnet "github.com/micro/misc/lib/net"
mls "github.com/micro/misc/lib/tls"
maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls"
)
type buffer struct {

View File

@@ -1,11 +1,11 @@
package transport
import (
"context"
"crypto/tls"
"time"
"github.com/micro/go-micro/transport/codec"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -1,10 +1,10 @@
package micro
import (
"context"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/metadata"
"golang.org/x/net/context"
)
type clientWrapper struct {

View File

@@ -1,11 +1,10 @@
package micro
import (
"context"
"testing"
"github.com/micro/go-micro/metadata"
"golang.org/x/net/context"
)
func TestWrapper(t *testing.T) {