Compare commits
42 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
378af01f77 | ||
|
c317547e4d | ||
|
e55437698b | ||
|
e365cad930 | ||
|
56735b4427 | ||
|
73e22eb5b1 | ||
|
c04b974311 | ||
|
75be57d6e4 | ||
|
270e9118c4 | ||
|
5d3d61855c | ||
|
7e0ee9ec08 | ||
|
2ae4214215 | ||
|
edaa0a0719 | ||
|
44b934d458 | ||
|
65a90f5a21 | ||
|
1eb4398b6c | ||
|
9b99d50396 | ||
|
68ab671bd0 | ||
|
f4cdfaf27f | ||
|
d486125d07 | ||
|
1599d717af | ||
|
a941a4772b | ||
|
dca078f30b | ||
|
cbbf9f7e3b | ||
|
a54dee31de | ||
|
1bd541b69e | ||
|
e769802939 | ||
|
a3741f8a11 | ||
|
6246fa2bcb | ||
|
c9b40cb33b | ||
|
982e6068cf | ||
|
e8b050ffd5 | ||
|
13f8e4fef7 | ||
|
1fe528c411 | ||
|
d0d9582b81 | ||
|
42bdca63da | ||
|
02260dcaa3 | ||
|
eb7788ce25 | ||
|
3b6f38a45c | ||
|
94ea766c9e | ||
|
ff9ad875af | ||
|
d970586a29 |
@@ -1,7 +1,7 @@
|
||||
language: go
|
||||
go:
|
||||
- 1.8.5
|
||||
- 1.9.2
|
||||
- 1.9.5
|
||||
- 1.10.x
|
||||
notifications:
|
||||
slack:
|
||||
secure: aEvhLbhujaGaKSrOokiG3//PaVHTIrc3fBpoRbCRqfZpyq6WREoapJJhF+tIpWWOwaC9GmChbD6aHo/jMUgwKXVyPSaNjiEL87YzUUpL8B2zslNp1rgfTg/LrzthOx3Q1TYwpaAl3to0fuHUVFX4yMeC2vuThq7WSXgMMxFCtbc=
|
||||
|
151
README.md
151
README.md
@@ -2,9 +2,9 @@
|
||||
|
||||
Go Micro is a pluggable RPC framework for distributed systems development.
|
||||
|
||||
The **Micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be easily swapped out. It comes with built in support for {json,proto}-rpc encoding, consul or multicast dns for service discovery, http for communication and random hashed client side load balancing.
|
||||
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be easily swapped out. It comes with built in support for {json,proto}-rpc encoding, consul or multicast dns for service discovery, http for communication and random hashed client side load balancing.
|
||||
|
||||
Everything in go-micro is **pluggable**. You can find and contribute to plugins at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).
|
||||
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).
|
||||
|
||||
Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://slack.micro.mu/) community.
|
||||
|
||||
@@ -12,12 +12,11 @@ Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://s
|
||||
|
||||
Go Micro abstracts away the details of distributed systems. Here are the main features.
|
||||
|
||||
- **Service Discovery** - Automatic registration and name resolution with service discovery
|
||||
- **Load Balancing** - Smart client side load balancing of services built on discovery
|
||||
- **Synchronous Comms** - RPC based communication with support for bidirectional streaming
|
||||
- **Asynchronous Comms** - PubSub interface built in for event driven architectures
|
||||
- **Message Encoding** - Dynamic encoding based on content-type with protobuf and json out of the box
|
||||
- **Service Interface** - All features are packaged in a simple high level interface for developing microservices
|
||||
- **Service Discovery** - Automatic service registration and name resolution
|
||||
- **Load Balancing** - Client side load balancing built on discovery
|
||||
- **Message Encoding** - Dynamic encoding based on content-type with protobuf and json support
|
||||
- **Sync Streaming** - RPC based communication with support for bidirectional streaming
|
||||
- **Async Messaging** - Native PubSub messaging built in for event driven architectures
|
||||
|
||||
Go Micro supports both the Service and Function programming models. Read on to learn more.
|
||||
|
||||
@@ -35,15 +34,25 @@ Watch the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34) vid
|
||||
|
||||
## Getting started
|
||||
|
||||
- [Install Protobuf](#install-protobuf)
|
||||
- [Service Discovery](#service-discovery)
|
||||
- [Writing a Service](#writing-a-service)
|
||||
- [Writing a Function](#writing-a-function)
|
||||
- [Publish & Subscribe](#publish--subscribe)
|
||||
- [Plugins](#plugins)
|
||||
- [Wrappers](#wrappers)
|
||||
|
||||
## Install Protobuf
|
||||
|
||||
Protobuf is required for code generation
|
||||
|
||||
You'll need to install:
|
||||
|
||||
- [protoc-gen-micro](https://github.com/micro/protoc-gen-micro)
|
||||
|
||||
## Service Discovery
|
||||
|
||||
Service discovery is used to resolve service names to addresses. It's the only dependency of go-micro.
|
||||
Service discovery is used to resolve service names to addresses.
|
||||
|
||||
### Consul
|
||||
|
||||
@@ -57,10 +66,10 @@ Discovery is pluggable. Find plugins for etcd, kubernetes, zookeeper and more in
|
||||
|
||||
[Multicast DNS](https://en.wikipedia.org/wiki/Multicast_DNS) is a built in service discovery plugin for a zero dependency configuration.
|
||||
|
||||
Pass `--registry=mdns` to any command or the enviroment variable MICRO_REGISTRY=mdns
|
||||
Pass `--registry=mdns` to any command or the enviroment variable `MICRO_REGISTRY=mdns`
|
||||
|
||||
```
|
||||
go run main.go --registry=mdns
|
||||
MICRO_REGISTRY=mdns go run main.go
|
||||
```
|
||||
|
||||
## Writing a service
|
||||
@@ -91,23 +100,12 @@ message HelloResponse {
|
||||
}
|
||||
```
|
||||
|
||||
### Install protobuf
|
||||
|
||||
Install [protobuf](https://developers.google.com/protocol-buffers/)
|
||||
|
||||
Now install the micro fork of protoc-gen-go. The protobuf compiler for Go.
|
||||
|
||||
```shell
|
||||
go get github.com/micro/protobuf/{proto,protoc-gen-go}
|
||||
```
|
||||
|
||||
### Generate the proto
|
||||
|
||||
After writing the proto definition we must compile it using protoc with the micro plugin.
|
||||
|
||||
```shell
|
||||
protoc -I$GOPATH/src --go_out=plugins=micro:$GOPATH/src \
|
||||
$GOPATH/src/github.com/micro/examples/service/proto/greeter.proto
|
||||
protoc --proto_path=$GOPATH/src:. --micro_out=. --go_out=. path/to/greeter.proto
|
||||
```
|
||||
|
||||
### Write the service
|
||||
@@ -125,11 +123,11 @@ It does the following:
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
micro "github.com/micro/go-micro"
|
||||
proto "github.com/micro/examples/service/proto"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Greeter struct{}
|
||||
@@ -180,20 +178,21 @@ The generated proto includes a greeter client to reduce boilerplate code.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
micro "github.com/micro/go-micro"
|
||||
proto "github.com/micro/examples/service/proto"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
||||
func main() {
|
||||
// Create a new service. Optionally include some options here.
|
||||
service := micro.NewService(micro.Name("greeter.client"))
|
||||
service.Init()
|
||||
|
||||
// Create new greeter client
|
||||
greeter := proto.NewGreeterClient("greeter", service.Client())
|
||||
greeter := proto.GreeterServiceClient("greeter", service.Client())
|
||||
|
||||
// Call the greeter
|
||||
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
|
||||
@@ -229,9 +228,10 @@ A Function is a one time executing Service which exits after completing a reques
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
proto "github.com/micro/examples/function/proto"
|
||||
"github.com/micro/go-micro"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Greeter struct{}
|
||||
@@ -244,7 +244,7 @@ func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto
|
||||
func main() {
|
||||
// create a new function
|
||||
fnc := micro.NewFunction(
|
||||
micro.Name("go.micro.fnc.greeter"),
|
||||
micro.Name("greeter"),
|
||||
)
|
||||
|
||||
// init the command line
|
||||
@@ -260,6 +260,47 @@ func main() {
|
||||
|
||||
It's that simple.
|
||||
|
||||
## Publish & Subscribe
|
||||
|
||||
Go-micro has a built in message broker interface for event driven architectures.
|
||||
|
||||
PubSub operates on the same protobuf generated messages as RPC. They are encoded/decoded automatically and sent via the broker.
|
||||
By default go-micro includes a point-to-point http broker but this can be swapped out via go-plugins.
|
||||
|
||||
### Publish
|
||||
|
||||
|
||||
Create a new publisher with a `topic` name and service client
|
||||
|
||||
```go
|
||||
p := micro.NewPublisher("events", service.Client())
|
||||
```
|
||||
|
||||
Publish a proto message
|
||||
|
||||
```go
|
||||
p.Publish(context.TODO(), &proto.Event{Name: "event"})
|
||||
```
|
||||
|
||||
### Subscribe
|
||||
|
||||
Create a message handler. It's signature should be `func(context.Context, v interface{}) error`.
|
||||
|
||||
```go
|
||||
func ProcessEvent(ctx context.Context, event *proto.Event) error {
|
||||
fmt.Printf("Got event %+v\n", event)
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
Register the message handler with a `topic`
|
||||
|
||||
```go
|
||||
micro.RegisterSubscriber("events", ProcessEvent)
|
||||
```
|
||||
|
||||
See [examples/pubsub](https://github.com/micro/examples/tree/master/pubsub) for a complete example.
|
||||
|
||||
## Plugins
|
||||
|
||||
By default go-micro only provides a few implementation of each interface at the core but it's completely pluggable. There's already dozens of plugins which are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins). Contributions are welcome!
|
||||
@@ -293,6 +334,60 @@ Flag usage of plugins
|
||||
service --registry=etcdv3 --transport=nats --broker=kafka
|
||||
```
|
||||
|
||||
### Plugin as option
|
||||
|
||||
Alternatively you can set the plugin as an option to a service
|
||||
|
||||
```go
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro"
|
||||
// etcd v3 registry
|
||||
"github.com/micro/go-plugins/registry/etcdv3"
|
||||
// nats transport
|
||||
"github.com/micro/go-plugins/transport/nats"
|
||||
// kafka broker
|
||||
"github.com/micro/go-plugins/broker/kafka"
|
||||
)
|
||||
|
||||
func main() {
|
||||
registry := etcdv3.NewRegistry()
|
||||
broker := kafka.NewBroker()
|
||||
transport := nats.NewTransport()
|
||||
|
||||
service := micro.NewService(
|
||||
micro.Name("greeter"),
|
||||
micro.Registry(registry),
|
||||
micro.Broker(broker),
|
||||
micro.Transport(transport),
|
||||
)
|
||||
|
||||
service.Init()
|
||||
service.Run()
|
||||
}
|
||||
```
|
||||
|
||||
### Write plugins
|
||||
|
||||
Plugins are a concept built on Go's interface. Each package maintains a high level interface abstraction.
|
||||
Simply implement the interface and pass it in as an option to the service.
|
||||
|
||||
The service discovery interface is called [Registry](https://godoc.org/github.com/micro/go-micro/registry#Registry).
|
||||
Anything which implements this interface can be used as a registry. The same applies to the other packages.
|
||||
|
||||
```go
|
||||
type Registry interface {
|
||||
Register(*Service, ...RegisterOption) error
|
||||
Deregister(*Service) error
|
||||
GetService(string) ([]*Service, error)
|
||||
ListServices() ([]*Service, error)
|
||||
Watch() (Watcher, error)
|
||||
String() string
|
||||
}
|
||||
```
|
||||
|
||||
Browse [go-plugins](https://github.com/micro/go-plugins) to get a better idea of implementation details.
|
||||
|
||||
## Wrappers
|
||||
|
||||
Go-micro includes the notion of middleware as wrappers. The client or handlers can be wrapped using the decorator pattern.
|
||||
|
@@ -2,6 +2,7 @@ package broker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -22,12 +23,10 @@ import (
|
||||
merr "github.com/micro/go-micro/errors"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-rcache"
|
||||
maddr "github.com/micro/misc/lib/addr"
|
||||
mnet "github.com/micro/misc/lib/net"
|
||||
mls "github.com/micro/misc/lib/tls"
|
||||
maddr "github.com/micro/util/go/lib/addr"
|
||||
mnet "github.com/micro/util/go/lib/net"
|
||||
mls "github.com/micro/util/go/lib/tls"
|
||||
"github.com/pborman/uuid"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// HTTP Broker is a point to point async broker
|
||||
|
@@ -1,11 +1,11 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
|
||||
"github.com/micro/go-micro/broker/codec"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
|
@@ -1,10 +1,9 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error)
|
||||
|
@@ -1,11 +1,10 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestBackoff(t *testing.T) {
|
||||
|
@@ -2,9 +2,8 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// Client is the interface used to make requests to services.
|
||||
|
@@ -1,41 +1,7 @@
|
||||
package client
|
||||
|
||||
/*
|
||||
Wrapper is a type of middleware for the go-micro client. It allows
|
||||
the client to be "wrapped" so that requests and responses can be intercepted
|
||||
to perform extra requirements such as auth, tracing, monitoring, logging, etc.
|
||||
|
||||
Example usage:
|
||||
|
||||
import (
|
||||
"log"
|
||||
"github.com/micro/go-micro/client"
|
||||
|
||||
)
|
||||
|
||||
type LogWrapper struct {
|
||||
client.Client
|
||||
}
|
||||
|
||||
func (l *LogWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
log.Println("Making request to service " + req.Service() + " method " + req.Method())
|
||||
return w.Client.Call(ctx, req, rsp)
|
||||
}
|
||||
|
||||
func Wrapper(c client.Client) client.Client {
|
||||
return &LogWrapper{c}
|
||||
}
|
||||
|
||||
func main() {
|
||||
c := client.NewClient(client.Wrap(Wrapper))
|
||||
|
||||
}
|
||||
|
||||
|
||||
*/
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"context"
|
||||
)
|
||||
|
||||
// CallFunc represents the individual call func
|
||||
|
@@ -1,7 +1,7 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"context"
|
||||
)
|
||||
|
||||
type clientKey struct{}
|
||||
|
@@ -1,7 +1,7 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"context"
|
||||
)
|
||||
|
||||
type responseKey struct{}
|
||||
|
@@ -1,14 +1,13 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/errors"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@@ -1,11 +1,10 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/errors"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestClient(t *testing.T) {
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
@@ -8,8 +9,6 @@ import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/selector"
|
||||
"github.com/micro/go-micro/transport"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
|
@@ -1,6 +1,8 @@
|
||||
package client
|
||||
|
||||
import "golang.org/x/net/context"
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// note that returning either false or a non-nil error will result in the call not being retried
|
||||
type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)
|
||||
|
@@ -2,6 +2,7 @@ package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -12,14 +13,14 @@ import (
|
||||
"github.com/micro/go-micro/metadata"
|
||||
"github.com/micro/go-micro/selector"
|
||||
"github.com/micro/go-micro/transport"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type rpcClient struct {
|
||||
once sync.Once
|
||||
opts Options
|
||||
pool *pool
|
||||
seq uint64
|
||||
}
|
||||
|
||||
func newRpcClient(opt ...Option) Client {
|
||||
@@ -29,6 +30,7 @@ func newRpcClient(opt ...Option) Client {
|
||||
once: sync.Once{},
|
||||
opts: opts,
|
||||
pool: newPool(opts.PoolSize, opts.PoolTTL),
|
||||
seq: 0,
|
||||
}
|
||||
|
||||
c := Client(rc)
|
||||
@@ -85,11 +87,15 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
|
||||
r.pool.release(address, c, grr)
|
||||
}()
|
||||
|
||||
seq := r.seq
|
||||
atomic.AddUint64(&r.seq, 1)
|
||||
|
||||
stream := &rpcStream{
|
||||
context: ctx,
|
||||
request: req,
|
||||
closed: make(chan bool),
|
||||
codec: newRpcPlusCodec(msg, c, cf),
|
||||
seq: seq,
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
|
@@ -1,14 +1,13 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/registry/mock"
|
||||
"github.com/micro/go-micro/selector"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestCallWrapper(t *testing.T) {
|
||||
|
@@ -1,10 +1,9 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// Implements the streamer interface
|
||||
@@ -45,7 +44,6 @@ func (r *rpcStream) Send(msg interface{}) error {
|
||||
}
|
||||
|
||||
seq := r.seq
|
||||
r.seq++
|
||||
|
||||
req := request{
|
||||
Service: r.request.Service(),
|
||||
|
14
cmd/cmd.go
14
cmd/cmd.go
@@ -78,6 +78,16 @@ var (
|
||||
EnvVar: "MICRO_CLIENT_POOL_TTL",
|
||||
Usage: "Sets the client connection pool ttl. e.g 500ms, 5s, 1m. Default: 1m",
|
||||
},
|
||||
cli.IntFlag{
|
||||
Name: "register_ttl",
|
||||
EnvVar: "MICRO_REGISTER_TTL",
|
||||
Usage: "Register TTL in seconds",
|
||||
},
|
||||
cli.IntFlag{
|
||||
Name: "register_interval",
|
||||
EnvVar: "MICRO_REGISTER_INTERVAL",
|
||||
Usage: "Register interval in seconds",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "server_name",
|
||||
EnvVar: "MICRO_SERVER_NAME",
|
||||
@@ -370,6 +380,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
serverOpts = append(serverOpts, server.Advertise(ctx.String("server_advertise")))
|
||||
}
|
||||
|
||||
if ttl := time.Duration(ctx.GlobalInt("register_ttl")); ttl > 0 {
|
||||
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
|
||||
}
|
||||
|
||||
// client opts
|
||||
if r := ctx.Int("client_retries"); r > 0 {
|
||||
clientOpts = append(clientOpts, client.Retries(r))
|
||||
|
@@ -1,14 +1,14 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/selector"
|
||||
"github.com/micro/go-micro/server"
|
||||
"github.com/micro/go-micro/transport"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
|
@@ -91,3 +91,13 @@ func InternalServerError(id, format string, a ...interface{}) error {
|
||||
Status: http.StatusText(500),
|
||||
}
|
||||
}
|
||||
|
||||
// Conflict generates a 409 error.
|
||||
func Conflict(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 409,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
Status: http.StatusText(409),
|
||||
}
|
||||
}
|
||||
|
@@ -1,10 +1,10 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/server"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type function struct {
|
||||
|
@@ -1,13 +1,12 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry/mock"
|
||||
proto "github.com/micro/go-micro/server/debug/proto"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestFunction(t *testing.T) {
|
||||
|
@@ -2,10 +2,10 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/server"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type serviceKey struct{}
|
||||
|
@@ -2,7 +2,7 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"context"
|
||||
)
|
||||
|
||||
type metaKey struct{}
|
||||
|
@@ -1,9 +1,8 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestMetadataContext(t *testing.T) {
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/micro/cli"
|
||||
@@ -11,8 +12,6 @@ import (
|
||||
"github.com/micro/go-micro/selector"
|
||||
"github.com/micro/go-micro/server"
|
||||
"github.com/micro/go-micro/transport"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
|
@@ -1,8 +1,9 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/client"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type publisher struct {
|
||||
|
@@ -1,9 +1,11 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func Config(c *consul.Config) registry.Option {
|
||||
@@ -14,3 +16,22 @@ func Config(c *consul.Config) registry.Option {
|
||||
o.Context = context.WithValue(o.Context, "consul_config", c)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// TCPCheck will tell the service provider to check the service address
|
||||
// and port every `t` interval. It will enabled only if `t` is greater than 0.
|
||||
// See `TCP + Interval` for more information [1].
|
||||
//
|
||||
// [1] https://www.consul.io/docs/agent/checks.html
|
||||
//
|
||||
func TCPCheck(t time.Duration) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if t <= time.Duration(0) {
|
||||
return
|
||||
}
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "consul_tcp_check", t)
|
||||
}
|
||||
}
|
||||
|
@@ -17,7 +17,7 @@ import (
|
||||
type consulRegistry struct {
|
||||
Address string
|
||||
Client *consul.Client
|
||||
Options Options
|
||||
opts Options
|
||||
|
||||
sync.Mutex
|
||||
register map[string]uint64
|
||||
@@ -94,7 +94,7 @@ func newConsulRegistry(opts ...Option) Registry {
|
||||
cr := &consulRegistry{
|
||||
Address: config.Address,
|
||||
Client: client,
|
||||
Options: options,
|
||||
opts: options,
|
||||
register: make(map[string]uint64),
|
||||
}
|
||||
|
||||
@@ -115,16 +115,39 @@ func (c *consulRegistry) Deregister(s *Service) error {
|
||||
return c.Client.Agent().ServiceDeregister(node.Id)
|
||||
}
|
||||
|
||||
func getDeregisterTTL(t time.Duration) time.Duration {
|
||||
// splay slightly for the watcher?
|
||||
splay := time.Second * 5
|
||||
deregTTL := t + splay
|
||||
|
||||
// consul has a minimum timeout on deregistration of 1 minute.
|
||||
if t < time.Minute {
|
||||
deregTTL = time.Minute + splay
|
||||
}
|
||||
|
||||
return deregTTL
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
}
|
||||
|
||||
var regTCPCheck bool
|
||||
var regInterval time.Duration
|
||||
|
||||
var options RegisterOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
if c.opts.Context != nil {
|
||||
if tcpCheckInterval, ok := c.opts.Context.Value("consul_tcp_check").(time.Duration); ok {
|
||||
regTCPCheck = true
|
||||
regInterval = tcpCheckInterval
|
||||
}
|
||||
}
|
||||
|
||||
// create hash of service; uint64
|
||||
h, err := hash.Hash(s, nil)
|
||||
if err != nil {
|
||||
@@ -155,16 +178,19 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
|
||||
var check *consul.AgentServiceCheck
|
||||
|
||||
// if the TTL is greater than 0 create an associated check
|
||||
if options.TTL > time.Duration(0) {
|
||||
// splay slightly for the watcher?
|
||||
splay := time.Second * 5
|
||||
deregTTL := options.TTL + splay
|
||||
// consul has a minimum timeout on deregistration of 1 minute.
|
||||
if options.TTL < time.Minute {
|
||||
deregTTL = time.Minute + splay
|
||||
if regTCPCheck {
|
||||
deregTTL := getDeregisterTTL(regInterval)
|
||||
|
||||
check = &consul.AgentServiceCheck{
|
||||
TCP: fmt.Sprintf("%s:%d", node.Address, node.Port),
|
||||
Interval: fmt.Sprintf("%v", regInterval),
|
||||
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
|
||||
}
|
||||
|
||||
// if the TTL is greater than 0 create an associated check
|
||||
} else if options.TTL > time.Duration(0) {
|
||||
deregTTL := getDeregisterTTL(options.TTL)
|
||||
|
||||
check = &consul.AgentServiceCheck{
|
||||
TTL: fmt.Sprintf("%v", options.TTL),
|
||||
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
|
||||
@@ -211,18 +237,18 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
}
|
||||
|
||||
// version is now a tag
|
||||
version, found := decodeVersion(s.Service.Tags)
|
||||
version, _ := decodeVersion(s.Service.Tags)
|
||||
// service ID is now the node id
|
||||
id := s.Service.ID
|
||||
// key is always the version
|
||||
key := version
|
||||
|
||||
// address is service address
|
||||
address := s.Service.Address
|
||||
|
||||
// if we can't get the version we bail
|
||||
// use old the old ways
|
||||
if !found {
|
||||
continue
|
||||
// use node address
|
||||
if len(address) == 0 {
|
||||
address = s.Node.Address
|
||||
}
|
||||
|
||||
svc, ok := serviceMap[key]
|
||||
@@ -236,6 +262,7 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
}
|
||||
|
||||
var del bool
|
||||
|
||||
for _, check := range s.Checks {
|
||||
// delete the node if the status is critical
|
||||
if check.Status == "critical" {
|
||||
@@ -279,10 +306,14 @@ func (c *consulRegistry) ListServices() ([]*Service, error) {
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Watch() (Watcher, error) {
|
||||
return newConsulWatcher(c)
|
||||
func (c *consulRegistry) Watch(opts ...WatchOption) (Watcher, error) {
|
||||
return newConsulWatcher(c, opts...)
|
||||
}
|
||||
|
||||
func (c *consulRegistry) String() string {
|
||||
return "consul"
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Options() Options {
|
||||
return c.opts
|
||||
}
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
type consulWatcher struct {
|
||||
r *consulRegistry
|
||||
wo WatchOptions
|
||||
wp *watch.Plan
|
||||
watchers map[string]*watch.Plan
|
||||
|
||||
@@ -20,9 +21,15 @@ type consulWatcher struct {
|
||||
services map[string][]*Service
|
||||
}
|
||||
|
||||
func newConsulWatcher(cr *consulRegistry) (Watcher, error) {
|
||||
func newConsulWatcher(cr *consulRegistry, opts ...WatchOption) (Watcher, error) {
|
||||
var wo WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
|
||||
cw := &consulWatcher{
|
||||
r: cr,
|
||||
wo: wo,
|
||||
exit: make(chan bool),
|
||||
next: make(chan *Result, 10),
|
||||
watchers: make(map[string]*watch.Plan),
|
||||
@@ -53,7 +60,7 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
|
||||
for _, e := range entries {
|
||||
serviceName = e.Service.Service
|
||||
// version is now a tag
|
||||
version, found := decodeVersion(e.Service.Tags)
|
||||
version, _ := decodeVersion(e.Service.Tags)
|
||||
// service ID is now the node id
|
||||
id := e.Service.ID
|
||||
// key is always the version
|
||||
@@ -61,9 +68,9 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
|
||||
// address is service address
|
||||
address := e.Service.Address
|
||||
|
||||
// if we can't get the version we bail
|
||||
if !found {
|
||||
continue
|
||||
// use node address
|
||||
if len(address) == 0 {
|
||||
address = e.Node.Address
|
||||
}
|
||||
|
||||
svc, ok := serviceMap[key]
|
||||
@@ -185,6 +192,12 @@ func (cw *consulWatcher) handle(idx uint64, data interface{}) {
|
||||
|
||||
// add new watchers
|
||||
for service, _ := range services {
|
||||
// Filter on watch options
|
||||
// wo.Service: Only watch services we care about
|
||||
if len(cw.wo.Service) > 0 && service != cw.wo.Service {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := cw.watchers[service]; ok {
|
||||
continue
|
||||
}
|
||||
|
@@ -297,8 +297,14 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) {
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Watch() (registry.Watcher, error) {
|
||||
func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wo registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wo)
|
||||
}
|
||||
|
||||
md := &mdnsWatcher{
|
||||
wo: wo,
|
||||
ch: make(chan *mdns.ServiceEntry, 32),
|
||||
exit: make(chan struct{}),
|
||||
}
|
||||
@@ -316,6 +322,10 @@ func (m *mdnsRegistry) String() string {
|
||||
return "mdns"
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Options() registry.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||
return newRegistry(opts...)
|
||||
}
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
)
|
||||
|
||||
type mdnsWatcher struct {
|
||||
wo registry.WatchOptions
|
||||
ch chan *mdns.ServiceEntry
|
||||
exit chan struct{}
|
||||
}
|
||||
@@ -26,6 +27,12 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter watch options
|
||||
// wo.Service: Only keep services we care about
|
||||
if len(m.wo.Service) > 0 && txt.Service != m.wo.Service {
|
||||
continue
|
||||
}
|
||||
|
||||
var action string
|
||||
|
||||
if e.TTL == 0 {
|
||||
|
@@ -87,14 +87,22 @@ func (m *mockRegistry) Deregister(s *registry.Service) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockRegistry) Watch() (registry.Watcher, error) {
|
||||
return &mockWatcher{exit: make(chan bool)}, nil
|
||||
func (m *mockRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wopts registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wopts)
|
||||
}
|
||||
return &mockWatcher{exit: make(chan bool), opts: wopts}, nil
|
||||
}
|
||||
|
||||
func (m *mockRegistry) String() string {
|
||||
return "mock"
|
||||
}
|
||||
|
||||
func (m *mockRegistry) Options() registry.Options {
|
||||
return registry.Options{}
|
||||
}
|
||||
|
||||
func NewRegistry() registry.Registry {
|
||||
m := &mockRegistry{Services: make(map[string][]*registry.Service)}
|
||||
m.init()
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
type mockWatcher struct {
|
||||
exit chan bool
|
||||
opts registry.WatchOptions
|
||||
}
|
||||
|
||||
func (m *mockWatcher) Next() (*registry.Result, error) {
|
||||
|
@@ -1,10 +1,9 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
@@ -25,6 +24,15 @@ type RegisterOptions struct {
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
type WatchOptions struct {
|
||||
// Specify a service to watch
|
||||
// If blank, the watch is for all services
|
||||
Service string
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// Addrs is the registry addresses to use
|
||||
func Addrs(addrs ...string) Option {
|
||||
return func(o *Options) {
|
||||
@@ -57,3 +65,10 @@ func RegisterTTL(t time.Duration) RegisterOption {
|
||||
o.TTL = t
|
||||
}
|
||||
}
|
||||
|
||||
// Watch a service
|
||||
func WatchService(name string) WatchOption {
|
||||
return func(o *WatchOptions) {
|
||||
o.Service = name
|
||||
}
|
||||
}
|
||||
|
@@ -13,14 +13,17 @@ type Registry interface {
|
||||
Deregister(*Service) error
|
||||
GetService(string) ([]*Service, error)
|
||||
ListServices() ([]*Service, error)
|
||||
Watch() (Watcher, error)
|
||||
Watch(...WatchOption) (Watcher, error)
|
||||
String() string
|
||||
Options() Options
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
type RegisterOption func(*RegisterOptions)
|
||||
|
||||
type WatchOption func(*WatchOptions)
|
||||
|
||||
var (
|
||||
DefaultRegistry = newConsulRegistry()
|
||||
|
||||
@@ -52,8 +55,8 @@ func ListServices() ([]*Service, error) {
|
||||
}
|
||||
|
||||
// Watch returns a watcher which allows you to track updates to the registry.
|
||||
func Watch() (Watcher, error) {
|
||||
return DefaultRegistry.Watch()
|
||||
func Watch(opts ...WatchOption) (Watcher, error) {
|
||||
return DefaultRegistry.Watch(opts...)
|
||||
}
|
||||
|
||||
func String() string {
|
||||
|
38
selector/cache/cache.go
vendored
38
selector/cache/cache.go
vendored
@@ -1,3 +1,4 @@
|
||||
// Package cache is a caching selector. It uses the registry watcher.
|
||||
package cache
|
||||
|
||||
import (
|
||||
@@ -9,11 +10,6 @@ import (
|
||||
"github.com/micro/go-micro/selector"
|
||||
)
|
||||
|
||||
/*
|
||||
Cache selector is a selector which uses the registry.Watcher to Cache service entries.
|
||||
It defaults to a TTL for 1 minute and causes a cache miss on the next request.
|
||||
*/
|
||||
|
||||
type cacheSelector struct {
|
||||
so selector.Options
|
||||
ttl time.Duration
|
||||
@@ -23,7 +19,7 @@ type cacheSelector struct {
|
||||
cache map[string][]*registry.Service
|
||||
ttls map[string]time.Time
|
||||
|
||||
once sync.Once
|
||||
watched map[string]bool
|
||||
|
||||
// used to close or reload watcher
|
||||
reload chan bool
|
||||
@@ -88,6 +84,12 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// watch service if not watched
|
||||
if _, ok := c.watched[service]; !ok {
|
||||
go c.run(service)
|
||||
c.watched[service] = true
|
||||
}
|
||||
|
||||
// get does the actual request for a service
|
||||
// it also caches it
|
||||
get := func(service string) ([]*registry.Service, error) {
|
||||
@@ -254,7 +256,7 @@ func (c *cacheSelector) update(res *registry.Result) {
|
||||
// it creates a new watcher if there's a problem
|
||||
// reloads the watcher if Init is called
|
||||
// and returns when Close is called
|
||||
func (c *cacheSelector) run() {
|
||||
func (c *cacheSelector) run(name string) {
|
||||
for {
|
||||
// exit early if already dead
|
||||
if c.quit() {
|
||||
@@ -262,7 +264,9 @@ func (c *cacheSelector) run() {
|
||||
}
|
||||
|
||||
// create new watcher
|
||||
w, err := c.so.Registry.Watch()
|
||||
w, err := c.so.Registry.Watch(
|
||||
registry.WatchService(name),
|
||||
)
|
||||
if err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
@@ -332,10 +336,6 @@ func (c *cacheSelector) Options() selector.Options {
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
c.once.Do(func() {
|
||||
go c.run()
|
||||
})
|
||||
|
||||
sopts := selector.SelectOptions{
|
||||
Strategy: c.so.Strategy,
|
||||
}
|
||||
@@ -377,6 +377,7 @@ func (c *cacheSelector) Reset(service string) {
|
||||
func (c *cacheSelector) Close() error {
|
||||
c.Lock()
|
||||
c.cache = make(map[string][]*registry.Service)
|
||||
c.watched = make(map[string]bool)
|
||||
c.Unlock()
|
||||
|
||||
select {
|
||||
@@ -414,11 +415,12 @@ func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
}
|
||||
|
||||
return &cacheSelector{
|
||||
so: sopts,
|
||||
ttl: ttl,
|
||||
cache: make(map[string][]*registry.Service),
|
||||
ttls: make(map[string]time.Time),
|
||||
reload: make(chan bool, 1),
|
||||
exit: make(chan bool),
|
||||
so: sopts,
|
||||
ttl: ttl,
|
||||
watched: make(map[string]bool),
|
||||
cache: make(map[string][]*registry.Service),
|
||||
ttls: make(map[string]time.Time),
|
||||
reload: make(chan bool, 1),
|
||||
exit: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
2
selector/cache/options.go
vendored
2
selector/cache/options.go
vendored
@@ -1,10 +1,10 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/selector"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type ttlKey struct{}
|
||||
|
@@ -1,9 +1,9 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
"context"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
|
@@ -1,7 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"context"
|
||||
)
|
||||
|
||||
type serverKey struct{}
|
||||
|
@@ -1,12 +1,11 @@
|
||||
package debug
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
proto "github.com/micro/go-micro/server/debug/proto"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// The debug handler represents an internal server handler
|
||||
|
@@ -1,11 +1,11 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type testHandler struct{}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
@@ -8,8 +9,6 @@ import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/server/debug"
|
||||
"github.com/micro/go-micro/transport"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
@@ -16,9 +17,7 @@ import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/transport"
|
||||
|
||||
"github.com/micro/misc/lib/addr"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"github.com/micro/util/go/lib/addr"
|
||||
)
|
||||
|
||||
type rpcServer struct {
|
||||
|
@@ -7,6 +7,7 @@ package server
|
||||
// Meh, we need to get rid of this shit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"reflect"
|
||||
@@ -16,7 +17,6 @@ import (
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/micro/go-log"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@@ -1,9 +1,8 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// Implements the Streamer interface
|
||||
|
@@ -2,13 +2,13 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/micro/go-log"
|
||||
"github.com/pborman/uuid"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Server interface {
|
||||
|
@@ -1,7 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"context"
|
||||
)
|
||||
|
||||
// HandlerFunc represents a single method of a handler. It's used primarily
|
||||
|
@@ -2,6 +2,7 @@ package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
@@ -9,7 +10,6 @@ import (
|
||||
"github.com/micro/go-micro/codec"
|
||||
"github.com/micro/go-micro/metadata"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
|
27
service.go
27
service.go
@@ -7,7 +7,8 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
log "github.com/micro/go-log"
|
||||
"github.com/micro/cli"
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/cmd"
|
||||
"github.com/micro/go-micro/metadata"
|
||||
@@ -66,8 +67,22 @@ func (s *service) Init(opts ...Option) {
|
||||
}
|
||||
|
||||
s.once.Do(func() {
|
||||
// save user action
|
||||
action := s.opts.Cmd.App().Action
|
||||
|
||||
// set service action
|
||||
s.opts.Cmd.App().Action = func(c *cli.Context) {
|
||||
// set register interval
|
||||
if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
|
||||
s.opts.RegisterInterval = i * time.Second
|
||||
}
|
||||
|
||||
// user action
|
||||
action(c)
|
||||
}
|
||||
|
||||
// Initialise the command flags, overriding new service
|
||||
s.opts.Cmd.Init(
|
||||
_ = s.opts.Cmd.Init(
|
||||
cmd.Broker(&s.opts.Broker),
|
||||
cmd.Registry(&s.opts.Registry),
|
||||
cmd.Transport(&s.opts.Transport),
|
||||
@@ -153,7 +168,7 @@ func (s *service) Run() error {
|
||||
go s.run(ex)
|
||||
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
|
||||
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
|
||||
select {
|
||||
// wait on kill signal
|
||||
@@ -165,9 +180,5 @@ func (s *service) Run() error {
|
||||
// exit reg loop
|
||||
close(ex)
|
||||
|
||||
if err := s.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.Stop()
|
||||
}
|
||||
|
@@ -1,13 +1,12 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry/mock"
|
||||
proto "github.com/micro/go-micro/server/debug/proto"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestService(t *testing.T) {
|
||||
@@ -31,35 +30,30 @@ func TestService(t *testing.T) {
|
||||
// we can't test service.Init as it parses the command line
|
||||
// service.Init()
|
||||
|
||||
// register handler
|
||||
// do that later
|
||||
|
||||
go func() {
|
||||
// wait for start
|
||||
wg.Wait()
|
||||
|
||||
// test call debug
|
||||
req := service.Client().NewRequest(
|
||||
"test.service",
|
||||
"Debug.Health",
|
||||
new(proto.HealthRequest),
|
||||
)
|
||||
|
||||
rsp := new(proto.HealthResponse)
|
||||
|
||||
err := service.Client().Call(context.TODO(), req, rsp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if rsp.Status != "ok" {
|
||||
t.Fatalf("service response: %s", rsp.Status)
|
||||
}
|
||||
|
||||
// shutdown the service
|
||||
cancel()
|
||||
}()
|
||||
|
||||
// run service
|
||||
service.Run()
|
||||
go service.Run()
|
||||
|
||||
// wait for start
|
||||
wg.Wait()
|
||||
|
||||
// test call debug
|
||||
req := service.Client().NewRequest(
|
||||
"test.service",
|
||||
"Debug.Health",
|
||||
new(proto.HealthRequest),
|
||||
)
|
||||
|
||||
rsp := new(proto.HealthResponse)
|
||||
|
||||
err := service.Client().Call(context.TODO(), req, rsp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if rsp.Status != "ok" {
|
||||
t.Fatalf("service response: %s", rsp.Status)
|
||||
}
|
||||
|
||||
// shutdown the service
|
||||
cancel()
|
||||
}
|
||||
|
@@ -14,9 +14,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-log"
|
||||
maddr "github.com/micro/misc/lib/addr"
|
||||
mnet "github.com/micro/misc/lib/net"
|
||||
mls "github.com/micro/misc/lib/tls"
|
||||
maddr "github.com/micro/util/go/lib/addr"
|
||||
mnet "github.com/micro/util/go/lib/net"
|
||||
mls "github.com/micro/util/go/lib/tls"
|
||||
)
|
||||
|
||||
type buffer struct {
|
||||
|
@@ -1,11 +1,11 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/transport/codec"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
|
@@ -1,10 +1,10 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/metadata"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type clientWrapper struct {
|
||||
|
@@ -1,11 +1,10 @@
|
||||
package micro
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/metadata"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestWrapper(t *testing.T) {
|
||||
|
Reference in New Issue
Block a user