cleanup interfaces for v4
Some checks failed
lint / lint (pull_request) Successful in 1m9s
pr / test (pull_request) Failing after 1m4s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2023-05-09 20:04:15 +03:00
parent a2a383606d
commit 819ad1117a
24 changed files with 368 additions and 1472 deletions

View File

@ -10,13 +10,15 @@ import (
) )
// DefaultBroker default memory broker // DefaultBroker default memory broker
var DefaultBroker = NewBroker() var DefaultBroker Broker // = NewBroker()
var ( var (
// ErrNotConnected returns when broker used but not connected yet // ErrNotConnected returns when broker used but not connected yet
ErrNotConnected = errors.New("broker not connected") ErrNotConnected = errors.New("broker not connected")
// ErrDisconnected returns when broker disconnected // ErrDisconnected returns when broker disconnected
ErrDisconnected = errors.New("broker disconnected") ErrDisconnected = errors.New("broker disconnected")
// ErrInvalidMessage returns when message has nvalid format
ErrInvalidMessage = errors.New("broker message has invalid format")
) )
// Broker is an interface used for asynchronous messaging. // Broker is an interface used for asynchronous messaging.
@ -33,61 +35,33 @@ type Broker interface {
Connect(ctx context.Context) error Connect(ctx context.Context) error
// Disconnect disconnect from broker // Disconnect disconnect from broker
Disconnect(ctx context.Context) error Disconnect(ctx context.Context) error
// NewMessage creates new broker message
NewMessage(endpoint string, req interface{}, opts ...MessageOption) Message
// Publish message to broker topic // Publish message to broker topic
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error Publish(ctx context.Context, msg interface{}, opts ...PublishOption) error
// Subscribe subscribes to topic message via handler // Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
// BatchPublish messages to broker with multiple topics
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
// BatchSubscribe subscribes to topic messages via handler
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
// String type of broker // String type of broker
String() string String() string
} }
// Handler is used to process messages via a subscription of a topic. // Message is given to a subscription handler for processing
type Handler func(Event) error type Message interface {
// Context for the message
// Events contains multiple events Context() context.Context
type Events []Event
// Ack try to ack all events and return
func (evs Events) Ack() error {
var err error
for _, ev := range evs {
if err = ev.Ack(); err != nil {
return err
}
}
return nil
}
// SetError sets error on event
func (evs Events) SetError(err error) {
for _, ev := range evs {
ev.SetError(err)
}
}
// BatchHandler is used to process messages in batches via a subscription of a topic.
type BatchHandler func(Events) error
// Event is given to a subscription handler for processing
type Event interface {
// Topic returns event topic // Topic returns event topic
Topic() string Topic() string
// Message returns broker message // Body returns broker message
Message() *Message Body() interface{}
// Ack acknowledge message // Ack acknowledge message
Ack() error Ack() error
// Error returns message error (like decoding errors or some other) // Error returns message error (like decoding errors or some other)
// In this case Body contains raw []byte from broker
Error() error Error() error
// SetError set event processing error
SetError(err error)
} }
// Message is used to transfer data // RawMessage is used to transfer data
type Message struct { type RawMessage struct {
// Header contains message metadata // Header contains message metadata
Header metadata.Metadata Header metadata.Metadata
// Body contains message body // Body contains message body
@ -95,8 +69,8 @@ type Message struct {
} }
// NewMessage create broker message with topic filled // NewMessage create broker message with topic filled
func NewMessage(topic string) *Message { func NewRawMessage(topic string) *RawMessage {
m := &Message{Header: metadata.New(2)} m := &RawMessage{Header: metadata.New(2)}
m.Header.Set(metadata.HeaderTopic, topic) m.Header.Set(metadata.HeaderTopic, topic)
return m return m
} }

View File

@ -1,3 +1,5 @@
//go:build ignore
package broker package broker
import ( import (
@ -6,7 +8,6 @@ import (
"time" "time"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
maddr "go.unistack.org/micro/v4/util/addr" maddr "go.unistack.org/micro/v4/util/addr"
"go.unistack.org/micro/v4/util/id" "go.unistack.org/micro/v4/util/id"
mnet "go.unistack.org/micro/v4/util/net" mnet "go.unistack.org/micro/v4/util/net"
@ -21,23 +22,6 @@ type memoryBroker struct {
connected bool connected bool
} }
type memoryEvent struct {
err error
message interface{}
topic string
opts Options
}
type memorySubscriber struct {
ctx context.Context
exit chan bool
handler Handler
batchhandler BatchHandler
id string
topic string
opts SubscribeOptions
}
func (m *memoryBroker) Options() Options { func (m *memoryBroker) Options() Options {
return m.opts return m.opts
} }
@ -89,16 +73,11 @@ func (m *memoryBroker) Init(opts ...Option) error {
return nil return nil
} }
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { func (m *memoryBroker) NewMessage(endpoint string, req interface{}, opts ...MessageOption) Message {
msg.Header.Set(metadata.HeaderTopic, topic) return &memoryMessage{}
return m.publish(ctx, []*Message{msg}, opts...)
} }
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { func (m *memoryBroker) Publish(ctx context.Context, message interface{}, opts ...PublishOption) error {
return m.publish(ctx, msgs, opts...)
}
func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
m.RLock() m.RLock()
if !m.connected { if !m.connected {
m.RUnlock() m.RUnlock()
@ -113,13 +92,20 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
return ctx.Err() return ctx.Err()
default: default:
options := NewPublishOptions(opts...) options := NewPublishOptions(opts...)
var msgs []*memoryMessage
msgTopicMap := make(map[string]Events) switch v := message.(type) {
for _, v := range msgs { case *memoryMessage:
p := &memoryEvent{opts: m.opts} msgs = []*memoryMessage{v}
case []*memoryMessage:
if m.opts.Codec == nil || options.BodyOnly { msgs = v
p.topic, _ = v.Header.Get(metadata.HeaderTopic) default:
return ErrInvalidMessage
}
msgTopicMap := make(map[string][]*memoryMessage)
for _, msg := range msgs {
p := &memoryMessage{opts: options}
/*
if mb, ok := msg.Body().(*codec.Frame); ok {
p.message = v.Body p.message = v.Body
} else { } else {
p.topic, _ = v.Header.Get(metadata.HeaderTopic) p.topic, _ = v.Header.Get(metadata.HeaderTopic)
@ -128,10 +114,10 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
return err return err
} }
} }
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p) */
msgTopicMap[msg.Topic()] = append(msgTopicMap[p.topic], p)
} }
beh := m.opts.BatchErrorHandler
eh := m.opts.ErrorHandler eh := m.opts.ErrorHandler
for t, ms := range msgTopicMap { for t, ms := range msgTopicMap {
@ -152,45 +138,13 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms)) m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms))
for _, sub := range subs { for _, sub := range subs {
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
}
if sub.opts.ErrorHandler != nil { if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler eh = sub.opts.ErrorHandler
} }
switch {
// batch processing
case sub.batchhandler != nil:
if err = sub.batchhandler(ms); err != nil {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
ms.SetError(err)
if beh != nil {
_ = beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else {
if sub.opts.AutoAck {
if err = ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
}
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
}
}
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
// single processing
case sub.handler != nil:
for _, p := range ms { for _, p := range ms {
if err = sub.handler(p); err != nil { if err = sub.handler(p); err != nil {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc() m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
p.SetError(err)
if eh != nil { if eh != nil {
_ = eh(p) _ = eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) { } else if m.opts.Logger.V(logger.ErrorLevel) {
@ -211,7 +165,6 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1) m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1)
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1) m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1)
} }
}
} }
te := time.Since(ts) te := time.Since(ts)
@ -226,52 +179,7 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
return nil return nil
} }
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
}
m.RUnlock()
sid, err := id.New()
if err != nil {
return nil, err
}
options := NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
id: sid,
topic: topic,
batchhandler: handler,
opts: options,
ctx: ctx,
}
m.Lock()
m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
for _, sb := range m.subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.subscribers[topic] = newSubscribers
m.Unlock()
}()
return sub, nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock() m.RLock()
if !m.connected { if !m.connected {
m.RUnlock() m.RUnlock()
@ -324,38 +232,41 @@ func (m *memoryBroker) Name() string {
return m.opts.Name return m.opts.Name
} }
func (m *memoryEvent) Topic() string { type memoryMessage struct {
err error
body interface{}
topic string
opts PublishOptions
ctx context.Context
}
func (m *memoryMessage) Topic() string {
return m.topic return m.topic
} }
func (m *memoryEvent) Message() *Message { func (m *memoryMessage) Body() interface{} {
switch v := m.message.(type) { return m.body
case *Message:
return v
case []byte:
msg := &Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
}
return nil
}
return msg
} }
func (m *memoryMessage) Ack() error {
return nil return nil
} }
func (m *memoryEvent) Ack() error { func (m *memoryMessage) Error() error {
return nil
}
func (m *memoryEvent) Error() error {
return m.err return m.err
} }
func (m *memoryEvent) SetError(err error) { func (m *memoryMessage) Context() context.Context {
m.err = err return m.ctx
}
type memorySubscriber struct {
ctx context.Context
exit chan bool
handler interface{}
id string
topic string
opts SubscribeOptions
} }
func (m *memorySubscriber) Options() SubscribeOptions { func (m *memorySubscriber) Options() SubscribeOptions {
@ -372,7 +283,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
} }
// NewBroker return new memory broker // NewBroker return new memory broker
func NewBroker(opts ...Option) Broker { func NewBroker(opts ...Option) *memoryBroker {
return &memoryBroker{ return &memoryBroker{
opts: NewOptions(opts...), opts: NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber), subscribers: make(map[string][]*memorySubscriber),

View File

@ -7,6 +7,7 @@ import (
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/tracer" "go.unistack.org/micro/v4/tracer"
@ -37,8 +38,8 @@ type Options struct {
Tracer tracer.Tracer Tracer tracer.Tracer
// Register can be used for clustering // Register can be used for clustering
Register register.Register Register register.Register
// Codec holds the codec for marshal/unmarshal // Codecs holds the codec for marshal/unmarshal
Codec codec.Codec Codecs map[string]codec.Codec
// Logger used for logging // Logger used for logging
Logger logger.Logger Logger logger.Logger
// Meter used for metrics // Meter used for metrics
@ -48,15 +49,16 @@ type Options struct {
// TLSConfig holds tls.TLSConfig options // TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config TLSConfig *tls.Config
// ErrorHandler used when broker can't unmarshal incoming message // ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler ErrorHandler func(Message)
// BatchErrorHandler used when broker can't unmashal incoming messages
BatchErrorHandler BatchHandler
// Name holds the broker name // Name holds the broker name
Name string Name string
// Addrs holds the broker address // Addrs holds the broker address
Addrs []string Addrs []string
} }
// Option func
type Option func(*Options)
// NewOptions create new Options // NewOptions create new Options
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
@ -64,7 +66,7 @@ func NewOptions(opts ...Option) Options {
Logger: logger.DefaultLogger, Logger: logger.DefaultLogger,
Context: context.Background(), Context: context.Background(),
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Codec: codec.DefaultCodec, Codecs: make(map[string]codec.Codec),
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
} }
for _, o := range opts { for _, o := range opts {
@ -80,6 +82,32 @@ func Context(ctx context.Context) Option {
} }
} }
// MessageOption func
type MessageOption func(*MessageOptions)
// MessageOptions struct
type MessageOptions struct {
Metadata metadata.Metadata
ContentType string
}
// MessageMetadata pass additional message metadata
func MessageMetadata(md metadata.Metadata) MessageOption {
return func(o *MessageOptions) {
o.Metadata = md
}
}
// MessageContentType pass ContentType for message data
func MessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.ContentType = ct
}
}
// PublishOption func
type PublishOption func(*PublishOptions)
// PublishOptions struct // PublishOptions struct
type PublishOptions struct { type PublishOptions struct {
// Context holds external options // Context holds external options
@ -104,11 +132,9 @@ type SubscribeOptions struct {
// Context holds external options // Context holds external options
Context context.Context Context context.Context
// ErrorHandler used when broker can't unmarshal incoming message // ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler ErrorHandler func(Message)
// BatchErrorHandler used when broker can't unmashal incoming messages // QueueGroup holds consumer group
BatchErrorHandler BatchHandler QueueGroup string
// Group holds consumer group
Group string
// AutoAck flag specifies auto ack of incoming message when no error happens // AutoAck flag specifies auto ack of incoming message when no error happens
AutoAck bool AutoAck bool
// BodyOnly flag specifies that message contains only body bytes without header // BodyOnly flag specifies that message contains only body bytes without header
@ -119,12 +145,6 @@ type SubscribeOptions struct {
BatchWait time.Duration BatchWait time.Duration
} }
// Option func
type Option func(*Options)
// PublishOption func
type PublishOption func(*PublishOptions)
// PublishBodyOnly publish only body of the message // PublishBodyOnly publish only body of the message
func PublishBodyOnly(b bool) PublishOption { func PublishBodyOnly(b bool) PublishOption {
return func(o *PublishOptions) { return func(o *PublishOptions) {
@ -148,59 +168,29 @@ func Addrs(addrs ...string) Option {
// Codec sets the codec used for encoding/decoding used where // Codec sets the codec used for encoding/decoding used where
// a broker does not support headers // a broker does not support headers
func Codec(c codec.Codec) Option { // Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c codec.Codec) Option {
return func(o *Options) { return func(o *Options) {
o.Codec = c o.Codecs[contentType] = c
} }
} }
// ErrorHandler will catch all broker errors that cant be handled // ErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors // in normal way, for example Codec errors
func ErrorHandler(h Handler) Option { func ErrorHandler(h func(Message)) Option {
return func(o *Options) { return func(o *Options) {
o.ErrorHandler = h o.ErrorHandler = h
} }
} }
// BatchErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func BatchErrorHandler(h BatchHandler) Option {
return func(o *Options) {
o.BatchErrorHandler = h
}
}
// SubscribeErrorHandler will catch all broker errors that cant be handled // SubscribeErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors // in normal way, for example Codec errors
func SubscribeErrorHandler(h Handler) SubscribeOption { func SubscribeErrorHandler(h func(Message)) SubscribeOption {
return func(o *SubscribeOptions) { return func(o *SubscribeOptions) {
o.ErrorHandler = h o.ErrorHandler = h
} }
} }
// SubscribeBatchErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption {
return func(o *SubscribeOptions) {
o.BatchErrorHandler = h
}
}
// Queue sets the subscribers queue
// Deprecated
func Queue(name string) SubscribeOption {
return func(o *SubscribeOptions) {
o.Group = name
}
}
// SubscribeGroup sets the name of the queue to share messages on
func SubscribeGroup(name string) SubscribeOption {
return func(o *SubscribeOptions) {
o.Group = name
}
}
// Register sets register option // Register sets register option
func Register(r register.Register) Option { func Register(r register.Register) Option {
return func(o *Options) { return func(o *Options) {
@ -243,6 +233,21 @@ func Name(n string) Option {
} }
} }
// SubscribeOption func signature
type SubscribeOption func(*SubscribeOptions)
// NewSubscribeOptions creates new SubscribeOptions
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
options := SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// SubscribeContext set context // SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption { func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) { return func(o *SubscribeOptions) {
@ -250,14 +255,6 @@ func SubscribeContext(ctx context.Context) SubscribeOption {
} }
} }
// DisableAutoAck disables auto ack
// Deprecated
func DisableAutoAck() SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = false
}
}
// SubscribeAutoAck contol auto acking of messages // SubscribeAutoAck contol auto acking of messages
// after they have been handled. // after they have been handled.
func SubscribeAutoAck(b bool) SubscribeOption { func SubscribeAutoAck(b bool) SubscribeOption {
@ -287,17 +284,16 @@ func SubscribeBatchWait(td time.Duration) SubscribeOption {
} }
} }
// SubscribeOption func // SubscribeQueueGroup sets the shared queue name distributed messages across subscribers
type SubscribeOption func(*SubscribeOptions) func SubscribeQueueGroup(n string) SubscribeOption {
return func(o *SubscribeOptions) {
o.QueueGroup = n
}
}
// NewSubscribeOptions creates new SubscribeOptions // SubscribeAutoAck control auto ack processing for handler
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { func SubscribeAuthAck(b bool) SubscribeOption {
options := SubscribeOptions{ return func(o *SubscribeOptions) {
AutoAck: true, o.AutoAck = b
Context: context.Background(),
} }
for _, o := range opts {
o(&options)
}
return options
} }

