Merge pull request #248 from micro/rework

Rework Interfaces
This commit is contained in:
Asim Aslam 2018-04-17 11:25:25 +01:00 committed by GitHub
commit 8fb5e20a22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 207 additions and 212 deletions

View File

@ -10,8 +10,10 @@ import (
func TestBackoff(t *testing.T) { func TestBackoff(t *testing.T) {
delta := time.Duration(0) delta := time.Duration(0)
c := NewClient()
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
d, err := exponentialBackoff(context.TODO(), NewJsonRequest("test", "test", nil), i) d, err := exponentialBackoff(context.TODO(), c.NewRequest("test", "test", nil), i)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -12,22 +12,18 @@ import (
type Client interface { type Client interface {
Init(...Option) error Init(...Option) error
Options() Options Options() Options
NewPublication(topic string, msg interface{}) Publication NewMessage(topic string, msg interface{}) Message
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error) Publish(ctx context.Context, msg Message, opts ...PublishOption) error
StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error)
Publish(ctx context.Context, p Publication, opts ...PublishOption) error
String() string String() string
} }
// Publication is the interface for a message published asynchronously // Message is the interface for publishing asynchronously
type Publication interface { type Message interface {
Topic() string Topic() string
Message() interface{} Payload() interface{}
ContentType() string ContentType() string
} }
@ -41,8 +37,8 @@ type Request interface {
Stream() bool Stream() bool
} }
// Streamer is the inteface for a bidirectional synchronous stream // Stream is the inteface for a bidirectional synchronous stream
type Streamer interface { type Stream interface {
Context() context.Context Context() context.Context
Request() Request Request() Request
Send(interface{}) error Send(interface{}) error
@ -85,26 +81,15 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca
return DefaultClient.Call(ctx, request, response, opts...) return DefaultClient.Call(ctx, request, response, opts...)
} }
// Makes a synchronous call to the specified address using the default client
func CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error {
return DefaultClient.CallRemote(ctx, address, request, response, opts...)
}
// Creates a streaming connection with a service and returns responses on the
// channel passed in. It's up to the user to close the streamer.
func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.Stream(ctx, request, opts...)
}
// Creates a streaming connection to the address specified.
func StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.StreamRemote(ctx, address, request, opts...)
}
// Publishes a publication using the default client. Using the underlying broker // Publishes a publication using the default client. Using the underlying broker
// set within the options. // set within the options.
func Publish(ctx context.Context, p Publication) error { func Publish(ctx context.Context, msg Message) error {
return DefaultClient.Publish(ctx, p) return DefaultClient.Publish(ctx, msg)
}
// Creates a new message using the default client
func NewMessage(topic string, payload interface{}) Message {
return DefaultClient.NewMessage(topic, payload)
} }
// Creates a new client with the options passed in // Creates a new client with the options passed in
@ -112,25 +97,16 @@ func NewClient(opt ...Option) Client {
return newRpcClient(opt...) return newRpcClient(opt...)
} }
// Creates a new publication using the default client
func NewPublication(topic string, message interface{}) Publication {
return DefaultClient.NewPublication(topic, message)
}
// Creates a new request using the default client. Content Type will // Creates a new request using the default client. Content Type will
// be set to the default within options and use the appropriate codec // be set to the default within options and use the appropriate codec
func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewRequest(service, method, request, reqOpts...) return DefaultClient.NewRequest(service, method, request, reqOpts...)
} }
// Creates a new protobuf request using the default client // Creates a streaming connection with a service and returns responses on the
func NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { // channel passed in. It's up to the user to close the streamer.
return DefaultClient.NewProtoRequest(service, method, request, reqOpts...) func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
} return DefaultClient.Stream(ctx, request, opts...)
// Creates a new json request using the default client
func NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewJsonRequest(service, method, request, reqOpts...)
} }
func String() string { func String() string {

View File

@ -49,22 +49,14 @@ func (m *MockClient) Options() client.Options {
return m.Opts return m.Opts
} }
func (m *MockClient) NewPublication(topic string, msg interface{}) client.Publication { func (m *MockClient) NewMessage(topic string, msg interface{}) client.Message {
return m.Client.NewPublication(topic, msg) return m.Client.NewMessage(topic, msg)
} }
func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewRequest(service, method, req, reqOpts...) return m.Client.NewRequest(service, method, req, reqOpts...)
} }
func (m *MockClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewProtoRequest(service, method, req, reqOpts...)
}
func (m *MockClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewJsonRequest(service, method, req, reqOpts...)
}
func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
@ -97,39 +89,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
return fmt.Errorf("rpc: can't find service %s", req.Method()) return fmt.Errorf("rpc: can't find service %s", req.Method())
} }
func (m *MockClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error { func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
m.Lock()
defer m.Unlock()
response, ok := m.Response[req.Service()]
if !ok {
return errors.NotFound("go.micro.client.mock", "service not found")
}
for _, r := range response {
if r.Method != req.Method() {
continue
}
if r.Error != nil {
return r.Error
}
v := reflect.ValueOf(rsp)
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
v = reflect.Indirect(v)
}
v.Set(reflect.ValueOf(r.Response))
return nil
}
return fmt.Errorf("rpc: can't find service %s", req.Method())
}
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
@ -137,15 +97,7 @@ func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...cli
return nil, nil return nil, nil
} }
func (m *MockClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) { func (m *MockClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
m.Lock()
defer m.Unlock()
// TODO: mock stream
return nil, nil
}
func (m *MockClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
return nil return nil
} }

