Compare commits

...

33 Commits

Author SHA1 Message Date
Asim Aslam
88505388c1 Add verbosity to errors 2018-07-26 09:33:50 +01:00
Asim Aslam
8a778644cf Merge pull request #281 from micro/retry
retry only on timeout or internal server error
2018-07-22 17:52:12 +01:00
Asim Aslam
d3a76e646a retry only on timeout or internal server error 2018-07-22 17:41:58 +01:00
Asim Aslam
cfa824bc5f Merge pull request #280 from jiyeyuran/master
Allow client_retries to be 0
2018-07-19 23:24:57 -07:00
武新飞
39be61685c client_retries can be 0 2018-07-20 14:20:23 +08:00
Asim Aslam
ac2106ced7 strip deadline from stream 2018-07-17 16:39:07 -07:00
Asim Aslam
1b4f7d8a68 a stream should not timeout 2018-07-17 16:32:35 -07:00
Asim Aslam
5eb2e79b86 go fmt 2018-07-17 16:31:09 -07:00
Asim Aslam
a2eff9918e Merge pull request #269 from Ak-Army/master
handle function in mock response
2018-06-13 16:54:10 +01:00
Hunyadvári Péter
52a470532d handle function in mock response 2018-06-13 17:46:30 +02:00
Asim Aslam
cd9441fafb Merge pull request #268 from jasimmk/fix-connect-lock
Fixing httpBroker dead lock; If publish is called from a subscription #267
2018-06-13 16:21:24 +01:00
Jasim Muhammed
356cf82af5 Fixing httpBroker dead lock; If publish is called from a subscription 2018-06-13 10:28:39 +04:00
Asim Aslam
5372707d0e Strip flag setting 2018-05-30 11:49:50 +01:00
Asim Aslam
a1deb5c44e Merge pull request #264 from micro/register
set register ttl and interval by default
2018-05-30 11:31:13 +01:00
Asim Aslam
956b1c6867 set register ttl and interval by default 2018-05-29 12:28:55 +01:00
Asim Aslam
55aca8b0bf Merge pull request #262 from micro/retries
Retry requests
2018-05-29 11:52:47 +01:00
Asim Aslam
ba8582a47a change retries to actually mean retries 2018-05-28 16:01:04 +01:00
Asim Aslam
f409468ccd Merge branch 'master' of github.com:micro/go-micro 2018-05-28 15:51:52 +01:00
Asim Aslam
d982225a54 restructure test 2018-05-28 15:40:28 +01:00
Asim Aslam
217190c4d6 Merge pull request #261 from micro/pool
Set the default pool size to 1
2018-05-26 09:58:16 +01:00
Asim Aslam
a56e97b47d Change waitgroup processing 2018-05-26 09:41:41 +01:00
Asim Aslam
b4f47b1cc9 Set the default pool size to 1 2018-05-26 09:10:29 +01:00
Asim Aslam
070cebd605 Merge pull request #260 from elebore/master
just update the pool configuration of rpcClient  if the options changed
2018-05-26 09:03:16 +01:00
bogle
541e894507 just update the pool configuration if the options changed, because recreating the pool,existed idleconnection, if any, will be dropped without closing 2018-05-26 15:38:41 +08:00
Asim Aslam
c666558f8c make the broker/transport listen on new addr when stop/started with addr :0 2018-05-25 15:19:25 +01:00
Asim Aslam
6444b7e24c context cancellation is not required 2018-05-25 15:03:15 +01:00
Asim Aslam
023245a7ba shutdown broker once done 2018-05-25 14:43:32 +01:00
Asim Aslam
2a2ad553a1 reorder testing functions 2018-05-25 14:39:50 +01:00
Asim Aslam
909e13a24a Merge pull request #254 from micro/message
add message options
2018-05-10 17:42:46 +01:00
Asim Aslam
b17a802675 update mock 2018-05-10 17:39:13 +01:00
Asim Aslam
c3c0543733 add message options 2018-05-10 17:33:54 +01:00
Asim Aslam
b39ec4472c Return subscriber errors 2018-04-26 10:47:13 +01:00
Asim Aslam
b33489e481 update readme 2018-04-25 16:03:22 +01:00
15 changed files with 233 additions and 98 deletions