98
broker/subscriber.go Normal file
View File

@ -0,0 +1,98 @@
package broker
import (
"fmt"
"reflect"
"strings"
"unicode"
"unicode/utf8"
)
const (
subSig = "func(context.Context, interface{}) error"
batchSubSig = "func([]context.Context, []interface{}) error"
)
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// ValidateSubscriber func signature
func ValidateSubscriber(sub interface{}) error {
typ := reflect.TypeOf(sub)
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 1: // func(Message) error
case 2: // func(context.Context, Message) error or func(context.Context, []Message) error
argType = typ.In(2)
// if sub.Options().Batch {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
}
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
}
// }
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
name, typ.NumOut(), subSig, batchSubSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
default:
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}

View File

@ -6,7 +6,6 @@ import (
"time" "time"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
) )
var ( var (
@ -35,23 +34,12 @@ type Client interface {
Name() string Name() string
Init(opts ...Option) error Init(opts ...Option) error
Options() Options Options() Options
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service string, endpoint string, req interface{}, opts ...RequestOption) Request NewRequest(service string, endpoint string, req interface{}, opts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error
String() string String() string
} }
// Message is the interface for publishing asynchronously
type Message interface {
Topic() string
Payload() interface{}
ContentType() string
Metadata() metadata.Metadata
}
// Request is the interface for a synchronous request used by Call or Stream // Request is the interface for a synchronous request used by Call or Stream
type Request interface { type Request interface {
// The service to call // The service to call
@ -68,16 +56,22 @@ type Request interface {
Codec() codec.Codec Codec() codec.Codec
// indicates whether the request will be a streaming one rather than unary // indicates whether the request will be a streaming one rather than unary
Stream() bool Stream() bool
// Header data
// Header() metadata.Metadata
} }
// Response is the response received from a service // Response is the response received from a service
type Response interface { type Response interface {
// Read the response // Read the response
Codec() codec.Codec Codec() codec.Codec
// The content type
// ContentType() string
// Header data // Header data
Header() metadata.Metadata // Header() metadata.Metadata
// Read the undecoded response // Read the undecoded response
Read() ([]byte, error) Read() ([]byte, error)
// The unencoded request body
// Body() interface{}
} }
// Stream is the interface for a bidirectional synchronous stream // Stream is the interface for a bidirectional synchronous stream

View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"time" "time"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/metadata"
@ -285,6 +284,9 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
ch := make(chan error, callOpts.Retries) ch := make(chan error, callOpts.Retries)
var gerr error var gerr error
ts := time.Now()
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Inc()
for i := 0; i <= callOpts.Retries; i++ { for i := 0; i <= callOpts.Retries; i++ {
go func() { go func() {
ch <- call(i) ch <- call(i)
@ -312,6 +314,16 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
} }
} }
if gerr != nil {
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "failure").Inc()
} else {
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "success").Inc()
}
n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Dec()
te := time.Since(ts)
n.opts.Meter.Summary(ClientRequestLatencyMicroseconds, "endpoint", endpoint).Update(te.Seconds())
n.opts.Meter.Histogram(ClientRequestDurationSeconds, "endpoint", endpoint).Update(te.Seconds())
return gerr return gerr
} }
@ -323,11 +335,6 @@ func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts
return &noopRequest{service: service, endpoint: endpoint} return &noopRequest{service: service, endpoint: endpoint}
} }
func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message {
options := NewMessageOptions(append([]MessageOption{MessageContentType(n.opts.ContentType)}, opts...)...)
return &noopMessage{topic: topic, payload: msg, opts: options}
}
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
var err error var err error
@ -414,7 +421,15 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
node := next() node := next()
// ts := time.Now()
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Inc()
stream, cerr := n.stream(ctx, node, req, callOpts) stream, cerr := n.stream(ctx, node, req, callOpts)
if cerr != nil {
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "failure").Inc()
} else {
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "success").Inc()
}
// record the result of the call to inform future routing decisions // record the result of the call to inform future routing decisions
if verr := n.opts.Selector.Record(node, cerr); verr != nil { if verr := n.opts.Selector.Record(node, cerr); verr != nil {
@ -468,64 +483,6 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
return nil, grr return nil, grr
} }
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) { func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (*noopStream, error) {
return &noopStream{}, nil return &noopStream{}, nil
} }
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
return n.publish(ctx, ps, opts...)
}
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
return n.publish(ctx, []Message{p}, opts...)
}
func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error {
options := NewPublishOptions(opts...)
msgs := make([]*broker.Message, 0, len(ps))
for _, p := range ps {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
md[metadata.HeaderTopic] = topic
var body []byte
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data
} else {
// use codec for payload
cf, err := n.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
}
msgs = append(msgs, &broker.Message{Header: md, Body: body})
}
return n.opts.Broker.BatchPublish(ctx, msgs,
broker.PublishContext(options.Context),
broker.PublishBodyOnly(options.BodyOnly),
)
}

