From 819ad1117a171d77a8d735c7f052c1e082d87c71 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 9 May 2023 20:04:15 +0300 Subject: [PATCH] cleanup interfaces for v4 Signed-off-by: Vasiliy Tolstov --- broker/broker.go | 62 ++---- broker/memory.go | 213 ++++++------------- broker/options.go | 140 ++++++------- broker/subscriber.go | 98 +++++++++ client/client.go | 20 +- client/noop.go | 87 ++------ client/options.go | 22 +- config/default_test.go | 10 +- event.go | 27 --- meter/wrapper/wrapper.go | 17 -- options.go | 28 +-- server/context.go | 10 - server/context_test.go | 11 - server/handler.go | 59 ------ server/noop.go | 199 ++++++------------ server/noop_test.go | 104 --------- server/options.go | 157 ++------------ server/registry.go | 1 - server/request.go | 35 ---- server/server.go | 38 ---- server/subscriber.go | 440 --------------------------------------- server/wrapper.go | 16 -- service.go | 5 - service_test.go | 41 +--- 24 files changed, 368 insertions(+), 1472 deletions(-) create mode 100644 broker/subscriber.go delete mode 100644 event.go delete mode 100644 server/handler.go delete mode 100644 server/noop_test.go delete mode 100644 server/request.go delete mode 100644 server/subscriber.go diff --git a/broker/broker.go b/broker/broker.go index 35fc1eb7..9761bf5e 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -10,13 +10,15 @@ import ( ) // DefaultBroker default memory broker -var DefaultBroker = NewBroker() +var DefaultBroker Broker // = NewBroker() var ( // ErrNotConnected returns when broker used but not connected yet ErrNotConnected = errors.New("broker not connected") // ErrDisconnected returns when broker disconnected ErrDisconnected = errors.New("broker disconnected") + // ErrInvalidMessage returns when message has nvalid format + ErrInvalidMessage = errors.New("broker message has invalid format") ) // Broker is an interface used for asynchronous messaging. @@ -33,61 +35,33 @@ type Broker interface { Connect(ctx context.Context) error // Disconnect disconnect from broker Disconnect(ctx context.Context) error + // NewMessage creates new broker message + NewMessage(endpoint string, req interface{}, opts ...MessageOption) Message // Publish message to broker topic - Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error + Publish(ctx context.Context, msg interface{}, opts ...PublishOption) error // Subscribe subscribes to topic message via handler - Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) - // BatchPublish messages to broker with multiple topics - BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error - // BatchSubscribe subscribes to topic messages via handler - BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) + Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) // String type of broker String() string } -// Handler is used to process messages via a subscription of a topic. -type Handler func(Event) error - -// Events contains multiple events -type Events []Event - -// Ack try to ack all events and return -func (evs Events) Ack() error { - var err error - for _, ev := range evs { - if err = ev.Ack(); err != nil { - return err - } - } - return nil -} - -// SetError sets error on event -func (evs Events) SetError(err error) { - for _, ev := range evs { - ev.SetError(err) - } -} - -// BatchHandler is used to process messages in batches via a subscription of a topic. -type BatchHandler func(Events) error - -// Event is given to a subscription handler for processing -type Event interface { +// Message is given to a subscription handler for processing +type Message interface { + // Context for the message + Context() context.Context // Topic returns event topic Topic() string - // Message returns broker message - Message() *Message + // Body returns broker message + Body() interface{} // Ack acknowledge message Ack() error // Error returns message error (like decoding errors or some other) + // In this case Body contains raw []byte from broker Error() error - // SetError set event processing error - SetError(err error) } -// Message is used to transfer data -type Message struct { +// RawMessage is used to transfer data +type RawMessage struct { // Header contains message metadata Header metadata.Metadata // Body contains message body @@ -95,8 +69,8 @@ type Message struct { } // NewMessage create broker message with topic filled -func NewMessage(topic string) *Message { - m := &Message{Header: metadata.New(2)} +func NewRawMessage(topic string) *RawMessage { + m := &RawMessage{Header: metadata.New(2)} m.Header.Set(metadata.HeaderTopic, topic) return m } diff --git a/broker/memory.go b/broker/memory.go index 08d55d55..8add9780 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -1,3 +1,5 @@ +//go:build ignore + package broker import ( @@ -6,7 +8,6 @@ import ( "time" "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/metadata" maddr "go.unistack.org/micro/v4/util/addr" "go.unistack.org/micro/v4/util/id" mnet "go.unistack.org/micro/v4/util/net" @@ -21,23 +22,6 @@ type memoryBroker struct { connected bool } -type memoryEvent struct { - err error - message interface{} - topic string - opts Options -} - -type memorySubscriber struct { - ctx context.Context - exit chan bool - handler Handler - batchhandler BatchHandler - id string - topic string - opts SubscribeOptions -} - func (m *memoryBroker) Options() Options { return m.opts } @@ -89,16 +73,11 @@ func (m *memoryBroker) Init(opts ...Option) error { return nil } -func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { - msg.Header.Set(metadata.HeaderTopic, topic) - return m.publish(ctx, []*Message{msg}, opts...) +func (m *memoryBroker) NewMessage(endpoint string, req interface{}, opts ...MessageOption) Message { + return &memoryMessage{} } -func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { - return m.publish(ctx, msgs, opts...) -} - -func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { +func (m *memoryBroker) Publish(ctx context.Context, message interface{}, opts ...PublishOption) error { m.RLock() if !m.connected { m.RUnlock() @@ -113,25 +92,32 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub return ctx.Err() default: options := NewPublishOptions(opts...) - - msgTopicMap := make(map[string]Events) - for _, v := range msgs { - p := &memoryEvent{opts: m.opts} - - if m.opts.Codec == nil || options.BodyOnly { - p.topic, _ = v.Header.Get(metadata.HeaderTopic) - p.message = v.Body - } else { - p.topic, _ = v.Header.Get(metadata.HeaderTopic) - p.message, err = m.opts.Codec.Marshal(v) - if err != nil { - return err + var msgs []*memoryMessage + switch v := message.(type) { + case *memoryMessage: + msgs = []*memoryMessage{v} + case []*memoryMessage: + msgs = v + default: + return ErrInvalidMessage + } + msgTopicMap := make(map[string][]*memoryMessage) + for _, msg := range msgs { + p := &memoryMessage{opts: options} + /* + if mb, ok := msg.Body().(*codec.Frame); ok { + p.message = v.Body + } else { + p.topic, _ = v.Header.Get(metadata.HeaderTopic) + p.message, err = m.opts.Codec.Marshal(v) + if err != nil { + return err + } } - } - msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p) + */ + msgTopicMap[msg.Topic()] = append(msgTopicMap[p.topic], p) } - beh := m.opts.BatchErrorHandler eh := m.opts.ErrorHandler for t, ms := range msgTopicMap { @@ -152,28 +138,21 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms)) for _, sub := range subs { - if sub.opts.BatchErrorHandler != nil { - beh = sub.opts.BatchErrorHandler - } if sub.opts.ErrorHandler != nil { eh = sub.opts.ErrorHandler } - switch { - // batch processing - case sub.batchhandler != nil: - - if err = sub.batchhandler(ms); err != nil { + for _, p := range ms { + if err = sub.handler(p); err != nil { m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc() - ms.SetError(err) - if beh != nil { - _ = beh(ms) + if eh != nil { + _ = eh(p) } else if m.opts.Logger.V(logger.ErrorLevel) { m.opts.Logger.Error(m.opts.Context, err.Error()) } } else { if sub.opts.AutoAck { - if err = ms.Ack(); err != nil { + if err = p.Ack(); err != nil { m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc() } else { @@ -183,34 +162,8 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc() } } - m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms)) - m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms)) - // single processing - case sub.handler != nil: - for _, p := range ms { - if err = sub.handler(p); err != nil { - m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc() - p.SetError(err) - if eh != nil { - _ = eh(p) - } else if m.opts.Logger.V(logger.ErrorLevel) { - m.opts.Logger.Error(m.opts.Context, err.Error()) - } - } else { - if sub.opts.AutoAck { - if err = p.Ack(); err != nil { - m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) - m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc() - } else { - m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc() - } - } else { - m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc() - } - } - m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1) - m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1) - } + m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1) + m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1) } } @@ -226,52 +179,7 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub return nil } -func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { - m.RLock() - if !m.connected { - m.RUnlock() - return nil, ErrNotConnected - } - m.RUnlock() - - sid, err := id.New() - if err != nil { - return nil, err - } - - options := NewSubscribeOptions(opts...) - - sub := &memorySubscriber{ - exit: make(chan bool, 1), - id: sid, - topic: topic, - batchhandler: handler, - opts: options, - ctx: ctx, - } - - m.Lock() - m.subscribers[topic] = append(m.subscribers[topic], sub) - m.Unlock() - - go func() { - <-sub.exit - m.Lock() - newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1) - for _, sb := range m.subscribers[topic] { - if sb.id == sub.id { - continue - } - newSubscribers = append(newSubscribers, sb) - } - m.subscribers[topic] = newSubscribers - m.Unlock() - }() - - return sub, nil -} - -func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { +func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() @@ -324,38 +232,41 @@ func (m *memoryBroker) Name() string { return m.opts.Name } -func (m *memoryEvent) Topic() string { +type memoryMessage struct { + err error + body interface{} + topic string + opts PublishOptions + ctx context.Context +} + +func (m *memoryMessage) Topic() string { return m.topic } -func (m *memoryEvent) Message() *Message { - switch v := m.message.(type) { - case *Message: - return v - case []byte: - msg := &Message{} - if err := m.opts.Codec.Unmarshal(v, msg); err != nil { - if m.opts.Logger.V(logger.ErrorLevel) { - m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err) - } - return nil - } - return msg - } +func (m *memoryMessage) Body() interface{} { + return m.body +} +func (m *memoryMessage) Ack() error { return nil } -func (m *memoryEvent) Ack() error { - return nil -} - -func (m *memoryEvent) Error() error { +func (m *memoryMessage) Error() error { return m.err } -func (m *memoryEvent) SetError(err error) { - m.err = err +func (m *memoryMessage) Context() context.Context { + return m.ctx +} + +type memorySubscriber struct { + ctx context.Context + exit chan bool + handler interface{} + id string + topic string + opts SubscribeOptions } func (m *memorySubscriber) Options() SubscribeOptions { @@ -372,7 +283,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { } // NewBroker return new memory broker -func NewBroker(opts ...Option) Broker { +func NewBroker(opts ...Option) *memoryBroker { return &memoryBroker{ opts: NewOptions(opts...), subscribers: make(map[string][]*memorySubscriber), diff --git a/broker/options.go b/broker/options.go index b5b8d933..07234d9e 100644 --- a/broker/options.go +++ b/broker/options.go @@ -7,6 +7,7 @@ import ( "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/register" "go.unistack.org/micro/v4/tracer" @@ -37,8 +38,8 @@ type Options struct { Tracer tracer.Tracer // Register can be used for clustering Register register.Register - // Codec holds the codec for marshal/unmarshal - Codec codec.Codec + // Codecs holds the codec for marshal/unmarshal + Codecs map[string]codec.Codec // Logger used for logging Logger logger.Logger // Meter used for metrics @@ -48,15 +49,16 @@ type Options struct { // TLSConfig holds tls.TLSConfig options TLSConfig *tls.Config // ErrorHandler used when broker can't unmarshal incoming message - ErrorHandler Handler - // BatchErrorHandler used when broker can't unmashal incoming messages - BatchErrorHandler BatchHandler + ErrorHandler func(Message) // Name holds the broker name Name string // Addrs holds the broker address Addrs []string } +// Option func +type Option func(*Options) + // NewOptions create new Options func NewOptions(opts ...Option) Options { options := Options{ @@ -64,7 +66,7 @@ func NewOptions(opts ...Option) Options { Logger: logger.DefaultLogger, Context: context.Background(), Meter: meter.DefaultMeter, - Codec: codec.DefaultCodec, + Codecs: make(map[string]codec.Codec), Tracer: tracer.DefaultTracer, } for _, o := range opts { @@ -80,6 +82,32 @@ func Context(ctx context.Context) Option { } } +// MessageOption func +type MessageOption func(*MessageOptions) + +// MessageOptions struct +type MessageOptions struct { + Metadata metadata.Metadata + ContentType string +} + +// MessageMetadata pass additional message metadata +func MessageMetadata(md metadata.Metadata) MessageOption { + return func(o *MessageOptions) { + o.Metadata = md + } +} + +// MessageContentType pass ContentType for message data +func MessageContentType(ct string) MessageOption { + return func(o *MessageOptions) { + o.ContentType = ct + } +} + +// PublishOption func +type PublishOption func(*PublishOptions) + // PublishOptions struct type PublishOptions struct { // Context holds external options @@ -104,11 +132,9 @@ type SubscribeOptions struct { // Context holds external options Context context.Context // ErrorHandler used when broker can't unmarshal incoming message - ErrorHandler Handler - // BatchErrorHandler used when broker can't unmashal incoming messages - BatchErrorHandler BatchHandler - // Group holds consumer group - Group string + ErrorHandler func(Message) + // QueueGroup holds consumer group + QueueGroup string // AutoAck flag specifies auto ack of incoming message when no error happens AutoAck bool // BodyOnly flag specifies that message contains only body bytes without header @@ -119,12 +145,6 @@ type SubscribeOptions struct { BatchWait time.Duration } -// Option func -type Option func(*Options) - -// PublishOption func -type PublishOption func(*PublishOptions) - // PublishBodyOnly publish only body of the message func PublishBodyOnly(b bool) PublishOption { return func(o *PublishOptions) { @@ -148,59 +168,29 @@ func Addrs(addrs ...string) Option { // Codec sets the codec used for encoding/decoding used where // a broker does not support headers -func Codec(c codec.Codec) Option { +// Codec to be used to encode/decode requests for a given content type +func Codec(contentType string, c codec.Codec) Option { return func(o *Options) { - o.Codec = c + o.Codecs[contentType] = c } } // ErrorHandler will catch all broker errors that cant be handled // in normal way, for example Codec errors -func ErrorHandler(h Handler) Option { +func ErrorHandler(h func(Message)) Option { return func(o *Options) { o.ErrorHandler = h } } -// BatchErrorHandler will catch all broker errors that cant be handled -// in normal way, for example Codec errors -func BatchErrorHandler(h BatchHandler) Option { - return func(o *Options) { - o.BatchErrorHandler = h - } -} - // SubscribeErrorHandler will catch all broker errors that cant be handled // in normal way, for example Codec errors -func SubscribeErrorHandler(h Handler) SubscribeOption { +func SubscribeErrorHandler(h func(Message)) SubscribeOption { return func(o *SubscribeOptions) { o.ErrorHandler = h } } -// SubscribeBatchErrorHandler will catch all broker errors that cant be handled -// in normal way, for example Codec errors -func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption { - return func(o *SubscribeOptions) { - o.BatchErrorHandler = h - } -} - -// Queue sets the subscribers queue -// Deprecated -func Queue(name string) SubscribeOption { - return func(o *SubscribeOptions) { - o.Group = name - } -} - -// SubscribeGroup sets the name of the queue to share messages on -func SubscribeGroup(name string) SubscribeOption { - return func(o *SubscribeOptions) { - o.Group = name - } -} - // Register sets register option func Register(r register.Register) Option { return func(o *Options) { @@ -243,6 +233,21 @@ func Name(n string) Option { } } +// SubscribeOption func signature +type SubscribeOption func(*SubscribeOptions) + +// NewSubscribeOptions creates new SubscribeOptions +func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { + options := SubscribeOptions{ + AutoAck: true, + Context: context.Background(), + } + for _, o := range opts { + o(&options) + } + return options +} + // SubscribeContext set context func SubscribeContext(ctx context.Context) SubscribeOption { return func(o *SubscribeOptions) { @@ -250,14 +255,6 @@ func SubscribeContext(ctx context.Context) SubscribeOption { } } -// DisableAutoAck disables auto ack -// Deprecated -func DisableAutoAck() SubscribeOption { - return func(o *SubscribeOptions) { - o.AutoAck = false - } -} - // SubscribeAutoAck contol auto acking of messages // after they have been handled. func SubscribeAutoAck(b bool) SubscribeOption { @@ -287,17 +284,16 @@ func SubscribeBatchWait(td time.Duration) SubscribeOption { } } -// SubscribeOption func -type SubscribeOption func(*SubscribeOptions) - -// NewSubscribeOptions creates new SubscribeOptions -func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { - options := SubscribeOptions{ - AutoAck: true, - Context: context.Background(), +// SubscribeQueueGroup sets the shared queue name distributed messages across subscribers +func SubscribeQueueGroup(n string) SubscribeOption { + return func(o *SubscribeOptions) { + o.QueueGroup = n + } +} + +// SubscribeAutoAck control auto ack processing for handler +func SubscribeAuthAck(b bool) SubscribeOption { + return func(o *SubscribeOptions) { + o.AutoAck = b } - for _, o := range opts { - o(&options) - } - return options } diff --git a/broker/subscriber.go b/broker/subscriber.go new file mode 100644 index 00000000..c330c022 --- /dev/null +++ b/broker/subscriber.go @@ -0,0 +1,98 @@ +package broker + +import ( + "fmt" + "reflect" + "strings" + "unicode" + "unicode/utf8" +) + +const ( + subSig = "func(context.Context, interface{}) error" + batchSubSig = "func([]context.Context, []interface{}) error" +) + +// Precompute the reflect type for error. Can't use error directly +// because Typeof takes an empty interface value. This is annoying. +var typeOfError = reflect.TypeOf((*error)(nil)).Elem() + +// Is this an exported - upper case - name? +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} + +// Is this type exported or a builtin? +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +// ValidateSubscriber func signature +func ValidateSubscriber(sub interface{}) error { + typ := reflect.TypeOf(sub) + var argType reflect.Type + switch typ.Kind() { + case reflect.Func: + name := "Func" + switch typ.NumIn() { + case 1: // func(Message) error + + case 2: // func(context.Context, Message) error or func(context.Context, []Message) error + argType = typ.In(2) + // if sub.Options().Batch { + if argType.Kind() != reflect.Slice { + return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig) + } + if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 { + return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig) + } + // } + default: + return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig) + } + if !isExportedOrBuiltinType(argType) { + return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType) + } + if typ.NumOut() != 1 { + return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s", + name, typ.NumOut(), subSig, batchSubSig) + } + if returnType := typ.Out(0); returnType != typeOfError { + return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String()) + } + default: + hdlr := reflect.ValueOf(sub) + name := reflect.Indirect(hdlr).Type().Name() + + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + switch method.Type.NumIn() { + case 3: + argType = method.Type.In(2) + default: + return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s", + name, method.Name, method.Type.NumIn(), subSig, batchSubSig) + } + + if !isExportedOrBuiltinType(argType) { + return fmt.Errorf("%v argument type not exported: %v", name, argType) + } + if method.Type.NumOut() != 1 { + return fmt.Errorf( + "subscriber %v.%v has wrong number of return values: %v require signature %s or %s", + name, method.Name, method.Type.NumOut(), subSig, batchSubSig) + } + if returnType := method.Type.Out(0); returnType != typeOfError { + return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String()) + } + } + } + + return nil +} diff --git a/client/client.go b/client/client.go index 5f9e195d..d52e0bec 100644 --- a/client/client.go +++ b/client/client.go @@ -6,7 +6,6 @@ import ( "time" "go.unistack.org/micro/v4/codec" - "go.unistack.org/micro/v4/metadata" ) var ( @@ -35,23 +34,12 @@ type Client interface { Name() string Init(opts ...Option) error Options() Options - NewMessage(topic string, msg interface{}, opts ...MessageOption) Message NewRequest(service string, endpoint string, req interface{}, opts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) - Publish(ctx context.Context, msg Message, opts ...PublishOption) error - BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error String() string } -// Message is the interface for publishing asynchronously -type Message interface { - Topic() string - Payload() interface{} - ContentType() string - Metadata() metadata.Metadata -} - // Request is the interface for a synchronous request used by Call or Stream type Request interface { // The service to call @@ -68,16 +56,22 @@ type Request interface { Codec() codec.Codec // indicates whether the request will be a streaming one rather than unary Stream() bool + // Header data + // Header() metadata.Metadata } // Response is the response received from a service type Response interface { // Read the response Codec() codec.Codec + // The content type + // ContentType() string // Header data - Header() metadata.Metadata + // Header() metadata.Metadata // Read the undecoded response Read() ([]byte, error) + // The unencoded request body + // Body() interface{} } // Stream is the interface for a bidirectional synchronous stream diff --git a/client/noop.go b/client/noop.go index 64e15da2..83393a71 100644 --- a/client/noop.go +++ b/client/noop.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v4/metadata" @@ -285,6 +284,9 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt ch := make(chan error, callOpts.Retries) var gerr error + ts := time.Now() + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Inc() for i := 0; i <= callOpts.Retries; i++ { go func() { ch <- call(i) @@ -312,6 +314,16 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt } } + if gerr != nil { + n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "failure").Inc() + } else { + n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "success").Inc() + } + n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Dec() + te := time.Since(ts) + n.opts.Meter.Summary(ClientRequestLatencyMicroseconds, "endpoint", endpoint).Update(te.Seconds()) + n.opts.Meter.Histogram(ClientRequestDurationSeconds, "endpoint", endpoint).Update(te.Seconds()) + return gerr } @@ -323,11 +335,6 @@ func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts return &noopRequest{service: service, endpoint: endpoint} } -func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message { - options := NewMessageOptions(append([]MessageOption{MessageContentType(n.opts.ContentType)}, opts...)...) - return &noopMessage{topic: topic, payload: msg, opts: options} -} - func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { var err error @@ -414,7 +421,15 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption node := next() + // ts := time.Now() + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Inc() stream, cerr := n.stream(ctx, node, req, callOpts) + if cerr != nil { + n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "failure").Inc() + } else { + n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "success").Inc() + } // record the result of the call to inform future routing decisions if verr := n.opts.Selector.Record(node, cerr); verr != nil { @@ -468,64 +483,6 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption return nil, grr } -func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) { +func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (*noopStream, error) { return &noopStream{}, nil } - -func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error { - return n.publish(ctx, ps, opts...) -} - -func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error { - return n.publish(ctx, []Message{p}, opts...) -} - -func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error { - options := NewPublishOptions(opts...) - - msgs := make([]*broker.Message, 0, len(ps)) - - for _, p := range ps { - md, ok := metadata.FromOutgoingContext(ctx) - if !ok { - md = metadata.New(0) - } - md[metadata.HeaderContentType] = p.ContentType() - - topic := p.Topic() - - // get the exchange - if len(options.Exchange) > 0 { - topic = options.Exchange - } - - md[metadata.HeaderTopic] = topic - - var body []byte - - // passed in raw data - if d, ok := p.Payload().(*codec.Frame); ok { - body = d.Data - } else { - // use codec for payload - cf, err := n.newCodec(p.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - - // set the body - b, err := cf.Marshal(p.Payload()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - body = b - } - - msgs = append(msgs, &broker.Message{Header: md, Body: body}) - } - - return n.opts.Broker.BatchPublish(ctx, msgs, - broker.PublishContext(options.Context), - broker.PublishBodyOnly(options.BodyOnly), - ) -} diff --git a/client/options.go b/client/options.go index 977b4a44..82994ec7 100644 --- a/client/options.go +++ b/client/options.go @@ -6,7 +6,6 @@ import ( "net" "time" - "go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/metadata" @@ -19,6 +18,17 @@ import ( "go.unistack.org/micro/v4/tracer" ) +var ( + // ClientRequestDurationSeconds specifies meter metric name + ClientRequestDurationSeconds = "client_request_duration_seconds" + // ClientRequestLatencyMicroseconds specifies meter metric name + ClientRequestLatencyMicroseconds = "client_request_latency_microseconds" + // ClientRequestTotal specifies meter metric name + ClientRequestTotal = "client_request_total" + // ClientRequestInflight specifies meter metric name + ClientRequestInflight = "client_request_inflight" +) + // Options holds client options type Options struct { // Transport used for transfer messages @@ -29,8 +39,6 @@ type Options struct { Logger logger.Logger // Tracer used for tracing Tracer tracer.Tracer - // Broker used to publish messages - Broker broker.Broker // Meter used for metrics Meter meter.Meter // Context is used for external options @@ -199,7 +207,6 @@ func NewOptions(opts ...Option) Options { PoolTTL: DefaultPoolTTL, Selector: random.NewSelector(), Logger: logger.DefaultLogger, - Broker: broker.DefaultBroker, Meter: meter.DefaultMeter, Tracer: tracer.DefaultTracer, Router: router.DefaultRouter, @@ -213,13 +220,6 @@ func NewOptions(opts ...Option) Options { return options } -// Broker to be used for pub/sub -func Broker(b broker.Broker) Option { - return func(o *Options) { - o.Broker = b - } -} - // Tracer to be used for tracing func Tracer(t tracer.Tracer) Option { return func(o *Options) { diff --git a/config/default_test.go b/config/default_test.go index 4ee8df7d..4d2dc810 100644 --- a/config/default_test.go +++ b/config/default_test.go @@ -14,9 +14,10 @@ type cfg struct { StringValue string `default:"string_value"` IgnoreValue string `json:"-"` StructValue *cfgStructValue - IntValue int `default:"99"` - DurationValue time.Duration `default:"10s"` - MDurationValue mtime.Duration `default:"10s"` + IntValue int `default:"99"` + DurationValue time.Duration `default:"10s"` + MDurationValue mtime.Duration `default:"10s"` + MapValue map[string]bool `default:"key1=true,key2=false"` } type cfgStructValue struct { @@ -67,6 +68,9 @@ func TestDefault(t *testing.T) { if conf.StringValue != "after_load" { t.Fatal("AfterLoad option not working") } + if len(conf.MapValue) != 2 { + t.Fatalf("map value invalid: %#+v\n", conf.MapValue) + } _ = conf // t.Logf("%#+v\n", conf) } diff --git a/event.go b/event.go deleted file mode 100644 index 1e4116d6..00000000 --- a/event.go +++ /dev/null @@ -1,27 +0,0 @@ -package micro - -import ( - "context" - - "go.unistack.org/micro/v4/client" -) - -// Event is used to publish messages to a topic -type Event interface { - // Publish publishes a message to the event topic - Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error -} - -type event struct { - c client.Client - topic string -} - -// NewEvent creates a new event publisher -func NewEvent(topic string, c client.Client) Event { - return &event{c, topic} -} - -func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error { - return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...) -} diff --git a/meter/wrapper/wrapper.go b/meter/wrapper/wrapper.go index c5a5e6a9..8bb48e02 100644 --- a/meter/wrapper/wrapper.go +++ b/meter/wrapper/wrapper.go @@ -11,23 +11,6 @@ import ( ) var ( - // ClientRequestDurationSeconds specifies meter metric name - ClientRequestDurationSeconds = "client_request_duration_seconds" - // ClientRequestLatencyMicroseconds specifies meter metric name - ClientRequestLatencyMicroseconds = "client_request_latency_microseconds" - // ClientRequestTotal specifies meter metric name - ClientRequestTotal = "client_request_total" - // ClientRequestInflight specifies meter metric name - ClientRequestInflight = "client_request_inflight" - // ServerRequestDurationSeconds specifies meter metric name - ServerRequestDurationSeconds = "server_request_duration_seconds" - // ServerRequestLatencyMicroseconds specifies meter metric name - ServerRequestLatencyMicroseconds = "server_request_latency_microseconds" - // ServerRequestTotal specifies meter metric name - ServerRequestTotal = "server_request_total" - // ServerRequestInflight specifies meter metric name - ServerRequestInflight = "server_request_inflight" - labelSuccess = "success" labelFailure = "failure" labelStatus = "status" diff --git a/options.go b/options.go index c5746152..e564a536 100644 --- a/options.go +++ b/options.go @@ -90,33 +90,7 @@ type Option func(*Options) error // Broker to be used for client and server func Broker(b broker.Broker, opts ...BrokerOption) Option { return func(o *Options) error { - var err error - bopts := brokerOptions{} - for _, opt := range opts { - opt(&bopts) - } - all := false - if len(opts) == 0 { - all = true - } - for _, srv := range o.Servers { - for _, os := range bopts.servers { - if srv.Name() == os || all { - if err = srv.Init(server.Broker(b)); err != nil { - return err - } - } - } - } - for _, cli := range o.Clients { - for _, oc := range bopts.clients { - if cli.Name() == oc || all { - if err = cli.Init(client.Broker(b)); err != nil { - return err - } - } - } - } + o.Brokers = []broker.Broker{b} return nil } } diff --git a/server/context.go b/server/context.go index 2ab80425..4fe23bed 100644 --- a/server/context.go +++ b/server/context.go @@ -33,16 +33,6 @@ func SetOption(k, v interface{}) Option { } } -// SetSubscriberOption returns a function to setup a context with given value -func SetSubscriberOption(k, v interface{}) SubscriberOption { - return func(o *SubscriberOptions) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} - // SetHandlerOption returns a function to setup a context with given value func SetHandlerOption(k, v interface{}) HandlerOption { return func(o *HandlerOptions) { diff --git a/server/context_test.go b/server/context_test.go index a479c203..351dc17f 100644 --- a/server/context_test.go +++ b/server/context_test.go @@ -51,14 +51,3 @@ func TestSetOption(t *testing.T) { t.Fatal("SetOption not works") } } - -func TestSetSubscriberOption(t *testing.T) { - type key struct{} - o := SetSubscriberOption(key{}, "test") - opts := &SubscriberOptions{} - o(opts) - - if v, ok := opts.Context.Value(key{}).(string); !ok || v == "" { - t.Fatal("SetSubscriberOption not works") - } -} diff --git a/server/handler.go b/server/handler.go deleted file mode 100644 index 58bf56fb..00000000 --- a/server/handler.go +++ /dev/null @@ -1,59 +0,0 @@ -package server - -import ( - "reflect" - - "go.unistack.org/micro/v4/register" -) - -type rpcHandler struct { - opts HandlerOptions - handler interface{} - name string - endpoints []*register.Endpoint -} - -func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler { - options := NewHandlerOptions(opts...) - - typ := reflect.TypeOf(handler) - hdlr := reflect.ValueOf(handler) - name := reflect.Indirect(hdlr).Type().Name() - - var endpoints []*register.Endpoint - - for m := 0; m < typ.NumMethod(); m++ { - if e := register.ExtractEndpoint(typ.Method(m)); e != nil { - e.Name = name + "." + e.Name - - for k, v := range options.Metadata[e.Name] { - e.Metadata[k] = v - } - - endpoints = append(endpoints, e) - } - } - - return &rpcHandler{ - name: name, - handler: handler, - endpoints: endpoints, - opts: options, - } -} - -func (r *rpcHandler) Name() string { - return r.name -} - -func (r *rpcHandler) Handler() interface{} { - return r.handler -} - -func (r *rpcHandler) Endpoints() []*register.Endpoint { - return r.endpoints -} - -func (r *rpcHandler) Options() HandlerOptions { - return r.opts -} diff --git a/server/noop.go b/server/noop.go index e2be9a13..be6ed396 100644 --- a/server/noop.go +++ b/server/noop.go @@ -1,12 +1,11 @@ package server import ( - "fmt" + "reflect" "sort" "sync" "time" - "go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/register" @@ -25,13 +24,12 @@ const ( ) type noopServer struct { - h Handler - wg *sync.WaitGroup - rsvc *register.Service - handlers map[string]Handler - subscribers map[*subscriber][]broker.Subscriber - exit chan chan error - opts Options + h Handler + wg *sync.WaitGroup + rsvc *register.Service + handlers map[string]Handler + exit chan chan error + opts Options sync.RWMutex registered bool started bool @@ -43,9 +41,6 @@ func NewServer(opts ...Option) Server { if n.handlers == nil { n.handlers = make(map[string]Handler) } - if n.subscribers == nil { - n.subscribers = make(map[*subscriber][]broker.Subscriber) - } if n.exit == nil { n.exit = make(chan chan error) } @@ -71,37 +66,10 @@ func (n *noopServer) Name() string { return n.opts.Name } -func (n *noopServer) Subscribe(sb Subscriber) error { - sub, ok := sb.(*subscriber) - if !ok { - return fmt.Errorf("invalid subscriber: expected *subscriber") - } else if len(sub.handlers) == 0 { - return fmt.Errorf("invalid subscriber: no handler functions") - } - - if err := ValidateSubscriber(sb); err != nil { - return err - } - - n.Lock() - if _, ok = n.subscribers[sub]; ok { - n.Unlock() - return fmt.Errorf("subscriber %v already exists", sub) - } - - n.subscribers[sub] = nil - n.Unlock() - return nil -} - func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { return newRPCHandler(h, opts...) } -func (n *noopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { - return newSubscriber(topic, sb, opts...) -} - func (n *noopServer) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) @@ -110,9 +78,6 @@ func (n *noopServer) Init(opts ...Option) error { if n.handlers == nil { n.handlers = make(map[string]Handler, 1) } - if n.subscribers == nil { - n.subscribers = make(map[*subscriber][]broker.Subscriber, 1) - } if n.exit == nil { n.exit = make(chan chan error) @@ -158,21 +123,10 @@ func (n *noopServer) Register() error { sort.Strings(handlerList) - subscriberList := make([]*subscriber, 0, len(n.subscribers)) - for e := range n.subscribers { - subscriberList = append(subscriberList, e) - } - sort.Slice(subscriberList, func(i, j int) bool { - return subscriberList[i].topic > subscriberList[j].topic - }) - - endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList)) + endpoints := make([]*register.Endpoint, 0, len(handlerList)) for _, h := range handlerList { endpoints = append(endpoints, n.handlers[h].Endpoints()...) } - for _, e := range subscriberList { - endpoints = append(endpoints, e.Endpoints()...) - } n.RUnlock() service.Nodes[0].Metadata["protocol"] = "noop" @@ -202,39 +156,6 @@ func (n *noopServer) Register() error { n.Lock() defer n.Unlock() - cx := config.Context - - var sub broker.Subscriber - - for sb := range n.subscribers { - if sb.Options().Context != nil { - cx = sb.Options().Context - } - - opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)} - if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.SubscribeGroup(queue)) - } - - if sb.Options().Batch { - // batch processing handler - sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...) - } else { - // single processing handler - sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...) - } - - if err != nil { - return err - } - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic()) - } - - n.subscribers[sb] = []broker.Subscriber{sub} - } - n.registered = true if cacheService { n.rsvc = service @@ -273,33 +194,6 @@ func (n *noopServer) Deregister() error { n.registered = false - cx := config.Context - - wg := sync.WaitGroup{} - for sb, subs := range n.subscribers { - for idx := range subs { - if sb.Options().Context != nil { - cx = sb.Options().Context - } - - ncx := cx - wg.Add(1) - go func(s broker.Subscriber) { - defer wg.Done() - if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic()) - } - if err := s.Unsubscribe(ncx); err != nil { - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err) - } - } - }(subs[idx]) - } - n.subscribers[sb] = nil - } - wg.Wait() - n.Unlock() return nil } @@ -336,21 +230,6 @@ func (n *noopServer) Start() error { } n.Unlock() - // only connect if we're subscribed - if len(n.subscribers) > 0 { - // connect to the broker - if err := config.Broker.Connect(config.Context); err != nil { - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err) - } - return err - } - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) - } - } - // use RegisterCheck func before register // nolint: nestif if err := config.RegisterCheck(config.Context); err != nil { @@ -429,16 +308,6 @@ func (n *noopServer) Start() error { // close transport ch <- nil - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) - } - // disconnect broker - if err := config.Broker.Disconnect(config.Context); err != nil { - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err) - } - } }() // mark the server as started @@ -468,3 +337,55 @@ func (n *noopServer) Stop() error { return err } + +type rpcHandler struct { + opts HandlerOptions + handler interface{} + name string + endpoints []*register.Endpoint +} + +func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler { + options := NewHandlerOptions(opts...) + + typ := reflect.TypeOf(handler) + hdlr := reflect.ValueOf(handler) + name := reflect.Indirect(hdlr).Type().Name() + + var endpoints []*register.Endpoint + + for m := 0; m < typ.NumMethod(); m++ { + if e := register.ExtractEndpoint(typ.Method(m)); e != nil { + e.Name = name + "." + e.Name + + for k, v := range options.Metadata[e.Name] { + e.Metadata[k] = v + } + + endpoints = append(endpoints, e) + } + } + + return &rpcHandler{ + name: name, + handler: handler, + endpoints: endpoints, + opts: options, + } +} + +func (r *rpcHandler) Name() string { + return r.name +} + +func (r *rpcHandler) Handler() interface{} { + return r.handler +} + +func (r *rpcHandler) Endpoints() []*register.Endpoint { + return r.endpoints +} + +func (r *rpcHandler) Options() HandlerOptions { + return r.opts +} diff --git a/server/noop_test.go b/server/noop_test.go deleted file mode 100644 index a5f659ae..00000000 --- a/server/noop_test.go +++ /dev/null @@ -1,104 +0,0 @@ -package server_test - -import ( - "context" - "fmt" - "testing" - - "go.unistack.org/micro/v4/broker" - "go.unistack.org/micro/v4/client" - "go.unistack.org/micro/v4/codec" - "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/metadata" - "go.unistack.org/micro/v4/server" -) - -type TestHandler struct { - t *testing.T -} - -type TestMessage struct { - Name string -} - -func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error { - // fmt.Printf("msg %s\n", msg.Data) - return nil -} - -func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error { - if len(msgs) != 8 { - h.t.Fatal("invalid number of messages received") - } - for idx := 0; idx < len(msgs); idx++ { - md, _ := metadata.FromIncomingContext(ctxs[idx]) - _ = md - // fmt.Printf("msg md %v\n", md) - } - return nil -} - -func TestNoopSub(t *testing.T) { - ctx := context.Background() - - b := broker.NewBroker() - - if err := b.Init(); err != nil { - t.Fatal(err) - } - - if err := b.Connect(ctx); err != nil { - t.Fatal(err) - } - - logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)) - s := server.NewServer( - server.Broker(b), - server.Codec("application/octet-stream", codec.NewCodec()), - ) - if err := s.Init(); err != nil { - t.Fatal(err) - } - - c := client.NewClient( - client.Broker(b), - client.Codec("application/octet-stream", codec.NewCodec()), - client.ContentType("application/octet-stream"), - ) - if err := c.Init(); err != nil { - t.Fatal(err) - } - h := &TestHandler{t: t} - - if err := s.Subscribe(s.NewSubscriber("single_topic", h.SingleSubHandler, - server.SubscriberQueue("queue"), - )); err != nil { - t.Fatal(err) - } - - if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler, - server.SubscriberQueue("queue"), - server.SubscriberBatch(true), - )); err != nil { - t.Fatal(err) - } - - if err := s.Start(); err != nil { - t.Fatal(err) - } - - msgs := make([]client.Message, 0, 8) - for i := 0; i < 8; i++ { - msgs = append(msgs, c.NewMessage("batch_topic", &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))})) - } - - if err := c.BatchPublish(ctx, msgs); err != nil { - t.Fatal(err) - } - - defer func() { - if err := s.Stop(); err != nil { - t.Fatal(err) - } - }() -} diff --git a/server/options.go b/server/options.go index efd441ac..26b3a42d 100644 --- a/server/options.go +++ b/server/options.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/metadata" @@ -19,6 +18,17 @@ import ( "go.unistack.org/micro/v4/util/id" ) +var ( + // ServerRequestDurationSeconds specifies meter metric name + ServerRequestDurationSeconds = "server_request_duration_seconds" + // ServerRequestLatencyMicroseconds specifies meter metric name + ServerRequestLatencyMicroseconds = "server_request_latency_microseconds" + // ServerRequestTotal specifies meter metric name + ServerRequestTotal = "server_request_total" + // ServerRequestInflight specifies meter metric name + ServerRequestInflight = "server_request_inflight" +) + // Option func type Option func(*Options) @@ -26,8 +36,6 @@ type Option func(*Options) type Options struct { // Context holds the external options and can be used for server shutdown Context context.Context - // Broker holds the server broker - Broker broker.Broker // Register holds the register Register register.Register // Tracer holds the tracer @@ -38,12 +46,6 @@ type Options struct { Meter meter.Meter // Transport holds the transport Transport transport.Transport - - /* - // Router for requests - Router Router - */ - // Listener may be passed if already created Listener net.Listener // Wait group @@ -68,10 +70,6 @@ type Options struct { Advertise string // Version holds the server version Version string - // SubWrappers holds the server subscribe wrappers - SubWrappers []SubscriberWrapper - // BatchSubWrappers holds the server batch subscribe wrappers - BatchSubWrappers []BatchSubscriberWrapper // HdlrWrappers holds the handler wrappers HdlrWrappers []HandlerWrapper // RegisterAttempts holds the number of register attempts before error @@ -84,7 +82,7 @@ type Options struct { MaxConn int // DeregisterAttempts holds the number of deregister attempts before error DeregisterAttempts int - // Hooks may contains SubscriberWrapper, HandlerWrapper or Server func wrapper + // Hooks may contains HandlerWrapper or Server func wrapper Hooks options.Hooks } @@ -100,7 +98,6 @@ func NewOptions(opts ...Option) Options { Logger: logger.DefaultLogger, Meter: meter.DefaultMeter, Tracer: tracer.DefaultTracer, - Broker: broker.DefaultBroker, Register: register.DefaultRegister, Transport: transport.DefaultTransport, Address: DefaultAddress, @@ -173,13 +170,6 @@ func Advertise(a string) Option { } } -// Broker to use for pub/sub -func Broker(b broker.Broker) Option { - return func(o *Options) { - o.Broker = b - } -} - // Codec to use to encode/decode requests for a given content type func Codec(contentType string, c codec.Codec) Option { return func(o *Options) { @@ -261,15 +251,6 @@ func TLSConfig(t *tls.Config) Option { } } -/* -// WithRouter sets the request router -func WithRouter(r Router) Option { - return func(o *Options) { - o.Router = r - } -} -*/ - // Wait tells the server to wait for requests to finish before exiting // If `wg` is nil, server only wait for completion of rpc handler. // For user need finer grained control, pass a concrete `wg` here, server will @@ -290,20 +271,6 @@ func WrapHandler(w HandlerWrapper) Option { } } -// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server -func WrapSubscriber(w SubscriberWrapper) Option { - return func(o *Options) { - o.SubWrappers = append(o.SubWrappers, w) - } -} - -// WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server -func WrapBatchSubscriber(w BatchSubscriberWrapper) Option { - return func(o *Options) { - o.BatchSubWrappers = append(o.BatchSubWrappers, w) - } -} - // MaxConn specifies maximum number of max simultaneous connections to server func MaxConn(n int) Option { return func(o *Options) { @@ -343,41 +310,6 @@ func NewHandlerOptions(opts ...HandlerOption) HandlerOptions { return options } -// SubscriberOption func -type SubscriberOption func(*SubscriberOptions) - -// SubscriberOptions struct -type SubscriberOptions struct { - // Context holds the external options - Context context.Context - // Queue holds the subscription queue - Queue string - // AutoAck flag for auto ack messages after processing - AutoAck bool - // BodyOnly flag specifies that message without headers - BodyOnly bool - // Batch flag specifies that message processed in batches - Batch bool - // BatchSize flag specifies max size of batch - BatchSize int - // BatchWait flag specifies max wait time for batch filling - BatchWait time.Duration -} - -// NewSubscriberOptions create new SubscriberOptions -func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions { - options := SubscriberOptions{ - AutoAck: true, - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - return options -} - // EndpointMetadata is a Handler option that allows metadata to be added to // individual endpoints. func EndpointMetadata(name string, md metadata.Metadata) HandlerOption { @@ -385,68 +317,3 @@ func EndpointMetadata(name string, md metadata.Metadata) HandlerOption { o.Metadata[name] = metadata.Copy(md) } } - -// DisableAutoAck will disable auto acking of messages -// after they have been handled. -func DisableAutoAck() SubscriberOption { - return func(o *SubscriberOptions) { - o.AutoAck = false - } -} - -// SubscriberQueue sets the shared queue name distributed messages across subscribers -func SubscriberQueue(n string) SubscriberOption { - return func(o *SubscriberOptions) { - o.Queue = n - } -} - -// SubscriberGroup sets the shared group name distributed messages across subscribers -func SubscriberGroup(n string) SubscriberOption { - return func(o *SubscriberOptions) { - o.Queue = n - } -} - -// SubscriberBodyOnly says broker that message contains raw data with absence of micro broker.Message format -func SubscriberBodyOnly(b bool) SubscriberOption { - return func(o *SubscriberOptions) { - o.BodyOnly = b - } -} - -// SubscriberContext set context options to allow broker SubscriberOption passed -func SubscriberContext(ctx context.Context) SubscriberOption { - return func(o *SubscriberOptions) { - o.Context = ctx - } -} - -// SubscriberAck control auto ack processing for handler -func SubscriberAck(b bool) SubscriberOption { - return func(o *SubscriberOptions) { - o.AutoAck = b - } -} - -// SubscriberBatch control batch processing for handler -func SubscriberBatch(b bool) SubscriberOption { - return func(o *SubscriberOptions) { - o.Batch = b - } -} - -// SubscriberBatchSize control batch filling size for handler -// Batch filling max waiting time controlled by SubscriberBatchWait -func SubscriberBatchSize(n int) SubscriberOption { - return func(o *SubscriberOptions) { - o.BatchSize = n - } -} - -// SubscriberBatchWait control batch filling wait time for handler -func SubscriberBatchWait(td time.Duration) SubscriberOption { - return func(o *SubscriberOptions) { - o.BatchWait = td - } -} diff --git a/server/registry.go b/server/registry.go index 024aaf52..0d247dab 100644 --- a/server/registry.go +++ b/server/registry.go @@ -78,7 +78,6 @@ func NewRegisterService(s Server) (*register.Service, error) { node.Metadata = metadata.Copy(opts.Metadata) node.Metadata["server"] = s.String() - node.Metadata["broker"] = opts.Broker.String() node.Metadata["register"] = opts.Register.String() return ®ister.Service{ diff --git a/server/request.go b/server/request.go deleted file mode 100644 index b032dcfb..00000000 --- a/server/request.go +++ /dev/null @@ -1,35 +0,0 @@ -package server - -import ( - "go.unistack.org/micro/v4/codec" - "go.unistack.org/micro/v4/metadata" -) - -type rpcMessage struct { - payload interface{} - codec codec.Codec - header metadata.Metadata - topic string - contentType string - body []byte -} - -func (r *rpcMessage) ContentType() string { - return r.contentType -} - -func (r *rpcMessage) Topic() string { - return r.topic -} - -func (r *rpcMessage) Body() interface{} { - return r.payload -} - -func (r *rpcMessage) Header() metadata.Metadata { - return r.header -} - -func (r *rpcMessage) Codec() codec.Codec { - return r.codec -} diff --git a/server/server.go b/server/server.go index 9ad24907..07c65854 100644 --- a/server/server.go +++ b/server/server.go @@ -48,10 +48,6 @@ type Server interface { Handle(h Handler) error // Create a new handler NewHandler(h interface{}, opts ...HandlerOption) Handler - // Create a new subscriber - NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber - // Register a subscriber - Subscribe(s Subscriber) error // Start the server Start() error // Stop the server @@ -60,30 +56,6 @@ type Server interface { String() string } -/* -// Router handle serving messages -type Router interface { - // ProcessMessage processes a message - ProcessMessage(ctx context.Context, msg Message) error - // ServeRequest processes a request to completion - ServeRequest(ctx context.Context, req Request, rsp Response) error -} -*/ - -// Message is an async message interface -type Message interface { - // Topic of the message - Topic() string - // The decoded payload value - Body() interface{} - // The content type of the payload - ContentType() string - // The raw headers of the message - Header() metadata.Metadata - // Codec used to decode the message - Codec() codec.Codec -} - // Request is a synchronous request interface type Request interface { // Service name requested @@ -156,13 +128,3 @@ type Handler interface { Endpoints() []*register.Endpoint Options() HandlerOptions } - -// Subscriber interface represents a subscription to a given topic using -// a specific subscriber function or object with endpoints. It mirrors -// the handler in its behaviour. -type Subscriber interface { - Topic() string - Subscriber() interface{} - Endpoints() []*register.Endpoint - Options() SubscriberOptions -} diff --git a/server/subscriber.go b/server/subscriber.go deleted file mode 100644 index 7d30c8f8..00000000 --- a/server/subscriber.go +++ /dev/null @@ -1,440 +0,0 @@ -package server - -import ( - "bytes" - "context" - "fmt" - "reflect" - "runtime/debug" - "strings" - "unicode" - "unicode/utf8" - - "go.unistack.org/micro/v4/broker" - "go.unistack.org/micro/v4/codec" - "go.unistack.org/micro/v4/errors" - "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/metadata" - "go.unistack.org/micro/v4/register" -) - -const ( - subSig = "func(context.Context, interface{}) error" - batchSubSig = "func([]context.Context, []interface{}) error" -) - -// Precompute the reflect type for error. Can't use error directly -// because Typeof takes an empty interface value. This is annoying. -var typeOfError = reflect.TypeOf((*error)(nil)).Elem() - -type handler struct { - reqType reflect.Type - ctxType reflect.Type - method reflect.Value -} - -type subscriber struct { - typ reflect.Type - subscriber interface{} - topic string - endpoints []*register.Endpoint - handlers []*handler - opts SubscriberOptions - rcvr reflect.Value -} - -// Is this an exported - upper case - name? -func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) -} - -// Is this type exported or a builtin? -func isExportedOrBuiltinType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - // PkgPath will be non-empty even for an exported type, - // so we need to check the type name as well. - return isExported(t.Name()) || t.PkgPath() == "" -} - -// ValidateSubscriber func signature -func ValidateSubscriber(sub Subscriber) error { - typ := reflect.TypeOf(sub.Subscriber()) - var argType reflect.Type - switch typ.Kind() { - case reflect.Func: - name := "Func" - switch typ.NumIn() { - case 2: - argType = typ.In(1) - if sub.Options().Batch { - if argType.Kind() != reflect.Slice { - return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig) - } - if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 { - return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig) - } - } - default: - return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig) - } - if !isExportedOrBuiltinType(argType) { - return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType) - } - if typ.NumOut() != 1 { - return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s", - name, typ.NumOut(), subSig, batchSubSig) - } - if returnType := typ.Out(0); returnType != typeOfError { - return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String()) - } - default: - hdlr := reflect.ValueOf(sub.Subscriber()) - name := reflect.Indirect(hdlr).Type().Name() - - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - switch method.Type.NumIn() { - case 3: - argType = method.Type.In(2) - default: - return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s", - name, method.Name, method.Type.NumIn(), subSig, batchSubSig) - } - - if !isExportedOrBuiltinType(argType) { - return fmt.Errorf("%v argument type not exported: %v", name, argType) - } - if method.Type.NumOut() != 1 { - return fmt.Errorf( - "subscriber %v.%v has wrong number of return values: %v require signature %s or %s", - name, method.Name, method.Type.NumOut(), subSig, batchSubSig) - } - if returnType := method.Type.Out(0); returnType != typeOfError { - return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String()) - } - } - } - - return nil -} - -func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { - var endpoints []*register.Endpoint - var handlers []*handler - - options := NewSubscriberOptions(opts...) - - if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { - h := &handler{ - method: reflect.ValueOf(sub), - } - - switch typ.NumIn() { - case 1: - h.reqType = typ.In(0) - case 2: - h.ctxType = typ.In(0) - h.reqType = typ.In(1) - } - - handlers = append(handlers, h) - ep := ®ister.Endpoint{ - Name: "Func", - Request: register.ExtractSubValue(typ), - Metadata: metadata.New(2), - } - ep.Metadata.Set("topic", topic) - ep.Metadata.Set("subscriber", "true") - endpoints = append(endpoints, ep) - } else { - hdlr := reflect.ValueOf(sub) - name := reflect.Indirect(hdlr).Type().Name() - - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - h := &handler{ - method: method.Func, - } - - switch method.Type.NumIn() { - case 2: - h.reqType = method.Type.In(1) - case 3: - h.ctxType = method.Type.In(1) - h.reqType = method.Type.In(2) - } - - handlers = append(handlers, h) - ep := ®ister.Endpoint{ - Name: name + "." + method.Name, - Request: register.ExtractSubValue(method.Type), - Metadata: metadata.New(2), - } - ep.Metadata.Set("topic", topic) - ep.Metadata.Set("subscriber", "true") - endpoints = append(endpoints, ep) - } - } - - return &subscriber{ - rcvr: reflect.ValueOf(sub), - typ: reflect.TypeOf(sub), - topic: topic, - subscriber: sub, - handlers: handlers, - endpoints: endpoints, - opts: options, - } -} - -//nolint:gocyclo -func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler { - return func(ps broker.Events) (err error) { - defer func() { - if r := recover(); r != nil { - n.RLock() - config := n.opts - n.RUnlock() - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(n.opts.Context, "panic recovered: ", r) - config.Logger.Error(n.opts.Context, string(debug.Stack())) - } - err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) - } - }() - - msgs := make([]Message, 0, len(ps)) - ctxs := make([]context.Context, 0, len(ps)) - for _, p := range ps { - msg := p.Message() - // if we don't have headers, create empty map - if msg.Header == nil { - msg.Header = metadata.New(2) - } - - ct, _ := msg.Header.Get(metadata.HeaderContentType) - if len(ct) == 0 { - msg.Header.Set(metadata.HeaderContentType, defaultContentType) - ct = defaultContentType - } - hdr := metadata.Copy(msg.Header) - topic, _ := msg.Header.Get(metadata.HeaderTopic) - ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr)) - msgs = append(msgs, &rpcMessage{ - topic: topic, - contentType: ct, - header: msg.Header, - body: msg.Body, - }) - } - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var req reflect.Value - - switch handler.reqType.Kind() { - case reflect.Ptr: - req = reflect.New(handler.reqType.Elem()) - default: - req = reflect.New(handler.reqType.Elem()).Elem() - } - - reqType := handler.reqType - var cf codec.Codec - for _, msg := range msgs { - cf, err = n.newCodec(msg.ContentType()) - if err != nil { - return err - } - rb := reflect.New(req.Type().Elem()) - if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil { - return err - } - msg.(*rpcMessage).codec = cf - msg.(*rpcMessage).payload = rb.Interface() - } - - fn := func(ctxs []context.Context, ms []Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctxs)) - } - payloads := reflect.MakeSlice(reqType, 0, len(ms)) - for _, m := range ms { - payloads = reflect.Append(payloads, reflect.ValueOf(m.Body())) - } - vals = append(vals, payloads) - - returnValues := handler.method.Call(vals) - if rerr := returnValues[0].Interface(); rerr != nil { - return rerr.(error) - } - return nil - } - - for i := len(opts.BatchSubWrappers); i > 0; i-- { - fn = opts.BatchSubWrappers[i-1](fn) - } - - if n.wg != nil { - n.wg.Add(1) - } - go func() { - if n.wg != nil { - defer n.wg.Done() - } - results <- fn(ctxs, msgs) - }() - } - - var errors []string - for i := 0; i < len(sb.handlers); i++ { - if rerr := <-results; rerr != nil { - errors = append(errors, rerr.Error()) - } - } - if len(errors) > 0 { - err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - return err - } -} - -//nolint:gocyclo -func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler { - return func(p broker.Event) (err error) { - defer func() { - if r := recover(); r != nil { - n.RLock() - config := n.opts - n.RUnlock() - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(n.opts.Context, "panic recovered: ", r) - config.Logger.Error(n.opts.Context, string(debug.Stack())) - } - err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) - } - }() - - msg := p.Message() - // if we don't have headers, create empty map - if msg.Header == nil { - msg.Header = metadata.New(2) - } - - ct := msg.Header["Content-Type"] - if len(ct) == 0 { - msg.Header.Set(metadata.HeaderContentType, defaultContentType) - ct = defaultContentType - } - cf, err := n.newCodec(ct) - if err != nil { - return err - } - - hdr := metadata.New(len(msg.Header)) - for k, v := range msg.Header { - if k == "Content-Type" { - continue - } - hdr.Set(k, v) - } - - ctx := metadata.NewIncomingContext(sb.opts.Context, hdr) - - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var isVal bool - var req reflect.Value - - if handler.reqType.Kind() == reflect.Ptr { - req = reflect.New(handler.reqType.Elem()) - } else { - req = reflect.New(handler.reqType) - isVal = true - } - if isVal { - req = req.Elem() - } - - if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil { - return err - } - - fn := func(ctx context.Context, msg Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctx)) - } - - vals = append(vals, reflect.ValueOf(msg.Body())) - - returnValues := handler.method.Call(vals) - if rerr := returnValues[0].Interface(); rerr != nil { - return rerr.(error) - } - return nil - } - - for i := len(opts.SubWrappers); i > 0; i-- { - fn = opts.SubWrappers[i-1](fn) - } - - if n.wg != nil { - n.wg.Add(1) - } - go func() { - if n.wg != nil { - defer n.wg.Done() - } - cerr := fn(ctx, &rpcMessage{ - topic: sb.topic, - contentType: ct, - payload: req.Interface(), - header: msg.Header, - }) - results <- cerr - }() - } - var errors []string - for i := 0; i < len(sb.handlers); i++ { - if rerr := <-results; rerr != nil { - errors = append(errors, rerr.Error()) - } - } - if len(errors) > 0 { - err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - return err - } -} - -func (s *subscriber) Topic() string { - return s.topic -} - -func (s *subscriber) Subscriber() interface{} { - return s.subscriber -} - -func (s *subscriber) Endpoints() []*register.Endpoint { - return s.endpoints -} - -func (s *subscriber) Options() SubscriberOptions { - return s.opts -} diff --git a/server/wrapper.go b/server/wrapper.go index b4596e63..3500dc78 100644 --- a/server/wrapper.go +++ b/server/wrapper.go @@ -9,25 +9,9 @@ import ( // request and response types. type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error -// SubscriberFunc represents a single method of a subscriber. It's used primarily -// for the wrappers. What's handed to the actual method is the concrete -// publication message. -type SubscriberFunc func(ctx context.Context, msg Message) error - -// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily -// for the wrappers. What's handed to the actual method is the concrete -// publication message. This func used by batch subscribers -type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error - // HandlerWrapper wraps the HandlerFunc and returns the equivalent type HandlerWrapper func(HandlerFunc) HandlerFunc -// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent -type SubscriberWrapper func(SubscriberFunc) SubscriberFunc - -// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent -type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc - // StreamWrapper wraps a Stream interface and returns the equivalent. // Because streams exist for the lifetime of a method invocation this // is a convenient way to wrap a Stream as its in use for trace, monitoring, diff --git a/service.go b/service.go index 559ea5e0..23e6f064 100644 --- a/service.go +++ b/service.go @@ -66,11 +66,6 @@ func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOptio return s.Handle(s.NewHandler(h, opts...)) } -// RegisterSubscriber is syntactic sugar for registering a subscriber -func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error { - return s.Subscribe(s.NewSubscriber(topic, h, opts...)) -} - type service struct { sync.RWMutex opts Options diff --git a/service_test.go b/service_test.go index c6df0a3a..025e44c8 100644 --- a/service_test.go +++ b/service_test.go @@ -1,7 +1,6 @@ package micro import ( - "context" "reflect" "testing" @@ -65,41 +64,6 @@ func TestRegisterHandler(t *testing.T) { } } -func TestRegisterSubscriber(t *testing.T) { - type args struct { - topic string - s server.Server - h interface{} - opts []server.SubscriberOption - } - h := func(_ context.Context, _ interface{}) error { - return nil - } - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "RegisterSubscriber", - args: args{ - topic: "test", - s: server.DefaultServer, - h: h, - opts: nil, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if err := RegisterSubscriber(tt.args.topic, tt.args.s, tt.args.h, tt.args.opts...); (err != nil) != tt.wantErr { - t.Errorf("RegisterSubscriber() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - func TestNewService(t *testing.T) { type args struct { opts []Option @@ -222,7 +186,6 @@ func Test_service_Options(t *testing.T) { } func Test_service_Broker(t *testing.T) { - b := broker.NewBroker() type fields struct { opts Options } @@ -238,12 +201,12 @@ func Test_service_Broker(t *testing.T) { { name: "service.Broker", fields: fields{ - opts: Options{Brokers: []broker.Broker{b}}, + opts: Options{Brokers: []broker.Broker{broker.DefaultBroker}}, }, args: args{ names: []string{"noop"}, }, - want: b, + want: broker.DefaultBroker, }, } for _, tt := range tests {