Compare commits

...

63 Commits

Author SHA1 Message Date
Asim Aslam
4dc593eca3 Merge branch 'master' of github.com:micro/go-micro 2018-11-22 10:40:33 +00:00
Asim Aslam
f1984650f4 use the request header 2018-11-22 10:39:36 +00:00
Asim Aslam
33ae45ad65 Merge pull request #333 from sneat/tcpcheck-interval
Only check if the service is in Consul once every deregister interval
2018-11-22 08:38:11 +00:00
Blair McMillan
e3a2fe52cd Only check if the service is in Consul once every deregister interval 2018-11-22 13:34:08 +10:00
Asim Aslam
5fd7da9de7 update readme 2018-11-21 13:10:09 +00:00
Asim Aslam
edc8a8b771 nitpick 2018-11-21 11:15:34 +00:00
Asim Aslam
1ce0df4e63 Merge pull request #331 from shuLhan/master
all: replace "pborman/uuid" with "google/uuid"
2018-11-21 10:48:36 +00:00
Shulhan
415fb3a730 all: replace "pborman/uuid" with "google/uuid"
Internally, "pborman/uuid.NewUUID()" is calling "google/uuid.New()"
that return nil when there is an error [1].

Both package use the same license.

[1] https://github.com/pborman/uuid/blob/master/version1.go#L17
2018-11-21 17:29:21 +07:00
Asim Aslam
172ffee8c3 add rpc package comments 2018-11-20 10:30:53 +00:00
Asim Aslam
48c068d88d add codec package comments 2018-11-20 10:06:13 +00:00
Asim Aslam
3f7152a4f5 update doc link 2018-11-19 12:30:23 +00:00
Asim Aslam
54f58a15d3 update readme 2018-11-19 12:29:26 +00:00
Asim Aslam
bcb6c12aa1 Merge pull request #329 from micro/http
add http handler option for broker
2018-11-18 20:44:16 +00:00
Asim Aslam
1cb40831a4 add http handler option for broker 2018-11-18 20:40:43 +00:00
Asim Aslam
d9fc2c922d update package comment 2018-11-18 18:38:46 +00:00
Asim Aslam
212c6c5ae9 Merge pull request #328 from micro/http
add option to set http handlers
2018-11-18 18:32:01 +00:00
Asim Aslam
1d8047a272 add option to set http handlers 2018-11-18 16:32:53 +00:00
Asim Aslam
98bb4a69c2 Merge pull request #325 from micro/accept_loop
make accept loop
2018-11-15 21:23:16 +00:00
Asim Aslam
e69413b763 add continue 2018-11-15 21:13:33 +00:00
Asim Aslam
45f18042b7 make accept loop 2018-11-15 19:55:13 +00:00
Asim Aslam
0672b051cc Add Local/Remote ip to metadata 2018-11-14 20:27:58 +00:00
Asim Aslam
3c496720cc Merge pull request #324 from micro/ip
Ip
2018-11-14 20:15:56 +00:00
Asim Aslam
881cb570d5 reorder 2018-11-14 19:49:04 +00:00
Asim Aslam
c6a2c8de6c add local/remote to testsocket 2018-11-14 19:45:46 +00:00
Asim Aslam
71bacf6991 add local/remote ip to socket 2018-11-14 19:41:13 +00:00
Asim Aslam
a0b257b572 remove line 2018-11-14 15:19:23 +00:00
Asim Aslam
a082c151f0 remove example section 2018-11-14 15:18:47 +00:00
Asim Aslam
302ab42a97 strip down readme 2018-11-14 15:18:13 +00:00
Asim Aslam
531d4dd24a Merge pull request #322 from mgrachev/fix-linter-issues
Fix some linter issues
2018-11-13 09:38:19 +00:00
Asim Aslam
1c401a852e Merge pull request #321 from mgrachev/errors-check
Add errors check
2018-11-13 09:36:54 +00:00
Mikhail Grachev
25e6dcc9b6 Fix some linter issues 2018-11-13 11:57:42 +03:00
Mikhail Grachev
4006d9f102 Add errors check 2018-11-13 11:56:21 +03:00
Asim Aslam
4c821baab4 go fmt 2018-11-03 12:17:11 +00:00
Asim Aslam
c8a35afc92 Merge pull request #313 from lovelly/master
Fix tcp check no ttl error
2018-11-03 12:16:56 +00:00
Asim Aslam
54f67db275 Merge pull request #289 from micro/http2
http2 support
2018-11-03 12:07:23 +00:00
lovelly
fd04722706 Fix tcp check no ttl error 2018-10-09 10:40:24 +08:00
Asim Aslam
4cee1f19f6 Merge pull request #309 from fireyang/master
fix rpc client call WARNING: DATA RACE
2018-09-20 07:39:45 +01:00
fireyang
ef8b5e28b0 fix rpc client call WARNING: DATA RACE 2018-09-20 10:08:00 +08:00
Asim Aslam
818f150b25 Merge pull request #308 from fireyang/master
fix bug: loop variable i captured by func literal
2018-09-19 15:13:19 +01:00
fireyang
446d3fc72e fix bug: loop variable i captured by func literal 2018-09-19 21:58:20 +08:00
Asim Aslam
240052246f Merge pull request #306 from DexterHD/fix-go-config-panic
fix bug with go-config panic
2018-09-13 19:26:57 +01:00
Anton Kucherov
156a51ab10 fix bug with go-config panic
See: https://github.com/micro/go-config/issues/49
2018-09-13 19:02:08 +03:00
Asim Aslam
52a4beb072 Merge pull request #305 from ahmadnurus/add_method_not_allowed_error
errors: Added 405 Method Not Allowed helper function
2018-09-13 07:39:42 +01:00
ahmadnurus
395d70cf01 errors: Added 405 Method Not Allowed helper function 2018-09-12 21:42:34 +07:00
Asim Aslam
3732dc2f42 bump travis 2018-08-28 16:00:04 +01:00
Asim Aslam
9d3cb65daa set broker address on Init 2018-08-18 17:28:58 +01:00
Asim Aslam
a0d3917832 Merge pull request #292 from micro/init
Add Init to all things, use init in cmd package to initialise
2018-08-09 18:30:36 +01:00
Asim Aslam
9968c7d007 Add Init to all things, use init in cmd package to initialise 2018-08-08 18:57:29 +01:00
Asim Aslam
68f5e71153 Merge pull request #290 from micro/connect
Support connect native registration
2018-08-06 17:20:37 +01:00
Asim Aslam
af328ee7b4 Support connect native registration 2018-08-06 17:12:34 +01:00
Asim Aslam
eebaa64d8c phase 1 2018-07-29 10:55:46 +01:00
Asim Aslam
88505388c1 Add verbosity to errors 2018-07-26 09:33:50 +01:00
Asim Aslam
8a778644cf Merge pull request #281 from micro/retry
retry only on timeout or internal server error
2018-07-22 17:52:12 +01:00
Asim Aslam
d3a76e646a retry only on timeout or internal server error 2018-07-22 17:41:58 +01:00
Asim Aslam
cfa824bc5f Merge pull request #280 from jiyeyuran/master
Allow client_retries to be 0
2018-07-19 23:24:57 -07:00
武新飞
39be61685c client_retries can be 0 2018-07-20 14:20:23 +08:00
Asim Aslam
ac2106ced7 strip deadline from stream 2018-07-17 16:39:07 -07:00
Asim Aslam
1b4f7d8a68 a stream should not timeout 2018-07-17 16:32:35 -07:00
Asim Aslam
5eb2e79b86 go fmt 2018-07-17 16:31:09 -07:00
Asim Aslam
a2eff9918e Merge pull request #269 from Ak-Army/master
handle function in mock response
2018-06-13 16:54:10 +01:00
Hunyadvári Péter
52a470532d handle function in mock response 2018-06-13 17:46:30 +02:00
Asim Aslam
cd9441fafb Merge pull request #268 from jasimmk/fix-connect-lock
Fixing httpBroker dead lock; If publish is called from a subscription #267
2018-06-13 16:21:24 +01:00
Jasim Muhammed
356cf82af5 Fixing httpBroker dead lock; If publish is called from a subscription 2018-06-13 10:28:39 +04:00
37 changed files with 663 additions and 713 deletions