View File

@ -21,7 +21,7 @@ func TestClient(t *testing.T) {
c := NewClient(Response("go.mock", response)) c := NewClient(Response("go.mock", response))
for _, r := range response { for _, r := range response {
req := c.NewJsonRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"}) req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
var rsp interface{} var rsp interface{}
err := c.Call(context.TODO(), req, &rsp) err := c.Call(context.TODO(), req, &rsp)

View File

@ -40,6 +40,8 @@ type Options struct {
type CallOptions struct { type CallOptions struct {
SelectOptions []selector.SelectOption SelectOptions []selector.SelectOption
// Address of remote host
Address string
// Backoff func // Backoff func
Backoff BackoffFunc Backoff BackoffFunc
// Check if retriable func // Check if retriable func
@ -66,7 +68,8 @@ type PublishOptions struct {
} }
type RequestOptions struct { type RequestOptions struct {
Stream bool ContentType string
Stream bool
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
@ -226,6 +229,13 @@ func DialTimeout(d time.Duration) Option {
// Call Options // Call Options
// WithAddress sets the remote address to use rather than using service discovery
func WithAddress(a string) CallOption {
return func(o *CallOptions) {
o.Address = a
}
}
func WithSelectOption(so ...selector.SelectOption) CallOption { func WithSelectOption(so ...selector.SelectOption) CallOption {
return func(o *CallOptions) { return func(o *CallOptions) {
o.SelectOptions = append(o.SelectOptions, so...) o.SelectOptions = append(o.SelectOptions, so...)
@ -281,6 +291,12 @@ func WithDialTimeout(d time.Duration) CallOption {
// Request Options // Request Options
func WithContentType(ct string) RequestOption {
return func(o *RequestOptions) {
o.ContentType = ct
}
}
func StreamingRequest() RequestOption { func StreamingRequest() RequestOption {
return func(o *RequestOptions) { return func(o *RequestOptions) {
o.Stream = true o.Stream = true

View File

@ -11,6 +11,7 @@ import (
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"sync/atomic" "sync/atomic"
@ -134,7 +135,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
} }
} }
func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Streamer, error) { func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) {
msg := &transport.Message{ msg := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
} }
@ -213,13 +214,25 @@ func (r *rpcClient) Options() Options {
return r.opts return r.opts
} }
func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error { func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
// make a copy of call opts // return remote address
callOpts := r.opts.CallOptions if len(opts.Address) > 0 {
for _, opt := range opts { return func() (*registry.Node, error) {
opt(&callOpts) return &registry.Node{
Address: opts.Address,
}, nil
}, nil
} }
return r.call(ctx, address, request, response, callOpts)
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
return next, nil
} }
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
@ -229,12 +242,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
opt(&callOpts) opt(&callOpts)
} }
// get next nodes from the selector next, err := r.next(request, callOpts)
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...) if err != nil {
if err != nil && err == selector.ErrNotFound { return err
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
} }
// check if we already have a deadline // check if we already have a deadline
@ -330,28 +340,16 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return gerr return gerr
} }
func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
return r.stream(ctx, address, request, callOpts)
}
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
// make a copy of call opts // make a copy of call opts
callOpts := r.opts.CallOptions callOpts := r.opts.CallOptions
for _, opt := range opts { for _, opt := range opts {
opt(&callOpts) opt(&callOpts)
} }
// get next nodes from the selector next, err := r.next(request, callOpts)
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...) if err != nil {
if err != nil && err == selector.ErrNotFound { return nil, err
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
// check if we already have a deadline // check if we already have a deadline
@ -373,7 +371,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
default: default:
} }
call := func(i int) (Streamer, error) { call := func(i int) (Stream, error) {
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i) t, err := callOpts.Backoff(ctx, request, i)
if err != nil { if err != nil {
@ -403,7 +401,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
} }
type response struct { type response struct {
stream Streamer stream Stream
err error err error
} }
@ -441,49 +439,38 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, grr return nil, grr
} }
func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishOption) error { func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
md, ok := metadata.FromContext(ctx) md, ok := metadata.FromContext(ctx)
if !ok { if !ok {
md = make(map[string]string) md = make(map[string]string)
} }
md["Content-Type"] = p.ContentType() md["Content-Type"] = msg.ContentType()
// encode message body // encode message body
cf, err := r.newCodec(p.ContentType()) cf, err := r.newCodec(msg.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
b := &buffer{bytes.NewBuffer(nil)} b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil { if err := cf(b).Write(&codec.Message{Type: codec.Publication}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
r.once.Do(func() { r.once.Do(func() {
r.opts.Broker.Connect() r.opts.Broker.Connect()
}) })
return r.opts.Broker.Publish(p.Topic(), &broker.Message{ return r.opts.Broker.Publish(msg.Topic(), &broker.Message{
Header: md, Header: md,
Body: b.Bytes(), Body: b.Bytes(),
}) })
} }
func (r *rpcClient) NewPublication(topic string, message interface{}) Publication { func (r *rpcClient) NewMessage(topic string, message interface{}) Message {
return newRpcPublication(topic, message, r.opts.ContentType) return newMessage(topic, message, r.opts.ContentType)
} }
func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, "application/octet-stream")
}
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...) return newRequest(service, method, request, r.opts.ContentType, reqOpts...)
}
func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...)
}
func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/json", reqOpts...)
} }
func (r *rpcClient) String() string { func (r *rpcClient) String() string {

View File

@ -10,6 +10,52 @@ import (
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
) )
func TestCallAddress(t *testing.T) {
var called bool
service := "test.service"
method := "Test.Method"
address := "10.1.10.1:8080"
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called = true
if req.Service() != service {
return fmt.Errorf("expected service: %s got %s", service, req.Service())
}
if req.Method() != method {
return fmt.Errorf("expected service: %s got %s", method, req.Method())
}
if addr != address {
return fmt.Errorf("expected address: %s got %s", address, addr)
}
// don't do the call
return nil
}
}
r := mock.NewRegistry()
c := NewClient(
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, method, nil)
// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
t.Fatal("call with address error", err)
}
if !called {
t.Fatal("wrapper not called")
}
}
func TestCallWrapper(t *testing.T) { func TestCallWrapper(t *testing.T) {
var called bool var called bool
id := "test.1" id := "test.1"

27
client/rpc_message.go Normal file
View File

@ -0,0 +1,27 @@
package client
type message struct {
topic string
contentType string
payload interface{}
}
func newMessage(topic string, payload interface{}, contentType string) Message {
return &message{
payload: payload,
topic: topic,
contentType: contentType,
}
}
func (m *message) ContentType() string {
return m.contentType
}
func (m *message) Topic() string {
return m.topic
}
func (m *message) Payload() interface{} {
return m.payload
}

View File

@ -1,27 +0,0 @@
package client
type rpcPublication struct {
topic string
contentType string
message interface{}
}
func newRpcPublication(topic string, message interface{}, contentType string) Publication {
return &rpcPublication{
message: message,
topic: topic,
contentType: contentType,
}
}
func (r *rpcPublication) ContentType() string {
return r.contentType
}
func (r *rpcPublication) Topic() string {
return r.topic
}
func (r *rpcPublication) Message() interface{} {
return r.message
}

View File

@ -8,13 +8,18 @@ type rpcRequest struct {
opts RequestOptions opts RequestOptions
} }
func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts RequestOptions var opts RequestOptions
for _, o := range reqOpts { for _, o := range reqOpts {
o(&opts) o(&opts)
} }
// set the content-type specified
if len(opts.ContentType) > 0 {
contentType = opts.ContentType
}
return &rpcRequest{ return &rpcRequest{
service: service, service: service,
method: method, method: method,

View File

@ -0,0 +1,23 @@
package client
import (
"testing"
)
func TestRequestOptions(t *testing.T) {
r := newRequest("service", "method", nil, "application/json")
if r.Service() != "service" {
t.Fatalf("expected 'service' got %s", r.Service())
}
if r.Method() != "method" {
t.Fatalf("expected 'method' got %s", r.Method())
}
if r.ContentType() != "application/json" {
t.Fatalf("expected 'method' got %s", r.ContentType())
}
r2 := newRequest("service", "method", nil, "application/json", WithContentType("application/protobuf"))
if r2.ContentType() != "application/protobuf" {
t.Fatalf("expected 'method' got %s", r2.ContentType())
}
}

View File

@ -14,4 +14,4 @@ type CallWrapper func(CallFunc) CallFunc
type Wrapper func(Client) Client type Wrapper func(Client) Client
// StreamWrapper wraps a Stream and returns the equivalent // StreamWrapper wraps a Stream and returns the equivalent
type StreamWrapper func(Streamer) Streamer type StreamWrapper func(Stream) Stream

View File

@ -23,7 +23,7 @@ func fnHandlerWrapper(f Function) server.HandlerWrapper {
func fnSubWrapper(f Function) server.SubscriberWrapper { func fnSubWrapper(f Function) server.SubscriberWrapper {
return func(s server.SubscriberFunc) server.SubscriberFunc { return func(s server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Publication) error { return func(ctx context.Context, msg server.Message) error {
defer f.Done() defer f.Done()
return s(ctx, msg) return s(ctx, msg)
} }

View File

@ -12,5 +12,5 @@ type publisher struct {
} }
func (p *publisher) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error { func (p *publisher) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return p.c.Publish(ctx, p.c.NewPublication(p.topic, msg)) return p.c.Publish(ctx, p.c.NewMessage(p.topic, msg))
} }

View File

@ -8,10 +8,10 @@ type rpcRequest struct {
stream bool stream bool
} }
type rpcPublication struct { type rpcMessage struct {
topic string topic string
contentType string contentType string
message interface{} payload interface{}
} }
func (r *rpcRequest) ContentType() string { func (r *rpcRequest) ContentType() string {
@ -34,14 +34,14 @@ func (r *rpcRequest) Stream() bool {
return r.stream return r.stream
} }
func (r *rpcPublication) ContentType() string { func (r *rpcMessage) ContentType() string {
return r.contentType return r.contentType
} }
func (r *rpcPublication) Topic() string { func (r *rpcMessage) Topic() string {
return r.topic return r.topic
} }
func (r *rpcPublication) Message() interface{} { func (r *rpcMessage) Payload() interface{} {
return r.message return r.payload
} }

View File

@ -119,9 +119,9 @@ func prepareMethod(method reflect.Method) *methodType {
if stream { if stream {
// check stream type // check stream type
streamType := reflect.TypeOf((*Streamer)(nil)).Elem() streamType := reflect.TypeOf((*Stream)(nil)).Elem()
if !argType.Implements(streamType) { if !argType.Implements(streamType) {
log.Log(mname, "argument does not implement Streamer interface:", argType) log.Log(mname, "argument does not implement Stream interface:", argType)
return nil return nil
} }
} else { } else {

View File

@ -25,9 +25,9 @@ type Server interface {
String() string String() string
} }
type Publication interface { type Message interface {
Topic() string Topic() string
Message() interface{} Payload() interface{}
ContentType() string ContentType() string
} }
@ -40,11 +40,11 @@ type Request interface {
Stream() bool Stream() bool
} }
// Streamer represents a stream established with a client. // Stream represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request. // A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error(). // The last error will be left in Error().
// EOF indicated end of the stream. // EOF indicated end of the stream.
type Streamer interface { type Stream interface {
Context() context.Context Context() context.Context
Request() Request Request() Request
Send(interface{}) error Send(interface{}) error

View File

@ -204,7 +204,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
return err return err
} }
fn := func(ctx context.Context, msg Publication) error { fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value var vals []reflect.Value
if sb.typ.Kind() != reflect.Func { if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr) vals = append(vals, sb.rcvr)
@ -213,7 +213,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
vals = append(vals, reflect.ValueOf(ctx)) vals = append(vals, reflect.ValueOf(ctx))
} }
vals = append(vals, reflect.ValueOf(msg.Message())) vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals) returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil { if err := returnValues[0].Interface(); err != nil {
@ -229,10 +229,10 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
s.wg.Add(1) s.wg.Add(1)
go func() { go func() {
defer s.wg.Done() defer s.wg.Done()
fn(ctx, &rpcPublication{ fn(ctx, &rpcMessage{
topic: sb.topic, topic: sb.topic,
contentType: ct, contentType: ct,
message: req.Interface(), payload: req.Interface(),
}) })
}() }()
} }

View File

@ -12,7 +12,7 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// SubscriberFunc represents a single method of a subscriber. It's used primarily // SubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete // for the wrappers. What's handed to the actual method is the concrete
// publication message. // publication message.
type SubscriberFunc func(ctx context.Context, msg Publication) error type SubscriberFunc func(ctx context.Context, msg Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent // HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc type HandlerWrapper func(HandlerFunc) HandlerFunc
@ -20,8 +20,8 @@ type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent // SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// StreamerWrapper wraps a Streamer interface and returns the equivalent. // StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this // Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring, // is a convenient way to wrap a Stream as its in use for trace, monitoring,
// metrics, etc. // metrics, etc.
type StreamerWrapper func(Streamer) Streamer type StreamWrapper func(Stream) Stream

View File

@ -50,15 +50,3 @@ var (
func NewTransport(opts ...Option) Transport { func NewTransport(opts ...Option) Transport {
return newHTTPTransport(opts...) return newHTTPTransport(opts...)
} }
func Dial(addr string, opts ...DialOption) (Client, error) {
return DefaultTransport.Dial(addr, opts...)
}
func Listen(addr string, opts ...ListenOption) (Listener, error) {
return DefaultTransport.Listen(addr, opts...)
}
func String() string {
return DefaultTransport.String()
}

View File

@ -36,12 +36,12 @@ func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interf
return c.Client.Call(ctx, req, rsp, opts...) return c.Client.Call(ctx, req, rsp, opts...)
} }
func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) { func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
ctx = c.setHeaders(ctx) ctx = c.setHeaders(ctx)
return c.Client.Stream(ctx, req, opts...) return c.Client.Stream(ctx, req, opts...)
} }
func (c *clientWrapper) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error { func (c *clientWrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
ctx = c.setHeaders(ctx) ctx = c.setHeaders(ctx)
return c.Client.Publish(ctx, p, opts...) return c.Client.Publish(ctx, p, opts...)
} }