Compare commits

...

46 Commits

Author SHA1 Message Date
Asim Aslam
98bb4a69c2 Merge pull request #325 from micro/accept_loop
make accept loop
2018-11-15 21:23:16 +00:00
Asim Aslam
e69413b763 add continue 2018-11-15 21:13:33 +00:00
Asim Aslam
45f18042b7 make accept loop 2018-11-15 19:55:13 +00:00
Asim Aslam
0672b051cc Add Local/Remote ip to metadata 2018-11-14 20:27:58 +00:00
Asim Aslam
3c496720cc Merge pull request #324 from micro/ip
Ip
2018-11-14 20:15:56 +00:00
Asim Aslam
881cb570d5 reorder 2018-11-14 19:49:04 +00:00
Asim Aslam
c6a2c8de6c add local/remote to testsocket 2018-11-14 19:45:46 +00:00
Asim Aslam
71bacf6991 add local/remote ip to socket 2018-11-14 19:41:13 +00:00
Asim Aslam
a0b257b572 remove line 2018-11-14 15:19:23 +00:00
Asim Aslam
a082c151f0 remove example section 2018-11-14 15:18:47 +00:00
Asim Aslam
302ab42a97 strip down readme 2018-11-14 15:18:13 +00:00
Asim Aslam
531d4dd24a Merge pull request #322 from mgrachev/fix-linter-issues
Fix some linter issues
2018-11-13 09:38:19 +00:00
Asim Aslam
1c401a852e Merge pull request #321 from mgrachev/errors-check
Add errors check
2018-11-13 09:36:54 +00:00
Mikhail Grachev
25e6dcc9b6 Fix some linter issues 2018-11-13 11:57:42 +03:00
Mikhail Grachev
4006d9f102 Add errors check 2018-11-13 11:56:21 +03:00
Asim Aslam
4c821baab4 go fmt 2018-11-03 12:17:11 +00:00
Asim Aslam
c8a35afc92 Merge pull request #313 from lovelly/master
Fix tcp check no ttl error
2018-11-03 12:16:56 +00:00
Asim Aslam
54f67db275 Merge pull request #289 from micro/http2
http2 support
2018-11-03 12:07:23 +00:00
lovelly
fd04722706 Fix tcp check no ttl error 2018-10-09 10:40:24 +08:00
Asim Aslam
4cee1f19f6 Merge pull request #309 from fireyang/master
fix rpc client call WARNING: DATA RACE
2018-09-20 07:39:45 +01:00
fireyang
ef8b5e28b0 fix rpc client call WARNING: DATA RACE 2018-09-20 10:08:00 +08:00
Asim Aslam
818f150b25 Merge pull request #308 from fireyang/master
fix bug: loop variable i captured by func literal
2018-09-19 15:13:19 +01:00
fireyang
446d3fc72e fix bug: loop variable i captured by func literal 2018-09-19 21:58:20 +08:00
Asim Aslam
240052246f Merge pull request #306 from DexterHD/fix-go-config-panic
fix bug with go-config panic
2018-09-13 19:26:57 +01:00
Anton Kucherov
156a51ab10 fix bug with go-config panic
See: https://github.com/micro/go-config/issues/49
2018-09-13 19:02:08 +03:00
Asim Aslam
52a4beb072 Merge pull request #305 from ahmadnurus/add_method_not_allowed_error
errors: Added 405 Method Not Allowed helper function
2018-09-13 07:39:42 +01:00
ahmadnurus
395d70cf01 errors: Added 405 Method Not Allowed helper function 2018-09-12 21:42:34 +07:00
Asim Aslam
3732dc2f42 bump travis 2018-08-28 16:00:04 +01:00
Asim Aslam
9d3cb65daa set broker address on Init 2018-08-18 17:28:58 +01:00
Asim Aslam
a0d3917832 Merge pull request #292 from micro/init
Add Init to all things, use init in cmd package to initialise
2018-08-09 18:30:36 +01:00
Asim Aslam
9968c7d007 Add Init to all things, use init in cmd package to initialise 2018-08-08 18:57:29 +01:00
Asim Aslam
68f5e71153 Merge pull request #290 from micro/connect
Support connect native registration
2018-08-06 17:20:37 +01:00
Asim Aslam
af328ee7b4 Support connect native registration 2018-08-06 17:12:34 +01:00
Asim Aslam
eebaa64d8c phase 1 2018-07-29 10:55:46 +01:00
Asim Aslam
88505388c1 Add verbosity to errors 2018-07-26 09:33:50 +01:00
Asim Aslam
8a778644cf Merge pull request #281 from micro/retry
retry only on timeout or internal server error
2018-07-22 17:52:12 +01:00
Asim Aslam
d3a76e646a retry only on timeout or internal server error 2018-07-22 17:41:58 +01:00
Asim Aslam
cfa824bc5f Merge pull request #280 from jiyeyuran/master
Allow client_retries to be 0
2018-07-19 23:24:57 -07:00
武新飞
39be61685c client_retries can be 0 2018-07-20 14:20:23 +08:00
Asim Aslam
ac2106ced7 strip deadline from stream 2018-07-17 16:39:07 -07:00
Asim Aslam
1b4f7d8a68 a stream should not timeout 2018-07-17 16:32:35 -07:00
Asim Aslam
5eb2e79b86 go fmt 2018-07-17 16:31:09 -07:00
Asim Aslam
a2eff9918e Merge pull request #269 from Ak-Army/master
handle function in mock response
2018-06-13 16:54:10 +01:00
Hunyadvári Péter
52a470532d handle function in mock response 2018-06-13 17:46:30 +02:00
Asim Aslam
cd9441fafb Merge pull request #268 from jasimmk/fix-connect-lock
Fixing httpBroker dead lock; If publish is called from a subscription #267
2018-06-13 16:21:24 +01:00
Jasim Muhammed
356cf82af5 Fixing httpBroker dead lock; If publish is called from a subscription 2018-06-13 10:28:39 +04:00
27 changed files with 551 additions and 691 deletions