View File

@@ -192,7 +192,7 @@ func main() {
service.Init()
// Create new greeter client
greeter := proto.GreeterServiceClient("greeter", service.Client())
greeter := proto.NewGreeterService("greeter", service.Client())
// Call the greeter
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})

View File

@@ -276,12 +276,16 @@ func (h *httpBroker) Address() string {
}
func (h *httpBroker) Connect() error {
h.Lock()
defer h.Unlock()
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
@@ -326,10 +330,16 @@ func (h *httpBroker) Connect() error {
}
log.Logf("Broker Listening on %s", l.Addr().String())
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go h.run(l)
go func() {
h.run(l)
h.Lock()
h.address = addr
h.Unlock()
}()
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
@@ -345,12 +355,16 @@ func (h *httpBroker) Connect() error {
}
func (h *httpBroker) Disconnect() error {
h.Lock()
defer h.Unlock()
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
// stop rcache
rc, ok := h.r.(rcache.Cache)
@@ -369,12 +383,15 @@ func (h *httpBroker) Disconnect() error {
}
func (h *httpBroker) Init(opts ...Option) error {
h.Lock()
defer h.Unlock()
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
o(&h.opts)

View File

@@ -12,7 +12,7 @@ import (
type Client interface {
Init(...Option) error
Options() Options
NewMessage(topic string, msg interface{}) Message
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
@@ -56,6 +56,9 @@ type CallOption func(*CallOptions)
// PublishOption used by Publish
type PublishOption func(*PublishOptions)
// MessageOption used by NewMessage
type MessageOption func(*MessageOptions)
// RequestOption used by NewRequest
type RequestOption func(*RequestOptions)
@@ -65,13 +68,13 @@ var (
// DefaultBackoff is the default backoff function for retries
DefaultBackoff = exponentialBackoff
// DefaultRetry is the default check-for-retry function for retries
DefaultRetry = alwaysRetry
DefaultRetry = RetryOnError
// DefaultRetries is the default number of times a request is tried
DefaultRetries = 1
// DefaultRequestTimeout is the default request timeout
DefaultRequestTimeout = time.Second * 5
// DefaultPoolSize sets the connection pool size
DefaultPoolSize = 0
DefaultPoolSize = 1
// DefaultPoolTTL sets the connection pool ttl
DefaultPoolTTL = time.Minute
)
@@ -83,13 +86,13 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca
// Publishes a publication using the default client. Using the underlying broker
// set within the options.
func Publish(ctx context.Context, msg Message) error {
return DefaultClient.Publish(ctx, msg)
func Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
return DefaultClient.Publish(ctx, msg, opts...)
}
// Creates a new message using the default client
func NewMessage(topic string, payload interface{}) Message {
return DefaultClient.NewMessage(topic, payload)
func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message {
return DefaultClient.NewMessage(topic, payload, opts...)
}
// Creates a new client with the options passed in

View File

@@ -49,8 +49,8 @@ func (m *MockClient) Options() client.Options {
return m.Opts
}
func (m *MockClient) NewMessage(topic string, msg interface{}) client.Message {
return m.Client.NewMessage(topic, msg)
func (m *MockClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
return m.Client.NewMessage(topic, msg, opts...)
}
func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
@@ -80,8 +80,12 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
v = reflect.Indirect(v)
}
response := r.Response
if t := reflect.TypeOf(r.Response); t.Kind() == reflect.Func {
response = reflect.ValueOf(r.Response).Call([]reflect.Value{})[0].Interface()
}
v.Set(reflect.ValueOf(r.Response))
v.Set(reflect.ValueOf(response))
return nil
}

View File

@@ -16,6 +16,8 @@ func TestClient(t *testing.T) {
{Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
{Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
{Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
{Method: "Foo.Func", Response: func() string { return "string" }},
{Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
}
c := NewClient(Response("go.mock", response))

View File

@@ -67,6 +67,10 @@ type PublishOptions struct {
Context context.Context
}
type MessageOptions struct {
ContentType string
}
type RequestOptions struct {
ContentType string
Stream bool

View File

@@ -2,12 +2,34 @@ package client
import (
"context"
"github.com/micro/go-micro/errors"
)
// note that returning either false or a non-nil error will result in the call not being retried
type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)
// always retry on error
func alwaysRetry(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
// RetryAlways always retry on error
func RetryAlways(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
return true, nil
}
// RetryOnError retries a request on a 500 or timeout error
func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
if err == nil {
return false, nil
}
e := errors.Parse(err.Error())
if e == nil {
return false, nil
}
switch e.Code {
// retry on timeout or internal server error
case 408, 500:
return true, nil
default:
return false, nil
}
}

View File

@@ -159,7 +159,15 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout))
dOpts := []transport.DialOption{
transport.WithStream(),
}
if opts.DialTimeout >= 0 {
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
}
c, err := r.opts.Transport.Dial(address, dOpts...)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
@@ -202,9 +210,12 @@ func (r *rpcClient) Init(opts ...Option) error {
o(&r.opts)
}
// recreate the pool if the options changed
// update pool configuration if the options changed
if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {
r.pool = newPool(r.opts.PoolSize, r.opts.PoolTTL)
r.pool.Lock()
r.pool.size = r.opts.PoolSize
r.pool.ttl = int64(r.opts.PoolTTL.Seconds())
r.pool.Unlock()
}
return nil
@@ -227,9 +238,9 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error())
}
return next, nil
@@ -279,7 +290,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
@@ -290,9 +301,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
// set the address
@@ -310,7 +321,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
ch := make(chan error, callOpts.Retries)
var gerr error
for i := 0; i < callOpts.Retries; i++ {
for i := 0; i <= callOpts.Retries; i++ {
go func() {
ch <- call(i)
}()
@@ -352,18 +363,6 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, err
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
@@ -375,7 +374,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
@@ -385,9 +384,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
address := node.Address
@@ -408,7 +407,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
ch := make(chan response, callOpts.Retries)
var grr error
for i := 0; i < callOpts.Retries; i++ {
for i := 0; i <= callOpts.Retries; i++ {
go func() {
s, err := call(i)
ch <- response{s, err}
@@ -465,8 +464,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
})
}
func (r *rpcClient) NewMessage(topic string, message interface{}) Message {
return newMessage(topic, message, r.opts.ContentType)
func (r *rpcClient) NewMessage(topic string, message interface{}, opts ...MessageOption) Message {
return newMessage(topic, message, r.opts.ContentType, opts...)
}
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"testing"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
@@ -56,6 +57,46 @@ func TestCallAddress(t *testing.T) {
}
}
func TestCallRetry(t *testing.T) {
service := "test.service"
method := "Test.Method"
address := "10.1.10.1:8080"
var called int
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called++
if called == 1 {
return errors.InternalServerError("test.error", "retry request")
}
// don't do the call
return nil
}
}
r := mock.NewRegistry()
c := NewClient(
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, method, nil)
// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
t.Fatal("call with address error", err)
}
// num calls
if called < c.Options().CallOptions.Retries+1 {
t.Fatal("request not retried")
}
}
func TestCallWrapper(t *testing.T) {
var called bool
id := "test.1"

View File

@@ -6,7 +6,16 @@ type message struct {
payload interface{}
}
func newMessage(topic string, payload interface{}, contentType string) Message {
func newMessage(topic string, payload interface{}, contentType string, opts ...MessageOption) Message {
var options MessageOptions
for _, o := range opts {
o(&options)
}
if len(options.ContentType) > 0 {
contentType = options.ContentType
}
return &message{
payload: payload,
topic: topic,

View File

@@ -71,7 +71,6 @@ var (
Name: "client_pool_size",
EnvVar: "MICRO_CLIENT_POOL_SIZE",
Usage: "Sets the client connection pool size. Default: 1",
Value: 1,
},
cli.StringFlag{
Name: "client_pool_ttl",
@@ -385,7 +384,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
}
// client opts
if r := ctx.Int("client_retries"); r > 0 {
if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r))
}

View File

@@ -15,8 +15,8 @@ func TestFunction(t *testing.T) {
// create service
fn := NewFunction(
Name("test.function"),
Registry(mock.NewRegistry()),
Name("test.function"),
AfterStart(func() error {
wg.Done()
return nil
@@ -26,29 +26,35 @@ func TestFunction(t *testing.T) {
// we can't test fn.Init as it parses the command line
// fn.Init()
ch := make(chan error, 2)
go func() {
// wait for start
wg.Wait()
// test call debug
req := fn.Client().NewRequest(
"test.function",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
err := fn.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("function response: %s", rsp.Status)
}
// run service
ch <- fn.Run()
}()
// run service
fn.Run()
// wait for start
wg.Wait()
// test call debug
req := fn.Client().NewRequest(
"test.function",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
err := fn.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("function response: %s", rsp.Status)
}
if err := <-ch; err != nil {
t.Fatal(err)
}
}

View File

@@ -66,6 +66,9 @@ func (s *rpcServer) accept(sock transport.Socket) {
return
}
// add to wait group
s.wg.Add(1)
// we use this Timeout header to set a server deadline
to := msg.Header["Timeout"]
// we use this Content-Type header to identify the codec needed
@@ -80,6 +83,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
},
Body: []byte(err.Error()),
})
s.wg.Done()
return
}
@@ -102,15 +106,13 @@ func (s *rpcServer) accept(sock transport.Socket) {
}
}
// add to wait group
s.wg.Add(1)
defer s.wg.Done()
// TODO: needs better error handling
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
s.wg.Done()
log.Logf("Unexpected error serving request, closing socket: %v", err)
return
}
s.wg.Done()
}
}
@@ -386,6 +388,8 @@ func (s *rpcServer) Start() error {
log.Logf("Listening on %s", ts.Addr())
s.Lock()
// swap address
addr := s.opts.Address
s.opts.Address = ts.Addr()
s.Unlock()
@@ -405,6 +409,11 @@ func (s *rpcServer) Start() error {
// disconnect the broker
config.Broker.Disconnect()
s.Lock()
// swap back address
s.opts.Address = addr
s.Unlock()
}()
// TODO: subscribe to cruft

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
@@ -176,6 +177,8 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
delete(hdr, "Content-Type")
ctx := metadata.NewContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
@@ -229,13 +232,26 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
s.wg.Add(1)
go func() {
defer s.wg.Done()
fn(ctx, &rpcMessage{
results <- fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
})
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return nil
}
}

View File

@@ -31,29 +31,33 @@ func TestService(t *testing.T) {
// service.Init()
// run service
go service.Run()
go func() {
// wait for start
wg.Wait()
// wait for start
wg.Wait()
// test call debug
req := service.Client().NewRequest(
"test.service",
"Debug.Health",
new(proto.HealthRequest),
)
// test call debug
req := service.Client().NewRequest(
"test.service",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
rsp := new(proto.HealthResponse)
err := service.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
err := service.Client().Call(context.TODO(), req, rsp)
if err != nil {
if rsp.Status != "ok" {
t.Fatalf("service response: %s", rsp.Status)
}
// shutdown the service
cancel()
}()
if err := service.Run(); err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("service response: %s", rsp.Status)
}
// shutdown the service
cancel()
}