broker refactor
Some checks failed
coverage / build (pull_request) Failing after 36s
lint / lint (pull_request) Failing after 2m55s
test / test (pull_request) Successful in 4m28s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-01-30 01:44:11 +03:00
parent c9066e0455
commit c5e6d4cddc
16 changed files with 785 additions and 688 deletions

View File

@@ -20,6 +20,8 @@ var (
ErrDisconnected = errors.New("broker disconnected")
// ErrInvalidMessage returns when invalid Message passed
ErrInvalidMessage = errors.New("invalid message")
// ErrInvalidHandler returns when subscriber passed to Subscribe
ErrInvalidHandler = errors.New("invalid handler")
// DefaultGracefulTimeout
DefaultGracefulTimeout = 5 * time.Second
)
@@ -87,9 +89,3 @@ type Subscriber interface {
// Unsubscribe from topic
Unsubscribe(ctx context.Context) error
}
// MessageHandler func signature for single message processing
type MessageHandler func(Message) error
// MessagesHandler func signature for batch message processing
type MessagesHandler func([]Message) error

View File

@@ -2,6 +2,7 @@ package broker
import (
"context"
"strings"
"sync"
"go.unistack.org/micro/v4/broker"
@@ -34,6 +35,30 @@ type memoryMessage struct {
opts broker.PublishOptions
}
func (m *memoryMessage) Ack() error {
return nil
}
func (m *memoryMessage) Body() []byte {
return m.body
}
func (m *memoryMessage) Header() metadata.Metadata {
return m.hdr
}
func (m *memoryMessage) Context() context.Context {
return m.ctx
}
func (m *memoryMessage) Topic() string {
return ""
}
func (m *memoryMessage) Unmarshal(dst interface{}, opts ...codec.Option) error {
return m.c.Unmarshal(m.body, dst)
}
type Subscriber struct {
ctx context.Context
exit chan bool
@@ -43,25 +68,38 @@ type Subscriber struct {
opts broker.SubscribeOptions
}
func (m *Broker) Options() broker.Options {
return m.opts
func (b *Broker) newCodec(ct string) (codec.Codec, error) {
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}
b.RLock()
c, ok := b.opts.Codecs[ct]
b.RUnlock()
if ok {
return c, nil
}
return nil, codec.ErrUnknownContentType
}
func (m *Broker) Address() string {
return m.addr
func (b *Broker) Options() broker.Options {
return b.opts
}
func (m *Broker) Connect(ctx context.Context) error {
func (b *Broker) Address() string {
return b.addr
}
func (b *Broker) Connect(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
m.Lock()
defer m.Unlock()
b.Lock()
defer b.Unlock()
if m.connected {
if b.connected {
return nil
}
@@ -75,65 +113,79 @@ func (m *Broker) Connect(ctx context.Context) error {
// set addr with port
addr = mnet.HostPort(addr, 10000+i)
m.addr = addr
m.connected = true
b.addr = addr
b.connected = true
return nil
}
func (m *Broker) Disconnect(ctx context.Context) error {
func (b *Broker) Disconnect(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
m.Lock()
defer m.Unlock()
b.Lock()
defer b.Unlock()
if !m.connected {
if !b.connected {
return nil
}
m.connected = false
b.connected = false
return nil
}
func (m *Broker) Init(opts ...broker.Option) error {
func (b *Broker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&m.opts)
o(&b.opts)
}
m.funcPublish = m.fnPublish
m.funcSubscribe = m.fnSubscribe
b.funcPublish = b.fnPublish
b.funcSubscribe = b.fnSubscribe
m.opts.Hooks.EachPrev(func(hook options.Hook) {
b.opts.Hooks.EachPrev(func(hook options.Hook) {
switch h := hook.(type) {
case broker.HookPublish:
m.funcPublish = h(m.funcPublish)
b.funcPublish = h(b.funcPublish)
case broker.HookSubscribe:
m.funcSubscribe = h(m.funcSubscribe)
b.funcSubscribe = h(b.funcSubscribe)
}
})
return nil
}
func (m *Broker) Publish(ctx context.Context, topic string, messages ...broker.Message) error {
return m.funcPublish(ctx, topic, messages...)
func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.PublishOption) (broker.Message, error) {
options := broker.NewPublishOptions(opts...)
m := &memoryMessage{ctx: ctx, hdr: hdr, opts: options}
c, err := b.newCodec(m.opts.ContentType)
if err == nil {
m.body, err = c.Marshal(body)
}
if err != nil {
return nil, err
}
return m, nil
}
func (m *Broker) fnPublish(ctx context.Context, topic string, messages ...broker.Message) error {
return m.publish(ctx, topic, messages...)
func (b *Broker) Publish(ctx context.Context, topic string, messages ...broker.Message) error {
return b.funcPublish(ctx, topic, messages...)
}
func (m *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
m.RLock()
if !m.connected {
m.RUnlock()
func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker.Message) error {
return b.publish(ctx, topic, messages...)
}
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
b.RLock()
if !b.connected {
b.RUnlock()
return broker.ErrNotConnected
}
m.RUnlock()
b.RUnlock()
select {
case <-ctx.Done():
@@ -141,9 +193,9 @@ func (m *Broker) publish(ctx context.Context, topic string, messages ...broker.M
default:
}
m.RLock()
subs, ok := m.subscribers[topic]
m.RUnlock()
b.RLock()
subs, ok := b.subscribers[topic]
b.RUnlock()
if !ok {
return nil
}
@@ -152,24 +204,28 @@ func (m *Broker) publish(ctx context.Context, topic string, messages ...broker.M
for _, sub := range subs {
switch s := sub.handler.(type) {
case broker.MessageHandler:
default:
if b.opts.Logger.V(logger.ErrorLevel) {
b.opts.Logger.Error(ctx, "broker handler error", broker.ErrInvalidHandler)
}
case func(broker.Message) error:
for _, message := range messages {
if err = s(message); err == nil && sub.opts.AutoAck {
err = message.Ack()
}
if err != nil {
if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, "broker handler error", err)
if b.opts.Logger.V(logger.ErrorLevel) {
b.opts.Logger.Error(ctx, "broker handler error", err)
}
}
}
case broker.MessagesHandler:
case func([]broker.Message) error:
if err = s(messages); err == nil && sub.opts.AutoAck {
for _, message := range messages {
err = message.Ack()
if err != nil {
if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, "broker handler error", err)
if b.opts.Logger.V(logger.ErrorLevel) {
b.opts.Logger.Error(ctx, "broker handler error", err)
}
}
}
@@ -180,17 +236,21 @@ func (m *Broker) publish(ctx context.Context, topic string, messages ...broker.M
return nil
}
func (m *Broker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return m.funcSubscribe(ctx, topic, handler, opts...)
func (b *Broker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return b.funcSubscribe(ctx, topic, handler, opts...)
}
func (m *Broker) fnSubscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
if err := broker.IsValidHandler(handler); err != nil {
return nil, err
}
b.RLock()
if !b.connected {
b.RUnlock()
return nil, broker.ErrNotConnected
}
m.RUnlock()
b.RUnlock()
sid, err := id.New()
if err != nil {
@@ -208,63 +268,47 @@ func (m *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
ctx: ctx,
}
m.Lock()
m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock()
b.Lock()
b.subscribers[topic] = append(b.subscribers[topic], sub)
b.Unlock()
go func() {
<-sub.exit
m.Lock()
newSubscribers := make([]*Subscriber, 0, len(m.subscribers)-1)
for _, sb := range m.subscribers[topic] {
b.Lock()
newSubscribers := make([]*Subscriber, 0, len(b.subscribers)-1)
for _, sb := range b.subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.subscribers[topic] = newSubscribers
m.Unlock()
b.subscribers[topic] = newSubscribers
b.Unlock()
}()
return sub, nil
}
func (m *Broker) String() string {
func (b *Broker) String() string {
return "memory"
}
func (m *Broker) Name() string {
return m.opts.Name
func (b *Broker) Name() string {
return b.opts.Name
}
func (m *Broker) Live() bool {
func (b *Broker) Live() bool {
return true
}
func (m *Broker) Ready() bool {
func (b *Broker) Ready() bool {
return true
}
func (m *Broker) Health() bool {
func (b *Broker) Health() bool {
return true
}
func (m *memoryMessage) Topic() string {
return m.topic
}
func (m *memoryMessage) Body() []byte {
return m.body
}
func (m *memoryMessage) Ack() error {
return nil
}
func (m *memoryMessage) Context() context.Context {
return m.ctx
}
func (m *Subscriber) Options() broker.SubscribeOptions {
return m.opts
}

View File

@@ -5,12 +5,23 @@ import (
"fmt"
"testing"
"go.uber.org/atomic"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
)
type hldr struct {
c atomic.Int64
}
func (h *hldr) Handler(m broker.Message) error {
h.c.Add(1)
return nil
}
func TestMemoryBroker(t *testing.T) {
b := NewBroker()
b := NewBroker(broker.Codec("application/octet-stream", codec.NewCodec()))
ctx := context.Background()
if err := b.Init(); err != nil {
@@ -22,28 +33,27 @@ func TestMemoryBroker(t *testing.T) {
}
topic := "test"
count := 10
count := int64(10)
fn := func(_ broker.Message) error {
return nil
}
h := &hldr{}
sub, err := b.Subscribe(ctx, topic, fn)
sub, err := b.Subscribe(ctx, topic, h.Handler)
if err != nil {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]*broker.Message, 0, count)
for i := 0; i < count; i++ {
message, err := b.NewMessage(ctx, metadata.Pairs()
Header: map[string]string{
metadata.HeaderTopic: topic,
"foo": "bar",
"id": fmt.Sprintf("%d", i),
},
for i := int64(0); i < count; i++ {
message, err := b.NewMessage(ctx,
metadata.Pairs(
"foo", "bar",
"id", fmt.Sprintf("%d", i),
),
[]byte(`"hello world"`),
broker.PublishContentType("application/octet-stream"),
)
if err != nil {
t.Fatal(err)
}
msgs = append(msgs, message)
if err := b.Publish(ctx, topic, message); err != nil {
t.Fatalf("Unexpected error publishing %d err: %v", i, err)
@@ -57,4 +67,8 @@ func TestMemoryBroker(t *testing.T) {
if err := b.Disconnect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
if h.c.Load() != count {
t.Fatal("invalid messages count received")
}
}

View File

@@ -118,10 +118,6 @@ func (m *noopMessage) Context() context.Context {
return m.ctx
}
func (m *noopMessage) Error() error {
return nil
}
func (m *noopMessage) Topic() string {
return ""
}

View File

@@ -133,8 +133,8 @@ func Addrs(addrs ...string) Option {
}
}
// Codecs sets the codec used for encoding/decoding messages
func Codecs(ct string, c codec.Codec) Option {
// Codec sets the codec used for encoding/decoding messages
func Codec(ct string, c codec.Codec) Option {
return func(o *Options) {
o.Codecs[ct] = c
}

View File

@@ -1,6 +1,4 @@
//go:build ignore
package server
package broker
import (
"fmt"
@@ -10,7 +8,8 @@ import (
)
const (
subSig = "func(context.Context, interface{}) error"
messageSig = "func(broker.Message) error"
messagesSig = "func([]broker.Message) error"
)
// Precompute the reflect type for error. Can't use error directly
@@ -33,31 +32,31 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
return isExported(t.Name()) || t.PkgPath() == ""
}
// ValidateSubscriber func signature
func ValidateSubscriber(sub Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
// IsValidHandler func signature
func IsValidHandler(sub interface{}) error {
typ := reflect.TypeOf(sub)
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
case 1:
argType = typ.In(0)
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), messageSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
name, typ.NumOut(), subSig)
name, typ.NumOut(), messageSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
default:
hdlr := reflect.ValueOf(sub.Subscriber())
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
@@ -67,7 +66,7 @@ func ValidateSubscriber(sub Subscriber) error {
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
name, method.Name, method.Type.NumIn(), messageSig)
}
if !isExportedOrBuiltinType(argType) {
@@ -76,7 +75,7 @@ func ValidateSubscriber(sub Subscriber) error {
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
name, method.Name, method.Type.NumOut(), messageSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())