Compare commits

..

32 Commits

Author SHA1 Message Date
Asim Aslam
5372707d0e Strip flag setting 2018-05-30 11:49:50 +01:00
Asim Aslam
a1deb5c44e Merge pull request #264 from micro/register
set register ttl and interval by default
2018-05-30 11:31:13 +01:00
Asim Aslam
956b1c6867 set register ttl and interval by default 2018-05-29 12:28:55 +01:00
Asim Aslam
55aca8b0bf Merge pull request #262 from micro/retries
Retry requests
2018-05-29 11:52:47 +01:00
Asim Aslam
ba8582a47a change retries to actually mean retries 2018-05-28 16:01:04 +01:00
Asim Aslam
f409468ccd Merge branch 'master' of github.com:micro/go-micro 2018-05-28 15:51:52 +01:00
Asim Aslam
d982225a54 restructure test 2018-05-28 15:40:28 +01:00
Asim Aslam
217190c4d6 Merge pull request #261 from micro/pool
Set the default pool size to 1
2018-05-26 09:58:16 +01:00
Asim Aslam
a56e97b47d Change waitgroup processing 2018-05-26 09:41:41 +01:00
Asim Aslam
b4f47b1cc9 Set the default pool size to 1 2018-05-26 09:10:29 +01:00
Asim Aslam
070cebd605 Merge pull request #260 from elebore/master
just update the pool configuration of rpcClient  if the options changed
2018-05-26 09:03:16 +01:00
bogle
541e894507 just update the pool configuration if the options changed, because recreating the pool,existed idleconnection, if any, will be dropped without closing 2018-05-26 15:38:41 +08:00
Asim Aslam
c666558f8c make the broker/transport listen on new addr when stop/started with addr :0 2018-05-25 15:19:25 +01:00
Asim Aslam
6444b7e24c context cancellation is not required 2018-05-25 15:03:15 +01:00
Asim Aslam
023245a7ba shutdown broker once done 2018-05-25 14:43:32 +01:00
Asim Aslam
2a2ad553a1 reorder testing functions 2018-05-25 14:39:50 +01:00
Asim Aslam
909e13a24a Merge pull request #254 from micro/message
add message options
2018-05-10 17:42:46 +01:00
Asim Aslam
b17a802675 update mock 2018-05-10 17:39:13 +01:00
Asim Aslam
c3c0543733 add message options 2018-05-10 17:33:54 +01:00
Asim Aslam
b39ec4472c Return subscriber errors 2018-04-26 10:47:13 +01:00
Asim Aslam
b33489e481 update readme 2018-04-25 16:03:22 +01:00
Asim Aslam
8fb5e20a22 Merge pull request #248 from micro/rework
Rework Interfaces
2018-04-17 11:25:25 +01:00
Asim Aslam
0315b4480f revert some changes 2018-04-17 11:00:22 +01:00
Asim Aslam
ccbc1b9cf3 Fix broker registry issue 2018-04-17 08:30:36 +01:00
Asim Aslam
19fdfba0bf move wrapper files 2018-04-14 19:24:17 +01:00
Asim Aslam
d00ac200dd remove registry and transport default funcs 2018-04-14 18:43:54 +01:00
Asim Aslam
173f7107e2 remove broker default funcs 2018-04-14 18:26:54 +01:00
Asim Aslam
d00d76bf7c Move publication to message 2018-04-14 18:21:02 +01:00
Asim Aslam
65068e8b82 rename Streamer to Stream 2018-04-14 18:15:09 +01:00
Asim Aslam
c2cfe5310c Rework client interface 2018-04-14 18:06:52 +01:00
Asim Aslam
07068379c6 remove remote func methods 2018-04-14 16:16:58 +01:00
Asim Aslam
528b5f58de update sponsor area 2018-04-12 12:09:36 +01:00
28 changed files with 366 additions and 267 deletions

View File