View File

@@ -1,7 +1,7 @@
language: go
go:
- 1.9.5
- 1.10.x
- 1.11.x
notifications:
slack:
secure: aEvhLbhujaGaKSrOokiG3//PaVHTIrc3fBpoRbCRqfZpyq6WREoapJJhF+tIpWWOwaC9GmChbD6aHo/jMUgwKXVyPSaNjiEL87YzUUpL8B2zslNp1rgfTg/LrzthOx3Q1TYwpaAl3to0fuHUVFX4yMeC2vuThq7WSXgMMxFCtbc=

441
README.md
View File

@@ -1,9 +1,8 @@
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro?status.svg)](https://godoc.org/github.com/micro/go-micro) [![Travis CI](https://api.travis-ci.org/micro/go-micro.svg?branch=master)](https://travis-ci.org/micro/go-micro) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro)](https://goreportcard.com/report/github.com/micro/go-micro)
Go Micro is a pluggable RPC framework for distributed systems development.
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be easily swapped out. It comes with built in support for {json,proto}-rpc encoding, consul or multicast dns for service discovery, http for communication and random hashed client side load balancing.
Go Micro is a pluggable framework for distributed systems development.
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be swapped out.
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).
Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://slack.micro.mu/) community.
@@ -15,440 +14,12 @@ Go Micro abstracts away the details of distributed systems. Here are the main fe
- **Service Discovery** - Automatic service registration and name resolution
- **Load Balancing** - Client side load balancing built on discovery
- **Message Encoding** - Dynamic encoding based on content-type with protobuf and json support
- **Sync Streaming** - RPC based communication with support for bidirectional streaming
- **Async Messaging** - Native PubSub messaging built in for event driven architectures
- **Sync Streaming** - RPC based request/response with support for bidirectional streaming
- **Async Messaging** - Native pubsub messaging built in for event driven architectures
Go Micro supports both the Service and Function programming models. Read on to learn more.
## Getting Started
## Docs
For more detailed information on the architecture, installation and use of go-micro checkout the [docs](https://micro.mu/docs).
## Learn By Example
An example service can be found in [**examples/service**](https://github.com/micro/examples/tree/master/service) and function in [**examples/function**](https://github.com/micro/examples/tree/master/function).
The [**examples**](https://github.com/micro/examples) directory contains examples for using things such as middleware/wrappers, selector filters, pub/sub, grpc, plugins and much more. For the complete greeter example look at [**examples/greeter**](https://github.com/micro/examples/tree/master/greeter). Other examples can be found throughout the GitHub repository.
Watch the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34) video for a high level overview.
## Getting started
- [Install Protobuf](#install-protobuf)
- [Service Discovery](#service-discovery)
- [Writing a Service](#writing-a-service)
- [Writing a Function](#writing-a-function)
- [Publish & Subscribe](#publish--subscribe)
- [Plugins](#plugins)
- [Wrappers](#wrappers)
## Install Protobuf
Protobuf is required for code generation
You'll need to install:
- [protoc-gen-micro](https://github.com/micro/protoc-gen-micro)
## Service Discovery
Service discovery is used to resolve service names to addresses.
### Consul
[Consul](https://www.consul.io/) is used as the default service discovery system.
Discovery is pluggable. Find plugins for etcd, kubernetes, zookeeper and more in the [micro/go-plugins](https://github.com/micro/go-plugins) repo.
[Install guide](https://www.consul.io/intro/getting-started/install.html)
### Multicast DNS
[Multicast DNS](https://en.wikipedia.org/wiki/Multicast_DNS) is a built in service discovery plugin for a zero dependency configuration.
Pass `--registry=mdns` to any command or the enviroment variable `MICRO_REGISTRY=mdns`
```
MICRO_REGISTRY=mdns go run main.go
```
## Writing a service
This is a simple greeter RPC service example
Find this example at [examples/service](https://github.com/micro/examples/tree/master/service).
### Create service proto
One of the key requirements of microservices is strongly defined interfaces. Micro uses protobuf to achieve this.
Here we define the Greeter handler with the method Hello. It takes a HelloRequest and HelloResponse both with one string arguments.
```proto
syntax = "proto3";
service Greeter {
rpc Hello(HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string greeting = 2;
}
```
### Generate the proto
After writing the proto definition we must compile it using protoc with the micro plugin.
```shell
protoc --proto_path=$GOPATH/src:. --micro_out=. --go_out=. path/to/greeter.proto
```
### Write the service
Below is the code for the greeter service.
It does the following:
1. Implements the interface defined for the Greeter handler
2. Initialises a micro.Service
3. Registers the Greeter handler
4. Runs the service
```go
package main
import (
"context"
"fmt"
micro "github.com/micro/go-micro"
proto "github.com/micro/examples/service/proto"
)
type Greeter struct{}
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
rsp.Greeting = "Hello " + req.Name
return nil
}
func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(
micro.Name("greeter"),
)
// Init will parse the command line flags.
service.Init()
// Register handler
proto.RegisterGreeterHandler(service.Server(), new(Greeter))
// Run the server
if err := service.Run(); err != nil {
fmt.Println(err)
}
}
```
### Run service
```
go run examples/service/main.go
```
Output
```
2016/03/14 10:59:14 Listening on [::]:50137
2016/03/14 10:59:14 Broker Listening on [::]:50138
2016/03/14 10:59:14 Registering node: greeter-ca62b017-e9d3-11e5-9bbb-68a86d0d36b6
```
### Define a client
Below is the client code to query the greeter service.
The generated proto includes a greeter client to reduce boilerplate code.
```go
package main
import (
"context"
"fmt"
micro "github.com/micro/go-micro"
proto "github.com/micro/examples/service/proto"
)
func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(micro.Name("greeter.client"))
service.Init()
// Create new greeter client
greeter := proto.NewGreeterService("greeter", service.Client())
// Call the greeter
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
if err != nil {
fmt.Println(err)
}
// Print response
fmt.Println(rsp.Greeting)
}
```
### Run the client
```shell
go run client.go
```
Output
```
Hello John
```
## Writing a Function
Go Micro includes the Function programming model.
A Function is a one time executing Service which exits after completing a request.
### Defining a Function
```go
package main
import (
"context"
proto "github.com/micro/examples/function/proto"
"github.com/micro/go-micro"
)
type Greeter struct{}
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
rsp.Greeting = "Hello " + req.Name
return nil
}
func main() {
// create a new function
fnc := micro.NewFunction(
micro.Name("greeter"),
)
// init the command line
fnc.Init()
// register a handler
fnc.Handle(new(Greeter))
// run the function
fnc.Run()
}
```
It's that simple.
## Publish & Subscribe
Go-micro has a built in message broker interface for event driven architectures.
PubSub operates on the same protobuf generated messages as RPC. They are encoded/decoded automatically and sent via the broker.
By default go-micro includes a point-to-point http broker but this can be swapped out via go-plugins.
### Publish
Create a new publisher with a `topic` name and service client
```go
p := micro.NewPublisher("events", service.Client())
```
Publish a proto message
```go
p.Publish(context.TODO(), &proto.Event{Name: "event"})
```
### Subscribe
Create a message handler. It's signature should be `func(context.Context, v interface{}) error`.
```go
func ProcessEvent(ctx context.Context, event *proto.Event) error {
fmt.Printf("Got event %+v\n", event)
return nil
}
```
Register the message handler with a `topic`
```go
micro.RegisterSubscriber("events", ProcessEvent)
```
See [examples/pubsub](https://github.com/micro/examples/tree/master/pubsub) for a complete example.
## Plugins
By default go-micro only provides a few implementation of each interface at the core but it's completely pluggable. There's already dozens of plugins which are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins). Contributions are welcome!
### Build with plugins
If you want to integrate plugins simply link them in a separate file and rebuild
Create a plugins.go file
```go
import (
// etcd v3 registry
_ "github.com/micro/go-plugins/registry/etcdv3"
// nats transport
_ "github.com/micro/go-plugins/transport/nats"
// kafka broker
_ "github.com/micro/go-plugins/broker/kafka"
)
```
Build binary
```shell
// For local use
go build -i -o service ./main.go ./plugins.go
```
Flag usage of plugins
```shell
service --registry=etcdv3 --transport=nats --broker=kafka
```
### Plugin as option
Alternatively you can set the plugin as an option to a service
```go
import (
"github.com/micro/go-micro"
// etcd v3 registry
"github.com/micro/go-plugins/registry/etcdv3"
// nats transport
"github.com/micro/go-plugins/transport/nats"
// kafka broker
"github.com/micro/go-plugins/broker/kafka"
)
func main() {
registry := etcdv3.NewRegistry()
broker := kafka.NewBroker()
transport := nats.NewTransport()
service := micro.NewService(
micro.Name("greeter"),
micro.Registry(registry),
micro.Broker(broker),
micro.Transport(transport),
)
service.Init()
service.Run()
}
```
### Write plugins
Plugins are a concept built on Go's interface. Each package maintains a high level interface abstraction.
Simply implement the interface and pass it in as an option to the service.
The service discovery interface is called [Registry](https://godoc.org/github.com/micro/go-micro/registry#Registry).
Anything which implements this interface can be used as a registry. The same applies to the other packages.
```go
type Registry interface {
Register(*Service, ...RegisterOption) error
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch() (Watcher, error)
String() string
}
```
Browse [go-plugins](https://github.com/micro/go-plugins) to get a better idea of implementation details.
## Wrappers
Go-micro includes the notion of middleware as wrappers. The client or handlers can be wrapped using the decorator pattern.
### Handler
Here's an example service handler wrapper which logs the incoming request
```go
// implements the server.HandlerWrapper
func logWrapper(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
fmt.Printf("[%v] server request: %s", time.Now(), req.Method())
return fn(ctx, req, rsp)
}
}
```
It can be initialised when creating the service
```go
service := micro.NewService(
micro.Name("greeter"),
// wrap the handler
micro.WrapHandler(logWrapper),
)
```
### Client
Here's an example of a client wrapper which logs requests made
```go
type logWrapper struct {
client.Client
}
func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
fmt.Printf("[wrapper] client request to service: %s method: %s\n", req.Service(), req.Method())
return l.Client.Call(ctx, req, rsp)
}
// implements client.Wrapper as logWrapper
func logWrap(c client.Client) client.Client {
return &logWrapper{c}
}
```
It can be initialised when creating the service
```go
service := micro.NewService(
micro.Name("greeter"),
// wrap the client
micro.WrapClient(logWrap),
)
```
## Other Languages
Check out [ja-micro](https://github.com/Sixt/ja-micro) to write services in Java
See the [docs](https://micro.mu/docs/go-micro.html) for detailed information on the architecture, installation and use of go-micro.
## Sponsors

View File

@@ -1,9 +1,11 @@
// Package http provides a http based message broker
package http
import (
"github.com/micro/go-micro/broker"
)
// NewBroker returns a new http broker
func NewBroker(opts ...broker.Option) broker.Broker {
return broker.NewBroker(opts...)
}

23
broker/http/options.go Normal file
View File

@@ -0,0 +1,23 @@
package http
import (
"context"
"net/http"
"github.com/micro/go-micro/broker"
)
// Handle registers the handler for the given pattern.
func Handle(pattern string, handler http.Handler) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
handlers, ok := o.Context.Value("http_handlers").(map[string]http.Handler)
if !ok {
handlers = make(map[string]http.Handler)
}
handlers[pattern] = handler
o.Context = context.WithValue(o.Context, "http_handlers", handlers)
}
}

View File

@@ -18,6 +18,7 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-log"
"github.com/micro/go-micro/broker/codec/json"
merr "github.com/micro/go-micro/errors"
@@ -26,7 +27,6 @@ import (
maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls"
"github.com/pborman/uuid"
)
// HTTP Broker is a point to point async broker
@@ -116,7 +116,7 @@ func newHttpBroker(opts ...Option) Broker {
}
h := &httpBroker{
id: "broker-" + uuid.NewUUID().String(),
id: "broker-" + uuid.New().String(),
address: addr,
opts: options,
r: reg,
@@ -126,7 +126,19 @@ func newHttpBroker(opts ...Option) Broker {
mux: http.NewServeMux(),
}
// specify the message handler
h.mux.Handle(DefaultSubPath, h)
// get optional handlers
if h.opts.Context != nil {
handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
h.mux.Handle(pattern, handler)
}
}
}
return h
}
@@ -176,7 +188,7 @@ func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward
if sub.id == s.id {
h.r.Deregister(sub.svc)
_ = h.r.Deregister(sub.svc)
continue
}
// keep subscriber
@@ -200,7 +212,7 @@ func (h *httpBroker) run(l net.Listener) {
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
@@ -210,7 +222,7 @@ func (h *httpBroker) run(l net.Listener) {
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Deregister(sub.svc)
_ = h.r.Deregister(sub.svc)
}
}
h.RUnlock()
@@ -276,12 +288,15 @@ func (h *httpBroker) Address() string {
}
func (h *httpBroker) Connect() error {
h.Lock()
defer h.Unlock()
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
@@ -351,12 +366,16 @@ func (h *httpBroker) Connect() error {
}
func (h *httpBroker) Disconnect() error {
h.Lock()
defer h.Unlock()
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
// stop rcache
rc, ok := h.r.(rcache.Cache)
@@ -375,19 +394,26 @@ func (h *httpBroker) Disconnect() error {
}
func (h *httpBroker) Init(opts ...Option) error {
h.Lock()
defer h.Unlock()
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "broker-" + uuid.NewUUID().String()
h.id = "broker-" + uuid.New().String()
}
// get registry
@@ -494,7 +520,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
}
// create unique id
id := h.id + "." + uuid.NewUUID().String()
id := h.id + "." + uuid.New().String()
var secure bool

View File

@@ -5,15 +5,15 @@ import (
"testing"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/registry/mock"
"github.com/pborman/uuid"
)
func sub(be *testing.B, c int) {
be.StopTimer()
m := mock.NewRegistry()
b := NewBroker(Registry(m))
topic := uuid.NewUUID().String()
topic := uuid.New().String()
if err := b.Init(); err != nil {
be.Fatalf("Unexpected init error: %v", err)
@@ -72,7 +72,7 @@ func pub(be *testing.B, c int) {
be.StopTimer()
m := mock.NewRegistry()
b := NewBroker(Registry(m))
topic := uuid.NewUUID().String()
topic := uuid.New().String()
if err := b.Init(); err != nil {
be.Fatalf("Unexpected init error: %v", err)

View File

@@ -4,8 +4,8 @@ import (
"errors"
"sync"
"github.com/google/uuid"
"github.com/micro/go-micro/broker"
"github.com/pborman/uuid"
)
type mockBroker struct {
@@ -112,7 +112,7 @@ func (m *mockBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
sub := &mockSubscriber{
exit: make(chan bool, 1),
id: uuid.NewUUID().String(),
id: uuid.New().String(),
topic: topic,
handler: handler,
opts: options,

View File

@@ -68,7 +68,7 @@ var (
// DefaultBackoff is the default backoff function for retries
DefaultBackoff = exponentialBackoff
// DefaultRetry is the default check-for-retry function for retries
DefaultRetry = alwaysRetry
DefaultRetry = RetryOnError
// DefaultRetries is the default number of times a request is tried
DefaultRetries = 1
// DefaultRequestTimeout is the default request timeout

View File

@@ -80,8 +80,12 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
v = reflect.Indirect(v)
}
response := r.Response
if t := reflect.TypeOf(r.Response); t.Kind() == reflect.Func {
response = reflect.ValueOf(r.Response).Call([]reflect.Value{})[0].Interface()
}
v.Set(reflect.ValueOf(r.Response))
v.Set(reflect.ValueOf(response))
return nil
}

View File

@@ -16,6 +16,8 @@ func TestClient(t *testing.T) {
{Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
{Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
{Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
{Method: "Foo.Func", Response: func() string { return "string" }},
{Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
}
c := NewClient(Response("go.mock", response))

View File

@@ -2,12 +2,34 @@ package client
import (
"context"
"github.com/micro/go-micro/errors"
)
// note that returning either false or a non-nil error will result in the call not being retried
type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)
// always retry on error
func alwaysRetry(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
// RetryAlways always retry on error
func RetryAlways(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
return true, nil
}
// RetryOnError retries a request on a 500 or timeout error
func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
if err == nil {
return false, nil
}
e := errors.Parse(err.Error())
if e == nil {
return false, nil
}
switch e.Code {
// retry on timeout or internal server error
case 408, 500:
return true, nil
default:
return false, nil
}
}

View File

@@ -1,9 +1,11 @@
// Package rpc provides an rpc client
package rpc
import (
"github.com/micro/go-micro/client"
)
// NewClient returns a new micro client interface
func NewClient(opts ...client.Option) client.Client {
return client.NewClient(opts...)
}

View File

@@ -7,6 +7,8 @@ import (
"sync"
"time"
"sync/atomic"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/errors"
@@ -14,7 +16,6 @@ import (
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"sync/atomic"
)
type rpcClient struct {
@@ -88,7 +89,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
r.pool.release(address, c, grr)
}()
seq := r.seq
seq := atomic.LoadUint64(&r.seq)
atomic.AddUint64(&r.seq, 1)
stream := &rpcStream{
@@ -159,7 +160,15 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout))
dOpts := []transport.DialOption{
transport.WithStream(),
}
if opts.DialTimeout >= 0 {
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
}
c, err := r.opts.Transport.Dial(address, dOpts...)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
@@ -230,9 +239,9 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error())
}
return next, nil
@@ -282,7 +291,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
@@ -293,9 +302,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
// set the address
@@ -314,9 +323,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
var gerr error
for i := 0; i <= callOpts.Retries; i++ {
go func() {
go func(i int) {
ch <- call(i)
}()
}(i)
select {
case <-ctx.Done():
@@ -355,18 +364,6 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, err
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
@@ -378,7 +375,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
@@ -388,9 +385,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
address := node.Address

View File

@@ -2,10 +2,10 @@ package client
import (
"context"
"errors"
"fmt"
"testing"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
@@ -69,7 +69,7 @@ func TestCallRetry(t *testing.T) {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called++
if called == 1 {
return errors.New("retry request")
return errors.InternalServerError("test.error", "retry request")
}
// don't do the call

View File

@@ -87,6 +87,11 @@ var (
EnvVar: "MICRO_REGISTER_INTERVAL",
Usage: "Register interval in seconds",
},
cli.StringFlag{
Name: "server",
EnvVar: "MICRO_SERVER",
Usage: "Server for go-micro; rpc",
},
cli.StringFlag{
Name: "server_name",
EnvVar: "MICRO_SERVER_NAME",
@@ -144,11 +149,6 @@ var (
Usage: "Selector used to pick nodes for querying",
Value: "cache",
},
cli.StringFlag{
Name: "server",
EnvVar: "MICRO_SERVER",
Usage: "Server for go-micro; rpc",
},
cli.StringFlag{
Name: "transport",
EnvVar: "MICRO_TRANSPORT",
@@ -262,48 +262,40 @@ func (c *cmd) Before(ctx *cli.Context) error {
// Set the client
if name := ctx.String("client"); len(name) > 0 {
if cl, ok := c.opts.Clients[name]; ok {
// only change if we have the client and type differs
if cl, ok := c.opts.Clients[name]; ok && (*c.opts.Client).String() != name {
*c.opts.Client = cl()
}
}
// Set the server
if name := ctx.String("server"); len(name) > 0 {
if s, ok := c.opts.Servers[name]; ok {
// only change if we have the server and type differs
if s, ok := c.opts.Servers[name]; ok && (*c.opts.Server).String() != name {
*c.opts.Server = s()
}
}
// Set the broker
if name := ctx.String("broker"); len(name) > 0 || len(ctx.String("broker_address")) > 0 {
if len(name) == 0 {
name = defaultBroker
}
if b, ok := c.opts.Brokers[name]; ok {
n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
*c.opts.Broker = n
} else {
if name := ctx.String("broker"); len(name) > 0 && (*c.opts.Broker).String() != name {
b, ok := c.opts.Brokers[name]
if !ok {
return fmt.Errorf("Broker %s not found", name)
}
*c.opts.Broker = b()
serverOpts = append(serverOpts, server.Broker(*c.opts.Broker))
clientOpts = append(clientOpts, client.Broker(*c.opts.Broker))
}
// Set the registry
if name := ctx.String("registry"); len(name) > 0 || len(ctx.String("registry_address")) > 0 {
if len(name) == 0 {
name = defaultRegistry
}
if r, ok := c.opts.Registries[name]; ok {
n := r(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
*c.opts.Registry = n
} else {
if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
r, ok := c.opts.Registries[name]
if !ok {
return fmt.Errorf("Registry %s not found", name)
}
*c.opts.Registry = r()
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
@@ -314,31 +306,26 @@ func (c *cmd) Before(ctx *cli.Context) error {
}
// Set the selector
if name := ctx.String("selector"); len(name) > 0 {
if s, ok := c.opts.Selectors[name]; ok {
n := s(selector.Registry(*c.opts.Registry))
*c.opts.Selector = n
} else {
if name := ctx.String("selector"); len(name) > 0 && (*c.opts.Selector).String() != name {
s, ok := c.opts.Selectors[name]
if !ok {
return fmt.Errorf("Selector %s not found", name)
}
*c.opts.Selector = s(selector.Registry(*c.opts.Registry))
// No server option here. Should there be?
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
}
// Set the transport
if name := ctx.String("transport"); len(name) > 0 || len(ctx.String("transport_address")) > 0 {
if len(name) == 0 {
name = defaultTransport
}
if t, ok := c.opts.Transports[name]; ok {
n := t(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
*c.opts.Transport = n
} else {
if name := ctx.String("transport"); len(name) > 0 && (*c.opts.Transport).String() != name {
t, ok := c.opts.Transports[name]
if !ok {
return fmt.Errorf("Transport %s not found", name)
}
*c.opts.Transport = t()
serverOpts = append(serverOpts, server.Transport(*c.opts.Transport))
clientOpts = append(clientOpts, client.Transport(*c.opts.Transport))
}
@@ -359,6 +346,18 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.Metadata(metadata))
}
if len(ctx.String("broker_address")) > 0 {
(*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
}
if len(ctx.String("registry_address")) > 0 {
(*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
}
if len(ctx.String("transport_address")) > 0 {
(*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
}
if len(ctx.String("server_name")) > 0 {
serverOpts = append(serverOpts, server.Name(ctx.String("server_name")))
}
@@ -384,7 +383,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
}
// client opts
if r := ctx.Int("client_retries"); r > 0 {
if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r))
}

View File

@@ -1,3 +1,4 @@
// Package jsonrpc provides a json-rpc 1.0 codec
package jsonrpc
import (
@@ -54,7 +55,8 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
case codec.Response:
return j.c.ReadHeader(m)
case codec.Publication:
io.Copy(j.buf, j.rwc)
_, err := io.Copy(j.buf, j.rwc)
return err
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}

View File

@@ -1,3 +1,4 @@
// Protorpc provides a net/rpc proto-rpc codec. See envelope.proto for the format.
package protorpc
import (
@@ -55,7 +56,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err
}
if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
if err = flusher.Flush(); err != nil {
return err
}
}
case codec.Response:
c.Lock()
@@ -82,7 +85,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err
}
if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
if err = flusher.Flush(); err != nil {
return err
}
}
case codec.Publication:
data, err := proto.Marshal(b.(proto.Message))
@@ -127,7 +132,8 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
m.Id = rtmp.GetSeq()
m.Error = rtmp.GetError()
case codec.Publication:
io.Copy(c.buf, c.rwc)
_, err := io.Copy(c.buf, c.rwc)
return err
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}

View File

@@ -82,6 +82,16 @@ func NotFound(id, format string, a ...interface{}) error {
}
}
// MethodNotAllowed generates a 405 error.
func MethodNotAllowed(id, format string, a ...interface{}) error {
return &Error{
Id: id,
Code: 405,
Detail: fmt.Sprintf(format, a...),
Status: http.StatusText(405),
}
}
// InternalServerError generates a 500 error.
func InternalServerError(id, format string, a ...interface{}) error {
return &Error{

View File

@@ -8,6 +8,16 @@ import (
"github.com/micro/go-micro/registry"
)
// Connect specifies services should be registered as Consul Connect services
func Connect() registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "consul_connect", true)
}
}
func Config(c *consul.Config) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {

View File

@@ -19,8 +19,26 @@ type consulRegistry struct {
Client *consul.Client
opts Options
// connect enabled
connect bool
sync.Mutex
register map[string]uint64
// lastChecked tracks when a node was last checked as existing in Consul
lastChecked map[string]time.Time
}
func getDeregisterTTL(t time.Duration) time.Duration {
// splay slightly for the watcher?
splay := time.Second * 5
deregTTL := t + splay
// consul has a minimum timeout on deregistration of 1 minute.
if t < time.Minute {
deregTTL = time.Minute + splay
}
return deregTTL
}
func newTransport(config *tls.Config) *http.Transport {
@@ -45,27 +63,31 @@ func newTransport(config *tls.Config) *http.Transport {
return t
}
func newConsulRegistry(opts ...Option) Registry {
var options Options
func configure(c *consulRegistry, opts ...Option) {
// set opts
for _, o := range opts {
o(&options)
o(&c.opts)
}
// use default config
config := consul.DefaultConfig()
if options.Context != nil {
if c.opts.Context != nil {
// Use the consul config passed in the options, if available
if c, ok := options.Context.Value("consul_config").(*consul.Config); ok {
config = c
if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok {
config = co
}
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
c.connect = cn
}
}
// check if there are any addrs
if len(options.Addrs) > 0 {
addr, port, err := net.SplitHostPort(options.Addrs[0])
if len(c.opts.Addrs) > 0 {
addr, port, err := net.SplitHostPort(c.opts.Addrs[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
addr = options.Addrs[0]
addr = c.opts.Addrs[0]
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
@@ -73,61 +95,59 @@ func newConsulRegistry(opts ...Option) Registry {
}
// requires secure connection?
if options.Secure || options.TLSConfig != nil {
if c.opts.Secure || c.opts.TLSConfig != nil {
if config.HttpClient == nil {
config.HttpClient = new(http.Client)
}
config.Scheme = "https"
// We're going to support InsecureSkipVerify
config.HttpClient.Transport = newTransport(options.TLSConfig)
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)
}
// set timeout
if c.opts.Timeout > 0 {
config.HttpClient.Timeout = c.opts.Timeout
}
// create the client
client, _ := consul.NewClient(config)
// set timeout
if options.Timeout > 0 {
config.HttpClient.Timeout = options.Timeout
}
// set address/client
c.Address = config.Address
c.Client = client
}
func newConsulRegistry(opts ...Option) Registry {
cr := &consulRegistry{
Address: config.Address,
Client: client,
opts: options,
register: make(map[string]uint64),
opts: Options{},
register: make(map[string]uint64),
lastChecked: make(map[string]time.Time),
}
configure(cr, opts...)
return cr
}
func (c *consulRegistry) Init(opts ...Option) error {
configure(c, opts...)
return nil
}
func (c *consulRegistry) Deregister(s *Service) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
}
// delete our hash of the service
// delete our hash and time check of the service
c.Lock()
delete(c.register, s.Name)
delete(c.lastChecked, s.Name)
c.Unlock()
node := s.Nodes[0]
return c.Client.Agent().ServiceDeregister(node.Id)
}
func getDeregisterTTL(t time.Duration) time.Duration {
// splay slightly for the watcher?
splay := time.Second * 5
deregTTL := t + splay
// consul has a minimum timeout on deregistration of 1 minute.
if t < time.Minute {
deregTTL = time.Minute + splay
}
return deregTTL
}
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
@@ -164,10 +184,27 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
// if it's already registered and matches then just pass the check
if ok && v == h {
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
if options.TTL == time.Duration(0) {
// ensure that our service hasn't been deregistered by Consul
if time.Since(c.lastChecked[s.Name]) <= getDeregisterTTL(regInterval) {
return nil
}
services, _, err := c.Client.Health().Checks(s.Name, &consul.QueryOptions{
AllowStale: true,
})
if err == nil {
for _, v := range services {
if v.ServiceID == node.Id {
return nil
}
}
}
} else {
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
}
}
}
@@ -192,26 +229,36 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
deregTTL := getDeregisterTTL(options.TTL)
check = &consul.AgentServiceCheck{
TTL: fmt.Sprintf("%v", options.TTL),
TTL: fmt.Sprintf("%v", options.TTL),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
}
}
// register the service
if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
asr := &consul.AgentServiceRegistration{
ID: node.Id,
Name: s.Name,
Tags: tags,
Port: node.Port,
Address: node.Address,
Check: check,
}); err != nil {
}
// Specify consul connect
if c.connect {
asr.Connect = &consul.AgentServiceConnect{
Native: true,
}
}
if err := c.Client.Agent().ServiceRegister(asr); err != nil {
return err
}
// save our hash of the service
// save our hash and time check of the service
c.Lock()
c.register[s.Name] = h
c.lastChecked[s.Name] = time.Now()
c.Unlock()
// if the TTL is 0 we don't mess with the checks
@@ -224,7 +271,15 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
}
func (c *consulRegistry) GetService(name string) ([]*Service, error) {
rsp, _, err := c.Client.Health().Service(name, "", false, nil)
var rsp []*consul.ServiceEntry
var err error
// if we're connect enabled only get connect services
if c.connect {
rsp, _, err = c.Client.Health().Connect(name, "", false, nil)
} else {
rsp, _, err = c.Client.Health().Service(name, "", false, nil)
}
if err != nil {
return nil, err
}

View File

@@ -49,6 +49,17 @@ func newRegistry(opts ...registry.Option) registry.Registry {
}
}
func (m *mdnsRegistry) Init(opts ...registry.Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *mdnsRegistry) Options() registry.Options {
return m.opts
}
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error {
m.Lock()
defer m.Unlock()
@@ -322,10 +333,6 @@ func (m *mdnsRegistry) String() string {
return "mdns"
}
func (m *mdnsRegistry) Options() registry.Options {
return m.opts
}
func NewRegistry(opts ...registry.Option) registry.Registry {
return newRegistry(opts...)
}

View File

@@ -99,11 +99,15 @@ func (m *mockRegistry) String() string {
return "mock"
}
func (m *mockRegistry) Init(opts ...registry.Option) error {
return nil
}
func (m *mockRegistry) Options() registry.Options {
return registry.Options{}
}
func NewRegistry() registry.Registry {
func NewRegistry(opts ...registry.Options) registry.Registry {
m := &mockRegistry{Services: make(map[string][]*registry.Service)}
m.init()
return m

View File

@@ -9,13 +9,14 @@ import (
// and an abstraction over varying implementations
// {consul, etcd, zookeeper, ...}
type Registry interface {
Init(...Option) error
Options() Options
Register(*Service, ...RegisterOption) error
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch(...WatchOption) (Watcher, error)
String() string
Options() Options
}
type Option func(*Options)

View File

@@ -366,11 +366,9 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s
}
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (c *cacheSelector) Reset(service string) {
return
}
// Close stops the watcher and destroys the cache

View File

@@ -48,11 +48,9 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
}
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (r *defaultSelector) Reset(service string) {
return
}
func (r *defaultSelector) Close() error {

View File

@@ -80,7 +80,7 @@ func TestFilterEndpoint(t *testing.T) {
}
}
if seen == false && data.count > 0 {
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}
@@ -232,7 +232,7 @@ func TestFilterVersion(t *testing.T) {
seen = true
}
if seen == false && data.count > 0 {
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}

View File

@@ -4,8 +4,8 @@ import (
"errors"
"sync"
"github.com/google/uuid"
"github.com/micro/go-micro/server"
"github.com/pborman/uuid"
)
type MockServer struct {
@@ -69,7 +69,7 @@ func (m *MockServer) NewHandler(h interface{}, opts ...server.HandlerOption) ser
}
return &MockHandler{
Id: uuid.NewUUID().String(),
Id: uuid.New().String(),
Hdlr: h,
Opts: options,
}

View File

@@ -1,9 +1,11 @@
// Package rpc provides an rpc server
package rpc
import (
"github.com/micro/go-micro/server"
)
// NewServer returns a micro server interface
func NewServer(opts ...server.Option) server.Server {
return server.NewServer(opts...)
}

View File

@@ -15,6 +15,8 @@ type testCodec struct {
}
type testSocket struct {
local string
remote string
}
// TestCodecWriteError simulates what happens when a codec is unable
@@ -87,6 +89,14 @@ func (c *testCodec) String() string {
return "string"
}
func (s testSocket) Local() string {
return s.local
}
func (s testSocket) Remote() string {
return s.remote
}
func (s testSocket) Recv(message *transport.Message) error {
return nil
}

View File

@@ -97,6 +97,10 @@ func (s *rpcServer) accept(sock transport.Socket) {
delete(hdr, "Content-Type")
delete(hdr, "Timeout")
// set local/remote ips
hdr["Local"] = sock.Local()
hdr["Remote"] = sock.Remote()
ctx := metadata.NewContext(context.Background(), hdr)
// set the timeout if we have it
@@ -393,11 +397,35 @@ func (s *rpcServer) Start() error {
s.opts.Address = ts.Addr()
s.Unlock()
go ts.Accept(s.accept)
exit := make(chan bool, 1)
go func() {
for {
err := ts.Accept(s.accept)
// check if we're supposed to exit
select {
case <-exit:
return
default:
}
// check the error and backoff
if err != nil {
log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
}
// no error just exit
return
}
}()
go func() {
// wait for exit
ch := <-s.exit
exit <- true
// wait for requests to finish
if wait(s.opts.Context) {

View File

@@ -199,7 +199,7 @@ func (server *server) register(rcvr interface{}) error {
return nil
}
func (server *server) sendResponse(sending *sync.Mutex, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
func (server *server) sendResponse(sending sync.Locker, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod

View File

@@ -7,8 +7,8 @@ import (
"os/signal"
"syscall"
"github.com/google/uuid"
"github.com/micro/go-log"
"github.com/pborman/uuid"
)
type Server interface {
@@ -63,7 +63,7 @@ var (
DefaultAddress = ":0"
DefaultName = "go-server"
DefaultVersion = "1.0.0"
DefaultId = uuid.NewUUID().String()
DefaultId = uuid.New().String()
DefaultServer Server = newRpcServer()
)

View File

@@ -1,9 +1,11 @@
// Package http returns a http2 transport using net/http
package http
import (
"github.com/micro/go-micro/transport"
)
// NewTransport returns a new http transport using net/http and supporting http2
func NewTransport(opts ...transport.Option) transport.Transport {
return transport.NewTransport(opts...)
}

23
transport/http/options.go Normal file
View File

@@ -0,0 +1,23 @@
package http
import (
"context"
"net/http"
"github.com/micro/go-micro/transport"
)
// Handle registers the handler for the given pattern.
func Handle(pattern string, handler http.Handler) transport.Option {
return func(o *transport.Options) {
if o.Context == nil {
o.Context = context.Background()
}
handlers, ok := o.Context.Value("http_handlers").(map[string]http.Handler)
if !ok {
handlers = make(map[string]http.Handler)
}
handlers[pattern] = handler
o.Context = context.WithValue(o.Context, "http_handlers", handlers)
}
}

View File

@@ -1,6 +1,7 @@
package transport
import (
//"fmt"
"bufio"
"bytes"
"crypto/tls"
@@ -13,10 +14,11 @@ import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/h2c"
maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls"
"golang.org/x/net/http2"
)
type buffer struct {
@@ -38,16 +40,25 @@ type httpTransportClient struct {
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
// local/remote ip
local string
remote string
}
type httpTransportSocket struct {
ht *httpTransport
r chan *http.Request
conn net.Conn
once sync.Once
ht *httpTransport
w http.ResponseWriter
r *http.Request
rw *bufio.ReadWriter
sync.Mutex
buff *bufio.Reader
conn net.Conn
// for the first request
ch chan *http.Request
// local/remote ip
local string
remote string
}
type httpTransportListener struct {
@@ -59,6 +70,14 @@ func (b *buffer) Close() error {
return nil
}
func (h *httpTransportClient) Local() string {
return h.local
}
func (h *httpTransportClient) Remote() string {
return h.remote
}
func (h *httpTransportClient) Send(m *Message) error {
header := make(http.Header)
@@ -170,33 +189,87 @@ func (h *httpTransportClient) Close() error {
return err
}
func (h *httpTransportSocket) Local() string {
return h.local
}
func (h *httpTransportSocket) Remote() string {
return h.remote
}
func (h *httpTransportSocket) Recv(m *Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
r, err := http.ReadRequest(h.buff)
if err != nil {
return err
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
r.Body.Close()
m.Body = b
if m.Header == nil {
m.Header = make(map[string]string)
}
for k, v := range r.Header {
// process http 1
if h.r.ProtoMajor == 1 {
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
var r *http.Request
select {
// get first request
case r = <-h.ch:
// read next request
default:
rr, err := http.ReadRequest(h.rw.Reader)
if err != nil {
return err
}
r = rr
}
// read body
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
// set body
r.Body.Close()
m.Body = b
// set headers
for k, v := range r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
// return early early
return nil
}
// processing http2 request
// read streaming body
// set max buffer size
buf := make([]byte, 4*1024)
// read the request body
n, err := h.r.Body.Read(buf)
// not an eof error
if err != nil {
return err
}
// check if we have data
if n > 0 {
m.Body = buf[:n]
}
// set headers
for k, v := range h.r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
@@ -204,78 +277,73 @@ func (h *httpTransportSocket) Recv(m *Message) error {
}
}
select {
case h.r <- r:
default:
}
return nil
}
func (h *httpTransportSocket) Send(m *Message) error {
b := bytes.NewBuffer(m.Body)
defer b.Reset()
if h.r.ProtoMajor == 1 {
rsp := &http.Response{
Header: h.r.Header,
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
r := <-h.r
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
rsp := &http.Response{
Header: r.Header,
Body: &buffer{b},
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return rsp.Write(h.conn)
}
// http2 request
// set headers
for k, v := range m.Header {
rsp.Header.Set(k, v)
h.w.Header().Set(k, v)
}
select {
case h.r <- r:
default:
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return rsp.Write(h.conn)
// write request
_, err := h.w.Write(m.Body)
return err
}
func (h *httpTransportSocket) error(m *Message) error {
b := bytes.NewBuffer(m.Body)
defer b.Reset()
rsp := &http.Response{
Header: make(http.Header),
Body: &buffer{b},
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
if h.r.ProtoMajor == 1 {
rsp := &http.Response{
Header: make(http.Header),
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
return rsp.Write(h.conn)
return rsp.Write(h.conn)
}
return nil
}
func (h *httpTransportSocket) Close() error {
err := h.conn.Close()
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.buff = nil
h.Unlock()
})
return err
if h.r.ProtoMajor == 1 {
return h.conn.Close()
}
return nil
}
func (h *httpTransportListener) Addr() string {
@@ -287,46 +355,81 @@ func (h *httpTransportListener) Close() error {
}
func (h *httpTransportListener) Accept(fn func(Socket)) error {
var tempDelay time.Duration
// create handler mux
mux := http.NewServeMux()
for {
c, err := h.listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.Logf("http: Accept error: %v; retrying in %v\n", err, tempDelay)
time.Sleep(tempDelay)
continue
// register our transport handler
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf *bufio.ReadWriter
var con net.Conn
// read a regular request
if r.ProtoMajor == 1 {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
return err
r.Body = ioutil.NopCloser(bytes.NewReader(b))
// hijack the conn
hj, ok := w.(http.Hijacker)
if !ok {
// we're screwed
http.Error(w, "cannot serve conn", http.StatusInternalServerError)
return
}
conn, bufrw, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
buf = bufrw
con = conn
}
sock := &httpTransportSocket{
ht: h.ht,
conn: c,
buff: bufio.NewReader(c),
r: make(chan *http.Request, 1),
// save the request
ch := make(chan *http.Request, 1)
ch <- r
fn(&httpTransportSocket{
ht: h.ht,
w: w,
r: r,
rw: buf,
ch: ch,
conn: con,
local: h.Addr(),
remote: r.RemoteAddr,
})
})
// get optional handlers
if h.ht.opts.Context != nil {
handlers, ok := h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
mux.Handle(pattern, handler)
}
}
go func() {
// TODO: think of a better error response strategy
defer func() {
if r := recover(); r != nil {
log.Log("panic recovered: ", r)
sock.Close()
}
}()
fn(sock)
}()
}
// default http2 server
srv := &http.Server{
Handler: mux,
}
// insecure connection use h2c
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
srv.Handler = &h2c.HandlerH2C{
Handler: mux,
H2Server: &http2.Server{},
}
}
// begin serving
return srv.Serve(h.listener)
}
func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
@@ -365,6 +468,8 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
buff: bufio.NewReader(conn),
dialOpts: dopts,
r: make(chan *http.Request, 1),
local: conn.LocalAddr().String(),
remote: conn.RemoteAddr().String(),
}, nil
}
@@ -423,6 +528,17 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
}, nil
}
func (h *httpTransport) Init(opts ...Option) error {
for _, o := range opts {
o(&h.opts)
}
return nil
}
func (h *httpTransport) Options() Options {
return h.opts
}
func (h *httpTransport) String() string {
return "http"
}

View File

@@ -18,6 +18,9 @@ type mockSocket struct {
exit chan bool
// listener exit
lexit chan bool
local string
remote string
}
type mockClient struct {
@@ -51,6 +54,14 @@ func (ms *mockSocket) Recv(m *transport.Message) error {
return nil
}
func (ms *mockSocket) Local() string {
return ms.local
}
func (ms *mockSocket) Remote() string {
return ms.remote
}
func (ms *mockSocket) Send(m *transport.Message) error {
select {
case <-ms.exit:
@@ -93,10 +104,12 @@ func (m *mockListener) Accept(fn func(transport.Socket)) error {
return nil
case c := <-m.conn:
go fn(&mockSocket{
lexit: c.lexit,
exit: c.exit,
send: c.recv,
recv: c.send,
lexit: c.lexit,
exit: c.exit,
send: c.recv,
recv: c.send,
local: c.Remote(),
remote: c.Local(),
})
}
}
@@ -118,10 +131,12 @@ func (m *mockTransport) Dial(addr string, opts ...transport.DialOption) (transpo
client := &mockClient{
&mockSocket{
send: make(chan *transport.Message),
recv: make(chan *transport.Message),
exit: make(chan bool),
lexit: listener.exit,
send: make(chan *transport.Message),
recv: make(chan *transport.Message),
exit: make(chan bool),
lexit: listener.exit,
local: addr,
remote: addr,
},
options,
}
@@ -171,6 +186,17 @@ func (m *mockTransport) Listen(addr string, opts ...transport.ListenOption) (tra
return listener, nil
}
func (m *mockTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *mockTransport) Options() transport.Options {
return m.opts
}
func (m *mockTransport) String() string {
return "mock"
}

View File

@@ -1,4 +1,4 @@
// Package is an interface for synchronous communication
// Package transport is an interface for synchronous communication
package transport
import (
@@ -14,6 +14,8 @@ type Socket interface {
Recv(*Message) error
Send(*Message) error
Close() error
Local() string
Remote() string
}
type Client interface {
@@ -30,6 +32,8 @@ type Listener interface {
// services. It uses socket send/recv semantics and had various
// implementations {HTTP, RabbitMQ, NATS, ...}
type Transport interface {
Init(...Option) error
Options() Options
Dial(addr string, opts ...DialOption) (Client, error)
Listen(addr string, opts ...ListenOption) (Listener, error)
String() string