View File

@ -6,7 +6,6 @@ import (
"net" "net"
"time" "time"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/metadata"
@ -19,6 +18,17 @@ import (
"go.unistack.org/micro/v4/tracer" "go.unistack.org/micro/v4/tracer"
) )
var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
)
// Options holds client options // Options holds client options
type Options struct { type Options struct {
// Transport used for transfer messages // Transport used for transfer messages
@ -29,8 +39,6 @@ type Options struct {
Logger logger.Logger Logger logger.Logger
// Tracer used for tracing // Tracer used for tracing
Tracer tracer.Tracer Tracer tracer.Tracer
// Broker used to publish messages
Broker broker.Broker
// Meter used for metrics // Meter used for metrics
Meter meter.Meter Meter meter.Meter
// Context is used for external options // Context is used for external options
@ -199,7 +207,6 @@ func NewOptions(opts ...Option) Options {
PoolTTL: DefaultPoolTTL, PoolTTL: DefaultPoolTTL,
Selector: random.NewSelector(), Selector: random.NewSelector(),
Logger: logger.DefaultLogger, Logger: logger.DefaultLogger,
Broker: broker.DefaultBroker,
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
Router: router.DefaultRouter, Router: router.DefaultRouter,
@ -213,13 +220,6 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
// Broker to be used for pub/sub
func Broker(b broker.Broker) Option {
return func(o *Options) {
o.Broker = b
}
}
// Tracer to be used for tracing // Tracer to be used for tracing
func Tracer(t tracer.Tracer) Option { func Tracer(t tracer.Tracer) Option {
return func(o *Options) { return func(o *Options) {

View File

@ -17,6 +17,7 @@ type cfg struct {
IntValue int `default:"99"` IntValue int `default:"99"`
DurationValue time.Duration `default:"10s"` DurationValue time.Duration `default:"10s"`
MDurationValue mtime.Duration `default:"10s"` MDurationValue mtime.Duration `default:"10s"`
MapValue map[string]bool `default:"key1=true,key2=false"`
} }
type cfgStructValue struct { type cfgStructValue struct {
@ -67,6 +68,9 @@ func TestDefault(t *testing.T) {
if conf.StringValue != "after_load" { if conf.StringValue != "after_load" {
t.Fatal("AfterLoad option not working") t.Fatal("AfterLoad option not working")
} }
if len(conf.MapValue) != 2 {
t.Fatalf("map value invalid: %#+v\n", conf.MapValue)
}
_ = conf _ = conf
// t.Logf("%#+v\n", conf) // t.Logf("%#+v\n", conf)
} }

View File

@ -1,27 +0,0 @@
package micro
import (
"context"
"go.unistack.org/micro/v4/client"
)
// Event is used to publish messages to a topic
type Event interface {
// Publish publishes a message to the event topic
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}
type event struct {
c client.Client
topic string
}
// NewEvent creates a new event publisher
func NewEvent(topic string, c client.Client) Event {
return &event{c, topic}
}
func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...)
}

View File

@ -11,23 +11,6 @@ import (
) )
var ( var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
labelSuccess = "success" labelSuccess = "success"
labelFailure = "failure" labelFailure = "failure"
labelStatus = "status" labelStatus = "status"

View File

@ -90,33 +90,7 @@ type Option func(*Options) error
// Broker to be used for client and server // Broker to be used for client and server
func Broker(b broker.Broker, opts ...BrokerOption) Option { func Broker(b broker.Broker, opts ...BrokerOption) Option {
return func(o *Options) error { return func(o *Options) error {
var err error o.Brokers = []broker.Broker{b}
bopts := brokerOptions{}
for _, opt := range opts {
opt(&bopts)
}
all := false
if len(opts) == 0 {
all = true
}
for _, srv := range o.Servers {
for _, os := range bopts.servers {
if srv.Name() == os || all {
if err = srv.Init(server.Broker(b)); err != nil {
return err
}
}
}
}
for _, cli := range o.Clients {
for _, oc := range bopts.clients {
if cli.Name() == oc || all {
if err = cli.Init(client.Broker(b)); err != nil {
return err
}
}
}
}
return nil return nil
} }
} }

View File

@ -33,16 +33,6 @@ func SetOption(k, v interface{}) Option {
} }
} }
// SetSubscriberOption returns a function to setup a context with given value
func SetSubscriberOption(k, v interface{}) SubscriberOption {
return func(o *SubscriberOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
// SetHandlerOption returns a function to setup a context with given value // SetHandlerOption returns a function to setup a context with given value
func SetHandlerOption(k, v interface{}) HandlerOption { func SetHandlerOption(k, v interface{}) HandlerOption {
return func(o *HandlerOptions) { return func(o *HandlerOptions) {

View File

@ -51,14 +51,3 @@ func TestSetOption(t *testing.T) {
t.Fatal("SetOption not works") t.Fatal("SetOption not works")
} }
} }
func TestSetSubscriberOption(t *testing.T) {
type key struct{}
o := SetSubscriberOption(key{}, "test")
opts := &SubscriberOptions{}
o(opts)
if v, ok := opts.Context.Value(key{}).(string); !ok || v == "" {
t.Fatal("SetSubscriberOption not works")
}
}

View File

@ -1,59 +0,0 @@
package server
import (
"reflect"
"go.unistack.org/micro/v4/register"
)
type rpcHandler struct {
opts HandlerOptions
handler interface{}
name string
endpoints []*register.Endpoint
}
func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
options := NewHandlerOptions(opts...)
typ := reflect.TypeOf(handler)
hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name()
var endpoints []*register.Endpoint
for m := 0; m < typ.NumMethod(); m++ {
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
for k, v := range options.Metadata[e.Name] {
e.Metadata[k] = v
}
endpoints = append(endpoints, e)
}
}
return &rpcHandler{
name: name,
handler: handler,
endpoints: endpoints,
opts: options,
}
}
func (r *rpcHandler) Name() string {
return r.name
}
func (r *rpcHandler) Handler() interface{} {
return r.handler
}
func (r *rpcHandler) Endpoints() []*register.Endpoint {
return r.endpoints
}
func (r *rpcHandler) Options() HandlerOptions {
return r.opts
}

View File

@ -1,12 +1,11 @@
package server package server
import ( import (
"fmt" "reflect"
"sort" "sort"
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/register" "go.unistack.org/micro/v4/register"
@ -29,7 +28,6 @@ type noopServer struct {
wg *sync.WaitGroup wg *sync.WaitGroup
rsvc *register.Service rsvc *register.Service
handlers map[string]Handler handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber
exit chan chan error exit chan chan error
opts Options opts Options
sync.RWMutex sync.RWMutex
@ -43,9 +41,6 @@ func NewServer(opts ...Option) Server {
if n.handlers == nil { if n.handlers == nil {
n.handlers = make(map[string]Handler) n.handlers = make(map[string]Handler)
} }
if n.subscribers == nil {
n.subscribers = make(map[*subscriber][]broker.Subscriber)
}
if n.exit == nil { if n.exit == nil {
n.exit = make(chan chan error) n.exit = make(chan chan error)
} }
@ -71,37 +66,10 @@ func (n *noopServer) Name() string {
return n.opts.Name return n.opts.Name
} }
func (n *noopServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
} else if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := ValidateSubscriber(sb); err != nil {
return err
}
n.Lock()
if _, ok = n.subscribers[sub]; ok {
n.Unlock()
return fmt.Errorf("subscriber %v already exists", sub)
}
n.subscribers[sub] = nil
n.Unlock()
return nil
}
func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
return newRPCHandler(h, opts...) return newRPCHandler(h, opts...)
} }
func (n *noopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (n *noopServer) Init(opts ...Option) error { func (n *noopServer) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
o(&n.opts) o(&n.opts)
@ -110,9 +78,6 @@ func (n *noopServer) Init(opts ...Option) error {
if n.handlers == nil { if n.handlers == nil {
n.handlers = make(map[string]Handler, 1) n.handlers = make(map[string]Handler, 1)
} }
if n.subscribers == nil {
n.subscribers = make(map[*subscriber][]broker.Subscriber, 1)
}
if n.exit == nil { if n.exit == nil {
n.exit = make(chan chan error) n.exit = make(chan chan error)
@ -158,21 +123,10 @@ func (n *noopServer) Register() error {
sort.Strings(handlerList) sort.Strings(handlerList)
subscriberList := make([]*subscriber, 0, len(n.subscribers)) endpoints := make([]*register.Endpoint, 0, len(handlerList))
for e := range n.subscribers {
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, h := range handlerList { for _, h := range handlerList {
endpoints = append(endpoints, n.handlers[h].Endpoints()...) endpoints = append(endpoints, n.handlers[h].Endpoints()...)
} }
for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...)
}
n.RUnlock() n.RUnlock()
service.Nodes[0].Metadata["protocol"] = "noop" service.Nodes[0].Metadata["protocol"] = "noop"
@ -202,39 +156,6 @@ func (n *noopServer) Register() error {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
cx := config.Context
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
}
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
n.registered = true n.registered = true
if cacheService { if cacheService {
n.rsvc = service n.rsvc = service
@ -273,33 +194,6 @@ func (n *noopServer) Deregister() error {
n.registered = false n.registered = false
cx := config.Context
wg := sync.WaitGroup{}
for sb, subs := range n.subscribers {
for idx := range subs {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
ncx := cx
wg.Add(1)
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic())
}
if err := s.Unsubscribe(ncx); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err)
}
}
}(subs[idx])
}
n.subscribers[sb] = nil
}
wg.Wait()
n.Unlock() n.Unlock()
return nil return nil
} }
@ -336,21 +230,6 @@ func (n *noopServer) Start() error {
} }
n.Unlock() n.Unlock()
// only connect if we're subscribed
if len(n.subscribers) > 0 {
// connect to the broker
if err := config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
}
}
// use RegisterCheck func before register // use RegisterCheck func before register
// nolint: nestif // nolint: nestif
if err := config.RegisterCheck(config.Context); err != nil { if err := config.RegisterCheck(config.Context); err != nil {
@ -429,16 +308,6 @@ func (n *noopServer) Start() error {
// close transport // close transport
ch <- nil ch <- nil
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
}
// disconnect broker
if err := config.Broker.Disconnect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err)
}
}
}() }()
// mark the server as started // mark the server as started
@ -468,3 +337,55 @@ func (n *noopServer) Stop() error {
return err return err
} }
type rpcHandler struct {
opts HandlerOptions
handler interface{}
name string
endpoints []*register.Endpoint
}
func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
options := NewHandlerOptions(opts...)
typ := reflect.TypeOf(handler)
hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name()
var endpoints []*register.Endpoint
for m := 0; m < typ.NumMethod(); m++ {
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
for k, v := range options.Metadata[e.Name] {
e.Metadata[k] = v
}
endpoints = append(endpoints, e)
}
}
return &rpcHandler{
name: name,
handler: handler,
endpoints: endpoints,
opts: options,
}
}
func (r *rpcHandler) Name() string {
return r.name
}
func (r *rpcHandler) Handler() interface{} {
return r.handler
}
func (r *rpcHandler) Endpoints() []*register.Endpoint {
return r.endpoints
}
func (r *rpcHandler) Options() HandlerOptions {
return r.opts
}

