@@ -5,6 +5,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"go.unistack.org/micro/v4/broker"
|
||||
"go.unistack.org/micro/v4/codec"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
"go.unistack.org/micro/v4/options"
|
||||
@@ -14,44 +15,43 @@ import (
|
||||
"go.unistack.org/micro/v4/util/rand"
|
||||
)
|
||||
|
||||
type memoryBroker struct {
|
||||
funcPublish broker.FuncPublish
|
||||
funcBatchPublish broker.FuncBatchPublish
|
||||
funcSubscribe broker.FuncSubscribe
|
||||
funcBatchSubscribe broker.FuncBatchSubscribe
|
||||
subscribers map[string][]*memorySubscriber
|
||||
addr string
|
||||
opts broker.Options
|
||||
type Broker struct {
|
||||
funcPublish broker.FuncPublish
|
||||
funcSubscribe broker.FuncSubscribe
|
||||
subscribers map[string][]*Subscriber
|
||||
addr string
|
||||
opts broker.Options
|
||||
sync.RWMutex
|
||||
connected bool
|
||||
}
|
||||
|
||||
type memoryEvent struct {
|
||||
err error
|
||||
message interface{}
|
||||
type memoryMessage struct {
|
||||
c codec.Codec
|
||||
topic string
|
||||
ctx context.Context
|
||||
body []byte
|
||||
hdr metadata.Metadata
|
||||
opts broker.PublishOptions
|
||||
}
|
||||
|
||||
type Subscriber struct {
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler interface{}
|
||||
id string
|
||||
topic string
|
||||
opts broker.Options
|
||||
opts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
type memorySubscriber struct {
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler broker.Handler
|
||||
batchhandler broker.BatchHandler
|
||||
id string
|
||||
topic string
|
||||
opts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Options() broker.Options {
|
||||
func (m *Broker) Options() broker.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Address() string {
|
||||
func (m *Broker) Address() string {
|
||||
return m.addr
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Connect(ctx context.Context) error {
|
||||
func (m *Broker) Connect(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -81,7 +81,7 @@ func (m *memoryBroker) Connect(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Disconnect(ctx context.Context) error {
|
||||
func (m *Broker) Disconnect(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -99,50 +99,35 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Init(opts ...broker.Option) error {
|
||||
func (m *Broker) Init(opts ...broker.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
}
|
||||
|
||||
m.funcPublish = m.fnPublish
|
||||
m.funcBatchPublish = m.fnBatchPublish
|
||||
m.funcSubscribe = m.fnSubscribe
|
||||
m.funcBatchSubscribe = m.fnBatchSubscribe
|
||||
|
||||
m.opts.Hooks.EachPrev(func(hook options.Hook) {
|
||||
switch h := hook.(type) {
|
||||
case broker.HookPublish:
|
||||
m.funcPublish = h(m.funcPublish)
|
||||
case broker.HookBatchPublish:
|
||||
m.funcBatchPublish = h(m.funcBatchPublish)
|
||||
case broker.HookSubscribe:
|
||||
m.funcSubscribe = h(m.funcSubscribe)
|
||||
case broker.HookBatchSubscribe:
|
||||
m.funcBatchSubscribe = h(m.funcBatchSubscribe)
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||
return m.funcPublish(ctx, topic, msg, opts...)
|
||||
func (m *Broker) Publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
||||
return m.funcPublish(ctx, topic, messages...)
|
||||
}
|
||||
|
||||
func (m *memoryBroker) fnPublish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||
msg.Header.Set(metadata.HeaderTopic, topic)
|
||||
return m.publish(ctx, []*broker.Message{msg}, opts...)
|
||||
func (m *Broker) fnPublish(ctx context.Context, topic string, messages ...broker.Message) error {
|
||||
return m.publish(ctx, topic, messages...)
|
||||
}
|
||||
|
||||
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
return m.funcBatchPublish(ctx, msgs, opts...)
|
||||
}
|
||||
|
||||
func (m *memoryBroker) fnBatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
return m.publish(ctx, msgs, opts...)
|
||||
}
|
||||
|
||||
func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
func (m *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
@@ -150,79 +135,41 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
var err error
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
}
|
||||
|
||||
msgTopicMap := make(map[string]broker.Events)
|
||||
for _, v := range msgs {
|
||||
p := &memoryEvent{opts: m.opts}
|
||||
m.RLock()
|
||||
subs, ok := m.subscribers[topic]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if m.opts.Codec == nil || options.BodyOnly {
|
||||
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
|
||||
p.message = v.Body
|
||||
} else {
|
||||
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
|
||||
p.message, err = m.opts.Codec.Marshal(v)
|
||||
var err error
|
||||
|
||||
for _, sub := range subs {
|
||||
switch s := sub.handler.(type) {
|
||||
case broker.MessageHandler:
|
||||
for _, message := range messages {
|
||||
if err = s(message); err == nil && sub.opts.AutoAck {
|
||||
err = message.Ack()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
|
||||
}
|
||||
|
||||
beh := m.opts.BatchErrorHandler
|
||||
eh := m.opts.ErrorHandler
|
||||
|
||||
for t, ms := range msgTopicMap {
|
||||
m.RLock()
|
||||
subs, ok := m.subscribers[t]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sub := range subs {
|
||||
if sub.opts.BatchErrorHandler != nil {
|
||||
beh = sub.opts.BatchErrorHandler
|
||||
}
|
||||
if sub.opts.ErrorHandler != nil {
|
||||
eh = sub.opts.ErrorHandler
|
||||
}
|
||||
|
||||
switch {
|
||||
// batch processing
|
||||
case sub.batchhandler != nil:
|
||||
if err = sub.batchhandler(ms); err != nil {
|
||||
ms.SetError(err)
|
||||
if beh != nil {
|
||||
_ = beh(ms)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err = ms.Ack(); err != nil {
|
||||
m.opts.Logger.Error(m.opts.Context, "broker ack error", err)
|
||||
}
|
||||
if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, "broker handler error", err)
|
||||
}
|
||||
// single processing
|
||||
case sub.handler != nil:
|
||||
for _, p := range ms {
|
||||
if err = sub.handler(p); err != nil {
|
||||
p.SetError(err)
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, "broker handler error", err)
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
m.opts.Logger.Error(m.opts.Context, "broker ack error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
case broker.MessagesHandler:
|
||||
if err = s(messages); err == nil && sub.opts.AutoAck {
|
||||
for _, message := range messages {
|
||||
err = message.Ack()
|
||||
if err != nil {
|
||||
if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, "broker handler error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -233,60 +180,11 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
return m.funcBatchSubscribe(ctx, topic, handler, opts...)
|
||||
}
|
||||
|
||||
func (m *memoryBroker) fnBatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return nil, broker.ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
sid, err := id.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
options := broker.NewSubscribeOptions(opts...)
|
||||
|
||||
sub := &memorySubscriber{
|
||||
exit: make(chan bool, 1),
|
||||
id: sid,
|
||||
topic: topic,
|
||||
batchhandler: handler,
|
||||
opts: options,
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.subscribers[topic] = append(m.subscribers[topic], sub)
|
||||
m.Unlock()
|
||||
|
||||
go func() {
|
||||
<-sub.exit
|
||||
m.Lock()
|
||||
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
|
||||
for _, sb := range m.subscribers[topic] {
|
||||
if sb.id == sub.id {
|
||||
continue
|
||||
}
|
||||
newSubscribers = append(newSubscribers, sb)
|
||||
}
|
||||
m.subscribers[topic] = newSubscribers
|
||||
m.Unlock()
|
||||
}()
|
||||
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
func (m *Broker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
return m.funcSubscribe(ctx, topic, handler, opts...)
|
||||
}
|
||||
|
||||
func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
func (m *Broker) fnSubscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
@@ -301,7 +199,7 @@ func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler br
|
||||
|
||||
options := broker.NewSubscribeOptions(opts...)
|
||||
|
||||
sub := &memorySubscriber{
|
||||
sub := &Subscriber{
|
||||
exit: make(chan bool, 1),
|
||||
id: sid,
|
||||
topic: topic,
|
||||
@@ -317,7 +215,7 @@ func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler br
|
||||
go func() {
|
||||
<-sub.exit
|
||||
m.Lock()
|
||||
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
|
||||
newSubscribers := make([]*Subscriber, 0, len(m.subscribers)-1)
|
||||
for _, sb := range m.subscribers[topic] {
|
||||
if sb.id == sub.id {
|
||||
continue
|
||||
@@ -331,81 +229,59 @@ func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler br
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) String() string {
|
||||
func (m *Broker) String() string {
|
||||
return "memory"
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Name() string {
|
||||
func (m *Broker) Name() string {
|
||||
return m.opts.Name
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Live() bool {
|
||||
func (m *Broker) Live() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Ready() bool {
|
||||
func (m *Broker) Ready() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Health() bool {
|
||||
func (m *Broker) Health() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *memoryEvent) Topic() string {
|
||||
func (m *memoryMessage) Topic() string {
|
||||
return m.topic
|
||||
}
|
||||
|
||||
func (m *memoryEvent) Message() *broker.Message {
|
||||
switch v := m.message.(type) {
|
||||
case *broker.Message:
|
||||
return v
|
||||
case []byte:
|
||||
msg := &broker.Message{}
|
||||
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
|
||||
if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return msg
|
||||
}
|
||||
func (m *memoryMessage) Body() []byte {
|
||||
return m.body
|
||||
}
|
||||
|
||||
func (m *memoryMessage) Ack() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryEvent) Ack() error {
|
||||
return nil
|
||||
func (m *memoryMessage) Context() context.Context {
|
||||
return m.ctx
|
||||
}
|
||||
|
||||
func (m *memoryEvent) Error() error {
|
||||
return m.err
|
||||
}
|
||||
|
||||
func (m *memoryEvent) SetError(err error) {
|
||||
m.err = err
|
||||
}
|
||||
|
||||
func (m *memoryEvent) Context() context.Context {
|
||||
return m.opts.Context
|
||||
}
|
||||
|
||||
func (m *memorySubscriber) Options() broker.SubscribeOptions {
|
||||
func (m *Subscriber) Options() broker.SubscribeOptions {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *memorySubscriber) Topic() string {
|
||||
func (m *Subscriber) Topic() string {
|
||||
return m.topic
|
||||
}
|
||||
|
||||
func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
|
||||
func (m *Subscriber) Unsubscribe(ctx context.Context) error {
|
||||
m.exit <- true
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewBroker return new memory broker
|
||||
func NewBroker(opts ...broker.Option) broker.Broker {
|
||||
return &memoryBroker{
|
||||
return &Broker{
|
||||
opts: broker.NewOptions(opts...),
|
||||
subscribers: make(map[string][]*memorySubscriber),
|
||||
subscribers: make(map[string][]*Subscriber),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,56 +9,6 @@ import (
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
)
|
||||
|
||||
func TestMemoryBatchBroker(t *testing.T) {
|
||||
b := NewBroker()
|
||||
ctx := context.Background()
|
||||
|
||||
if err := b.Init(); err != nil {
|
||||
t.Fatalf("Unexpected init error %v", err)
|
||||
}
|
||||
|
||||
if err := b.Connect(ctx); err != nil {
|
||||
t.Fatalf("Unexpected connect error %v", err)
|
||||
}
|
||||
|
||||
topic := "test"
|
||||
count := 10
|
||||
|
||||
fn := func(evts broker.Events) error {
|
||||
return evts.Ack()
|
||||
}
|
||||
|
||||
sub, err := b.BatchSubscribe(ctx, topic, fn)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error subscribing %v", err)
|
||||
}
|
||||
|
||||
msgs := make([]*broker.Message, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &broker.Message{
|
||||
Header: map[string]string{
|
||||
metadata.HeaderTopic: topic,
|
||||
"foo": "bar",
|
||||
"id": fmt.Sprintf("%d", i),
|
||||
},
|
||||
Body: []byte(`"hello world"`),
|
||||
}
|
||||
msgs = append(msgs, message)
|
||||
}
|
||||
|
||||
if err := b.BatchPublish(ctx, msgs); err != nil {
|
||||
t.Fatalf("Unexpected error publishing %v", err)
|
||||
}
|
||||
|
||||
if err := sub.Unsubscribe(ctx); err != nil {
|
||||
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
|
||||
}
|
||||
|
||||
if err := b.Disconnect(ctx); err != nil {
|
||||
t.Fatalf("Unexpected connect error %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryBroker(t *testing.T) {
|
||||
b := NewBroker()
|
||||
ctx := context.Background()
|
||||
@@ -74,7 +24,7 @@ func TestMemoryBroker(t *testing.T) {
|
||||
topic := "test"
|
||||
count := 10
|
||||
|
||||
fn := func(_ broker.Event) error {
|
||||
fn := func(_ broker.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -85,13 +35,13 @@ func TestMemoryBroker(t *testing.T) {
|
||||
|
||||
msgs := make([]*broker.Message, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &broker.Message{
|
||||
message, err := b.NewMessage(ctx, metadata.Pairs()
|
||||
Header: map[string]string{
|
||||
metadata.HeaderTopic: topic,
|
||||
"foo": "bar",
|
||||
"id": fmt.Sprintf("%d", i),
|
||||
},
|
||||
Body: []byte(`"hello world"`),
|
||||
[]byte(`"hello world"`),
|
||||
}
|
||||
msgs = append(msgs, message)
|
||||
|
||||
@@ -100,10 +50,6 @@ func TestMemoryBroker(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := b.BatchPublish(ctx, msgs); err != nil {
|
||||
t.Fatalf("Unexpected error publishing %v", err)
|
||||
}
|
||||
|
||||
if err := sub.Unsubscribe(ctx); err != nil {
|
||||
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user