View File

@@ -1,7 +1,7 @@
language: go
go:
- 1.9.5
- 1.10.x
- 1.11.x
notifications:
slack:
secure: aEvhLbhujaGaKSrOokiG3//PaVHTIrc3fBpoRbCRqfZpyq6WREoapJJhF+tIpWWOwaC9GmChbD6aHo/jMUgwKXVyPSaNjiEL87YzUUpL8B2zslNp1rgfTg/LrzthOx3Q1TYwpaAl3to0fuHUVFX4yMeC2vuThq7WSXgMMxFCtbc=

432
README.md
View File

@@ -18,437 +18,9 @@ Go Micro abstracts away the details of distributed systems. Here are the main fe
- **Sync Streaming** - RPC based communication with support for bidirectional streaming
- **Async Messaging** - Native PubSub messaging built in for event driven architectures
Go Micro supports both the Service and Function programming models. Read on to learn more.
## Getting Started
## Docs
For more detailed information on the architecture, installation and use of go-micro checkout the [docs](https://micro.mu/docs).
## Learn By Example
An example service can be found in [**examples/service**](https://github.com/micro/examples/tree/master/service) and function in [**examples/function**](https://github.com/micro/examples/tree/master/function).
The [**examples**](https://github.com/micro/examples) directory contains examples for using things such as middleware/wrappers, selector filters, pub/sub, grpc, plugins and much more. For the complete greeter example look at [**examples/greeter**](https://github.com/micro/examples/tree/master/greeter). Other examples can be found throughout the GitHub repository.
Watch the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34) video for a high level overview.
## Getting started
- [Install Protobuf](#install-protobuf)
- [Service Discovery](#service-discovery)
- [Writing a Service](#writing-a-service)
- [Writing a Function](#writing-a-function)
- [Publish & Subscribe](#publish--subscribe)
- [Plugins](#plugins)
- [Wrappers](#wrappers)
## Install Protobuf
Protobuf is required for code generation
You'll need to install:
- [protoc-gen-micro](https://github.com/micro/protoc-gen-micro)
## Service Discovery
Service discovery is used to resolve service names to addresses.
### Consul
[Consul](https://www.consul.io/) is used as the default service discovery system.
Discovery is pluggable. Find plugins for etcd, kubernetes, zookeeper and more in the [micro/go-plugins](https://github.com/micro/go-plugins) repo.
[Install guide](https://www.consul.io/intro/getting-started/install.html)
### Multicast DNS
[Multicast DNS](https://en.wikipedia.org/wiki/Multicast_DNS) is a built in service discovery plugin for a zero dependency configuration.
Pass `--registry=mdns` to any command or the enviroment variable `MICRO_REGISTRY=mdns`
```
MICRO_REGISTRY=mdns go run main.go
```
## Writing a service
This is a simple greeter RPC service example
Find this example at [examples/service](https://github.com/micro/examples/tree/master/service).
### Create service proto
One of the key requirements of microservices is strongly defined interfaces. Micro uses protobuf to achieve this.
Here we define the Greeter handler with the method Hello. It takes a HelloRequest and HelloResponse both with one string arguments.
```proto
syntax = "proto3";
service Greeter {
rpc Hello(HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string greeting = 2;
}
```
### Generate the proto
After writing the proto definition we must compile it using protoc with the micro plugin.
```shell
protoc --proto_path=$GOPATH/src:. --micro_out=. --go_out=. path/to/greeter.proto
```
### Write the service
Below is the code for the greeter service.
It does the following:
1. Implements the interface defined for the Greeter handler
2. Initialises a micro.Service
3. Registers the Greeter handler
4. Runs the service
```go
package main
import (
"context"
"fmt"
micro "github.com/micro/go-micro"
proto "github.com/micro/examples/service/proto"
)
type Greeter struct{}
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
rsp.Greeting = "Hello " + req.Name
return nil
}
func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(
micro.Name("greeter"),
)
// Init will parse the command line flags.
service.Init()
// Register handler
proto.RegisterGreeterHandler(service.Server(), new(Greeter))
// Run the server
if err := service.Run(); err != nil {
fmt.Println(err)
}
}
```
### Run service
```
go run examples/service/main.go
```
Output
```
2016/03/14 10:59:14 Listening on [::]:50137
2016/03/14 10:59:14 Broker Listening on [::]:50138
2016/03/14 10:59:14 Registering node: greeter-ca62b017-e9d3-11e5-9bbb-68a86d0d36b6
```
### Define a client
Below is the client code to query the greeter service.
The generated proto includes a greeter client to reduce boilerplate code.
```go
package main
import (
"context"
"fmt"
micro "github.com/micro/go-micro"
proto "github.com/micro/examples/service/proto"
)
func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(micro.Name("greeter.client"))
service.Init()
// Create new greeter client
greeter := proto.NewGreeterService("greeter", service.Client())
// Call the greeter
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
if err != nil {
fmt.Println(err)
}
// Print response
fmt.Println(rsp.Greeting)
}
```
### Run the client
```shell
go run client.go
```
Output
```
Hello John
```
## Writing a Function
Go Micro includes the Function programming model.
A Function is a one time executing Service which exits after completing a request.
### Defining a Function
```go
package main
import (
"context"
proto "github.com/micro/examples/function/proto"
"github.com/micro/go-micro"
)
type Greeter struct{}
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
rsp.Greeting = "Hello " + req.Name
return nil
}
func main() {
// create a new function
fnc := micro.NewFunction(
micro.Name("greeter"),
)
// init the command line
fnc.Init()
// register a handler
fnc.Handle(new(Greeter))
// run the function
fnc.Run()
}
```
It's that simple.
## Publish & Subscribe
Go-micro has a built in message broker interface for event driven architectures.
PubSub operates on the same protobuf generated messages as RPC. They are encoded/decoded automatically and sent via the broker.
By default go-micro includes a point-to-point http broker but this can be swapped out via go-plugins.
### Publish
Create a new publisher with a `topic` name and service client
```go
p := micro.NewPublisher("events", service.Client())
```
Publish a proto message
```go
p.Publish(context.TODO(), &proto.Event{Name: "event"})
```
### Subscribe
Create a message handler. It's signature should be `func(context.Context, v interface{}) error`.
```go
func ProcessEvent(ctx context.Context, event *proto.Event) error {
fmt.Printf("Got event %+v\n", event)
return nil
}
```
Register the message handler with a `topic`
```go
micro.RegisterSubscriber("events", ProcessEvent)
```
See [examples/pubsub](https://github.com/micro/examples/tree/master/pubsub) for a complete example.
## Plugins
By default go-micro only provides a few implementation of each interface at the core but it's completely pluggable. There's already dozens of plugins which are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins). Contributions are welcome!
### Build with plugins
If you want to integrate plugins simply link them in a separate file and rebuild
Create a plugins.go file
```go
import (
// etcd v3 registry
_ "github.com/micro/go-plugins/registry/etcdv3"
// nats transport
_ "github.com/micro/go-plugins/transport/nats"
// kafka broker
_ "github.com/micro/go-plugins/broker/kafka"
)
```
Build binary
```shell
// For local use
go build -i -o service ./main.go ./plugins.go
```
Flag usage of plugins
```shell
service --registry=etcdv3 --transport=nats --broker=kafka
```
### Plugin as option
Alternatively you can set the plugin as an option to a service
```go
import (
"github.com/micro/go-micro"
// etcd v3 registry
"github.com/micro/go-plugins/registry/etcdv3"
// nats transport
"github.com/micro/go-plugins/transport/nats"
// kafka broker
"github.com/micro/go-plugins/broker/kafka"
)
func main() {
registry := etcdv3.NewRegistry()
broker := kafka.NewBroker()
transport := nats.NewTransport()
service := micro.NewService(
micro.Name("greeter"),
micro.Registry(registry),
micro.Broker(broker),
micro.Transport(transport),
)
service.Init()
service.Run()
}
```
### Write plugins
Plugins are a concept built on Go's interface. Each package maintains a high level interface abstraction.
Simply implement the interface and pass it in as an option to the service.
The service discovery interface is called [Registry](https://godoc.org/github.com/micro/go-micro/registry#Registry).
Anything which implements this interface can be used as a registry. The same applies to the other packages.
```go
type Registry interface {
Register(*Service, ...RegisterOption) error
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch() (Watcher, error)
String() string
}
```
Browse [go-plugins](https://github.com/micro/go-plugins) to get a better idea of implementation details.
## Wrappers
Go-micro includes the notion of middleware as wrappers. The client or handlers can be wrapped using the decorator pattern.
### Handler
Here's an example service handler wrapper which logs the incoming request
```go
// implements the server.HandlerWrapper
func logWrapper(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
fmt.Printf("[%v] server request: %s", time.Now(), req.Method())
return fn(ctx, req, rsp)
}
}
```
It can be initialised when creating the service
```go
service := micro.NewService(
micro.Name("greeter"),
// wrap the handler
micro.WrapHandler(logWrapper),
)
```
### Client
Here's an example of a client wrapper which logs requests made
```go
type logWrapper struct {
client.Client
}
func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
fmt.Printf("[wrapper] client request to service: %s method: %s\n", req.Service(), req.Method())
return l.Client.Call(ctx, req, rsp)
}
// implements client.Wrapper as logWrapper
func logWrap(c client.Client) client.Client {
return &logWrapper{c}
}
```
It can be initialised when creating the service
```go
service := micro.NewService(
micro.Name("greeter"),
// wrap the client
micro.WrapClient(logWrap),
)
```
## Other Languages
Check out [ja-micro](https://github.com/Sixt/ja-micro) to write services in Java
For detailed information on the architecture, installation and use of go-micro checkout the [docs](https://micro.mu/docs).
## Sponsors

View File

@@ -176,7 +176,7 @@ func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward
if sub.id == s.id {
h.r.Deregister(sub.svc)
_ = h.r.Deregister(sub.svc)
continue
}
// keep subscriber
@@ -200,7 +200,7 @@ func (h *httpBroker) run(l net.Listener) {
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
@@ -210,7 +210,7 @@ func (h *httpBroker) run(l net.Listener) {
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Deregister(sub.svc)
_ = h.r.Deregister(sub.svc)
}
}
h.RUnlock()
@@ -276,12 +276,15 @@ func (h *httpBroker) Address() string {
}
func (h *httpBroker) Connect() error {
h.Lock()
defer h.Unlock()
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
@@ -351,12 +354,16 @@ func (h *httpBroker) Connect() error {
}
func (h *httpBroker) Disconnect() error {
h.Lock()
defer h.Unlock()
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
// stop rcache
rc, ok := h.r.(rcache.Cache)
@@ -375,17 +382,24 @@ func (h *httpBroker) Disconnect() error {
}
func (h *httpBroker) Init(opts ...Option) error {
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
if h.running {
return errors.New("cannot init while connected")
}
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "broker-" + uuid.NewUUID().String()
}

View File

@@ -68,7 +68,7 @@ var (
// DefaultBackoff is the default backoff function for retries
DefaultBackoff = exponentialBackoff
// DefaultRetry is the default check-for-retry function for retries
DefaultRetry = alwaysRetry
DefaultRetry = RetryOnError
// DefaultRetries is the default number of times a request is tried
DefaultRetries = 1
// DefaultRequestTimeout is the default request timeout

View File

@@ -80,8 +80,12 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
v = reflect.Indirect(v)
}
response := r.Response
if t := reflect.TypeOf(r.Response); t.Kind() == reflect.Func {
response = reflect.ValueOf(r.Response).Call([]reflect.Value{})[0].Interface()
}
v.Set(reflect.ValueOf(r.Response))
v.Set(reflect.ValueOf(response))
return nil
}

View File

@@ -16,6 +16,8 @@ func TestClient(t *testing.T) {
{Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
{Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
{Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
{Method: "Foo.Func", Response: func() string { return "string" }},
{Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
}
c := NewClient(Response("go.mock", response))

View File

@@ -2,12 +2,34 @@ package client
import (
"context"
"github.com/micro/go-micro/errors"
)
// note that returning either false or a non-nil error will result in the call not being retried
type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)
// always retry on error
func alwaysRetry(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
// RetryAlways always retry on error
func RetryAlways(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
return true, nil
}
// RetryOnError retries a request on a 500 or timeout error
func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
if err == nil {
return false, nil
}
e := errors.Parse(err.Error())
if e == nil {
return false, nil
}
switch e.Code {
// retry on timeout or internal server error
case 408, 500:
return true, nil
default:
return false, nil
}
}

View File

@@ -7,6 +7,8 @@ import (
"sync"
"time"
"sync/atomic"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/errors"
@@ -14,7 +16,6 @@ import (
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"sync/atomic"
)
type rpcClient struct {
@@ -88,7 +89,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
r.pool.release(address, c, grr)
}()
seq := r.seq
seq := atomic.LoadUint64(&r.seq)
atomic.AddUint64(&r.seq, 1)
stream := &rpcStream{
@@ -159,7 +160,15 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout))
dOpts := []transport.DialOption{
transport.WithStream(),
}
if opts.DialTimeout >= 0 {
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
}
c, err := r.opts.Transport.Dial(address, dOpts...)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
@@ -230,9 +239,9 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error())
}
return next, nil
@@ -282,7 +291,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
@@ -293,9 +302,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
// set the address
@@ -314,9 +323,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
var gerr error
for i := 0; i <= callOpts.Retries; i++ {
go func() {
go func(i int) {
ch <- call(i)
}()
}(i)
select {
case <-ctx.Done():
@@ -355,18 +364,6 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, err
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
@@ -378,7 +375,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
@@ -388,9 +385,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
address := node.Address

View File

@@ -2,10 +2,10 @@ package client
import (
"context"
"errors"
"fmt"
"testing"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
@@ -69,7 +69,7 @@ func TestCallRetry(t *testing.T) {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called++
if called == 1 {
return errors.New("retry request")
return errors.InternalServerError("test.error", "retry request")
}
// don't do the call

View File

@@ -87,6 +87,11 @@ var (
EnvVar: "MICRO_REGISTER_INTERVAL",
Usage: "Register interval in seconds",
},
cli.StringFlag{
Name: "server",
EnvVar: "MICRO_SERVER",
Usage: "Server for go-micro; rpc",
},
cli.StringFlag{
Name: "server_name",
EnvVar: "MICRO_SERVER_NAME",
@@ -144,11 +149,6 @@ var (
Usage: "Selector used to pick nodes for querying",
Value: "cache",
},
cli.StringFlag{
Name: "server",
EnvVar: "MICRO_SERVER",
Usage: "Server for go-micro; rpc",
},
cli.StringFlag{
Name: "transport",
EnvVar: "MICRO_TRANSPORT",
@@ -262,48 +262,40 @@ func (c *cmd) Before(ctx *cli.Context) error {
// Set the client
if name := ctx.String("client"); len(name) > 0 {
if cl, ok := c.opts.Clients[name]; ok {
// only change if we have the client and type differs
if cl, ok := c.opts.Clients[name]; ok && (*c.opts.Client).String() != name {
*c.opts.Client = cl()
}
}
// Set the server
if name := ctx.String("server"); len(name) > 0 {
if s, ok := c.opts.Servers[name]; ok {
// only change if we have the server and type differs
if s, ok := c.opts.Servers[name]; ok && (*c.opts.Server).String() != name {
*c.opts.Server = s()
}
}
// Set the broker
if name := ctx.String("broker"); len(name) > 0 || len(ctx.String("broker_address")) > 0 {
if len(name) == 0 {
name = defaultBroker
}
if b, ok := c.opts.Brokers[name]; ok {
n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
*c.opts.Broker = n
} else {
if name := ctx.String("broker"); len(name) > 0 && (*c.opts.Broker).String() != name {
b, ok := c.opts.Brokers[name]
if !ok {
return fmt.Errorf("Broker %s not found", name)
}
*c.opts.Broker = b()
serverOpts = append(serverOpts, server.Broker(*c.opts.Broker))
clientOpts = append(clientOpts, client.Broker(*c.opts.Broker))
}
// Set the registry
if name := ctx.String("registry"); len(name) > 0 || len(ctx.String("registry_address")) > 0 {
if len(name) == 0 {
name = defaultRegistry
}
if r, ok := c.opts.Registries[name]; ok {
n := r(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
*c.opts.Registry = n
} else {
if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
r, ok := c.opts.Registries[name]
if !ok {
return fmt.Errorf("Registry %s not found", name)
}
*c.opts.Registry = r()
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
@@ -314,31 +306,26 @@ func (c *cmd) Before(ctx *cli.Context) error {
}
// Set the selector
if name := ctx.String("selector"); len(name) > 0 {
if s, ok := c.opts.Selectors[name]; ok {
n := s(selector.Registry(*c.opts.Registry))
*c.opts.Selector = n
} else {
if name := ctx.String("selector"); len(name) > 0 && (*c.opts.Selector).String() != name {
s, ok := c.opts.Selectors[name]
if !ok {
return fmt.Errorf("Selector %s not found", name)
}
*c.opts.Selector = s(selector.Registry(*c.opts.Registry))
// No server option here. Should there be?
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
}
// Set the transport
if name := ctx.String("transport"); len(name) > 0 || len(ctx.String("transport_address")) > 0 {
if len(name) == 0 {
name = defaultTransport
}
if t, ok := c.opts.Transports[name]; ok {
n := t(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
*c.opts.Transport = n
} else {
if name := ctx.String("transport"); len(name) > 0 && (*c.opts.Transport).String() != name {
t, ok := c.opts.Transports[name]
if !ok {
return fmt.Errorf("Transport %s not found", name)
}
*c.opts.Transport = t()
serverOpts = append(serverOpts, server.Transport(*c.opts.Transport))
clientOpts = append(clientOpts, client.Transport(*c.opts.Transport))
}
@@ -359,6 +346,18 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.Metadata(metadata))
}
if len(ctx.String("broker_address")) > 0 {
(*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
}
if len(ctx.String("registry_address")) > 0 {
(*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
}
if len(ctx.String("transport_address")) > 0 {
(*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
}
if len(ctx.String("server_name")) > 0 {
serverOpts = append(serverOpts, server.Name(ctx.String("server_name")))
}
@@ -384,7 +383,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
}
// client opts
if r := ctx.Int("client_retries"); r > 0 {
if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r))
}

View File

@@ -54,7 +54,8 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
case codec.Response:
return j.c.ReadHeader(m)
case codec.Publication:
io.Copy(j.buf, j.rwc)
_, err := io.Copy(j.buf, j.rwc)
return err
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}

View File

@@ -55,7 +55,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err
}
if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
if err = flusher.Flush(); err != nil {
return err
}
}
case codec.Response:
c.Lock()
@@ -82,7 +84,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err
}
if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
if err = flusher.Flush(); err != nil {
return err
}
}
case codec.Publication:
data, err := proto.Marshal(b.(proto.Message))
@@ -127,7 +131,8 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
m.Id = rtmp.GetSeq()
m.Error = rtmp.GetError()
case codec.Publication:
io.Copy(c.buf, c.rwc)
_, err := io.Copy(c.buf, c.rwc)
return err
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}

View File

@@ -82,6 +82,16 @@ func NotFound(id, format string, a ...interface{}) error {
}
}
// MethodNotAllowed generates a 405 error.
func MethodNotAllowed(id, format string, a ...interface{}) error {
return &Error{
Id: id,
Code: 405,
Detail: fmt.Sprintf(format, a...),
Status: http.StatusText(405),
}
}
// InternalServerError generates a 500 error.
func InternalServerError(id, format string, a ...interface{}) error {
return &Error{

View File

@@ -8,6 +8,16 @@ import (
"github.com/micro/go-micro/registry"
)
// Connect specifies services should be registered as Consul Connect services
func Connect() registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "consul_connect", true)
}
}
func Config(c *consul.Config) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {

View File

@@ -19,10 +19,26 @@ type consulRegistry struct {
Client *consul.Client
opts Options
// connect enabled
connect bool
sync.Mutex
register map[string]uint64
}
func getDeregisterTTL(t time.Duration) time.Duration {
// splay slightly for the watcher?
splay := time.Second * 5
deregTTL := t + splay
// consul has a minimum timeout on deregistration of 1 minute.
if t < time.Minute {
deregTTL = time.Minute + splay
}
return deregTTL
}
func newTransport(config *tls.Config) *http.Transport {
if config == nil {
config = &tls.Config{
@@ -45,27 +61,31 @@ func newTransport(config *tls.Config) *http.Transport {
return t
}
func newConsulRegistry(opts ...Option) Registry {
var options Options
func configure(c *consulRegistry, opts ...Option) {
// set opts
for _, o := range opts {
o(&options)
o(&c.opts)
}
// use default config
config := consul.DefaultConfig()
if options.Context != nil {
if c.opts.Context != nil {
// Use the consul config passed in the options, if available
if c, ok := options.Context.Value("consul_config").(*consul.Config); ok {
config = c
if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok {
config = co
}
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
c.connect = cn
}
}
// check if there are any addrs
if len(options.Addrs) > 0 {
addr, port, err := net.SplitHostPort(options.Addrs[0])
if len(c.opts.Addrs) > 0 {
addr, port, err := net.SplitHostPort(c.opts.Addrs[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
addr = options.Addrs[0]
addr = c.opts.Addrs[0]
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
@@ -73,34 +93,43 @@ func newConsulRegistry(opts ...Option) Registry {
}
// requires secure connection?
if options.Secure || options.TLSConfig != nil {
if c.opts.Secure || c.opts.TLSConfig != nil {
if config.HttpClient == nil {
config.HttpClient = new(http.Client)
}
config.Scheme = "https"
// We're going to support InsecureSkipVerify
config.HttpClient.Transport = newTransport(options.TLSConfig)
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)
}
// set timeout
if c.opts.Timeout > 0 {
config.HttpClient.Timeout = c.opts.Timeout
}
// create the client
client, _ := consul.NewClient(config)
// set timeout
if options.Timeout > 0 {
config.HttpClient.Timeout = options.Timeout
}
// set address/client
c.Address = config.Address
c.Client = client
}
func newConsulRegistry(opts ...Option) Registry {
cr := &consulRegistry{
Address: config.Address,
Client: client,
opts: options,
opts: Options{},
register: make(map[string]uint64),
}
configure(cr, opts...)
return cr
}
func (c *consulRegistry) Init(opts ...Option) error {
configure(c, opts...)
return nil
}
func (c *consulRegistry) Deregister(s *Service) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
@@ -115,19 +144,6 @@ func (c *consulRegistry) Deregister(s *Service) error {
return c.Client.Agent().ServiceDeregister(node.Id)
}
func getDeregisterTTL(t time.Duration) time.Duration {
// splay slightly for the watcher?
splay := time.Second * 5
deregTTL := t + splay
// consul has a minimum timeout on deregistration of 1 minute.
if t < time.Minute {
deregTTL = time.Minute + splay
}
return deregTTL
}
func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
@@ -164,10 +180,21 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
// if it's already registered and matches then just pass the check
if ok && v == h {
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
if options.TTL == time.Duration(0) {
services, _, err := c.Client.Health().Checks(s.Name, nil)
if err == nil {
for _, v := range services {
if v.ServiceID == node.Id {
return nil
}
}
}
} else {
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
}
}
}
@@ -192,20 +219,29 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
deregTTL := getDeregisterTTL(options.TTL)
check = &consul.AgentServiceCheck{
TTL: fmt.Sprintf("%v", options.TTL),
TTL: fmt.Sprintf("%v", options.TTL),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
}
}
// register the service
if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
asr := &consul.AgentServiceRegistration{
ID: node.Id,
Name: s.Name,
Tags: tags,
Port: node.Port,
Address: node.Address,
Check: check,
}); err != nil {
}
// Specify consul connect
if c.connect {
asr.Connect = &consul.AgentServiceConnect{
Native: true,
}
}
if err := c.Client.Agent().ServiceRegister(asr); err != nil {
return err
}
@@ -224,7 +260,15 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
}
func (c *consulRegistry) GetService(name string) ([]*Service, error) {
rsp, _, err := c.Client.Health().Service(name, "", false, nil)
var rsp []*consul.ServiceEntry
var err error
// if we're connect enabled only get connect services
if c.connect {
rsp, _, err = c.Client.Health().Connect(name, "", false, nil)
} else {
rsp, _, err = c.Client.Health().Service(name, "", false, nil)
}
if err != nil {
return nil, err
}

View File

@@ -49,6 +49,17 @@ func newRegistry(opts ...registry.Option) registry.Registry {
}
}
func (m *mdnsRegistry) Init(opts ...registry.Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *mdnsRegistry) Options() registry.Options {
return m.opts
}
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error {
m.Lock()
defer m.Unlock()
@@ -322,10 +333,6 @@ func (m *mdnsRegistry) String() string {
return "mdns"
}
func (m *mdnsRegistry) Options() registry.Options {
return m.opts
}
func NewRegistry(opts ...registry.Option) registry.Registry {
return newRegistry(opts...)
}

View File

@@ -99,11 +99,15 @@ func (m *mockRegistry) String() string {
return "mock"
}
func (m *mockRegistry) Init(opts ...registry.Option) error {
return nil
}
func (m *mockRegistry) Options() registry.Options {
return registry.Options{}
}
func NewRegistry() registry.Registry {
func NewRegistry(opts ...registry.Options) registry.Registry {
m := &mockRegistry{Services: make(map[string][]*registry.Service)}
m.init()
return m

View File

@@ -9,13 +9,14 @@ import (
// and an abstraction over varying implementations
// {consul, etcd, zookeeper, ...}
type Registry interface {
Init(...Option) error
Options() Options
Register(*Service, ...RegisterOption) error
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch(...WatchOption) (Watcher, error)
String() string
Options() Options
}
type Option func(*Options)

View File

@@ -366,11 +366,9 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s
}
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (c *cacheSelector) Reset(service string) {
return
}
// Close stops the watcher and destroys the cache

View File

@@ -48,11 +48,9 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
}
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (r *defaultSelector) Reset(service string) {
return
}
func (r *defaultSelector) Close() error {

View File

@@ -80,7 +80,7 @@ func TestFilterEndpoint(t *testing.T) {
}
}
if seen == false && data.count > 0 {
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}
@@ -232,7 +232,7 @@ func TestFilterVersion(t *testing.T) {
seen = true
}
if seen == false && data.count > 0 {
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}

View File

@@ -15,6 +15,8 @@ type testCodec struct {
}
type testSocket struct {
local string
remote string
}
// TestCodecWriteError simulates what happens when a codec is unable
@@ -87,6 +89,14 @@ func (c *testCodec) String() string {
return "string"
}
func (s testSocket) Local() string {
return s.local
}
func (s testSocket) Remote() string {
return s.remote
}
func (s testSocket) Recv(message *transport.Message) error {
return nil
}

View File

@@ -97,6 +97,10 @@ func (s *rpcServer) accept(sock transport.Socket) {
delete(hdr, "Content-Type")
delete(hdr, "Timeout")
// set local/remote ips
hdr["Local"] = sock.Local()
hdr["Remote"] = sock.Remote()
ctx := metadata.NewContext(context.Background(), hdr)
// set the timeout if we have it
@@ -393,11 +397,35 @@ func (s *rpcServer) Start() error {
s.opts.Address = ts.Addr()
s.Unlock()
go ts.Accept(s.accept)
exit := make(chan bool, 1)
go func() {
for {
err := ts.Accept(s.accept)
// check if we're supposed to exit
select {
case <-exit:
return
default:
}
// check the error and backoff
if err != nil {
log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
}
// no error just exit
return
}
}()
go func() {
// wait for exit
ch := <-s.exit
exit <- true
// wait for requests to finish
if wait(s.opts.Context) {

View File

@@ -199,7 +199,7 @@ func (server *server) register(rcvr interface{}) error {
return nil
}
func (server *server) sendResponse(sending *sync.Mutex, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
func (server *server) sendResponse(sending sync.Locker, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod

View File

@@ -1,6 +1,7 @@
package transport
import (
//"fmt"
"bufio"
"bytes"
"crypto/tls"
@@ -13,10 +14,11 @@ import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/h2c"
maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls"
"golang.org/x/net/http2"
)
type buffer struct {
@@ -38,16 +40,25 @@ type httpTransportClient struct {
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
// local/remote ip
local string
remote string
}
type httpTransportSocket struct {
ht *httpTransport
r chan *http.Request
conn net.Conn
once sync.Once
ht *httpTransport
w http.ResponseWriter
r *http.Request
rw *bufio.ReadWriter
sync.Mutex
buff *bufio.Reader
conn net.Conn
// for the first request
ch chan *http.Request
// local/remote ip
local string
remote string
}
type httpTransportListener struct {
@@ -59,6 +70,14 @@ func (b *buffer) Close() error {
return nil
}
func (h *httpTransportClient) Local() string {
return h.local
}
func (h *httpTransportClient) Remote() string {
return h.remote
}
func (h *httpTransportClient) Send(m *Message) error {
header := make(http.Header)
@@ -170,33 +189,87 @@ func (h *httpTransportClient) Close() error {
return err
}
func (h *httpTransportSocket) Local() string {
return h.local
}
func (h *httpTransportSocket) Remote() string {
return h.remote
}
func (h *httpTransportSocket) Recv(m *Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
r, err := http.ReadRequest(h.buff)
if err != nil {
return err
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
r.Body.Close()
m.Body = b
if m.Header == nil {
m.Header = make(map[string]string)
}
for k, v := range r.Header {
// process http 1
if h.r.ProtoMajor == 1 {
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
var r *http.Request
select {
// get first request
case r = <-h.ch:
// read next request
default:
rr, err := http.ReadRequest(h.rw.Reader)
if err != nil {
return err
}
r = rr
}
// read body
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
// set body
r.Body.Close()
m.Body = b
// set headers
for k, v := range h.r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
// return early early
return nil
}
// processing http2 request
// read streaming body
// set max buffer size
buf := make([]byte, 4*1024)
// read the request body
n, err := h.r.Body.Read(buf)
// not an eof error
if err != nil {
return err
}
// check if we have data
if n > 0 {
m.Body = buf[:n]
}
// set headers
for k, v := range h.r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
@@ -204,78 +277,73 @@ func (h *httpTransportSocket) Recv(m *Message) error {
}
}
select {
case h.r <- r:
default:
}
return nil
}
func (h *httpTransportSocket) Send(m *Message) error {
b := bytes.NewBuffer(m.Body)
defer b.Reset()
if h.r.ProtoMajor == 1 {
rsp := &http.Response{
Header: h.r.Header,
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
r := <-h.r
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
rsp := &http.Response{
Header: r.Header,
Body: &buffer{b},
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return rsp.Write(h.conn)
}
// http2 request
// set headers
for k, v := range m.Header {
rsp.Header.Set(k, v)
h.w.Header().Set(k, v)
}
select {
case h.r <- r:
default:
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return rsp.Write(h.conn)
// write request
_, err := h.w.Write(m.Body)
return err
}
func (h *httpTransportSocket) error(m *Message) error {
b := bytes.NewBuffer(m.Body)
defer b.Reset()
rsp := &http.Response{
Header: make(http.Header),
Body: &buffer{b},
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
if h.r.ProtoMajor == 1 {
rsp := &http.Response{
Header: make(http.Header),
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
return rsp.Write(h.conn)
return rsp.Write(h.conn)
}
return nil
}
func (h *httpTransportSocket) Close() error {
err := h.conn.Close()
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.buff = nil
h.Unlock()
})
return err
if h.r.ProtoMajor == 1 {
return h.conn.Close()
}
return nil
}
func (h *httpTransportListener) Addr() string {
@@ -287,46 +355,69 @@ func (h *httpTransportListener) Close() error {
}
func (h *httpTransportListener) Accept(fn func(Socket)) error {
var tempDelay time.Duration
// create handler mux
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf *bufio.ReadWriter
var con net.Conn
for {
c, err := h.listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.Logf("http: Accept error: %v; retrying in %v\n", err, tempDelay)
time.Sleep(tempDelay)
continue
// read a regular request
if r.ProtoMajor == 1 {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
return err
r.Body = ioutil.NopCloser(bytes.NewReader(b))
// hijack the conn
hj, ok := w.(http.Hijacker)
if !ok {
// we're screwed
http.Error(w, "cannot serve conn", http.StatusInternalServerError)
return
}
conn, bufrw, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
buf = bufrw
con = conn
}
sock := &httpTransportSocket{
ht: h.ht,
conn: c,
buff: bufio.NewReader(c),
r: make(chan *http.Request, 1),
}
// save the request
ch := make(chan *http.Request, 1)
ch <- r
go func() {
// TODO: think of a better error response strategy
defer func() {
if r := recover(); r != nil {
log.Log("panic recovered: ", r)
sock.Close()
}
}()
fn(&httpTransportSocket{
ht: h.ht,
w: w,
r: r,
rw: buf,
ch: ch,
conn: con,
local: h.Addr(),
remote: r.RemoteAddr,
})
})
fn(sock)
}()
// default http2 server
srv := &http.Server{
Handler: mux,
}
// insecure connection use h2c
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
srv.Handler = &h2c.HandlerH2C{
Handler: mux,
H2Server: &http2.Server{},
}
}
// begin serving
return srv.Serve(h.listener)
}
func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
@@ -365,6 +456,8 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
buff: bufio.NewReader(conn),
dialOpts: dopts,
r: make(chan *http.Request, 1),
local: conn.LocalAddr().String(),
remote: conn.RemoteAddr().String(),
}, nil
}
@@ -423,6 +516,17 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
}, nil
}
func (h *httpTransport) Init(opts ...Option) error {
for _, o := range opts {
o(&h.opts)
}
return nil
}
func (h *httpTransport) Options() Options {
return h.opts
}
func (h *httpTransport) String() string {
return "http"
}

View File

@@ -18,6 +18,9 @@ type mockSocket struct {
exit chan bool
// listener exit
lexit chan bool
local string
remote string
}
type mockClient struct {
@@ -51,6 +54,14 @@ func (ms *mockSocket) Recv(m *transport.Message) error {
return nil
}
func (ms *mockSocket) Local() string {
return ms.local
}
func (ms *mockSocket) Remote() string {
return ms.remote
}
func (ms *mockSocket) Send(m *transport.Message) error {
select {
case <-ms.exit:
@@ -93,10 +104,12 @@ func (m *mockListener) Accept(fn func(transport.Socket)) error {
return nil
case c := <-m.conn:
go fn(&mockSocket{
lexit: c.lexit,
exit: c.exit,
send: c.recv,
recv: c.send,
lexit: c.lexit,
exit: c.exit,
send: c.recv,
recv: c.send,
local: c.Remote(),
remote: c.Local(),
})
}
}
@@ -118,10 +131,12 @@ func (m *mockTransport) Dial(addr string, opts ...transport.DialOption) (transpo
client := &mockClient{
&mockSocket{
send: make(chan *transport.Message),
recv: make(chan *transport.Message),
exit: make(chan bool),
lexit: listener.exit,
send: make(chan *transport.Message),
recv: make(chan *transport.Message),
exit: make(chan bool),
lexit: listener.exit,
local: addr,
remote: addr,
},
options,
}
@@ -171,6 +186,17 @@ func (m *mockTransport) Listen(addr string, opts ...transport.ListenOption) (tra
return listener, nil
}
func (m *mockTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *mockTransport) Options() transport.Options {
return m.opts
}
func (m *mockTransport) String() string {
return "mock"
}

View File

@@ -14,6 +14,8 @@ type Socket interface {
Recv(*Message) error
Send(*Message) error
Close() error
Local() string
Remote() string
}
type Client interface {
@@ -30,6 +32,8 @@ type Listener interface {
// services. It uses socket send/recv semantics and had various
// implementations {HTTP, RabbitMQ, NATS, ...}
type Transport interface {
Init(...Option) error
Options() Options
Dial(addr string, opts ...DialOption) (Client, error)
Listen(addr string, opts ...ListenOption) (Listener, error)
String() string