View File

@ -1,104 +0,0 @@
package server_test
import (
"context"
"fmt"
"testing"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/server"
)
type TestHandler struct {
t *testing.T
}
type TestMessage struct {
Name string
}
func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error {
// fmt.Printf("msg %s\n", msg.Data)
return nil
}
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
if len(msgs) != 8 {
h.t.Fatal("invalid number of messages received")
}
for idx := 0; idx < len(msgs); idx++ {
md, _ := metadata.FromIncomingContext(ctxs[idx])
_ = md
// fmt.Printf("msg md %v\n", md)
}
return nil
}
func TestNoopSub(t *testing.T) {
ctx := context.Background()
b := broker.NewBroker()
if err := b.Init(); err != nil {
t.Fatal(err)
}
if err := b.Connect(ctx); err != nil {
t.Fatal(err)
}
logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel))
s := server.NewServer(
server.Broker(b),
server.Codec("application/octet-stream", codec.NewCodec()),
)
if err := s.Init(); err != nil {
t.Fatal(err)
}
c := client.NewClient(
client.Broker(b),
client.Codec("application/octet-stream", codec.NewCodec()),
client.ContentType("application/octet-stream"),
)
if err := c.Init(); err != nil {
t.Fatal(err)
}
h := &TestHandler{t: t}
if err := s.Subscribe(s.NewSubscriber("single_topic", h.SingleSubHandler,
server.SubscriberQueue("queue"),
)); err != nil {
t.Fatal(err)
}
if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
server.SubscriberQueue("queue"),
server.SubscriberBatch(true),
)); err != nil {
t.Fatal(err)
}
if err := s.Start(); err != nil {
t.Fatal(err)
}
msgs := make([]client.Message, 0, 8)
for i := 0; i < 8; i++ {
msgs = append(msgs, c.NewMessage("batch_topic", &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))}))
}
if err := c.BatchPublish(ctx, msgs); err != nil {
t.Fatal(err)
}
defer func() {
if err := s.Stop(); err != nil {
t.Fatal(err)
}
}()
}

