update gobreaker wrapper to allow multiservice breaker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
78525d6849
commit
be54792f21
86
gobreaker.go
86
gobreaker.go
@ -1,28 +1,92 @@
|
|||||||
package gobreaker
|
package gobreaker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/micro/go-micro/client"
|
|
||||||
"github.com/sony/gobreaker"
|
|
||||||
|
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/client"
|
||||||
|
"github.com/micro/go-micro/errors"
|
||||||
|
"github.com/sony/gobreaker"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BreakerMethod int
|
||||||
|
|
||||||
|
const (
|
||||||
|
BreakService BreakerMethod = iota
|
||||||
|
BreakServiceEndpoint
|
||||||
)
|
)
|
||||||
|
|
||||||
type clientWrapper struct {
|
type clientWrapper struct {
|
||||||
cb *gobreaker.CircuitBreaker
|
bs gobreaker.Settings
|
||||||
|
bm BreakerMethod
|
||||||
|
cbs map[string]*gobreaker.TwoStepCircuitBreaker
|
||||||
|
mu sync.Mutex
|
||||||
client.Client
|
client.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
_, err := c.cb.Execute(func() (interface{}, error) {
|
var svc string
|
||||||
cerr := c.Client.Call(ctx, req, rsp, opts...)
|
|
||||||
return nil, cerr
|
switch c.bm {
|
||||||
})
|
case BreakService:
|
||||||
|
svc = req.Service()
|
||||||
|
case BreakServiceEndpoint:
|
||||||
|
svc = req.Service() + "." + req.Endpoint()
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
cb, ok := c.cbs[svc]
|
||||||
|
if !ok {
|
||||||
|
cb = gobreaker.NewTwoStepCircuitBreaker(c.bs)
|
||||||
|
c.cbs[svc] = cb
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
cbAllow, err := cb.Allow()
|
||||||
|
if err != nil {
|
||||||
|
return errors.New(req.Service(), err.Error(), 502)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = c.Client.Call(ctx, req, rsp, opts...); err == nil {
|
||||||
|
cbAllow(true)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch err.(type) {
|
||||||
|
case *errors.Error:
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
err = errors.New(req.Service(), err.Error(), 503)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err.(*errors.Error).Code >= 500 {
|
||||||
|
cbAllow(false)
|
||||||
|
} else {
|
||||||
|
cbAllow(true)
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClientWrapper takes a *gobreaker.CircuitBreaker and returns a client Wrapper.
|
// NewClientWrapper returns a client Wrapper.
|
||||||
func NewClientWrapper(cb *gobreaker.CircuitBreaker) client.Wrapper {
|
func NewClientWrapper() client.Wrapper {
|
||||||
return func(c client.Client) client.Client {
|
return func(c client.Client) client.Client {
|
||||||
return &clientWrapper{cb, c}
|
w := &clientWrapper{}
|
||||||
|
w.bs = gobreaker.Settings{}
|
||||||
|
w.cbs = make(map[string]*gobreaker.TwoStepCircuitBreaker)
|
||||||
|
w.Client = c
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCustomClientWrapper takes a gobreaker.Settings and BreakerMethod. Returns a client Wrapper.
|
||||||
|
func NewCustomClientWrapper(bs gobreaker.Settings, bm BreakerMethod) client.Wrapper {
|
||||||
|
return func(c client.Client) client.Client {
|
||||||
|
w := &clientWrapper{}
|
||||||
|
w.bm = bm
|
||||||
|
w.bs = bs
|
||||||
|
w.cbs = make(map[string]*gobreaker.TwoStepCircuitBreaker)
|
||||||
|
w.Client = c
|
||||||
|
return w
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,14 +1,14 @@
|
|||||||
package gobreaker
|
package gobreaker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/micro/go-micro/client"
|
"github.com/micro/go-micro/client"
|
||||||
|
"github.com/micro/go-micro/errors"
|
||||||
"github.com/micro/go-micro/registry/memory"
|
"github.com/micro/go-micro/registry/memory"
|
||||||
"github.com/micro/go-micro/selector"
|
"github.com/micro/go-micro/selector"
|
||||||
"github.com/sony/gobreaker"
|
"github.com/sony/gobreaker"
|
||||||
|
|
||||||
"context"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBreaker(t *testing.T) {
|
func TestBreaker(t *testing.T) {
|
||||||
@ -20,8 +20,43 @@ func TestBreaker(t *testing.T) {
|
|||||||
// set the selector
|
// set the selector
|
||||||
client.Selector(s),
|
client.Selector(s),
|
||||||
// add the breaker wrapper
|
// add the breaker wrapper
|
||||||
client.Wrap(NewClientWrapper(
|
client.Wrap(NewClientWrapper()),
|
||||||
gobreaker.NewCircuitBreaker(gobreaker.Settings{}),
|
)
|
||||||
|
|
||||||
|
req := c.NewRequest("test.service", "Test.Method", map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}, client.WithContentType("application/json"))
|
||||||
|
|
||||||
|
var rsp map[string]interface{}
|
||||||
|
|
||||||
|
// Force to point of trip
|
||||||
|
for i := 0; i < 6; i++ {
|
||||||
|
c.Call(context.TODO(), req, rsp)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.Call(context.TODO(), req, rsp)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("Expecting tripped breaker, got nil error")
|
||||||
|
}
|
||||||
|
|
||||||
|
merr := err.(*errors.Error)
|
||||||
|
if merr.Code != 502 {
|
||||||
|
t.Errorf("Expecting tripped breaker, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCustomBreaker(t *testing.T) {
|
||||||
|
// setup
|
||||||
|
r := memory.NewRegistry()
|
||||||
|
s := selector.NewSelector(selector.Registry(r))
|
||||||
|
|
||||||
|
c := client.NewClient(
|
||||||
|
// set the selector
|
||||||
|
client.Selector(s),
|
||||||
|
// add the breaker wrapper
|
||||||
|
client.Wrap(NewCustomClientWrapper(
|
||||||
|
gobreaker.Settings{},
|
||||||
|
BreakService,
|
||||||
)),
|
)),
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -41,7 +76,8 @@ func TestBreaker(t *testing.T) {
|
|||||||
t.Error("Expecting tripped breaker, got nil error")
|
t.Error("Expecting tripped breaker, got nil error")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err.Error() != "circuit breaker is open" {
|
merr := err.(*errors.Error)
|
||||||
|
if merr.Code != 502 {
|
||||||
t.Errorf("Expecting tripped breaker, got %v", err)
|
t.Errorf("Expecting tripped breaker, got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user