broker noop implementation #307

Merged
vtolstov merged 2 commits from noops into v3 2024-03-04 01:15:17 +03:00
9 changed files with 161 additions and 133 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"sync"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
maddr "go.unistack.org/micro/v3/util/addr"
@ -15,7 +16,7 @@ import (
type memoryBroker struct {
subscribers map[string][]*memorySubscriber
addr string
opts Options
opts broker.Options
sync.RWMutex
connected bool
}
@ -24,20 +25,20 @@ type memoryEvent struct {
err error
message interface{}
topic string
opts Options
opts broker.Options
}
type memorySubscriber struct {
ctx context.Context
exit chan bool
handler Handler
batchhandler BatchHandler
handler broker.Handler
batchhandler broker.BatchHandler
id string
topic string
opts SubscribeOptions
opts broker.SubscribeOptions
}
func (m *memoryBroker) Options() Options {
func (m *memoryBroker) Options() broker.Options {
return m.opts
}
@ -46,6 +47,12 @@ func (m *memoryBroker) Address() string {
}
func (m *memoryBroker) Connect(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
m.Lock()
defer m.Unlock()
@ -70,6 +77,12 @@ func (m *memoryBroker) Connect(ctx context.Context) error {
}
func (m *memoryBroker) Disconnect(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
m.Lock()
defer m.Unlock()
@ -81,27 +94,27 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error {
return nil
}
func (m *memoryBroker) Init(opts ...Option) error {
func (m *memoryBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
msg.Header.Set(metadata.HeaderTopic, topic)
return m.publish(ctx, []*Message{msg}, opts...)
return m.publish(ctx, []*broker.Message{msg}, opts...)
}
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
return m.publish(ctx, msgs, opts...)
}
func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return ErrNotConnected
return broker.ErrNotConnected
}
m.RUnlock()
@ -111,9 +124,9 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
case <-ctx.Done():
return ctx.Err()
default:
options := NewPublishOptions(opts...)
options := broker.NewPublishOptions(opts...)
msgTopicMap := make(map[string]Events)
msgTopicMap := make(map[string]broker.Events)
for _, v := range msgs {
p := &memoryEvent{opts: m.opts}
@ -188,11 +201,11 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
return nil
}
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
return nil, broker.ErrNotConnected
}
m.RUnlock()
@ -201,7 +214,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
return nil, err
}
options := NewSubscribeOptions(opts...)
options := broker.NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
@ -233,11 +246,11 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
return sub, nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
return nil, broker.ErrNotConnected
}
m.RUnlock()
@ -246,7 +259,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
return nil, err
}
options := NewSubscribeOptions(opts...)
options := broker.NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
@ -290,12 +303,12 @@ func (m *memoryEvent) Topic() string {
return m.topic
}
func (m *memoryEvent) Message() *Message {
func (m *memoryEvent) Message() *broker.Message {
switch v := m.message.(type) {
case *Message:
case *broker.Message:
return v
case []byte:
msg := &Message{}
msg := &broker.Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
@ -320,7 +333,7 @@ func (m *memoryEvent) SetError(err error) {
m.err = err
}
func (m *memorySubscriber) Options() SubscribeOptions {
func (m *memorySubscriber) Options() broker.SubscribeOptions {
return m.opts
}
@ -334,9 +347,9 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
}
// NewBroker return new memory broker
func NewBroker(opts ...Option) Broker {
func NewBroker(opts ...broker.Option) broker.Broker {
return &memoryBroker{
opts: NewOptions(opts...),
opts: broker.NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber),
}
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"testing"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/metadata"
)
@ -19,7 +20,7 @@ func TestMemoryBatchBroker(t *testing.T) {
topic := "test"
count := 10
fn := func(evts Events) error {
fn := func(evts broker.Events) error {
return evts.Ack()
}
@ -28,9 +29,9 @@ func TestMemoryBatchBroker(t *testing.T) {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]*Message, 0, count)
msgs := make([]*broker.Message, 0, count)
for i := 0; i < count; i++ {
message := &Message{
message := &broker.Message{
Header: map[string]string{
metadata.HeaderTopic: topic,
"foo": "bar",
@ -65,7 +66,7 @@ func TestMemoryBroker(t *testing.T) {
topic := "test"
count := 10
fn := func(p Event) error {
fn := func(p broker.Event) error {
return nil
}
@ -74,9 +75,9 @@ func TestMemoryBroker(t *testing.T) {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]*Message, 0, count)
msgs := make([]*broker.Message, 0, count)
for i := 0; i < count; i++ {
message := &Message{
message := &broker.Message{
Header: map[string]string{
metadata.HeaderTopic: topic,
"foo": "bar",

82
broker/noop.go Normal file
View File

@ -0,0 +1,82 @@
package broker
import (
"context"
"strings"
)
type NoopBroker struct {
opts Options
}
func NewBroker(opts ...Option) *NoopBroker {
b := &NoopBroker{opts: NewOptions(opts...)}
return b
}
func (b *NoopBroker) Name() string {
return b.opts.Name
}
func (b *NoopBroker) String() string {
return "noop"
}
func (b *NoopBroker) Options() Options {
return b.opts
}
func (b *NoopBroker) Init(opts ...Option) error {
for _, opt := range opts {
opt(&b.opts)
}
return nil
}
func (b *NoopBroker) Connect(_ context.Context) error {
return nil
}
func (b *NoopBroker) Disconnect(_ context.Context) error {
return nil
}
func (b *NoopBroker) Address() string {
return strings.Join(b.opts.Addrs, ",")
}
func (b *NoopBroker) BatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error {
return nil
}
func (b *NoopBroker) Publish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error {
return nil
}
type NoopSubscriber struct {
ctx context.Context
topic string
handler Handler
batchHandler BatchHandler
opts SubscribeOptions
}
func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil
}
func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil
}
func (s *NoopSubscriber) Options() SubscribeOptions {
return s.opts
}
func (s *NoopSubscriber) Topic() string {
return s.topic
}
func (s *NoopSubscriber) Unsubscribe(ctx context.Context) error {
return nil
}

View File

@ -57,7 +57,9 @@ type Logger interface {
Log(ctx context.Context, level Level, args ...interface{})
// Logf logs message with needed level
Logf(ctx context.Context, level Level, msg string, args ...interface{})
// String returns the name of logger
// Name returns broker instance name
Name() string
// String returns the type of logger
String() string
}

View File

@ -13,11 +13,15 @@ func NewLogger(opts ...Option) Logger {
return &noopLogger{opts: options}
}
func (l *noopLogger) V(lvl Level) bool {
func (l *noopLogger) V(_ Level) bool {
return false
}
func (l *noopLogger) Level(lvl Level) {
func (l *noopLogger) Level(_ Level) {
}
func (l *noopLogger) Name() string {
return l.opts.Name
}
func (l *noopLogger) Init(opts ...Option) error {
@ -35,7 +39,7 @@ func (l *noopLogger) Clone(opts ...Option) Logger {
return nl
}
func (l *noopLogger) Fields(attrs ...interface{}) Logger {
func (l *noopLogger) Fields(_ ...interface{}) Logger {
return l
}

View File

@ -368,6 +368,10 @@ func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}
_ = s.slog.Handler().Handle(ctx, r)
}
func (s *slogLogger) Name() string {
return s.opts.Name
}
func (s *slogLogger) String() string {
return "slog"
}

View File

@ -7,15 +7,11 @@ import (
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/config"
"go.unistack.org/micro/v3/flow"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/resolver"
"go.unistack.org/micro/v3/router"
"go.unistack.org/micro/v3/selector"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/store"
"go.unistack.org/micro/v3/tracer"
@ -380,97 +376,15 @@ func (s *service) Run() error {
return s.Stop()
}
func getNameIndex(n string, ifaces interface{}) int {
type namer interface {
type Namer interface {
Name() string
}
}
switch vt := ifaces.(type) {
case []broker.Broker:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
func getNameIndex[T Namer](n string, ifaces []T) int {
for idx, iface := range ifaces {
if iface.Name() == n {
return idx
}
}
case []client.Client:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []codec.Codec:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []config.Config:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []flow.Flow:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []logger.Logger:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []meter.Meter:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []register.Register:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []resolver.Resolver:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []router.Router:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []selector.Selector:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []server.Server:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []store.Store:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
case []tracer.Tracer:
for idx, iface := range vt {
if nm, ok := iface.(namer); ok && nm.Name() == n {
return idx
}
}
}
return 0
}

View File

@ -41,6 +41,14 @@ func (ti *testItem) Name() string {
return ti.name
}
func Test_getNameIndex(t *testing.T) {
items := []*testItem{{name: "test1"}, {name: "test2"}}
idx := getNameIndex("test2", items)
if items[idx].Name() != "test2" {
t.Fatal("getNameIndex wrong")
}
}
func TestRegisterHandler(t *testing.T) {
type args struct {
s server.Server

View File

@ -35,8 +35,8 @@ func TestUnmarshalJSON(t *testing.T) {
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
if err != nil {
t.Fatal(err)
} else if v.TTL != 31536000000000000 {
t.Fatalf("invalid duration %v != 31536000000000000", v.TTL)
} else if v.TTL != 31622400000000000 {
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
}
}
@ -55,7 +55,7 @@ func TestParseDuration(t *testing.T) {
if err != nil {
t.Fatalf("ParseDuration error: %v", err)
}
if td.String() != "8760h0m0s" {
t.Fatalf("ParseDuration 1y != 8760h0m0s : %s", td.String())
if td.String() != "8784h0m0s" {
t.Fatalf("ParseDuration 1y != 8784h0m0s : %s", td.String())
}
}