View File

@ -7,7 +7,6 @@ import (
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/metadata"
@ -19,6 +18,17 @@ import (
"go.unistack.org/micro/v4/util/id" "go.unistack.org/micro/v4/util/id"
) )
var (
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
)
// Option func // Option func
type Option func(*Options) type Option func(*Options)
@ -26,8 +36,6 @@ type Option func(*Options)
type Options struct { type Options struct {
// Context holds the external options and can be used for server shutdown // Context holds the external options and can be used for server shutdown
Context context.Context Context context.Context
// Broker holds the server broker
Broker broker.Broker
// Register holds the register // Register holds the register
Register register.Register Register register.Register
// Tracer holds the tracer // Tracer holds the tracer
@ -38,12 +46,6 @@ type Options struct {
Meter meter.Meter Meter meter.Meter
// Transport holds the transport // Transport holds the transport
Transport transport.Transport Transport transport.Transport
/*
// Router for requests
Router Router
*/
// Listener may be passed if already created // Listener may be passed if already created
Listener net.Listener Listener net.Listener
// Wait group // Wait group
@ -68,10 +70,6 @@ type Options struct {
Advertise string Advertise string
// Version holds the server version // Version holds the server version
Version string Version string
// SubWrappers holds the server subscribe wrappers
SubWrappers []SubscriberWrapper
// BatchSubWrappers holds the server batch subscribe wrappers
BatchSubWrappers []BatchSubscriberWrapper
// HdlrWrappers holds the handler wrappers // HdlrWrappers holds the handler wrappers
HdlrWrappers []HandlerWrapper HdlrWrappers []HandlerWrapper
// RegisterAttempts holds the number of register attempts before error // RegisterAttempts holds the number of register attempts before error
@ -84,7 +82,7 @@ type Options struct {
MaxConn int MaxConn int
// DeregisterAttempts holds the number of deregister attempts before error // DeregisterAttempts holds the number of deregister attempts before error
DeregisterAttempts int DeregisterAttempts int
// Hooks may contains SubscriberWrapper, HandlerWrapper or Server func wrapper // Hooks may contains HandlerWrapper or Server func wrapper
Hooks options.Hooks Hooks options.Hooks
} }
@ -100,7 +98,6 @@ func NewOptions(opts ...Option) Options {
Logger: logger.DefaultLogger, Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
Broker: broker.DefaultBroker,
Register: register.DefaultRegister, Register: register.DefaultRegister,
Transport: transport.DefaultTransport, Transport: transport.DefaultTransport,
Address: DefaultAddress, Address: DefaultAddress,
@ -173,13 +170,6 @@ func Advertise(a string) Option {
} }
} }
// Broker to use for pub/sub
func Broker(b broker.Broker) Option {
return func(o *Options) {
o.Broker = b
}
}
// Codec to use to encode/decode requests for a given content type // Codec to use to encode/decode requests for a given content type
func Codec(contentType string, c codec.Codec) Option { func Codec(contentType string, c codec.Codec) Option {
return func(o *Options) { return func(o *Options) {
@ -261,15 +251,6 @@ func TLSConfig(t *tls.Config) Option {
} }
} }
/*
// WithRouter sets the request router
func WithRouter(r Router) Option {
return func(o *Options) {
o.Router = r
}
}
*/
// Wait tells the server to wait for requests to finish before exiting // Wait tells the server to wait for requests to finish before exiting
// If `wg` is nil, server only wait for completion of rpc handler. // If `wg` is nil, server only wait for completion of rpc handler.
// For user need finer grained control, pass a concrete `wg` here, server will // For user need finer grained control, pass a concrete `wg` here, server will
@ -290,20 +271,6 @@ func WrapHandler(w HandlerWrapper) Option {
} }
} }
// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
func WrapSubscriber(w SubscriberWrapper) Option {
return func(o *Options) {
o.SubWrappers = append(o.SubWrappers, w)
}
}
// WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server
func WrapBatchSubscriber(w BatchSubscriberWrapper) Option {
return func(o *Options) {
o.BatchSubWrappers = append(o.BatchSubWrappers, w)
}
}
// MaxConn specifies maximum number of max simultaneous connections to server // MaxConn specifies maximum number of max simultaneous connections to server
func MaxConn(n int) Option { func MaxConn(n int) Option {
return func(o *Options) { return func(o *Options) {
@ -343,41 +310,6 @@ func NewHandlerOptions(opts ...HandlerOption) HandlerOptions {
return options return options
} }
// SubscriberOption func
type SubscriberOption func(*SubscriberOptions)
// SubscriberOptions struct
type SubscriberOptions struct {
// Context holds the external options
Context context.Context
// Queue holds the subscription queue
Queue string
// AutoAck flag for auto ack messages after processing
AutoAck bool
// BodyOnly flag specifies that message without headers
BodyOnly bool
// Batch flag specifies that message processed in batches
Batch bool
// BatchSize flag specifies max size of batch
BatchSize int
// BatchWait flag specifies max wait time for batch filling
BatchWait time.Duration
}
// NewSubscriberOptions create new SubscriberOptions
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
options := SubscriberOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// EndpointMetadata is a Handler option that allows metadata to be added to // EndpointMetadata is a Handler option that allows metadata to be added to
// individual endpoints. // individual endpoints.
func EndpointMetadata(name string, md metadata.Metadata) HandlerOption { func EndpointMetadata(name string, md metadata.Metadata) HandlerOption {
@ -385,68 +317,3 @@ func EndpointMetadata(name string, md metadata.Metadata) HandlerOption {
o.Metadata[name] = metadata.Copy(md) o.Metadata[name] = metadata.Copy(md)
} }
} }
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
func DisableAutoAck() SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = false
}
}
// SubscriberQueue sets the shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberGroup sets the shared group name distributed messages across subscribers
func SubscriberGroup(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberBodyOnly says broker that message contains raw data with absence of micro broker.Message format
func SubscriberBodyOnly(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.BodyOnly = b
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
}
// SubscriberAck control auto ack processing for handler
func SubscriberAck(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = b
}
}
// SubscriberBatch control batch processing for handler
func SubscriberBatch(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Batch = b
}
}
// SubscriberBatchSize control batch filling size for handler
// Batch filling max waiting time controlled by SubscriberBatchWait
func SubscriberBatchSize(n int) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchSize = n
}
}
// SubscriberBatchWait control batch filling wait time for handler
func SubscriberBatchWait(td time.Duration) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchWait = td
}
}

