Compare commits
33 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
88505388c1 | ||
|
8a778644cf | ||
|
d3a76e646a | ||
|
cfa824bc5f | ||
|
39be61685c | ||
|
ac2106ced7 | ||
|
1b4f7d8a68 | ||
|
5eb2e79b86 | ||
|
a2eff9918e | ||
|
52a470532d | ||
|
cd9441fafb | ||
|
356cf82af5 | ||
|
5372707d0e | ||
|
a1deb5c44e | ||
|
956b1c6867 | ||
|
55aca8b0bf | ||
|
ba8582a47a | ||
|
f409468ccd | ||
|
d982225a54 | ||
|
217190c4d6 | ||
|
a56e97b47d | ||
|
b4f47b1cc9 | ||
|
070cebd605 | ||
|
541e894507 | ||
|
c666558f8c | ||
|
6444b7e24c | ||
|
023245a7ba | ||
|
2a2ad553a1 | ||
|
909e13a24a | ||
|
b17a802675 | ||
|
c3c0543733 | ||
|
b39ec4472c | ||
|
b33489e481 |
@@ -192,7 +192,7 @@ func main() {
|
||||
service.Init()
|
||||
|
||||
// Create new greeter client
|
||||
greeter := proto.GreeterServiceClient("greeter", service.Client())
|
||||
greeter := proto.NewGreeterService("greeter", service.Client())
|
||||
|
||||
// Call the greeter
|
||||
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
|
||||
|
@@ -276,12 +276,16 @@ func (h *httpBroker) Address() string {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Connect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if h.running {
|
||||
h.RUnlock()
|
||||
return nil
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
var l net.Listener
|
||||
var err error
|
||||
@@ -326,10 +330,16 @@ func (h *httpBroker) Connect() error {
|
||||
}
|
||||
|
||||
log.Logf("Broker Listening on %s", l.Addr().String())
|
||||
addr := h.address
|
||||
h.address = l.Addr().String()
|
||||
|
||||
go http.Serve(l, h.mux)
|
||||
go h.run(l)
|
||||
go func() {
|
||||
h.run(l)
|
||||
h.Lock()
|
||||
h.address = addr
|
||||
h.Unlock()
|
||||
}()
|
||||
|
||||
// get registry
|
||||
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
|
||||
@@ -345,12 +355,16 @@ func (h *httpBroker) Connect() error {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Disconnect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if !h.running {
|
||||
h.RUnlock()
|
||||
return nil
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
// stop rcache
|
||||
rc, ok := h.r.(rcache.Cache)
|
||||
@@ -369,12 +383,15 @@ func (h *httpBroker) Disconnect() error {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Init(opts ...Option) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
if h.running {
|
||||
h.RUnlock()
|
||||
return errors.New("cannot init while connected")
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
|
@@ -12,7 +12,7 @@ import (
|
||||
type Client interface {
|
||||
Init(...Option) error
|
||||
Options() Options
|
||||
NewMessage(topic string, msg interface{}) Message
|
||||
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
|
||||
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
|
||||
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
|
||||
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
|
||||
@@ -56,6 +56,9 @@ type CallOption func(*CallOptions)
|
||||
// PublishOption used by Publish
|
||||
type PublishOption func(*PublishOptions)
|
||||
|
||||
// MessageOption used by NewMessage
|
||||
type MessageOption func(*MessageOptions)
|
||||
|
||||
// RequestOption used by NewRequest
|
||||
type RequestOption func(*RequestOptions)
|
||||
|
||||
@@ -65,13 +68,13 @@ var (
|
||||
// DefaultBackoff is the default backoff function for retries
|
||||
DefaultBackoff = exponentialBackoff
|
||||
// DefaultRetry is the default check-for-retry function for retries
|
||||
DefaultRetry = alwaysRetry
|
||||
DefaultRetry = RetryOnError
|
||||
// DefaultRetries is the default number of times a request is tried
|
||||
DefaultRetries = 1
|
||||
// DefaultRequestTimeout is the default request timeout
|
||||
DefaultRequestTimeout = time.Second * 5
|
||||
// DefaultPoolSize sets the connection pool size
|
||||
DefaultPoolSize = 0
|
||||
DefaultPoolSize = 1
|
||||
// DefaultPoolTTL sets the connection pool ttl
|
||||
DefaultPoolTTL = time.Minute
|
||||
)
|
||||
@@ -83,13 +86,13 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca
|
||||
|
||||
// Publishes a publication using the default client. Using the underlying broker
|
||||
// set within the options.
|
||||
func Publish(ctx context.Context, msg Message) error {
|
||||
return DefaultClient.Publish(ctx, msg)
|
||||
func Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
|
||||
return DefaultClient.Publish(ctx, msg, opts...)
|
||||
}
|
||||
|
||||
// Creates a new message using the default client
|
||||
func NewMessage(topic string, payload interface{}) Message {
|
||||
return DefaultClient.NewMessage(topic, payload)
|
||||
func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message {
|
||||
return DefaultClient.NewMessage(topic, payload, opts...)
|
||||
}
|
||||
|
||||
// Creates a new client with the options passed in
|
||||
|
@@ -49,8 +49,8 @@ func (m *MockClient) Options() client.Options {
|
||||
return m.Opts
|
||||
}
|
||||
|
||||
func (m *MockClient) NewMessage(topic string, msg interface{}) client.Message {
|
||||
return m.Client.NewMessage(topic, msg)
|
||||
func (m *MockClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
||||
return m.Client.NewMessage(topic, msg, opts...)
|
||||
}
|
||||
|
||||
func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
||||
@@ -80,8 +80,12 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
|
||||
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
|
||||
v = reflect.Indirect(v)
|
||||
}
|
||||
response := r.Response
|
||||
if t := reflect.TypeOf(r.Response); t.Kind() == reflect.Func {
|
||||
response = reflect.ValueOf(r.Response).Call([]reflect.Value{})[0].Interface()
|
||||
}
|
||||
|
||||
v.Set(reflect.ValueOf(r.Response))
|
||||
v.Set(reflect.ValueOf(response))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@@ -16,6 +16,8 @@ func TestClient(t *testing.T) {
|
||||
{Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
|
||||
{Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
|
||||
{Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
|
||||
{Method: "Foo.Func", Response: func() string { return "string" }},
|
||||
{Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
|
||||
}
|
||||
|
||||
c := NewClient(Response("go.mock", response))
|
||||
|
@@ -67,6 +67,10 @@ type PublishOptions struct {
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
type MessageOptions struct {
|
||||
ContentType string
|
||||
}
|
||||
|
||||
type RequestOptions struct {
|
||||
ContentType string
|
||||
Stream bool
|
||||
|
@@ -2,12 +2,34 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/errors"
|
||||
)
|
||||
|
||||
// note that returning either false or a non-nil error will result in the call not being retried
|
||||
type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)
|
||||
|
||||
// always retry on error
|
||||
func alwaysRetry(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
|
||||
// RetryAlways always retry on error
|
||||
func RetryAlways(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// RetryOnError retries a request on a 500 or timeout error
|
||||
func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
e := errors.Parse(err.Error())
|
||||
if e == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
switch e.Code {
|
||||
// retry on timeout or internal server error
|
||||
case 408, 500:
|
||||
return true, nil
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
@@ -159,7 +159,15 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
|
||||
c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout))
|
||||
dOpts := []transport.DialOption{
|
||||
transport.WithStream(),
|
||||
}
|
||||
|
||||
if opts.DialTimeout >= 0 {
|
||||
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
|
||||
}
|
||||
|
||||
c, err := r.opts.Transport.Dial(address, dOpts...)
|
||||
if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
|
||||
}
|
||||
@@ -202,9 +210,12 @@ func (r *rpcClient) Init(opts ...Option) error {
|
||||
o(&r.opts)
|
||||
}
|
||||
|
||||
// recreate the pool if the options changed
|
||||
// update pool configuration if the options changed
|
||||
if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {
|
||||
r.pool = newPool(r.opts.PoolSize, r.opts.PoolTTL)
|
||||
r.pool.Lock()
|
||||
r.pool.size = r.opts.PoolSize
|
||||
r.pool.ttl = int64(r.opts.PoolTTL.Seconds())
|
||||
r.pool.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -227,9 +238,9 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
|
||||
// get next nodes from the selector
|
||||
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
|
||||
if err != nil && err == selector.ErrNotFound {
|
||||
return nil, errors.NotFound("go.micro.client", err.Error())
|
||||
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
return next, nil
|
||||
@@ -279,7 +290,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
// call backoff first. Someone may want an initial start delay
|
||||
t, err := callOpts.Backoff(ctx, request, i)
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
|
||||
}
|
||||
|
||||
// only sleep if greater than 0
|
||||
@@ -290,9 +301,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
// select next node
|
||||
node, err := next()
|
||||
if err != nil && err == selector.ErrNotFound {
|
||||
return errors.NotFound("go.micro.client", err.Error())
|
||||
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
// set the address
|
||||
@@ -310,7 +321,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
ch := make(chan error, callOpts.Retries)
|
||||
var gerr error
|
||||
|
||||
for i := 0; i < callOpts.Retries; i++ {
|
||||
for i := 0; i <= callOpts.Retries; i++ {
|
||||
go func() {
|
||||
ch <- call(i)
|
||||
}()
|
||||
@@ -352,18 +363,6 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check if we already have a deadline
|
||||
d, ok := ctx.Deadline()
|
||||
if !ok {
|
||||
// no deadline so we create a new one
|
||||
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
|
||||
} else {
|
||||
// got a deadline so no need to setup context
|
||||
// but we need to set the timeout we pass along
|
||||
opt := WithRequestTimeout(d.Sub(time.Now()))
|
||||
opt(&callOpts)
|
||||
}
|
||||
|
||||
// should we noop right here?
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -375,7 +374,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
// call backoff first. Someone may want an initial start delay
|
||||
t, err := callOpts.Backoff(ctx, request, i)
|
||||
if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
|
||||
}
|
||||
|
||||
// only sleep if greater than 0
|
||||
@@ -385,9 +384,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
|
||||
node, err := next()
|
||||
if err != nil && err == selector.ErrNotFound {
|
||||
return nil, errors.NotFound("go.micro.client", err.Error())
|
||||
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
|
||||
} else if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
address := node.Address
|
||||
@@ -408,7 +407,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
ch := make(chan response, callOpts.Retries)
|
||||
var grr error
|
||||
|
||||
for i := 0; i < callOpts.Retries; i++ {
|
||||
for i := 0; i <= callOpts.Retries; i++ {
|
||||
go func() {
|
||||
s, err := call(i)
|
||||
ch <- response{s, err}
|
||||
@@ -465,8 +464,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
||||
})
|
||||
}
|
||||
|
||||
func (r *rpcClient) NewMessage(topic string, message interface{}) Message {
|
||||
return newMessage(topic, message, r.opts.ContentType)
|
||||
func (r *rpcClient) NewMessage(topic string, message interface{}, opts ...MessageOption) Message {
|
||||
return newMessage(topic, message, r.opts.ContentType, opts...)
|
||||
}
|
||||
|
||||
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/errors"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/registry/mock"
|
||||
"github.com/micro/go-micro/selector"
|
||||
@@ -56,6 +57,46 @@ func TestCallAddress(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestCallRetry(t *testing.T) {
|
||||
service := "test.service"
|
||||
method := "Test.Method"
|
||||
address := "10.1.10.1:8080"
|
||||
|
||||
var called int
|
||||
|
||||
wrap := func(cf CallFunc) CallFunc {
|
||||
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
||||
called++
|
||||
if called == 1 {
|
||||
return errors.InternalServerError("test.error", "retry request")
|
||||
}
|
||||
|
||||
// don't do the call
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
r := mock.NewRegistry()
|
||||
c := NewClient(
|
||||
Registry(r),
|
||||
WrapCall(wrap),
|
||||
)
|
||||
c.Options().Selector.Init(selector.Registry(r))
|
||||
|
||||
req := c.NewRequest(service, method, nil)
|
||||
|
||||
// test calling remote address
|
||||
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
|
||||
t.Fatal("call with address error", err)
|
||||
}
|
||||
|
||||
// num calls
|
||||
if called < c.Options().CallOptions.Retries+1 {
|
||||
t.Fatal("request not retried")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCallWrapper(t *testing.T) {
|
||||
var called bool
|
||||
id := "test.1"
|
||||
|
@@ -6,7 +6,16 @@ type message struct {
|
||||
payload interface{}
|
||||
}
|
||||
|
||||
func newMessage(topic string, payload interface{}, contentType string) Message {
|
||||
func newMessage(topic string, payload interface{}, contentType string, opts ...MessageOption) Message {
|
||||
var options MessageOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
if len(options.ContentType) > 0 {
|
||||
contentType = options.ContentType
|
||||
}
|
||||
|
||||
return &message{
|
||||
payload: payload,
|
||||
topic: topic,
|
||||
|
@@ -71,7 +71,6 @@ var (
|
||||
Name: "client_pool_size",
|
||||
EnvVar: "MICRO_CLIENT_POOL_SIZE",
|
||||
Usage: "Sets the client connection pool size. Default: 1",
|
||||
Value: 1,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "client_pool_ttl",
|
||||
@@ -385,7 +384,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
}
|
||||
|
||||
// client opts
|
||||
if r := ctx.Int("client_retries"); r > 0 {
|
||||
if r := ctx.Int("client_retries"); r >= 0 {
|
||||
clientOpts = append(clientOpts, client.Retries(r))
|
||||
}
|
||||
|
||||
|
@@ -15,8 +15,8 @@ func TestFunction(t *testing.T) {
|
||||
|
||||
// create service
|
||||
fn := NewFunction(
|
||||
Name("test.function"),
|
||||
Registry(mock.NewRegistry()),
|
||||
Name("test.function"),
|
||||
AfterStart(func() error {
|
||||
wg.Done()
|
||||
return nil
|
||||
@@ -26,29 +26,35 @@ func TestFunction(t *testing.T) {
|
||||
// we can't test fn.Init as it parses the command line
|
||||
// fn.Init()
|
||||
|
||||
ch := make(chan error, 2)
|
||||
|
||||
go func() {
|
||||
// wait for start
|
||||
wg.Wait()
|
||||
|
||||
// test call debug
|
||||
req := fn.Client().NewRequest(
|
||||
"test.function",
|
||||
"Debug.Health",
|
||||
new(proto.HealthRequest),
|
||||
)
|
||||
|
||||
rsp := new(proto.HealthResponse)
|
||||
|
||||
err := fn.Client().Call(context.TODO(), req, rsp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if rsp.Status != "ok" {
|
||||
t.Fatalf("function response: %s", rsp.Status)
|
||||
}
|
||||
// run service
|
||||
ch <- fn.Run()
|
||||
}()
|
||||
|
||||
// run service
|
||||
fn.Run()
|
||||
// wait for start
|
||||
wg.Wait()
|
||||
|
||||
// test call debug
|
||||
req := fn.Client().NewRequest(
|
||||
"test.function",
|
||||
"Debug.Health",
|
||||
new(proto.HealthRequest),
|
||||
)
|
||||
|
||||
rsp := new(proto.HealthResponse)
|
||||
|
||||
err := fn.Client().Call(context.TODO(), req, rsp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if rsp.Status != "ok" {
|
||||
t.Fatalf("function response: %s", rsp.Status)
|
||||
}
|
||||
|
||||
if err := <-ch; err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@@ -66,6 +66,9 @@ func (s *rpcServer) accept(sock transport.Socket) {
|
||||
return
|
||||
}
|
||||
|
||||
// add to wait group
|
||||
s.wg.Add(1)
|
||||
|
||||
// we use this Timeout header to set a server deadline
|
||||
to := msg.Header["Timeout"]
|
||||
// we use this Content-Type header to identify the codec needed
|
||||
@@ -80,6 +83,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
|
||||
},
|
||||
Body: []byte(err.Error()),
|
||||
})
|
||||
s.wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -102,15 +106,13 @@ func (s *rpcServer) accept(sock transport.Socket) {
|
||||
}
|
||||
}
|
||||
|
||||
// add to wait group
|
||||
s.wg.Add(1)
|
||||
defer s.wg.Done()
|
||||
|
||||
// TODO: needs better error handling
|
||||
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
|
||||
s.wg.Done()
|
||||
log.Logf("Unexpected error serving request, closing socket: %v", err)
|
||||
return
|
||||
}
|
||||
s.wg.Done()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -386,6 +388,8 @@ func (s *rpcServer) Start() error {
|
||||
|
||||
log.Logf("Listening on %s", ts.Addr())
|
||||
s.Lock()
|
||||
// swap address
|
||||
addr := s.opts.Address
|
||||
s.opts.Address = ts.Addr()
|
||||
s.Unlock()
|
||||
|
||||
@@ -405,6 +409,11 @@ func (s *rpcServer) Start() error {
|
||||
|
||||
// disconnect the broker
|
||||
config.Broker.Disconnect()
|
||||
|
||||
s.Lock()
|
||||
// swap back address
|
||||
s.opts.Address = addr
|
||||
s.Unlock()
|
||||
}()
|
||||
|
||||
// TODO: subscribe to cruft
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/codec"
|
||||
@@ -176,6 +177,8 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
|
||||
delete(hdr, "Content-Type")
|
||||
ctx := metadata.NewContext(context.Background(), hdr)
|
||||
|
||||
results := make(chan error, len(sb.handlers))
|
||||
|
||||
for i := 0; i < len(sb.handlers); i++ {
|
||||
handler := sb.handlers[i]
|
||||
|
||||
@@ -229,13 +232,26 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
fn(ctx, &rpcMessage{
|
||||
results <- fn(ctx, &rpcMessage{
|
||||
topic: sb.topic,
|
||||
contentType: ct,
|
||||
payload: req.Interface(),
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
var errors []string
|
||||
|
||||
for i := 0; i < len(sb.handlers); i++ {
|
||||
if err := <-results; err != nil {
|
||||
errors = append(errors, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) > 0 {
|
||||
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@@ -31,29 +31,33 @@ func TestService(t *testing.T) {
|
||||
// service.Init()
|
||||
|
||||
// run service
|
||||
go service.Run()
|
||||
go func() {
|
||||
// wait for start
|
||||
wg.Wait()
|
||||
|
||||
// wait for start
|
||||
wg.Wait()
|
||||
// test call debug
|
||||
req := service.Client().NewRequest(
|
||||
"test.service",
|
||||
"Debug.Health",
|
||||
new(proto.HealthRequest),
|
||||
)
|
||||
|
||||
// test call debug
|
||||
req := service.Client().NewRequest(
|
||||
"test.service",
|
||||
"Debug.Health",
|
||||
new(proto.HealthRequest),
|
||||
)
|
||||
rsp := new(proto.HealthResponse)
|
||||
|
||||
rsp := new(proto.HealthResponse)
|
||||
err := service.Client().Call(context.TODO(), req, rsp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err := service.Client().Call(context.TODO(), req, rsp)
|
||||
if err != nil {
|
||||
if rsp.Status != "ok" {
|
||||
t.Fatalf("service response: %s", rsp.Status)
|
||||
}
|
||||
|
||||
// shutdown the service
|
||||
cancel()
|
||||
}()
|
||||
|
||||
if err := service.Run(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if rsp.Status != "ok" {
|
||||
t.Fatalf("service response: %s", rsp.Status)
|
||||
}
|
||||
|
||||
// shutdown the service
|
||||
cancel()
|
||||
}
|
||||
|
Reference in New Issue
Block a user