diff --git a/broker/memory.go b/broker/memory/memory.go similarity index 80% rename from broker/memory.go rename to broker/memory/memory.go index cd723118..d5609e1e 100644 --- a/broker/memory.go +++ b/broker/memory/memory.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" maddr "go.unistack.org/micro/v3/util/addr" @@ -15,7 +16,7 @@ import ( type memoryBroker struct { subscribers map[string][]*memorySubscriber addr string - opts Options + opts broker.Options sync.RWMutex connected bool } @@ -24,20 +25,20 @@ type memoryEvent struct { err error message interface{} topic string - opts Options + opts broker.Options } type memorySubscriber struct { ctx context.Context exit chan bool - handler Handler - batchhandler BatchHandler + handler broker.Handler + batchhandler broker.BatchHandler id string topic string - opts SubscribeOptions + opts broker.SubscribeOptions } -func (m *memoryBroker) Options() Options { +func (m *memoryBroker) Options() broker.Options { return m.opts } @@ -46,6 +47,12 @@ func (m *memoryBroker) Address() string { } func (m *memoryBroker) Connect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + m.Lock() defer m.Unlock() @@ -70,6 +77,12 @@ func (m *memoryBroker) Connect(ctx context.Context) error { } func (m *memoryBroker) Disconnect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + m.Lock() defer m.Unlock() @@ -81,27 +94,27 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error { return nil } -func (m *memoryBroker) Init(opts ...Option) error { +func (m *memoryBroker) Init(opts ...broker.Option) error { for _, o := range opts { o(&m.opts) } return nil } -func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { +func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { msg.Header.Set(metadata.HeaderTopic, topic) - return m.publish(ctx, []*Message{msg}, opts...) + return m.publish(ctx, []*broker.Message{msg}, opts...) } -func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { +func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { return m.publish(ctx, msgs, opts...) } -func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { +func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { m.RLock() if !m.connected { m.RUnlock() - return ErrNotConnected + return broker.ErrNotConnected } m.RUnlock() @@ -111,9 +124,9 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub case <-ctx.Done(): return ctx.Err() default: - options := NewPublishOptions(opts...) + options := broker.NewPublishOptions(opts...) - msgTopicMap := make(map[string]Events) + msgTopicMap := make(map[string]broker.Events) for _, v := range msgs { p := &memoryEvent{opts: m.opts} @@ -188,11 +201,11 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub return nil } -func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { +func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() - return nil, ErrNotConnected + return nil, broker.ErrNotConnected } m.RUnlock() @@ -201,7 +214,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler return nil, err } - options := NewSubscribeOptions(opts...) + options := broker.NewSubscribeOptions(opts...) sub := &memorySubscriber{ exit: make(chan bool, 1), @@ -233,11 +246,11 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler return sub, nil } -func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { +func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() - return nil, ErrNotConnected + return nil, broker.ErrNotConnected } m.RUnlock() @@ -246,7 +259,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand return nil, err } - options := NewSubscribeOptions(opts...) + options := broker.NewSubscribeOptions(opts...) sub := &memorySubscriber{ exit: make(chan bool, 1), @@ -290,12 +303,12 @@ func (m *memoryEvent) Topic() string { return m.topic } -func (m *memoryEvent) Message() *Message { +func (m *memoryEvent) Message() *broker.Message { switch v := m.message.(type) { - case *Message: + case *broker.Message: return v case []byte: - msg := &Message{} + msg := &broker.Message{} if err := m.opts.Codec.Unmarshal(v, msg); err != nil { if m.opts.Logger.V(logger.ErrorLevel) { m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err) @@ -320,7 +333,7 @@ func (m *memoryEvent) SetError(err error) { m.err = err } -func (m *memorySubscriber) Options() SubscribeOptions { +func (m *memorySubscriber) Options() broker.SubscribeOptions { return m.opts } @@ -334,9 +347,9 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { } // NewBroker return new memory broker -func NewBroker(opts ...Option) Broker { +func NewBroker(opts ...broker.Option) broker.Broker { return &memoryBroker{ - opts: NewOptions(opts...), + opts: broker.NewOptions(opts...), subscribers: make(map[string][]*memorySubscriber), } } diff --git a/broker/memory_test.go b/broker/memory/memory_test.go similarity index 88% rename from broker/memory_test.go rename to broker/memory/memory_test.go index af949ede..e558ef10 100644 --- a/broker/memory_test.go +++ b/broker/memory/memory_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/metadata" ) @@ -19,7 +20,7 @@ func TestMemoryBatchBroker(t *testing.T) { topic := "test" count := 10 - fn := func(evts Events) error { + fn := func(evts broker.Events) error { return evts.Ack() } @@ -28,9 +29,9 @@ func TestMemoryBatchBroker(t *testing.T) { t.Fatalf("Unexpected error subscribing %v", err) } - msgs := make([]*Message, 0, count) + msgs := make([]*broker.Message, 0, count) for i := 0; i < count; i++ { - message := &Message{ + message := &broker.Message{ Header: map[string]string{ metadata.HeaderTopic: topic, "foo": "bar", @@ -65,7 +66,7 @@ func TestMemoryBroker(t *testing.T) { topic := "test" count := 10 - fn := func(p Event) error { + fn := func(p broker.Event) error { return nil } @@ -74,9 +75,9 @@ func TestMemoryBroker(t *testing.T) { t.Fatalf("Unexpected error subscribing %v", err) } - msgs := make([]*Message, 0, count) + msgs := make([]*broker.Message, 0, count) for i := 0; i < count; i++ { - message := &Message{ + message := &broker.Message{ Header: map[string]string{ metadata.HeaderTopic: topic, "foo": "bar", diff --git a/broker/noop.go b/broker/noop.go new file mode 100644 index 00000000..29142673 --- /dev/null +++ b/broker/noop.go @@ -0,0 +1,82 @@ +package broker + +import ( + "context" + "strings" +) + +type NoopBroker struct { + opts Options +} + +func NewBroker(opts ...Option) *NoopBroker { + b := &NoopBroker{opts: NewOptions(opts...)} + return b +} + +func (b *NoopBroker) Name() string { + return b.opts.Name +} + +func (b *NoopBroker) String() string { + return "noop" +} + +func (b *NoopBroker) Options() Options { + return b.opts +} + +func (b *NoopBroker) Init(opts ...Option) error { + for _, opt := range opts { + opt(&b.opts) + } + return nil +} + +func (b *NoopBroker) Connect(_ context.Context) error { + return nil +} + +func (b *NoopBroker) Disconnect(_ context.Context) error { + return nil +} + +func (b *NoopBroker) Address() string { + return strings.Join(b.opts.Addrs, ",") +} + +func (b *NoopBroker) BatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error { + return nil +} + +func (b *NoopBroker) Publish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error { + return nil +} + +type NoopSubscriber struct { + ctx context.Context + topic string + handler Handler + batchHandler BatchHandler + opts SubscribeOptions +} + +func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { + return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil +} + +func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { + return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil +} + +func (s *NoopSubscriber) Options() SubscribeOptions { + return s.opts +} + +func (s *NoopSubscriber) Topic() string { + return s.topic +} + +func (s *NoopSubscriber) Unsubscribe(ctx context.Context) error { + return nil +} diff --git a/logger/logger.go b/logger/logger.go index 425bedfe..6448ccaf 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -57,7 +57,9 @@ type Logger interface { Log(ctx context.Context, level Level, args ...interface{}) // Logf logs message with needed level Logf(ctx context.Context, level Level, msg string, args ...interface{}) - // String returns the name of logger + // Name returns broker instance name + Name() string + // String returns the type of logger String() string } diff --git a/logger/noop.go b/logger/noop.go index 04f0515b..17f3608b 100644 --- a/logger/noop.go +++ b/logger/noop.go @@ -13,11 +13,15 @@ func NewLogger(opts ...Option) Logger { return &noopLogger{opts: options} } -func (l *noopLogger) V(lvl Level) bool { +func (l *noopLogger) V(_ Level) bool { return false } -func (l *noopLogger) Level(lvl Level) { +func (l *noopLogger) Level(_ Level) { +} + +func (l *noopLogger) Name() string { + return l.opts.Name } func (l *noopLogger) Init(opts ...Option) error { @@ -35,7 +39,7 @@ func (l *noopLogger) Clone(opts ...Option) Logger { return nl } -func (l *noopLogger) Fields(attrs ...interface{}) Logger { +func (l *noopLogger) Fields(_ ...interface{}) Logger { return l } diff --git a/logger/slog/slog.go b/logger/slog/slog.go index 6e12b65e..b2b756c3 100644 --- a/logger/slog/slog.go +++ b/logger/slog/slog.go @@ -368,6 +368,10 @@ func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{} _ = s.slog.Handler().Handle(ctx, r) } +func (s *slogLogger) Name() string { + return s.opts.Name +} + func (s *slogLogger) String() string { return "slog" } diff --git a/service.go b/service.go index 9ae899a3..22bcb74e 100644 --- a/service.go +++ b/service.go @@ -7,15 +7,11 @@ import ( "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/client" - "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/config" - "go.unistack.org/micro/v3/flow" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/register" - "go.unistack.org/micro/v3/resolver" "go.unistack.org/micro/v3/router" - "go.unistack.org/micro/v3/selector" "go.unistack.org/micro/v3/server" "go.unistack.org/micro/v3/store" "go.unistack.org/micro/v3/tracer" @@ -380,97 +376,15 @@ func (s *service) Run() error { return s.Stop() } -func getNameIndex(n string, ifaces interface{}) int { - type namer interface { - Name() string - } +type Namer interface { + Name() string +} - switch vt := ifaces.(type) { - case []broker.Broker: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []client.Client: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []codec.Codec: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []config.Config: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []flow.Flow: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []logger.Logger: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []meter.Meter: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []register.Register: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []resolver.Resolver: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []router.Router: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []selector.Selector: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []server.Server: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []store.Store: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } - } - case []tracer.Tracer: - for idx, iface := range vt { - if nm, ok := iface.(namer); ok && nm.Name() == n { - return idx - } +func getNameIndex[T Namer](n string, ifaces []T) int { + for idx, iface := range ifaces { + if iface.Name() == n { + return idx } } - return 0 } diff --git a/service_test.go b/service_test.go index 97fd612b..b6bca118 100644 --- a/service_test.go +++ b/service_test.go @@ -41,6 +41,14 @@ func (ti *testItem) Name() string { return ti.name } +func Test_getNameIndex(t *testing.T) { + items := []*testItem{{name: "test1"}, {name: "test2"}} + idx := getNameIndex("test2", items) + if items[idx].Name() != "test2" { + t.Fatal("getNameIndex wrong") + } +} + func TestRegisterHandler(t *testing.T) { type args struct { s server.Server diff --git a/util/time/duration_test.go b/util/time/duration_test.go index 80b7289c..12447510 100644 --- a/util/time/duration_test.go +++ b/util/time/duration_test.go @@ -35,8 +35,8 @@ func TestUnmarshalJSON(t *testing.T) { err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v) if err != nil { t.Fatal(err) - } else if v.TTL != 31536000000000000 { - t.Fatalf("invalid duration %v != 31536000000000000", v.TTL) + } else if v.TTL != 31622400000000000 { + t.Fatalf("invalid duration %v != 31622400000000000", v.TTL) } } @@ -55,7 +55,7 @@ func TestParseDuration(t *testing.T) { if err != nil { t.Fatalf("ParseDuration error: %v", err) } - if td.String() != "8760h0m0s" { - t.Fatalf("ParseDuration 1y != 8760h0m0s : %s", td.String()) + if td.String() != "8784h0m0s" { + t.Fatalf("ParseDuration 1y != 8784h0m0s : %s", td.String()) } }