Compare commits
46 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
98bb4a69c2 | ||
|
e69413b763 | ||
|
45f18042b7 | ||
|
0672b051cc | ||
|
3c496720cc | ||
|
881cb570d5 | ||
|
c6a2c8de6c | ||
|
71bacf6991 | ||
|
a0b257b572 | ||
|
a082c151f0 | ||
|
302ab42a97 | ||
|
531d4dd24a | ||
|
1c401a852e | ||
|
25e6dcc9b6 | ||
|
4006d9f102 | ||
|
4c821baab4 | ||
|
c8a35afc92 | ||
|
54f67db275 | ||
|
fd04722706 | ||
|
4cee1f19f6 | ||
|
ef8b5e28b0 | ||
|
818f150b25 | ||
|
446d3fc72e | ||
|
240052246f | ||
|
156a51ab10 | ||
|
52a4beb072 | ||
|
395d70cf01 | ||
|
3732dc2f42 | ||
|
9d3cb65daa | ||
|
a0d3917832 | ||
|
9968c7d007 | ||
|
68f5e71153 | ||
|
af328ee7b4 | ||
|
eebaa64d8c | ||
|
88505388c1 | ||
|
8a778644cf | ||
|
d3a76e646a | ||
|
cfa824bc5f | ||
|
39be61685c | ||
|
ac2106ced7 | ||
|
1b4f7d8a68 | ||
|
5eb2e79b86 | ||
|
a2eff9918e | ||
|
52a470532d | ||
|
cd9441fafb | ||
|
356cf82af5 |
@@ -1,7 +1,7 @@
|
||||
language: go
|
||||
go:
|
||||
- 1.9.5
|
||||
- 1.10.x
|
||||
- 1.11.x
|
||||
notifications:
|
||||
slack:
|
||||
secure: aEvhLbhujaGaKSrOokiG3//PaVHTIrc3fBpoRbCRqfZpyq6WREoapJJhF+tIpWWOwaC9GmChbD6aHo/jMUgwKXVyPSaNjiEL87YzUUpL8B2zslNp1rgfTg/LrzthOx3Q1TYwpaAl3to0fuHUVFX4yMeC2vuThq7WSXgMMxFCtbc=
|
||||
|
432
README.md
432
README.md
@@ -18,437 +18,9 @@ Go Micro abstracts away the details of distributed systems. Here are the main fe
|
||||
- **Sync Streaming** - RPC based communication with support for bidirectional streaming
|
||||
- **Async Messaging** - Native PubSub messaging built in for event driven architectures
|
||||
|
||||
Go Micro supports both the Service and Function programming models. Read on to learn more.
|
||||
## Getting Started
|
||||
|
||||
## Docs
|
||||
|
||||
For more detailed information on the architecture, installation and use of go-micro checkout the [docs](https://micro.mu/docs).
|
||||
|
||||
## Learn By Example
|
||||
|
||||
An example service can be found in [**examples/service**](https://github.com/micro/examples/tree/master/service) and function in [**examples/function**](https://github.com/micro/examples/tree/master/function).
|
||||
|
||||
The [**examples**](https://github.com/micro/examples) directory contains examples for using things such as middleware/wrappers, selector filters, pub/sub, grpc, plugins and much more. For the complete greeter example look at [**examples/greeter**](https://github.com/micro/examples/tree/master/greeter). Other examples can be found throughout the GitHub repository.
|
||||
|
||||
Watch the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34) video for a high level overview.
|
||||
|
||||
## Getting started
|
||||
|
||||
- [Install Protobuf](#install-protobuf)
|
||||
- [Service Discovery](#service-discovery)
|
||||
- [Writing a Service](#writing-a-service)
|
||||
- [Writing a Function](#writing-a-function)
|
||||
- [Publish & Subscribe](#publish--subscribe)
|
||||
- [Plugins](#plugins)
|
||||
- [Wrappers](#wrappers)
|
||||
|
||||
## Install Protobuf
|
||||
|
||||
Protobuf is required for code generation
|
||||
|
||||
You'll need to install:
|
||||
|
||||
- [protoc-gen-micro](https://github.com/micro/protoc-gen-micro)
|
||||
|
||||
## Service Discovery
|
||||
|
||||
Service discovery is used to resolve service names to addresses.
|
||||
|
||||
### Consul
|
||||
|
||||
[Consul](https://www.consul.io/) is used as the default service discovery system.
|
||||
|
||||
Discovery is pluggable. Find plugins for etcd, kubernetes, zookeeper and more in the [micro/go-plugins](https://github.com/micro/go-plugins) repo.
|
||||
|
||||
[Install guide](https://www.consul.io/intro/getting-started/install.html)
|
||||
|
||||
### Multicast DNS
|
||||
|
||||
[Multicast DNS](https://en.wikipedia.org/wiki/Multicast_DNS) is a built in service discovery plugin for a zero dependency configuration.
|
||||
|
||||
Pass `--registry=mdns` to any command or the enviroment variable `MICRO_REGISTRY=mdns`
|
||||
|
||||
```
|
||||
MICRO_REGISTRY=mdns go run main.go
|
||||
```
|
||||
|
||||
## Writing a service
|
||||
|
||||
This is a simple greeter RPC service example
|
||||
|
||||
Find this example at [examples/service](https://github.com/micro/examples/tree/master/service).
|
||||
|
||||
### Create service proto
|
||||
|
||||
One of the key requirements of microservices is strongly defined interfaces. Micro uses protobuf to achieve this.
|
||||
|
||||
Here we define the Greeter handler with the method Hello. It takes a HelloRequest and HelloResponse both with one string arguments.
|
||||
|
||||
```proto
|
||||
syntax = "proto3";
|
||||
|
||||
service Greeter {
|
||||
rpc Hello(HelloRequest) returns (HelloResponse) {}
|
||||
}
|
||||
|
||||
message HelloRequest {
|
||||
string name = 1;
|
||||
}
|
||||
|
||||
message HelloResponse {
|
||||
string greeting = 2;
|
||||
}
|
||||
```
|
||||
|
||||
### Generate the proto
|
||||
|
||||
After writing the proto definition we must compile it using protoc with the micro plugin.
|
||||
|
||||
```shell
|
||||
protoc --proto_path=$GOPATH/src:. --micro_out=. --go_out=. path/to/greeter.proto
|
||||
```
|
||||
|
||||
### Write the service
|
||||
|
||||
Below is the code for the greeter service.
|
||||
|
||||
It does the following:
|
||||
|
||||
1. Implements the interface defined for the Greeter handler
|
||||
2. Initialises a micro.Service
|
||||
3. Registers the Greeter handler
|
||||
4. Runs the service
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
micro "github.com/micro/go-micro"
|
||||
proto "github.com/micro/examples/service/proto"
|
||||
)
|
||||
|
||||
type Greeter struct{}
|
||||
|
||||
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
|
||||
rsp.Greeting = "Hello " + req.Name
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Create a new service. Optionally include some options here.
|
||||
service := micro.NewService(
|
||||
micro.Name("greeter"),
|
||||
)
|
||||
|
||||
// Init will parse the command line flags.
|
||||
service.Init()
|
||||
|
||||
// Register handler
|
||||
proto.RegisterGreeterHandler(service.Server(), new(Greeter))
|
||||
|
||||
// Run the server
|
||||
if err := service.Run(); err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Run service
|
||||
```
|
||||
go run examples/service/main.go
|
||||
```
|
||||
|
||||
Output
|
||||
```
|
||||
2016/03/14 10:59:14 Listening on [::]:50137
|
||||
2016/03/14 10:59:14 Broker Listening on [::]:50138
|
||||
2016/03/14 10:59:14 Registering node: greeter-ca62b017-e9d3-11e5-9bbb-68a86d0d36b6
|
||||
```
|
||||
|
||||
### Define a client
|
||||
|
||||
Below is the client code to query the greeter service.
|
||||
|
||||
The generated proto includes a greeter client to reduce boilerplate code.
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
micro "github.com/micro/go-micro"
|
||||
proto "github.com/micro/examples/service/proto"
|
||||
)
|
||||
|
||||
|
||||
func main() {
|
||||
// Create a new service. Optionally include some options here.
|
||||
service := micro.NewService(micro.Name("greeter.client"))
|
||||
service.Init()
|
||||
|
||||
// Create new greeter client
|
||||
greeter := proto.NewGreeterService("greeter", service.Client())
|
||||
|
||||
// Call the greeter
|
||||
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
// Print response
|
||||
fmt.Println(rsp.Greeting)
|
||||
}
|
||||
```
|
||||
|
||||
### Run the client
|
||||
|
||||
```shell
|
||||
go run client.go
|
||||
```
|
||||
|
||||
Output
|
||||
```
|
||||
Hello John
|
||||
```
|
||||
|
||||
## Writing a Function
|
||||
|
||||
Go Micro includes the Function programming model.
|
||||
|
||||
A Function is a one time executing Service which exits after completing a request.
|
||||
|
||||
### Defining a Function
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
proto "github.com/micro/examples/function/proto"
|
||||
"github.com/micro/go-micro"
|
||||
)
|
||||
|
||||
type Greeter struct{}
|
||||
|
||||
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
|
||||
rsp.Greeting = "Hello " + req.Name
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
// create a new function
|
||||
fnc := micro.NewFunction(
|
||||
micro.Name("greeter"),
|
||||
)
|
||||
|
||||
// init the command line
|
||||
fnc.Init()
|
||||
|
||||
// register a handler
|
||||
fnc.Handle(new(Greeter))
|
||||
|
||||
// run the function
|
||||
fnc.Run()
|
||||
}
|
||||
```
|
||||
|
||||
It's that simple.
|
||||
|
||||
## Publish & Subscribe
|
||||
|
||||
Go-micro has a built in message broker interface for event driven architectures.
|
||||
|
||||
PubSub operates on the same protobuf generated messages as RPC. They are encoded/decoded automatically and sent via the broker.
|
||||
By default go-micro includes a point-to-point http broker but this can be swapped out via go-plugins.
|
||||
|
||||
### Publish
|
||||
|
||||
|
||||
Create a new publisher with a `topic` name and service client
|
||||
|
||||
```go
|
||||
p := micro.NewPublisher("events", service.Client())
|
||||
```
|
||||
|
||||
Publish a proto message
|
||||
|
||||
```go
|
||||
p.Publish(context.TODO(), &proto.Event{Name: "event"})
|
||||
```
|
||||
|
||||
### Subscribe
|
||||
|
||||
Create a message handler. It's signature should be `func(context.Context, v interface{}) error`.
|
||||
|
||||
```go
|
||||
func ProcessEvent(ctx context.Context, event *proto.Event) error {
|
||||
fmt.Printf("Got event %+v\n", event)
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
Register the message handler with a `topic`
|
||||
|
||||
```go
|
||||
micro.RegisterSubscriber("events", ProcessEvent)
|
||||
```
|
||||
|
||||
See [examples/pubsub](https://github.com/micro/examples/tree/master/pubsub) for a complete example.
|
||||
|
||||
## Plugins
|
||||
|
||||
By default go-micro only provides a few implementation of each interface at the core but it's completely pluggable. There's already dozens of plugins which are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins). Contributions are welcome!
|
||||
|
||||
### Build with plugins
|
||||
|
||||
If you want to integrate plugins simply link them in a separate file and rebuild
|
||||
|
||||
Create a plugins.go file
|
||||
|
||||
```go
|
||||
import (
|
||||
// etcd v3 registry
|
||||
_ "github.com/micro/go-plugins/registry/etcdv3"
|
||||
// nats transport
|
||||
_ "github.com/micro/go-plugins/transport/nats"
|
||||
// kafka broker
|
||||
_ "github.com/micro/go-plugins/broker/kafka"
|
||||
)
|
||||
```
|
||||
|
||||
Build binary
|
||||
|
||||
```shell
|
||||
// For local use
|
||||
go build -i -o service ./main.go ./plugins.go
|
||||
```
|
||||
|
||||
Flag usage of plugins
|
||||
```shell
|
||||
service --registry=etcdv3 --transport=nats --broker=kafka
|
||||
```
|
||||
|
||||
### Plugin as option
|
||||
|
||||
Alternatively you can set the plugin as an option to a service
|
||||
|
||||
```go
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro"
|
||||
// etcd v3 registry
|
||||
"github.com/micro/go-plugins/registry/etcdv3"
|
||||
// nats transport
|
||||
"github.com/micro/go-plugins/transport/nats"
|
||||
// kafka broker
|
||||
"github.com/micro/go-plugins/broker/kafka"
|
||||
)
|
||||
|
||||
func main() {
|
||||
registry := etcdv3.NewRegistry()
|
||||
broker := kafka.NewBroker()
|
||||
transport := nats.NewTransport()
|
||||
|
||||
service := micro.NewService(
|
||||
micro.Name("greeter"),
|
||||
micro.Registry(registry),
|
||||
micro.Broker(broker),
|
||||
micro.Transport(transport),
|
||||
)
|
||||
|
||||
service.Init()
|
||||
service.Run()
|
||||
}
|
||||
```
|
||||
|
||||
### Write plugins
|
||||
|
||||
Plugins are a concept built on Go's interface. Each package maintains a high level interface abstraction.
|
||||
Simply implement the interface and pass it in as an option to the service.
|
||||
|
||||
The service discovery interface is called [Registry](https://godoc.org/github.com/micro/go-micro/registry#Registry).
|
||||
Anything which implements this interface can be used as a registry. The same applies to the other packages.
|
||||
|
||||
```go
|
||||
type Registry interface {
|
||||
Register(*Service, ...RegisterOption) error
|
||||
Deregister(*Service) error
|
||||
GetService(string) ([]*Service, error)
|
||||
ListServices() ([]*Service, error)
|
||||
Watch() (Watcher, error)
|
||||
String() string
|
||||
}
|
||||
```
|
||||
|
||||
Browse [go-plugins](https://github.com/micro/go-plugins) to get a better idea of implementation details.
|
||||
|
||||
## Wrappers
|
||||
|
||||
Go-micro includes the notion of middleware as wrappers. The client or handlers can be wrapped using the decorator pattern.
|
||||
|
||||
### Handler
|
||||
|
||||
Here's an example service handler wrapper which logs the incoming request
|
||||
|
||||
```go
|
||||
// implements the server.HandlerWrapper
|
||||
func logWrapper(fn server.HandlerFunc) server.HandlerFunc {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
fmt.Printf("[%v] server request: %s", time.Now(), req.Method())
|
||||
return fn(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
It can be initialised when creating the service
|
||||
|
||||
```go
|
||||
service := micro.NewService(
|
||||
micro.Name("greeter"),
|
||||
// wrap the handler
|
||||
micro.WrapHandler(logWrapper),
|
||||
)
|
||||
```
|
||||
|
||||
### Client
|
||||
|
||||
Here's an example of a client wrapper which logs requests made
|
||||
|
||||
```go
|
||||
type logWrapper struct {
|
||||
client.Client
|
||||
}
|
||||
|
||||
func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
fmt.Printf("[wrapper] client request to service: %s method: %s\n", req.Service(), req.Method())
|
||||
return l.Client.Call(ctx, req, rsp)
|
||||
}
|
||||
|
||||
// implements client.Wrapper as logWrapper
|
||||
func logWrap(c client.Client) client.Client {
|
||||
return &logWrapper{c}
|
||||
}
|
||||
```
|
||||
|
||||
It can be initialised when creating the service
|
||||
|
||||
```go
|
||||
service := micro.NewService(
|
||||
micro.Name("greeter"),
|
||||
// wrap the client
|
||||
micro.WrapClient(logWrap),
|
||||
)
|
||||
```
|
||||
|
||||
## Other Languages
|
||||
|
||||
Check out [ja-micro](https://github.com/Sixt/ja-micro) to write services in Java
|
||||
For detailed information on the architecture, installation and use of go-micro checkout the [docs](https://micro.mu/docs).
|
||||
|
||||
## Sponsors
|
||||
|
||||
|
@@ -176,7 +176,7 @@ func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
|
||||
for _, sub := range h.subscribers[s.topic] {
|
||||
// deregister and skip forward
|
||||
if sub.id == s.id {
|
||||
h.r.Deregister(sub.svc)
|
||||
_ = h.r.Deregister(sub.svc)
|
||||
continue
|
||||
}
|
||||
// keep subscriber
|
||||
@@ -200,7 +200,7 @@ func (h *httpBroker) run(l net.Listener) {
|
||||
h.RLock()
|
||||
for _, subs := range h.subscribers {
|
||||
for _, sub := range subs {
|
||||
h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
|
||||
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
|
||||
}
|
||||
}
|
||||
h.RUnlock()
|
||||
@@ -210,7 +210,7 @@ func (h *httpBroker) run(l net.Listener) {
|
||||
h.RLock()
|
||||
for _, subs := range h.subscribers {
|
||||
for _, sub := range subs {
|
||||
h.r.Deregister(sub.svc)
|
||||
_ = h.r.Deregister(sub.svc)
|
||||
}
|
||||
}
|
||||
h.RUnlock()
|
||||
@@ -276,12 +276,15 @@ func (h *httpBroker) Address() string {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Connect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if h.running {
|
||||
h.RUnlock()
|
||||
return nil
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
var l net.Listener
|
||||
var err error
|
||||
@@ -351,12 +354,16 @@ func (h *httpBroker) Connect() error {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Disconnect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if !h.running {
|
||||
h.RUnlock()
|
||||
return nil
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
// stop rcache
|
||||
rc, ok := h.r.(rcache.Cache)
|
||||
@@ -375,17 +382,24 @@ func (h *httpBroker) Disconnect() error {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Init(opts ...Option) error {
|
||||
h.RLock()
|
||||
if h.running {
|
||||
h.RUnlock()
|
||||
return errors.New("cannot init while connected")
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if h.running {
|
||||
return errors.New("cannot init while connected")
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
}
|
||||
|
||||
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
|
||||
h.address = h.opts.Addrs[0]
|
||||
}
|
||||
|
||||
if len(h.id) == 0 {
|
||||
h.id = "broker-" + uuid.NewUUID().String()
|
||||
}
|
||||
|
@@ -68,7 +68,7 @@ var (
|
||||
// DefaultBackoff is the default backoff function for retries
|
||||
DefaultBackoff = exponentialBackoff
|
||||
// DefaultRetry is the default check-for-retry function for retries
|
||||
DefaultRetry = alwaysRetry
|
||||
DefaultRetry = RetryOnError
|
||||
// DefaultRetries is the default number of times a request is tried
|
||||
DefaultRetries = 1
|
||||
// DefaultRequestTimeout is the default request timeout
|
||||
|
@@ -80,8 +80,12 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
|
||||
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
|
||||
v = reflect.Indirect(v)
|
||||
}
|
||||
response := r.Response
|
||||
if t := reflect.TypeOf(r.Response); t.Kind() == reflect.Func {
|
||||
response = reflect.ValueOf(r.Response).Call([]reflect.Value{})[0].Interface()
|
||||
}
|
||||
|
||||
v.Set(reflect.ValueOf(r.Response))
|
||||
v.Set(reflect.ValueOf(response))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@@ -16,6 +16,8 @@ func TestClient(t *testing.T) {
|
||||
{Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
|
||||
{Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
|
||||
{Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
|
||||
{Method: "Foo.Func", Response: func() string { return "string" }},
|
||||
{Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
|
||||
}
|
||||
|
||||
c := NewClient(Response("go.mock", response))
|
||||
|
@@ -2,12 +2,34 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/errors"
|
||||
)
|
||||
|
||||
// note that returning either false or a non-nil error will result in the call not being retried
|
||||
type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)
|
||||
|
||||
// always retry on error
|
||||
func alwaysRetry(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
|
||||
// RetryAlways always retry on error
|
||||
func RetryAlways(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// RetryOnError retries a request on a 500 or timeout error
|
||||
func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
e := errors.Parse(err.Error())
|
||||
if e == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
switch e.Code {
|
||||
// retry on timeout or internal server error
|
||||
case 408, 500:
|
||||
return true, nil
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
@@ -7,6 +7,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/codec"
|
||||
"github.com/micro/go-micro/errors"
|
||||
@@ -14,7 +16,6 @@ import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/selector"
|
||||
"github.com/micro/go-micro/transport"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type rpcClient struct {
|
||||
@@ -88,7 +89,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
|
||||
r.pool.release(address, c, grr)
|
||||
}()
|
||||
|
||||
seq := r.seq
|
||||
seq := atomic.LoadUint64(&r.seq)
|
||||
atomic.AddUint64(&r.seq, 1)
|
||||
|
||||
stream := &rpcStream{
|
||||
@@ -159,7 +160,15 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
|
||||
c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout))
|
||||
dOpts := []transport.DialOption{
|
||||
transport.WithStream(),
|
||||
}
|
||||
|
||||
if opts.DialTimeout >= 0 {
|
||||
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
|
||||
}
|
||||
|
||||
c, err := r.opts.Transport.Dial(address, dOpts...)
|
||||
if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
|
||||
}
|
||||
@@ -230,9 +239,9 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
|
||||
// get next nodes from the selector
|
||||
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
|
||||
if err != nil && err == selector.ErrNotFound {
|
||||
return nil, errors.NotFound("go.micro.client", err.Error())
|
||||
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
return next, nil
|
||||
@@ -282,7 +291,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
// call backoff first. Someone may want an initial start delay
|
||||
t, err := callOpts.Backoff(ctx, request, i)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
|
||||
}
|
||||
|
||||
// only sleep if greater than 0
|
||||
@@ -293,9 +302,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
// select next node
|
||||
node, err := next()
|
||||
if err != nil && err == selector.ErrNotFound {
|
||||
return errors.NotFound("go.micro.client", err.Error())
|
||||
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
// set the address
|
||||
@@ -314,9 +323,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
var gerr error
|
||||
|
||||
for i := 0; i <= callOpts.Retries; i++ {
|
||||
go func() {
|
||||
go func(i int) {
|
||||
ch <- call(i)
|
||||
}()
|
||||
}(i)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -355,18 +364,6 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check if we already have a deadline
|
||||
d, ok := ctx.Deadline()
|
||||
if !ok {
|
||||
// no deadline so we create a new one
|
||||
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
|
||||
} else {
|
||||
// got a deadline so no need to setup context
|
||||
// but we need to set the timeout we pass along
|
||||
opt := WithRequestTimeout(d.Sub(time.Now()))
|
||||
opt(&callOpts)
|
||||
}
|
||||
|
||||
// should we noop right here?
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -378,7 +375,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
// call backoff first. Someone may want an initial start delay
|
||||
t, err := callOpts.Backoff(ctx, request, i)
|
||||
if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
|
||||
}
|
||||
|
||||
// only sleep if greater than 0
|
||||
@@ -388,9 +385,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
|
||||
node, err := next()
|
||||
if err != nil && err == selector.ErrNotFound {
|
||||
return nil, errors.NotFound("go.micro.client", err.Error())
|
||||
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
address := node.Address
|
||||
|
@@ -2,10 +2,10 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/errors"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/registry/mock"
|
||||
"github.com/micro/go-micro/selector"
|
||||
@@ -69,7 +69,7 @@ func TestCallRetry(t *testing.T) {
|
||||
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
||||
called++
|
||||
if called == 1 {
|
||||
return errors.New("retry request")
|
||||
return errors.InternalServerError("test.error", "retry request")
|
||||
}
|
||||
|
||||
// don't do the call
|
||||
|
79
cmd/cmd.go
79
cmd/cmd.go
@@ -87,6 +87,11 @@ var (
|
||||
EnvVar: "MICRO_REGISTER_INTERVAL",
|
||||
Usage: "Register interval in seconds",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "server",
|
||||
EnvVar: "MICRO_SERVER",
|
||||
Usage: "Server for go-micro; rpc",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "server_name",
|
||||
EnvVar: "MICRO_SERVER_NAME",
|
||||
@@ -144,11 +149,6 @@ var (
|
||||
Usage: "Selector used to pick nodes for querying",
|
||||
Value: "cache",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "server",
|
||||
EnvVar: "MICRO_SERVER",
|
||||
Usage: "Server for go-micro; rpc",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "transport",
|
||||
EnvVar: "MICRO_TRANSPORT",
|
||||
@@ -262,48 +262,40 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
|
||||
// Set the client
|
||||
if name := ctx.String("client"); len(name) > 0 {
|
||||
if cl, ok := c.opts.Clients[name]; ok {
|
||||
// only change if we have the client and type differs
|
||||
if cl, ok := c.opts.Clients[name]; ok && (*c.opts.Client).String() != name {
|
||||
*c.opts.Client = cl()
|
||||
}
|
||||
}
|
||||
|
||||
// Set the server
|
||||
if name := ctx.String("server"); len(name) > 0 {
|
||||
if s, ok := c.opts.Servers[name]; ok {
|
||||
// only change if we have the server and type differs
|
||||
if s, ok := c.opts.Servers[name]; ok && (*c.opts.Server).String() != name {
|
||||
*c.opts.Server = s()
|
||||
}
|
||||
}
|
||||
|
||||
// Set the broker
|
||||
if name := ctx.String("broker"); len(name) > 0 || len(ctx.String("broker_address")) > 0 {
|
||||
if len(name) == 0 {
|
||||
name = defaultBroker
|
||||
}
|
||||
|
||||
if b, ok := c.opts.Brokers[name]; ok {
|
||||
n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
|
||||
*c.opts.Broker = n
|
||||
} else {
|
||||
if name := ctx.String("broker"); len(name) > 0 && (*c.opts.Broker).String() != name {
|
||||
b, ok := c.opts.Brokers[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("Broker %s not found", name)
|
||||
}
|
||||
|
||||
*c.opts.Broker = b()
|
||||
serverOpts = append(serverOpts, server.Broker(*c.opts.Broker))
|
||||
clientOpts = append(clientOpts, client.Broker(*c.opts.Broker))
|
||||
}
|
||||
|
||||
// Set the registry
|
||||
if name := ctx.String("registry"); len(name) > 0 || len(ctx.String("registry_address")) > 0 {
|
||||
if len(name) == 0 {
|
||||
name = defaultRegistry
|
||||
}
|
||||
|
||||
if r, ok := c.opts.Registries[name]; ok {
|
||||
n := r(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
|
||||
*c.opts.Registry = n
|
||||
} else {
|
||||
if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
|
||||
r, ok := c.opts.Registries[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("Registry %s not found", name)
|
||||
}
|
||||
|
||||
*c.opts.Registry = r()
|
||||
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
|
||||
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
|
||||
|
||||
@@ -314,31 +306,26 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
}
|
||||
|
||||
// Set the selector
|
||||
if name := ctx.String("selector"); len(name) > 0 {
|
||||
if s, ok := c.opts.Selectors[name]; ok {
|
||||
n := s(selector.Registry(*c.opts.Registry))
|
||||
*c.opts.Selector = n
|
||||
} else {
|
||||
if name := ctx.String("selector"); len(name) > 0 && (*c.opts.Selector).String() != name {
|
||||
s, ok := c.opts.Selectors[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("Selector %s not found", name)
|
||||
}
|
||||
|
||||
*c.opts.Selector = s(selector.Registry(*c.opts.Registry))
|
||||
|
||||
// No server option here. Should there be?
|
||||
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
|
||||
}
|
||||
|
||||
// Set the transport
|
||||
if name := ctx.String("transport"); len(name) > 0 || len(ctx.String("transport_address")) > 0 {
|
||||
if len(name) == 0 {
|
||||
name = defaultTransport
|
||||
}
|
||||
|
||||
if t, ok := c.opts.Transports[name]; ok {
|
||||
n := t(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
|
||||
*c.opts.Transport = n
|
||||
} else {
|
||||
if name := ctx.String("transport"); len(name) > 0 && (*c.opts.Transport).String() != name {
|
||||
t, ok := c.opts.Transports[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("Transport %s not found", name)
|
||||
}
|
||||
|
||||
*c.opts.Transport = t()
|
||||
serverOpts = append(serverOpts, server.Transport(*c.opts.Transport))
|
||||
clientOpts = append(clientOpts, client.Transport(*c.opts.Transport))
|
||||
}
|
||||
@@ -359,6 +346,18 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
serverOpts = append(serverOpts, server.Metadata(metadata))
|
||||
}
|
||||
|
||||
if len(ctx.String("broker_address")) > 0 {
|
||||
(*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
|
||||
}
|
||||
|
||||
if len(ctx.String("registry_address")) > 0 {
|
||||
(*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
|
||||
}
|
||||
|
||||
if len(ctx.String("transport_address")) > 0 {
|
||||
(*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
|
||||
}
|
||||
|
||||
if len(ctx.String("server_name")) > 0 {
|
||||
serverOpts = append(serverOpts, server.Name(ctx.String("server_name")))
|
||||
}
|
||||
@@ -384,7 +383,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
}
|
||||
|
||||
// client opts
|
||||
if r := ctx.Int("client_retries"); r > 0 {
|
||||
if r := ctx.Int("client_retries"); r >= 0 {
|
||||
clientOpts = append(clientOpts, client.Retries(r))
|
||||
}
|
||||
|
||||
|
@@ -54,7 +54,8 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
||||
case codec.Response:
|
||||
return j.c.ReadHeader(m)
|
||||
case codec.Publication:
|
||||
io.Copy(j.buf, j.rwc)
|
||||
_, err := io.Copy(j.buf, j.rwc)
|
||||
return err
|
||||
default:
|
||||
return fmt.Errorf("Unrecognised message type: %v", mt)
|
||||
}
|
||||
|
@@ -55,7 +55,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
|
||||
return err
|
||||
}
|
||||
if flusher, ok := c.rwc.(flusher); ok {
|
||||
err = flusher.Flush()
|
||||
if err = flusher.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case codec.Response:
|
||||
c.Lock()
|
||||
@@ -82,7 +84,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
|
||||
return err
|
||||
}
|
||||
if flusher, ok := c.rwc.(flusher); ok {
|
||||
err = flusher.Flush()
|
||||
if err = flusher.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case codec.Publication:
|
||||
data, err := proto.Marshal(b.(proto.Message))
|
||||
@@ -127,7 +131,8 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
||||
m.Id = rtmp.GetSeq()
|
||||
m.Error = rtmp.GetError()
|
||||
case codec.Publication:
|
||||
io.Copy(c.buf, c.rwc)
|
||||
_, err := io.Copy(c.buf, c.rwc)
|
||||
return err
|
||||
default:
|
||||
return fmt.Errorf("Unrecognised message type: %v", mt)
|
||||
}
|
||||
|
@@ -82,6 +82,16 @@ func NotFound(id, format string, a ...interface{}) error {
|
||||
}
|
||||
}
|
||||
|
||||
// MethodNotAllowed generates a 405 error.
|
||||
func MethodNotAllowed(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 405,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
Status: http.StatusText(405),
|
||||
}
|
||||
}
|
||||
|
||||
// InternalServerError generates a 500 error.
|
||||
func InternalServerError(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
|
@@ -8,6 +8,16 @@ import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
// Connect specifies services should be registered as Consul Connect services
|
||||
func Connect() registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, "consul_connect", true)
|
||||
}
|
||||
}
|
||||
|
||||
func Config(c *consul.Config) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
|
@@ -19,10 +19,26 @@ type consulRegistry struct {
|
||||
Client *consul.Client
|
||||
opts Options
|
||||
|
||||
// connect enabled
|
||||
connect bool
|
||||
|
||||
sync.Mutex
|
||||
register map[string]uint64
|
||||
}
|
||||
|
||||
func getDeregisterTTL(t time.Duration) time.Duration {
|
||||
// splay slightly for the watcher?
|
||||
splay := time.Second * 5
|
||||
deregTTL := t + splay
|
||||
|
||||
// consul has a minimum timeout on deregistration of 1 minute.
|
||||
if t < time.Minute {
|
||||
deregTTL = time.Minute + splay
|
||||
}
|
||||
|
||||
return deregTTL
|
||||
}
|
||||
|
||||
func newTransport(config *tls.Config) *http.Transport {
|
||||
if config == nil {
|
||||
config = &tls.Config{
|
||||
@@ -45,27 +61,31 @@ func newTransport(config *tls.Config) *http.Transport {
|
||||
return t
|
||||
}
|
||||
|
||||
func newConsulRegistry(opts ...Option) Registry {
|
||||
var options Options
|
||||
func configure(c *consulRegistry, opts ...Option) {
|
||||
// set opts
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
o(&c.opts)
|
||||
}
|
||||
|
||||
// use default config
|
||||
config := consul.DefaultConfig()
|
||||
if options.Context != nil {
|
||||
|
||||
if c.opts.Context != nil {
|
||||
// Use the consul config passed in the options, if available
|
||||
if c, ok := options.Context.Value("consul_config").(*consul.Config); ok {
|
||||
config = c
|
||||
if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok {
|
||||
config = co
|
||||
}
|
||||
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
|
||||
c.connect = cn
|
||||
}
|
||||
}
|
||||
|
||||
// check if there are any addrs
|
||||
if len(options.Addrs) > 0 {
|
||||
addr, port, err := net.SplitHostPort(options.Addrs[0])
|
||||
if len(c.opts.Addrs) > 0 {
|
||||
addr, port, err := net.SplitHostPort(c.opts.Addrs[0])
|
||||
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
|
||||
port = "8500"
|
||||
addr = options.Addrs[0]
|
||||
addr = c.opts.Addrs[0]
|
||||
config.Address = fmt.Sprintf("%s:%s", addr, port)
|
||||
} else if err == nil {
|
||||
config.Address = fmt.Sprintf("%s:%s", addr, port)
|
||||
@@ -73,34 +93,43 @@ func newConsulRegistry(opts ...Option) Registry {
|
||||
}
|
||||
|
||||
// requires secure connection?
|
||||
if options.Secure || options.TLSConfig != nil {
|
||||
if c.opts.Secure || c.opts.TLSConfig != nil {
|
||||
if config.HttpClient == nil {
|
||||
config.HttpClient = new(http.Client)
|
||||
}
|
||||
|
||||
config.Scheme = "https"
|
||||
// We're going to support InsecureSkipVerify
|
||||
config.HttpClient.Transport = newTransport(options.TLSConfig)
|
||||
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)
|
||||
}
|
||||
|
||||
// set timeout
|
||||
if c.opts.Timeout > 0 {
|
||||
config.HttpClient.Timeout = c.opts.Timeout
|
||||
}
|
||||
|
||||
// create the client
|
||||
client, _ := consul.NewClient(config)
|
||||
|
||||
// set timeout
|
||||
if options.Timeout > 0 {
|
||||
config.HttpClient.Timeout = options.Timeout
|
||||
}
|
||||
// set address/client
|
||||
c.Address = config.Address
|
||||
c.Client = client
|
||||
}
|
||||
|
||||
func newConsulRegistry(opts ...Option) Registry {
|
||||
cr := &consulRegistry{
|
||||
Address: config.Address,
|
||||
Client: client,
|
||||
opts: options,
|
||||
opts: Options{},
|
||||
register: make(map[string]uint64),
|
||||
}
|
||||
|
||||
configure(cr, opts...)
|
||||
return cr
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Init(opts ...Option) error {
|
||||
configure(c, opts...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Deregister(s *Service) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
@@ -115,19 +144,6 @@ func (c *consulRegistry) Deregister(s *Service) error {
|
||||
return c.Client.Agent().ServiceDeregister(node.Id)
|
||||
}
|
||||
|
||||
func getDeregisterTTL(t time.Duration) time.Duration {
|
||||
// splay slightly for the watcher?
|
||||
splay := time.Second * 5
|
||||
deregTTL := t + splay
|
||||
|
||||
// consul has a minimum timeout on deregistration of 1 minute.
|
||||
if t < time.Minute {
|
||||
deregTTL = time.Minute + splay
|
||||
}
|
||||
|
||||
return deregTTL
|
||||
}
|
||||
|
||||
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
if len(s.Nodes) == 0 {
|
||||
return errors.New("Require at least one node")
|
||||
@@ -164,10 +180,21 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
|
||||
// if it's already registered and matches then just pass the check
|
||||
if ok && v == h {
|
||||
// if the err is nil we're all good, bail out
|
||||
// if not, we don't know what the state is, so full re-register
|
||||
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
|
||||
return nil
|
||||
if options.TTL == time.Duration(0) {
|
||||
services, _, err := c.Client.Health().Checks(s.Name, nil)
|
||||
if err == nil {
|
||||
for _, v := range services {
|
||||
if v.ServiceID == node.Id {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// if the err is nil we're all good, bail out
|
||||
// if not, we don't know what the state is, so full re-register
|
||||
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,20 +219,29 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
deregTTL := getDeregisterTTL(options.TTL)
|
||||
|
||||
check = &consul.AgentServiceCheck{
|
||||
TTL: fmt.Sprintf("%v", options.TTL),
|
||||
TTL: fmt.Sprintf("%v", options.TTL),
|
||||
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
|
||||
}
|
||||
}
|
||||
|
||||
// register the service
|
||||
if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
|
||||
asr := &consul.AgentServiceRegistration{
|
||||
ID: node.Id,
|
||||
Name: s.Name,
|
||||
Tags: tags,
|
||||
Port: node.Port,
|
||||
Address: node.Address,
|
||||
Check: check,
|
||||
}); err != nil {
|
||||
}
|
||||
|
||||
// Specify consul connect
|
||||
if c.connect {
|
||||
asr.Connect = &consul.AgentServiceConnect{
|
||||
Native: true,
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.Client.Agent().ServiceRegister(asr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -224,7 +260,15 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
|
||||
}
|
||||
|
||||
func (c *consulRegistry) GetService(name string) ([]*Service, error) {
|
||||
rsp, _, err := c.Client.Health().Service(name, "", false, nil)
|
||||
var rsp []*consul.ServiceEntry
|
||||
var err error
|
||||
|
||||
// if we're connect enabled only get connect services
|
||||
if c.connect {
|
||||
rsp, _, err = c.Client.Health().Connect(name, "", false, nil)
|
||||
} else {
|
||||
rsp, _, err = c.Client.Health().Service(name, "", false, nil)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -49,6 +49,17 @@ func newRegistry(opts ...registry.Option) registry.Registry {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Init(opts ...registry.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Options() registry.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
@@ -322,10 +333,6 @@ func (m *mdnsRegistry) String() string {
|
||||
return "mdns"
|
||||
}
|
||||
|
||||
func (m *mdnsRegistry) Options() registry.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||
return newRegistry(opts...)
|
||||
}
|
||||
|
@@ -99,11 +99,15 @@ func (m *mockRegistry) String() string {
|
||||
return "mock"
|
||||
}
|
||||
|
||||
func (m *mockRegistry) Init(opts ...registry.Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockRegistry) Options() registry.Options {
|
||||
return registry.Options{}
|
||||
}
|
||||
|
||||
func NewRegistry() registry.Registry {
|
||||
func NewRegistry(opts ...registry.Options) registry.Registry {
|
||||
m := &mockRegistry{Services: make(map[string][]*registry.Service)}
|
||||
m.init()
|
||||
return m
|
||||
|
@@ -9,13 +9,14 @@ import (
|
||||
// and an abstraction over varying implementations
|
||||
// {consul, etcd, zookeeper, ...}
|
||||
type Registry interface {
|
||||
Init(...Option) error
|
||||
Options() Options
|
||||
Register(*Service, ...RegisterOption) error
|
||||
Deregister(*Service) error
|
||||
GetService(string) ([]*Service, error)
|
||||
ListServices() ([]*Service, error)
|
||||
Watch(...WatchOption) (Watcher, error)
|
||||
String() string
|
||||
Options() Options
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
2
selector/cache/cache.go
vendored
2
selector/cache/cache.go
vendored
@@ -366,11 +366,9 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Reset(service string) {
|
||||
return
|
||||
}
|
||||
|
||||
// Close stops the watcher and destroys the cache
|
||||
|
@@ -48,11 +48,9 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Reset(service string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Close() error {
|
||||
|
@@ -80,7 +80,7 @@ func TestFilterEndpoint(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if seen == false && data.count > 0 {
|
||||
if !seen && data.count > 0 {
|
||||
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
|
||||
}
|
||||
}
|
||||
@@ -232,7 +232,7 @@ func TestFilterVersion(t *testing.T) {
|
||||
seen = true
|
||||
}
|
||||
|
||||
if seen == false && data.count > 0 {
|
||||
if !seen && data.count > 0 {
|
||||
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
|
||||
}
|
||||
}
|
||||
|
@@ -15,6 +15,8 @@ type testCodec struct {
|
||||
}
|
||||
|
||||
type testSocket struct {
|
||||
local string
|
||||
remote string
|
||||
}
|
||||
|
||||
// TestCodecWriteError simulates what happens when a codec is unable
|
||||
@@ -87,6 +89,14 @@ func (c *testCodec) String() string {
|
||||
return "string"
|
||||
}
|
||||
|
||||
func (s testSocket) Local() string {
|
||||
return s.local
|
||||
}
|
||||
|
||||
func (s testSocket) Remote() string {
|
||||
return s.remote
|
||||
}
|
||||
|
||||
func (s testSocket) Recv(message *transport.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
@@ -97,6 +97,10 @@ func (s *rpcServer) accept(sock transport.Socket) {
|
||||
delete(hdr, "Content-Type")
|
||||
delete(hdr, "Timeout")
|
||||
|
||||
// set local/remote ips
|
||||
hdr["Local"] = sock.Local()
|
||||
hdr["Remote"] = sock.Remote()
|
||||
|
||||
ctx := metadata.NewContext(context.Background(), hdr)
|
||||
|
||||
// set the timeout if we have it
|
||||
@@ -393,11 +397,35 @@ func (s *rpcServer) Start() error {
|
||||
s.opts.Address = ts.Addr()
|
||||
s.Unlock()
|
||||
|
||||
go ts.Accept(s.accept)
|
||||
exit := make(chan bool, 1)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
err := ts.Accept(s.accept)
|
||||
|
||||
// check if we're supposed to exit
|
||||
select {
|
||||
case <-exit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// check the error and backoff
|
||||
if err != nil {
|
||||
log.Logf("Accept error: %v", err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
// no error just exit
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
// wait for exit
|
||||
ch := <-s.exit
|
||||
exit <- true
|
||||
|
||||
// wait for requests to finish
|
||||
if wait(s.opts.Context) {
|
||||
|
@@ -199,7 +199,7 @@ func (server *server) register(rcvr interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (server *server) sendResponse(sending *sync.Mutex, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
|
||||
func (server *server) sendResponse(sending sync.Locker, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
|
||||
resp := server.getResponse()
|
||||
// Encode the response header
|
||||
resp.ServiceMethod = req.ServiceMethod
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
//"fmt"
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
@@ -13,10 +14,11 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/h2c"
|
||||
maddr "github.com/micro/util/go/lib/addr"
|
||||
mnet "github.com/micro/util/go/lib/net"
|
||||
mls "github.com/micro/util/go/lib/tls"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
type buffer struct {
|
||||
@@ -38,16 +40,25 @@ type httpTransportClient struct {
|
||||
r chan *http.Request
|
||||
bl []*http.Request
|
||||
buff *bufio.Reader
|
||||
|
||||
// local/remote ip
|
||||
local string
|
||||
remote string
|
||||
}
|
||||
|
||||
type httpTransportSocket struct {
|
||||
ht *httpTransport
|
||||
r chan *http.Request
|
||||
conn net.Conn
|
||||
once sync.Once
|
||||
ht *httpTransport
|
||||
w http.ResponseWriter
|
||||
r *http.Request
|
||||
rw *bufio.ReadWriter
|
||||
|
||||
sync.Mutex
|
||||
buff *bufio.Reader
|
||||
conn net.Conn
|
||||
// for the first request
|
||||
ch chan *http.Request
|
||||
|
||||
// local/remote ip
|
||||
local string
|
||||
remote string
|
||||
}
|
||||
|
||||
type httpTransportListener struct {
|
||||
@@ -59,6 +70,14 @@ func (b *buffer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpTransportClient) Local() string {
|
||||
return h.local
|
||||
}
|
||||
|
||||
func (h *httpTransportClient) Remote() string {
|
||||
return h.remote
|
||||
}
|
||||
|
||||
func (h *httpTransportClient) Send(m *Message) error {
|
||||
header := make(http.Header)
|
||||
|
||||
@@ -170,33 +189,87 @@ func (h *httpTransportClient) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *httpTransportSocket) Local() string {
|
||||
return h.local
|
||||
}
|
||||
|
||||
func (h *httpTransportSocket) Remote() string {
|
||||
return h.remote
|
||||
}
|
||||
|
||||
func (h *httpTransportSocket) Recv(m *Message) error {
|
||||
if m == nil {
|
||||
return errors.New("message passed in is nil")
|
||||
}
|
||||
|
||||
// set timeout if its greater than 0
|
||||
if h.ht.opts.Timeout > time.Duration(0) {
|
||||
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
|
||||
}
|
||||
|
||||
r, err := http.ReadRequest(h.buff)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Body.Close()
|
||||
m.Body = b
|
||||
|
||||
if m.Header == nil {
|
||||
m.Header = make(map[string]string)
|
||||
}
|
||||
|
||||
for k, v := range r.Header {
|
||||
// process http 1
|
||||
if h.r.ProtoMajor == 1 {
|
||||
// set timeout if its greater than 0
|
||||
if h.ht.opts.Timeout > time.Duration(0) {
|
||||
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
|
||||
}
|
||||
|
||||
var r *http.Request
|
||||
|
||||
select {
|
||||
// get first request
|
||||
case r = <-h.ch:
|
||||
// read next request
|
||||
default:
|
||||
rr, err := http.ReadRequest(h.rw.Reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r = rr
|
||||
}
|
||||
|
||||
// read body
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// set body
|
||||
r.Body.Close()
|
||||
m.Body = b
|
||||
|
||||
// set headers
|
||||
for k, v := range h.r.Header {
|
||||
if len(v) > 0 {
|
||||
m.Header[k] = v[0]
|
||||
} else {
|
||||
m.Header[k] = ""
|
||||
}
|
||||
}
|
||||
|
||||
// return early early
|
||||
return nil
|
||||
}
|
||||
|
||||
// processing http2 request
|
||||
// read streaming body
|
||||
|
||||
// set max buffer size
|
||||
buf := make([]byte, 4*1024)
|
||||
|
||||
// read the request body
|
||||
n, err := h.r.Body.Read(buf)
|
||||
// not an eof error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if we have data
|
||||
if n > 0 {
|
||||
m.Body = buf[:n]
|
||||
}
|
||||
|
||||
// set headers
|
||||
for k, v := range h.r.Header {
|
||||
if len(v) > 0 {
|
||||
m.Header[k] = v[0]
|
||||
} else {
|
||||
@@ -204,78 +277,73 @@ func (h *httpTransportSocket) Recv(m *Message) error {
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case h.r <- r:
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpTransportSocket) Send(m *Message) error {
|
||||
b := bytes.NewBuffer(m.Body)
|
||||
defer b.Reset()
|
||||
if h.r.ProtoMajor == 1 {
|
||||
rsp := &http.Response{
|
||||
Header: h.r.Header,
|
||||
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
ContentLength: int64(len(m.Body)),
|
||||
}
|
||||
|
||||
r := <-h.r
|
||||
for k, v := range m.Header {
|
||||
rsp.Header.Set(k, v)
|
||||
}
|
||||
|
||||
rsp := &http.Response{
|
||||
Header: r.Header,
|
||||
Body: &buffer{b},
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
ContentLength: int64(len(m.Body)),
|
||||
// set timeout if its greater than 0
|
||||
if h.ht.opts.Timeout > time.Duration(0) {
|
||||
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
|
||||
}
|
||||
|
||||
return rsp.Write(h.conn)
|
||||
}
|
||||
|
||||
// http2 request
|
||||
|
||||
// set headers
|
||||
for k, v := range m.Header {
|
||||
rsp.Header.Set(k, v)
|
||||
h.w.Header().Set(k, v)
|
||||
}
|
||||
|
||||
select {
|
||||
case h.r <- r:
|
||||
default:
|
||||
}
|
||||
|
||||
// set timeout if its greater than 0
|
||||
if h.ht.opts.Timeout > time.Duration(0) {
|
||||
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
|
||||
}
|
||||
|
||||
return rsp.Write(h.conn)
|
||||
// write request
|
||||
_, err := h.w.Write(m.Body)
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *httpTransportSocket) error(m *Message) error {
|
||||
b := bytes.NewBuffer(m.Body)
|
||||
defer b.Reset()
|
||||
rsp := &http.Response{
|
||||
Header: make(http.Header),
|
||||
Body: &buffer{b},
|
||||
Status: "500 Internal Server Error",
|
||||
StatusCode: 500,
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
ContentLength: int64(len(m.Body)),
|
||||
}
|
||||
if h.r.ProtoMajor == 1 {
|
||||
rsp := &http.Response{
|
||||
Header: make(http.Header),
|
||||
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
|
||||
Status: "500 Internal Server Error",
|
||||
StatusCode: 500,
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
ContentLength: int64(len(m.Body)),
|
||||
}
|
||||
|
||||
for k, v := range m.Header {
|
||||
rsp.Header.Set(k, v)
|
||||
}
|
||||
for k, v := range m.Header {
|
||||
rsp.Header.Set(k, v)
|
||||
}
|
||||
|
||||
return rsp.Write(h.conn)
|
||||
return rsp.Write(h.conn)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpTransportSocket) Close() error {
|
||||
err := h.conn.Close()
|
||||
h.once.Do(func() {
|
||||
h.Lock()
|
||||
h.buff.Reset(nil)
|
||||
h.buff = nil
|
||||
h.Unlock()
|
||||
})
|
||||
return err
|
||||
if h.r.ProtoMajor == 1 {
|
||||
return h.conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpTransportListener) Addr() string {
|
||||
@@ -287,46 +355,69 @@ func (h *httpTransportListener) Close() error {
|
||||
}
|
||||
|
||||
func (h *httpTransportListener) Accept(fn func(Socket)) error {
|
||||
var tempDelay time.Duration
|
||||
// create handler mux
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
var buf *bufio.ReadWriter
|
||||
var con net.Conn
|
||||
|
||||
for {
|
||||
c, err := h.listener.Accept()
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||
if tempDelay == 0 {
|
||||
tempDelay = 5 * time.Millisecond
|
||||
} else {
|
||||
tempDelay *= 2
|
||||
}
|
||||
if max := 1 * time.Second; tempDelay > max {
|
||||
tempDelay = max
|
||||
}
|
||||
log.Logf("http: Accept error: %v; retrying in %v\n", err, tempDelay)
|
||||
time.Sleep(tempDelay)
|
||||
continue
|
||||
// read a regular request
|
||||
if r.ProtoMajor == 1 {
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
return err
|
||||
r.Body = ioutil.NopCloser(bytes.NewReader(b))
|
||||
// hijack the conn
|
||||
hj, ok := w.(http.Hijacker)
|
||||
if !ok {
|
||||
// we're screwed
|
||||
http.Error(w, "cannot serve conn", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
conn, bufrw, err := hj.Hijack()
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
buf = bufrw
|
||||
con = conn
|
||||
}
|
||||
|
||||
sock := &httpTransportSocket{
|
||||
ht: h.ht,
|
||||
conn: c,
|
||||
buff: bufio.NewReader(c),
|
||||
r: make(chan *http.Request, 1),
|
||||
}
|
||||
// save the request
|
||||
ch := make(chan *http.Request, 1)
|
||||
ch <- r
|
||||
|
||||
go func() {
|
||||
// TODO: think of a better error response strategy
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Log("panic recovered: ", r)
|
||||
sock.Close()
|
||||
}
|
||||
}()
|
||||
fn(&httpTransportSocket{
|
||||
ht: h.ht,
|
||||
w: w,
|
||||
r: r,
|
||||
rw: buf,
|
||||
ch: ch,
|
||||
conn: con,
|
||||
local: h.Addr(),
|
||||
remote: r.RemoteAddr,
|
||||
})
|
||||
})
|
||||
|
||||
fn(sock)
|
||||
}()
|
||||
// default http2 server
|
||||
srv := &http.Server{
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
// insecure connection use h2c
|
||||
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
|
||||
srv.Handler = &h2c.HandlerH2C{
|
||||
Handler: mux,
|
||||
H2Server: &http2.Server{},
|
||||
}
|
||||
}
|
||||
|
||||
// begin serving
|
||||
return srv.Serve(h.listener)
|
||||
}
|
||||
|
||||
func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
|
||||
@@ -365,6 +456,8 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
|
||||
buff: bufio.NewReader(conn),
|
||||
dialOpts: dopts,
|
||||
r: make(chan *http.Request, 1),
|
||||
local: conn.LocalAddr().String(),
|
||||
remote: conn.RemoteAddr().String(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -423,6 +516,17 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *httpTransport) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpTransport) Options() Options {
|
||||
return h.opts
|
||||
}
|
||||
|
||||
func (h *httpTransport) String() string {
|
||||
return "http"
|
||||
}
|
||||
|
@@ -18,6 +18,9 @@ type mockSocket struct {
|
||||
exit chan bool
|
||||
// listener exit
|
||||
lexit chan bool
|
||||
|
||||
local string
|
||||
remote string
|
||||
}
|
||||
|
||||
type mockClient struct {
|
||||
@@ -51,6 +54,14 @@ func (ms *mockSocket) Recv(m *transport.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *mockSocket) Local() string {
|
||||
return ms.local
|
||||
}
|
||||
|
||||
func (ms *mockSocket) Remote() string {
|
||||
return ms.remote
|
||||
}
|
||||
|
||||
func (ms *mockSocket) Send(m *transport.Message) error {
|
||||
select {
|
||||
case <-ms.exit:
|
||||
@@ -93,10 +104,12 @@ func (m *mockListener) Accept(fn func(transport.Socket)) error {
|
||||
return nil
|
||||
case c := <-m.conn:
|
||||
go fn(&mockSocket{
|
||||
lexit: c.lexit,
|
||||
exit: c.exit,
|
||||
send: c.recv,
|
||||
recv: c.send,
|
||||
lexit: c.lexit,
|
||||
exit: c.exit,
|
||||
send: c.recv,
|
||||
recv: c.send,
|
||||
local: c.Remote(),
|
||||
remote: c.Local(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -118,10 +131,12 @@ func (m *mockTransport) Dial(addr string, opts ...transport.DialOption) (transpo
|
||||
|
||||
client := &mockClient{
|
||||
&mockSocket{
|
||||
send: make(chan *transport.Message),
|
||||
recv: make(chan *transport.Message),
|
||||
exit: make(chan bool),
|
||||
lexit: listener.exit,
|
||||
send: make(chan *transport.Message),
|
||||
recv: make(chan *transport.Message),
|
||||
exit: make(chan bool),
|
||||
lexit: listener.exit,
|
||||
local: addr,
|
||||
remote: addr,
|
||||
},
|
||||
options,
|
||||
}
|
||||
@@ -171,6 +186,17 @@ func (m *mockTransport) Listen(addr string, opts ...transport.ListenOption) (tra
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
func (m *mockTransport) Init(opts ...transport.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockTransport) Options() transport.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *mockTransport) String() string {
|
||||
return "mock"
|
||||
}
|
||||
|
@@ -14,6 +14,8 @@ type Socket interface {
|
||||
Recv(*Message) error
|
||||
Send(*Message) error
|
||||
Close() error
|
||||
Local() string
|
||||
Remote() string
|
||||
}
|
||||
|
||||
type Client interface {
|
||||
@@ -30,6 +32,8 @@ type Listener interface {
|
||||
// services. It uses socket send/recv semantics and had various
|
||||
// implementations {HTTP, RabbitMQ, NATS, ...}
|
||||
type Transport interface {
|
||||
Init(...Option) error
|
||||
Options() Options
|
||||
Dial(addr string, opts ...DialOption) (Client, error)
|
||||
Listen(addr string, opts ...ListenOption) (Listener, error)
|
||||
String() string
|
||||
|
Reference in New Issue
Block a user