From d3bb2f7236e31bd12486f1bd944a0bd694d1603e Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Mon, 4 Mar 2024 01:05:40 +0300 Subject: [PATCH] broker/noop: add initial implementation Signed-off-by: Vasiliy Tolstov --- broker/{ => memory}/memory.go | 65 +++++++++++++---------- broker/{ => memory}/memory_test.go | 13 ++--- broker/noop.go | 82 ++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 32 deletions(-) rename broker/{ => memory}/memory.go (80%) rename broker/{ => memory}/memory_test.go (88%) create mode 100644 broker/noop.go diff --git a/broker/memory.go b/broker/memory/memory.go similarity index 80% rename from broker/memory.go rename to broker/memory/memory.go index cd723118..d5609e1e 100644 --- a/broker/memory.go +++ b/broker/memory/memory.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" maddr "go.unistack.org/micro/v3/util/addr" @@ -15,7 +16,7 @@ import ( type memoryBroker struct { subscribers map[string][]*memorySubscriber addr string - opts Options + opts broker.Options sync.RWMutex connected bool } @@ -24,20 +25,20 @@ type memoryEvent struct { err error message interface{} topic string - opts Options + opts broker.Options } type memorySubscriber struct { ctx context.Context exit chan bool - handler Handler - batchhandler BatchHandler + handler broker.Handler + batchhandler broker.BatchHandler id string topic string - opts SubscribeOptions + opts broker.SubscribeOptions } -func (m *memoryBroker) Options() Options { +func (m *memoryBroker) Options() broker.Options { return m.opts } @@ -46,6 +47,12 @@ func (m *memoryBroker) Address() string { } func (m *memoryBroker) Connect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + m.Lock() defer m.Unlock() @@ -70,6 +77,12 @@ func (m *memoryBroker) Connect(ctx context.Context) error { } func (m *memoryBroker) Disconnect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + m.Lock() defer m.Unlock() @@ -81,27 +94,27 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error { return nil } -func (m *memoryBroker) Init(opts ...Option) error { +func (m *memoryBroker) Init(opts ...broker.Option) error { for _, o := range opts { o(&m.opts) } return nil } -func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { +func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { msg.Header.Set(metadata.HeaderTopic, topic) - return m.publish(ctx, []*Message{msg}, opts...) + return m.publish(ctx, []*broker.Message{msg}, opts...) } -func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { +func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { return m.publish(ctx, msgs, opts...) } -func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { +func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { m.RLock() if !m.connected { m.RUnlock() - return ErrNotConnected + return broker.ErrNotConnected } m.RUnlock() @@ -111,9 +124,9 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub case <-ctx.Done(): return ctx.Err() default: - options := NewPublishOptions(opts...) + options := broker.NewPublishOptions(opts...) - msgTopicMap := make(map[string]Events) + msgTopicMap := make(map[string]broker.Events) for _, v := range msgs { p := &memoryEvent{opts: m.opts} @@ -188,11 +201,11 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub return nil } -func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { +func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() - return nil, ErrNotConnected + return nil, broker.ErrNotConnected } m.RUnlock() @@ -201,7 +214,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler return nil, err } - options := NewSubscribeOptions(opts...) + options := broker.NewSubscribeOptions(opts...) sub := &memorySubscriber{ exit: make(chan bool, 1), @@ -233,11 +246,11 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler return sub, nil } -func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { +func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() - return nil, ErrNotConnected + return nil, broker.ErrNotConnected } m.RUnlock() @@ -246,7 +259,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand return nil, err } - options := NewSubscribeOptions(opts...) + options := broker.NewSubscribeOptions(opts...) sub := &memorySubscriber{ exit: make(chan bool, 1), @@ -290,12 +303,12 @@ func (m *memoryEvent) Topic() string { return m.topic } -func (m *memoryEvent) Message() *Message { +func (m *memoryEvent) Message() *broker.Message { switch v := m.message.(type) { - case *Message: + case *broker.Message: return v case []byte: - msg := &Message{} + msg := &broker.Message{} if err := m.opts.Codec.Unmarshal(v, msg); err != nil { if m.opts.Logger.V(logger.ErrorLevel) { m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err) @@ -320,7 +333,7 @@ func (m *memoryEvent) SetError(err error) { m.err = err } -func (m *memorySubscriber) Options() SubscribeOptions { +func (m *memorySubscriber) Options() broker.SubscribeOptions { return m.opts } @@ -334,9 +347,9 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { } // NewBroker return new memory broker -func NewBroker(opts ...Option) Broker { +func NewBroker(opts ...broker.Option) broker.Broker { return &memoryBroker{ - opts: NewOptions(opts...), + opts: broker.NewOptions(opts...), subscribers: make(map[string][]*memorySubscriber), } } diff --git a/broker/memory_test.go b/broker/memory/memory_test.go similarity index 88% rename from broker/memory_test.go rename to broker/memory/memory_test.go index af949ede..e558ef10 100644 --- a/broker/memory_test.go +++ b/broker/memory/memory_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/metadata" ) @@ -19,7 +20,7 @@ func TestMemoryBatchBroker(t *testing.T) { topic := "test" count := 10 - fn := func(evts Events) error { + fn := func(evts broker.Events) error { return evts.Ack() } @@ -28,9 +29,9 @@ func TestMemoryBatchBroker(t *testing.T) { t.Fatalf("Unexpected error subscribing %v", err) } - msgs := make([]*Message, 0, count) + msgs := make([]*broker.Message, 0, count) for i := 0; i < count; i++ { - message := &Message{ + message := &broker.Message{ Header: map[string]string{ metadata.HeaderTopic: topic, "foo": "bar", @@ -65,7 +66,7 @@ func TestMemoryBroker(t *testing.T) { topic := "test" count := 10 - fn := func(p Event) error { + fn := func(p broker.Event) error { return nil } @@ -74,9 +75,9 @@ func TestMemoryBroker(t *testing.T) { t.Fatalf("Unexpected error subscribing %v", err) } - msgs := make([]*Message, 0, count) + msgs := make([]*broker.Message, 0, count) for i := 0; i < count; i++ { - message := &Message{ + message := &broker.Message{ Header: map[string]string{ metadata.HeaderTopic: topic, "foo": "bar", diff --git a/broker/noop.go b/broker/noop.go new file mode 100644 index 00000000..29142673 --- /dev/null +++ b/broker/noop.go @@ -0,0 +1,82 @@ +package broker + +import ( + "context" + "strings" +) + +type NoopBroker struct { + opts Options +} + +func NewBroker(opts ...Option) *NoopBroker { + b := &NoopBroker{opts: NewOptions(opts...)} + return b +} + +func (b *NoopBroker) Name() string { + return b.opts.Name +} + +func (b *NoopBroker) String() string { + return "noop" +} + +func (b *NoopBroker) Options() Options { + return b.opts +} + +func (b *NoopBroker) Init(opts ...Option) error { + for _, opt := range opts { + opt(&b.opts) + } + return nil +} + +func (b *NoopBroker) Connect(_ context.Context) error { + return nil +} + +func (b *NoopBroker) Disconnect(_ context.Context) error { + return nil +} + +func (b *NoopBroker) Address() string { + return strings.Join(b.opts.Addrs, ",") +} + +func (b *NoopBroker) BatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error { + return nil +} + +func (b *NoopBroker) Publish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error { + return nil +} + +type NoopSubscriber struct { + ctx context.Context + topic string + handler Handler + batchHandler BatchHandler + opts SubscribeOptions +} + +func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { + return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil +} + +func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { + return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil +} + +func (s *NoopSubscriber) Options() SubscribeOptions { + return s.opts +} + +func (s *NoopSubscriber) Topic() string { + return s.topic +} + +func (s *NoopSubscriber) Unsubscribe(ctx context.Context) error { + return nil +}