View File

@ -78,7 +78,6 @@ func NewRegisterService(s Server) (*register.Service, error) {
node.Metadata = metadata.Copy(opts.Metadata) node.Metadata = metadata.Copy(opts.Metadata)
node.Metadata["server"] = s.String() node.Metadata["server"] = s.String()
node.Metadata["broker"] = opts.Broker.String()
node.Metadata["register"] = opts.Register.String() node.Metadata["register"] = opts.Register.String()
return &register.Service{ return &register.Service{

View File

@ -1,35 +0,0 @@
package server
import (
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
)
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
body []byte
}
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Body() interface{} {
return r.payload
}
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}
func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}

View File

@ -48,10 +48,6 @@ type Server interface {
Handle(h Handler) error Handle(h Handler) error
// Create a new handler // Create a new handler
NewHandler(h interface{}, opts ...HandlerOption) Handler NewHandler(h interface{}, opts ...HandlerOption) Handler
// Create a new subscriber
NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber
// Register a subscriber
Subscribe(s Subscriber) error
// Start the server // Start the server
Start() error Start() error
// Stop the server // Stop the server
@ -60,30 +56,6 @@ type Server interface {
String() string String() string
} }
/*
// Router handle serving messages
type Router interface {
// ProcessMessage processes a message
ProcessMessage(ctx context.Context, msg Message) error
// ServeRequest processes a request to completion
ServeRequest(ctx context.Context, req Request, rsp Response) error
}
*/
// Message is an async message interface
type Message interface {
// Topic of the message
Topic() string
// The decoded payload value
Body() interface{}
// The content type of the payload
ContentType() string
// The raw headers of the message
Header() metadata.Metadata
// Codec used to decode the message
Codec() codec.Codec
}
// Request is a synchronous request interface // Request is a synchronous request interface
type Request interface { type Request interface {
// Service name requested // Service name requested
@ -156,13 +128,3 @@ type Handler interface {
Endpoints() []*register.Endpoint Endpoints() []*register.Endpoint
Options() HandlerOptions Options() HandlerOptions
} }
// Subscriber interface represents a subscription to a given topic using
// a specific subscriber function or object with endpoints. It mirrors
// the handler in its behaviour.
type Subscriber interface {
Topic() string
Subscriber() interface{}
Endpoints() []*register.Endpoint
Options() SubscriberOptions
}

View File

@ -1,440 +0,0 @@
package server
import (
"bytes"
"context"
"fmt"
"reflect"
"runtime/debug"
"strings"
"unicode"
"unicode/utf8"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/register"
)
const (
subSig = "func(context.Context, interface{}) error"
batchSubSig = "func([]context.Context, []interface{}) error"
)
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type handler struct {
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type subscriber struct {
typ reflect.Type
subscriber interface{}
topic string
endpoints []*register.Endpoint
handlers []*handler
opts SubscriberOptions
rcvr reflect.Value
}
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// ValidateSubscriber func signature
func ValidateSubscriber(sub Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
if sub.Options().Batch {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
}
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
}
}
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
name, typ.NumOut(), subSig, batchSubSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
default:
hdlr := reflect.ValueOf(sub.Subscriber())
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
var endpoints []*register.Endpoint
var handlers []*handler
options := NewSubscriberOptions(opts...)
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: "Func",
Request: register.ExtractSubValue(typ),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: name + "." + method.Name,
Request: register.ExtractSubValue(method.Type),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
}
}
return &subscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
}
}
//nolint:gocyclo
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) {
defer func() {
if r := recover(); r != nil {
n.RLock()
config := n.opts
n.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msgs := make([]Message, 0, len(ps))
ctxs := make([]context.Context, 0, len(ps))
for _, p := range ps {
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
}
ct, _ := msg.Header.Get(metadata.HeaderContentType)
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
hdr := metadata.Copy(msg.Header)
topic, _ := msg.Header.Get(metadata.HeaderTopic)
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
msgs = append(msgs, &rpcMessage{
topic: topic,
contentType: ct,
header: msg.Header,
body: msg.Body,
})
}
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var req reflect.Value
switch handler.reqType.Kind() {
case reflect.Ptr:
req = reflect.New(handler.reqType.Elem())
default:
req = reflect.New(handler.reqType.Elem()).Elem()
}
reqType := handler.reqType
var cf codec.Codec
for _, msg := range msgs {
cf, err = n.newCodec(msg.ContentType())
if err != nil {
return err
}
rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
return err
}
msg.(*rpcMessage).codec = cf
msg.(*rpcMessage).payload = rb.Interface()
}
fn := func(ctxs []context.Context, ms []Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctxs))
}
payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
}
vals = append(vals, payloads)
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
for i := len(opts.BatchSubWrappers); i > 0; i-- {
fn = opts.BatchSubWrappers[i-1](fn)
}
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
results <- fn(ctxs, msgs)
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
//nolint:gocyclo
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {
n.RLock()
config := n.opts
n.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
}
ct := msg.Header["Content-Type"]
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
cf, err := n.newCodec(ct)
if err != nil {
return err
}
hdr := metadata.New(len(msg.Header))
for k, v := range msg.Header {
if k == "Content-Type" {
continue
}
hdr.Set(k, v)
}
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
cerr := fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
})
results <- cerr
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
func (s *subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Endpoints() []*register.Endpoint {
return s.endpoints
}
func (s *subscriber) Options() SubscriberOptions {
return s.opts
}

View File

@ -9,25 +9,9 @@ import (
// request and response types. // request and response types.
type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// SubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message.
type SubscriberFunc func(ctx context.Context, msg Message) error
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message. This func used by batch subscribers
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent // HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
// StreamWrapper wraps a Stream interface and returns the equivalent. // StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this // Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring, // is a convenient way to wrap a Stream as its in use for trace, monitoring,

View File

@ -66,11 +66,6 @@ func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOptio
return s.Handle(s.NewHandler(h, opts...)) return s.Handle(s.NewHandler(h, opts...))
} }
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}
type service struct { type service struct {
sync.RWMutex sync.RWMutex
opts Options opts Options

View File

@ -1,7 +1,6 @@
package micro package micro
import ( import (
"context"
"reflect" "reflect"
"testing" "testing"
@ -65,41 +64,6 @@ func TestRegisterHandler(t *testing.T) {
} }
} }
func TestRegisterSubscriber(t *testing.T) {
type args struct {
topic string
s server.Server
h interface{}
opts []server.SubscriberOption
}
h := func(_ context.Context, _ interface{}) error {
return nil
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "RegisterSubscriber",
args: args{
topic: "test",
s: server.DefaultServer,
h: h,
opts: nil,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := RegisterSubscriber(tt.args.topic, tt.args.s, tt.args.h, tt.args.opts...); (err != nil) != tt.wantErr {
t.Errorf("RegisterSubscriber() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestNewService(t *testing.T) { func TestNewService(t *testing.T) {
type args struct { type args struct {
opts []Option opts []Option
@ -222,7 +186,6 @@ func Test_service_Options(t *testing.T) {
} }
func Test_service_Broker(t *testing.T) { func Test_service_Broker(t *testing.T) {
b := broker.NewBroker()
type fields struct { type fields struct {
opts Options opts Options
} }
@ -238,12 +201,12 @@ func Test_service_Broker(t *testing.T) {
{ {
name: "service.Broker", name: "service.Broker",
fields: fields{ fields: fields{
opts: Options{Brokers: []broker.Broker{b}}, opts: Options{Brokers: []broker.Broker{broker.DefaultBroker}},
}, },
args: args{ args: args{
names: []string{"noop"}, names: []string{"noop"},
}, },
want: b, want: broker.DefaultBroker,
}, },
} }
for _, tt := range tests { for _, tt := range tests {