@@ -192,7 +192,7 @@ func main() {
service.Init()
// Create new greeter client
greeter := proto.GreeterServiceClient("greeter", service.Client())
greeter := proto.NewGreeterService("greeter", service.Client())
// Call the greeter
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
@@ -452,6 +452,8 @@ Check out [ja-micro](https://github.com/Sixt/ja-micro) to write services in Java
## Sponsors
Open source development of Micro is sponsored by Sixt
Sixt is an Enterprise Sponsor of Micro
<a href="https://micro.mu/blog/2016/04/25/announcing-sixt-sponsorship.html"><img src="https://micro.mu/sixt_logo.png" width=150px height="auto" /></a>
Become a sponsor by backing micro on [Patreon](https://www.patreon.com/microhq)

View File

@@ -326,10 +326,16 @@ func (h *httpBroker) Connect() error {
}
log.Logf("Broker Listening on %s", l.Addr().String())
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go h.run(l)
go func() {
h.run(l)
h.Lock()
h.address = addr
h.Unlock()
}()
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)

View File

@@ -10,8 +10,10 @@ import (
func TestBackoff(t *testing.T) {
delta := time.Duration(0)
c := NewClient()
for i := 0; i < 5; i++ {
d, err := exponentialBackoff(context.TODO(), NewJsonRequest("test", "test", nil), i)
d, err := exponentialBackoff(context.TODO(), c.NewRequest("test", "test", nil), i)
if err != nil {
t.Fatal(err)
}

View File

@@ -12,22 +12,18 @@ import (
type Client interface {
Init(...Option) error
Options() Options
NewPublication(topic string, msg interface{}) Publication
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error)
StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error)
Publish(ctx context.Context, p Publication, opts ...PublishOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
String() string
}
// Publication is the interface for a message published asynchronously
type Publication interface {
// Message is the interface for publishing asynchronously
type Message interface {
Topic() string
Message() interface{}
Payload() interface{}
ContentType() string
}
@@ -41,8 +37,8 @@ type Request interface {
Stream() bool
}
// Streamer is the inteface for a bidirectional synchronous stream
type Streamer interface {
// Stream is the inteface for a bidirectional synchronous stream
type Stream interface {
Context() context.Context
Request() Request
Send(interface{}) error
@@ -60,6 +56,9 @@ type CallOption func(*CallOptions)
// PublishOption used by Publish
type PublishOption func(*PublishOptions)
// MessageOption used by NewMessage
type MessageOption func(*MessageOptions)
// RequestOption used by NewRequest
type RequestOption func(*RequestOptions)
@@ -75,7 +74,7 @@ var (
// DefaultRequestTimeout is the default request timeout
DefaultRequestTimeout = time.Second * 5
// DefaultPoolSize sets the connection pool size
DefaultPoolSize = 0
DefaultPoolSize = 1
// DefaultPoolTTL sets the connection pool ttl
DefaultPoolTTL = time.Minute
)
@@ -85,26 +84,15 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca
return DefaultClient.Call(ctx, request, response, opts...)
}
// Makes a synchronous call to the specified address using the default client
func CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error {
return DefaultClient.CallRemote(ctx, address, request, response, opts...)
}
// Creates a streaming connection with a service and returns responses on the
// channel passed in. It's up to the user to close the streamer.
func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.Stream(ctx, request, opts...)
}
// Creates a streaming connection to the address specified.
func StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.StreamRemote(ctx, address, request, opts...)
}
// Publishes a publication using the default client. Using the underlying broker
// set within the options.
func Publish(ctx context.Context, p Publication) error {
return DefaultClient.Publish(ctx, p)
func Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
return DefaultClient.Publish(ctx, msg, opts...)
}
// Creates a new message using the default client
func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message {
return DefaultClient.NewMessage(topic, payload, opts...)
}
// Creates a new client with the options passed in
@@ -112,25 +100,16 @@ func NewClient(opt ...Option) Client {
return newRpcClient(opt...)
}
// Creates a new publication using the default client
func NewPublication(topic string, message interface{}) Publication {
return DefaultClient.NewPublication(topic, message)
}
// Creates a new request using the default client. Content Type will
// be set to the default within options and use the appropriate codec
func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewRequest(service, method, request, reqOpts...)
}
// Creates a new protobuf request using the default client
func NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewProtoRequest(service, method, request, reqOpts...)
}
// Creates a new json request using the default client
func NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewJsonRequest(service, method, request, reqOpts...)
// Creates a streaming connection with a service and returns responses on the
// channel passed in. It's up to the user to close the streamer.
func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
return DefaultClient.Stream(ctx, request, opts...)
}
func String() string {

View File

@@ -49,22 +49,14 @@ func (m *MockClient) Options() client.Options {
return m.Opts
}
func (m *MockClient) NewPublication(topic string, msg interface{}) client.Publication {
return m.Client.NewPublication(topic, msg)
func (m *MockClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
return m.Client.NewMessage(topic, msg, opts...)
}
func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewRequest(service, method, req, reqOpts...)
}
func (m *MockClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewProtoRequest(service, method, req, reqOpts...)
}
func (m *MockClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewJsonRequest(service, method, req, reqOpts...)
}
func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
m.Lock()
defer m.Unlock()
@@ -97,39 +89,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
return fmt.Errorf("rpc: can't find service %s", req.Method())
}
func (m *MockClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error {
m.Lock()
defer m.Unlock()
response, ok := m.Response[req.Service()]
if !ok {
return errors.NotFound("go.micro.client.mock", "service not found")
}
for _, r := range response {
if r.Method != req.Method() {
continue
}
if r.Error != nil {
return r.Error
}
v := reflect.ValueOf(rsp)
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
v = reflect.Indirect(v)
}
v.Set(reflect.ValueOf(r.Response))
return nil
}
return fmt.Errorf("rpc: can't find service %s", req.Method())
}
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
m.Lock()
defer m.Unlock()
@@ -137,15 +97,7 @@ func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...cli
return nil, nil
}
func (m *MockClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
m.Lock()
defer m.Unlock()
// TODO: mock stream
return nil, nil
}
func (m *MockClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
func (m *MockClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
return nil
}

View File

@@ -21,7 +21,7 @@ func TestClient(t *testing.T) {
c := NewClient(Response("go.mock", response))
for _, r := range response {
req := c.NewJsonRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
var rsp interface{}
err := c.Call(context.TODO(), req, &rsp)

View File

@@ -40,6 +40,8 @@ type Options struct {
type CallOptions struct {
SelectOptions []selector.SelectOption
// Address of remote host
Address string
// Backoff func
Backoff BackoffFunc
// Check if retriable func
@@ -65,8 +67,13 @@ type PublishOptions struct {
Context context.Context
}
type MessageOptions struct {
ContentType string
}
type RequestOptions struct {
Stream bool
ContentType string
Stream bool
// Other options for implementations of the interface
// can be stored in a context
@@ -226,6 +233,13 @@ func DialTimeout(d time.Duration) Option {
// Call Options
// WithAddress sets the remote address to use rather than using service discovery
func WithAddress(a string) CallOption {
return func(o *CallOptions) {
o.Address = a
}
}
func WithSelectOption(so ...selector.SelectOption) CallOption {
return func(o *CallOptions) {
o.SelectOptions = append(o.SelectOptions, so...)
@@ -281,6 +295,12 @@ func WithDialTimeout(d time.Duration) CallOption {
// Request Options
func WithContentType(ct string) RequestOption {
return func(o *RequestOptions) {
o.ContentType = ct
}
}
func StreamingRequest() RequestOption {
return func(o *RequestOptions) {
o.Stream = true

View File

@@ -11,6 +11,7 @@ import (
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"sync/atomic"
@@ -134,7 +135,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
}
}
func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Streamer, error) {
func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) {
msg := &transport.Message{
Header: make(map[string]string),
}
@@ -201,9 +202,12 @@ func (r *rpcClient) Init(opts ...Option) error {
o(&r.opts)
}
// recreate the pool if the options changed
// update pool configuration if the options changed
if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {
r.pool = newPool(r.opts.PoolSize, r.opts.PoolTTL)
r.pool.Lock()
r.pool.size = r.opts.PoolSize
r.pool.ttl = int64(r.opts.PoolTTL.Seconds())
r.pool.Unlock()
}
return nil
@@ -213,13 +217,25 @@ func (r *rpcClient) Options() Options {
return r.opts
}
func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
// return remote address
if len(opts.Address) > 0 {
return func() (*registry.Node, error) {
return &registry.Node{
Address: opts.Address,
}, nil
}, nil
}
return r.call(ctx, address, request, response, callOpts)
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
return next, nil
}
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
@@ -229,12 +245,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
opt(&callOpts)
}
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
next, err := r.next(request, callOpts)
if err != nil {
return err
}
// check if we already have a deadline
@@ -300,7 +313,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
ch := make(chan error, callOpts.Retries)
var gerr error
for i := 0; i < callOpts.Retries; i++ {
for i := 0; i <= callOpts.Retries; i++ {
go func() {
ch <- call(i)
}()
@@ -330,28 +343,16 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return gerr
}
func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
return r.stream(ctx, address, request, callOpts)
}
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
next, err := r.next(request, callOpts)
if err != nil {
return nil, err
}
// check if we already have a deadline
@@ -373,7 +374,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
default:
}
call := func(i int) (Streamer, error) {
call := func(i int) (Stream, error) {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
@@ -403,14 +404,14 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
}
type response struct {
stream Streamer
stream Stream
err error
}
ch := make(chan response, callOpts.Retries)
var grr error
for i := 0; i < callOpts.Retries; i++ {
for i := 0; i <= callOpts.Retries; i++ {
go func() {
s, err := call(i)
ch <- response{s, err}
@@ -441,49 +442,38 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, grr
}
func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishOption) error {
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
}
md["Content-Type"] = p.ContentType()
md["Content-Type"] = msg.ContentType()
// encode message body
cf, err := r.newCodec(p.ContentType())
cf, err := r.newCodec(msg.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil {
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
r.once.Do(func() {
r.opts.Broker.Connect()
})
return r.opts.Broker.Publish(p.Topic(), &broker.Message{
return r.opts.Broker.Publish(msg.Topic(), &broker.Message{
Header: md,
Body: b.Bytes(),
})
}
func (r *rpcClient) NewPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, r.opts.ContentType)
func (r *rpcClient) NewMessage(topic string, message interface{}, opts ...MessageOption) Message {
return newMessage(topic, message, r.opts.ContentType, opts...)
}
func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, "application/octet-stream")
}
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...)
}
func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...)
}
func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/json", reqOpts...)
return newRequest(service, method, request, r.opts.ContentType, reqOpts...)
}
func (r *rpcClient) String() string {

View File

@@ -2,6 +2,7 @@ package client
import (
"context"
"errors"
"fmt"
"testing"
@@ -10,6 +11,92 @@ import (
"github.com/micro/go-micro/selector"
)
func TestCallAddress(t *testing.T) {
var called bool
service := "test.service"
method := "Test.Method"
address := "10.1.10.1:8080"
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called = true
if req.Service() != service {
return fmt.Errorf("expected service: %s got %s", service, req.Service())
}
if req.Method() != method {
return fmt.Errorf("expected service: %s got %s", method, req.Method())
}
if addr != address {
return fmt.Errorf("expected address: %s got %s", address, addr)
}
// don't do the call
return nil
}
}
r := mock.NewRegistry()
c := NewClient(
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, method, nil)
// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
t.Fatal("call with address error", err)
}
if !called {
t.Fatal("wrapper not called")
}
}
func TestCallRetry(t *testing.T) {
service := "test.service"
method := "Test.Method"
address := "10.1.10.1:8080"
var called int
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called++
if called == 1 {
return errors.New("retry request")
}
// don't do the call
return nil
}
}
r := mock.NewRegistry()
c := NewClient(
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, method, nil)
// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
t.Fatal("call with address error", err)
}
// num calls
if called < c.Options().CallOptions.Retries+1 {
t.Fatal("request not retried")
}
}
func TestCallWrapper(t *testing.T) {
var called bool
id := "test.1"

36
client/rpc_message.go Normal file
View File

@@ -0,0 +1,36 @@
package client
type message struct {
topic string
contentType string
payload interface{}
}
func newMessage(topic string, payload interface{}, contentType string, opts ...MessageOption) Message {
var options MessageOptions
for _, o := range opts {
o(&options)
}
if len(options.ContentType) > 0 {
contentType = options.ContentType
}
return &message{
payload: payload,
topic: topic,
contentType: contentType,
}
}
func (m *message) ContentType() string {
return m.contentType
}
func (m *message) Topic() string {
return m.topic
}
func (m *message) Payload() interface{} {
return m.payload
}

View File

@@ -1,27 +0,0 @@
package client
type rpcPublication struct {
topic string
contentType string
message interface{}
}
func newRpcPublication(topic string, message interface{}, contentType string) Publication {
return &rpcPublication{
message: message,
topic: topic,
contentType: contentType,
}
}
func (r *rpcPublication) ContentType() string {
return r.contentType
}
func (r *rpcPublication) Topic() string {
return r.topic
}
func (r *rpcPublication) Message() interface{} {
return r.message
}

View File

@@ -8,13 +8,18 @@ type rpcRequest struct {
opts RequestOptions
}
func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts RequestOptions
for _, o := range reqOpts {
o(&opts)
}
// set the content-type specified
if len(opts.ContentType) > 0 {
contentType = opts.ContentType
}
return &rpcRequest{
service: service,
method: method,

View File

@@ -0,0 +1,23 @@
package client
import (
"testing"
)
func TestRequestOptions(t *testing.T) {
r := newRequest("service", "method", nil, "application/json")
if r.Service() != "service" {
t.Fatalf("expected 'service' got %s", r.Service())
}
if r.Method() != "method" {
t.Fatalf("expected 'method' got %s", r.Method())
}
if r.ContentType() != "application/json" {
t.Fatalf("expected 'method' got %s", r.ContentType())
}
r2 := newRequest("service", "method", nil, "application/json", WithContentType("application/protobuf"))
if r2.ContentType() != "application/protobuf" {
t.Fatalf("expected 'method' got %s", r2.ContentType())
}
}

View File

@@ -14,4 +14,4 @@ type CallWrapper func(CallFunc) CallFunc
type Wrapper func(Client) Client
// StreamWrapper wraps a Stream and returns the equivalent
type StreamWrapper func(Streamer) Streamer
type StreamWrapper func(Stream) Stream

View File

@@ -71,7 +71,6 @@ var (
Name: "client_pool_size",
EnvVar: "MICRO_CLIENT_POOL_SIZE",
Usage: "Sets the client connection pool size. Default: 1",
Value: 1,
},
cli.StringFlag{
Name: "client_pool_ttl",

View File

@@ -23,7 +23,7 @@ func fnHandlerWrapper(f Function) server.HandlerWrapper {
func fnSubWrapper(f Function) server.SubscriberWrapper {
return func(s server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Publication) error {
return func(ctx context.Context, msg server.Message) error {
defer f.Done()
return s(ctx, msg)
}

View File

@@ -15,8 +15,8 @@ func TestFunction(t *testing.T) {
// create service
fn := NewFunction(
Name("test.function"),
Registry(mock.NewRegistry()),
Name("test.function"),
AfterStart(func() error {
wg.Done()
return nil
@@ -26,29 +26,35 @@ func TestFunction(t *testing.T) {
// we can't test fn.Init as it parses the command line
// fn.Init()
ch := make(chan error, 2)
go func() {
// wait for start
wg.Wait()
// test call debug
req := fn.Client().NewRequest(
"test.function",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
err := fn.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("function response: %s", rsp.Status)
}
// run service
ch <- fn.Run()
}()
// run service
fn.Run()
// wait for start
wg.Wait()
// test call debug
req := fn.Client().NewRequest(
"test.function",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
err := fn.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("function response: %s", rsp.Status)
}
if err := <-ch; err != nil {
t.Fatal(err)
}
}

View File

@@ -100,6 +100,8 @@ func Registry(r registry.Registry) Option {
o.Server.Init(server.Registry(r))
// Update Selector
o.Client.Options().Selector.Init(selector.Registry(r))
// Update Broker
o.Broker.Init(broker.Registry(r))
}
}

View File

@@ -12,5 +12,5 @@ type publisher struct {
}
func (p *publisher) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return p.c.Publish(ctx, p.c.NewPublication(p.topic, msg))
return p.c.Publish(ctx, p.c.NewMessage(p.topic, msg))
}

View File

@@ -8,10 +8,10 @@ type rpcRequest struct {
stream bool
}
type rpcPublication struct {
type rpcMessage struct {
topic string
contentType string
message interface{}
payload interface{}
}
func (r *rpcRequest) ContentType() string {
@@ -34,14 +34,14 @@ func (r *rpcRequest) Stream() bool {
return r.stream
}
func (r *rpcPublication) ContentType() string {
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcPublication) Topic() string {
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcPublication) Message() interface{} {
return r.message
func (r *rpcMessage) Payload() interface{} {
return r.payload
}

View File

@@ -66,6 +66,9 @@ func (s *rpcServer) accept(sock transport.Socket) {
return
}
// add to wait group
s.wg.Add(1)
// we use this Timeout header to set a server deadline
to := msg.Header["Timeout"]
// we use this Content-Type header to identify the codec needed
@@ -80,6 +83,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
},
Body: []byte(err.Error()),
})
s.wg.Done()
return
}
@@ -102,15 +106,13 @@ func (s *rpcServer) accept(sock transport.Socket) {
}
}
// add to wait group
s.wg.Add(1)
defer s.wg.Done()
// TODO: needs better error handling
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
s.wg.Done()
log.Logf("Unexpected error serving request, closing socket: %v", err)
return
}
s.wg.Done()
}
}
@@ -386,6 +388,8 @@ func (s *rpcServer) Start() error {
log.Logf("Listening on %s", ts.Addr())
s.Lock()
// swap address
addr := s.opts.Address
s.opts.Address = ts.Addr()
s.Unlock()
@@ -405,6 +409,11 @@ func (s *rpcServer) Start() error {
// disconnect the broker
config.Broker.Disconnect()
s.Lock()
// swap back address
s.opts.Address = addr
s.Unlock()
}()
// TODO: subscribe to cruft

View File

@@ -119,9 +119,9 @@ func prepareMethod(method reflect.Method) *methodType {
if stream {
// check stream type
streamType := reflect.TypeOf((*Streamer)(nil)).Elem()
streamType := reflect.TypeOf((*Stream)(nil)).Elem()
if !argType.Implements(streamType) {
log.Log(mname, "argument does not implement Streamer interface:", argType)
log.Log(mname, "argument does not implement Stream interface:", argType)
return nil
}
} else {

View File

@@ -25,9 +25,9 @@ type Server interface {
String() string
}
type Publication interface {
type Message interface {
Topic() string
Message() interface{}
Payload() interface{}
ContentType() string
}
@@ -40,11 +40,11 @@ type Request interface {
Stream() bool
}
// Streamer represents a stream established with a client.
// Stream represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error().
// EOF indicated end of the stream.
type Streamer interface {
type Stream interface {
Context() context.Context
Request() Request
Send(interface{}) error

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
@@ -176,6 +177,8 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
delete(hdr, "Content-Type")
ctx := metadata.NewContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
@@ -204,7 +207,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
return err
}
fn := func(ctx context.Context, msg Publication) error {
fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
@@ -213,7 +216,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Message()))
vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
@@ -229,13 +232,26 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
s.wg.Add(1)
go func() {
defer s.wg.Done()
fn(ctx, &rpcPublication{
results <- fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
message: req.Interface(),
payload: req.Interface(),
})
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return nil
}
}

View File

@@ -12,7 +12,7 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// SubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message.
type SubscriberFunc func(ctx context.Context, msg Publication) error
type SubscriberFunc func(ctx context.Context, msg Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc
@@ -20,8 +20,8 @@ type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// StreamerWrapper wraps a Streamer interface and returns the equivalent.
// StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring,
// metrics, etc.
type StreamerWrapper func(Streamer) Streamer
type StreamWrapper func(Stream) Stream

View File

@@ -31,29 +31,33 @@ func TestService(t *testing.T) {
// service.Init()
// run service
go service.Run()
go func() {
// wait for start
wg.Wait()
// wait for start
wg.Wait()
// test call debug
req := service.Client().NewRequest(
"test.service",
"Debug.Health",
new(proto.HealthRequest),
)
// test call debug
req := service.Client().NewRequest(
"test.service",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
rsp := new(proto.HealthResponse)
err := service.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
err := service.Client().Call(context.TODO(), req, rsp)
if err != nil {
if rsp.Status != "ok" {
t.Fatalf("service response: %s", rsp.Status)
}
// shutdown the service
cancel()
}()
if err := service.Run(); err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("service response: %s", rsp.Status)
}
// shutdown the service
cancel()
}

View File

@@ -50,15 +50,3 @@ var (
func NewTransport(opts ...Option) Transport {
return newHTTPTransport(opts...)
}
func Dial(addr string, opts ...DialOption) (Client, error) {
return DefaultTransport.Dial(addr, opts...)
}
func Listen(addr string, opts ...ListenOption) (Listener, error) {
return DefaultTransport.Listen(addr, opts...)
}
func String() string {
return DefaultTransport.String()
}

View File

@@ -36,12 +36,12 @@ func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interf
return c.Client.Call(ctx, req, rsp, opts...)
}
func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
ctx = c.setHeaders(ctx)
return c.Client.Stream(ctx, req, opts...)
}
func (c *clientWrapper) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
func (c *clientWrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
ctx = c.setHeaders(ctx)
return c.Client.Publish(ctx, p, opts...)
}