broker noop implementation #307
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
maddr "go.unistack.org/micro/v3/util/addr"
|
||||
@ -15,7 +16,7 @@ import (
|
||||
type memoryBroker struct {
|
||||
subscribers map[string][]*memorySubscriber
|
||||
addr string
|
||||
opts Options
|
||||
opts broker.Options
|
||||
sync.RWMutex
|
||||
connected bool
|
||||
}
|
||||
@ -24,20 +25,20 @@ type memoryEvent struct {
|
||||
err error
|
||||
message interface{}
|
||||
topic string
|
||||
opts Options
|
||||
opts broker.Options
|
||||
}
|
||||
|
||||
type memorySubscriber struct {
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler Handler
|
||||
batchhandler BatchHandler
|
||||
handler broker.Handler
|
||||
batchhandler broker.BatchHandler
|
||||
id string
|
||||
topic string
|
||||
opts SubscribeOptions
|
||||
opts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Options() Options {
|
||||
func (m *memoryBroker) Options() broker.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
@ -46,6 +47,12 @@ func (m *memoryBroker) Address() string {
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Connect(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
@ -70,6 +77,12 @@ func (m *memoryBroker) Connect(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Disconnect(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
@ -81,27 +94,27 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Init(opts ...Option) error {
|
||||
func (m *memoryBroker) Init(opts ...broker.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
|
||||
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||
msg.Header.Set(metadata.HeaderTopic, topic)
|
||||
return m.publish(ctx, []*Message{msg}, opts...)
|
||||
return m.publish(ctx, []*broker.Message{msg}, opts...)
|
||||
}
|
||||
|
||||
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
|
||||
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
return m.publish(ctx, msgs, opts...)
|
||||
}
|
||||
|
||||
func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
|
||||
func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return ErrNotConnected
|
||||
return broker.ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
@ -111,9 +124,9 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
options := NewPublishOptions(opts...)
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
|
||||
msgTopicMap := make(map[string]Events)
|
||||
msgTopicMap := make(map[string]broker.Events)
|
||||
for _, v := range msgs {
|
||||
p := &memoryEvent{opts: m.opts}
|
||||
|
||||
@ -188,11 +201,11 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return nil, ErrNotConnected
|
||||
return nil, broker.ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
@ -201,7 +214,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
|
||||
return nil, err
|
||||
}
|
||||
|
||||
options := NewSubscribeOptions(opts...)
|
||||
options := broker.NewSubscribeOptions(opts...)
|
||||
|
||||
sub := &memorySubscriber{
|
||||
exit: make(chan bool, 1),
|
||||
@ -233,11 +246,11 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return nil, ErrNotConnected
|
||||
return nil, broker.ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
@ -246,7 +259,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
|
||||
return nil, err
|
||||
}
|
||||
|
||||
options := NewSubscribeOptions(opts...)
|
||||
options := broker.NewSubscribeOptions(opts...)
|
||||
|
||||
sub := &memorySubscriber{
|
||||
exit: make(chan bool, 1),
|
||||
@ -290,12 +303,12 @@ func (m *memoryEvent) Topic() string {
|
||||
return m.topic
|
||||
}
|
||||
|
||||
func (m *memoryEvent) Message() *Message {
|
||||
func (m *memoryEvent) Message() *broker.Message {
|
||||
switch v := m.message.(type) {
|
||||
case *Message:
|
||||
case *broker.Message:
|
||||
return v
|
||||
case []byte:
|
||||
msg := &Message{}
|
||||
msg := &broker.Message{}
|
||||
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
|
||||
if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
|
||||
@ -320,7 +333,7 @@ func (m *memoryEvent) SetError(err error) {
|
||||
m.err = err
|
||||
}
|
||||
|
||||
func (m *memorySubscriber) Options() SubscribeOptions {
|
||||
func (m *memorySubscriber) Options() broker.SubscribeOptions {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
@ -334,9 +347,9 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// NewBroker return new memory broker
|
||||
func NewBroker(opts ...Option) Broker {
|
||||
func NewBroker(opts ...broker.Option) broker.Broker {
|
||||
return &memoryBroker{
|
||||
opts: NewOptions(opts...),
|
||||
opts: broker.NewOptions(opts...),
|
||||
subscribers: make(map[string][]*memorySubscriber),
|
||||
}
|
||||
}
|
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
@ -19,7 +20,7 @@ func TestMemoryBatchBroker(t *testing.T) {
|
||||
topic := "test"
|
||||
count := 10
|
||||
|
||||
fn := func(evts Events) error {
|
||||
fn := func(evts broker.Events) error {
|
||||
return evts.Ack()
|
||||
}
|
||||
|
||||
@ -28,9 +29,9 @@ func TestMemoryBatchBroker(t *testing.T) {
|
||||
t.Fatalf("Unexpected error subscribing %v", err)
|
||||
}
|
||||
|
||||
msgs := make([]*Message, 0, count)
|
||||
msgs := make([]*broker.Message, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &Message{
|
||||
message := &broker.Message{
|
||||
Header: map[string]string{
|
||||
metadata.HeaderTopic: topic,
|
||||
"foo": "bar",
|
||||
@ -65,7 +66,7 @@ func TestMemoryBroker(t *testing.T) {
|
||||
topic := "test"
|
||||
count := 10
|
||||
|
||||
fn := func(p Event) error {
|
||||
fn := func(p broker.Event) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -74,9 +75,9 @@ func TestMemoryBroker(t *testing.T) {
|
||||
t.Fatalf("Unexpected error subscribing %v", err)
|
||||
}
|
||||
|
||||
msgs := make([]*Message, 0, count)
|
||||
msgs := make([]*broker.Message, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &Message{
|
||||
message := &broker.Message{
|
||||
Header: map[string]string{
|
||||
metadata.HeaderTopic: topic,
|
||||
"foo": "bar",
|
82
broker/noop.go
Normal file
82
broker/noop.go
Normal file
@ -0,0 +1,82 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type NoopBroker struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
func NewBroker(opts ...Option) *NoopBroker {
|
||||
b := &NoopBroker{opts: NewOptions(opts...)}
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *NoopBroker) Name() string {
|
||||
return b.opts.Name
|
||||
}
|
||||
|
||||
func (b *NoopBroker) String() string {
|
||||
return "noop"
|
||||
}
|
||||
|
||||
func (b *NoopBroker) Options() Options {
|
||||
return b.opts
|
||||
}
|
||||
|
||||
func (b *NoopBroker) Init(opts ...Option) error {
|
||||
for _, opt := range opts {
|
||||
opt(&b.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *NoopBroker) Connect(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *NoopBroker) Disconnect(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *NoopBroker) Address() string {
|
||||
return strings.Join(b.opts.Addrs, ",")
|
||||
}
|
||||
|
||||
func (b *NoopBroker) BatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *NoopBroker) Publish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type NoopSubscriber struct {
|
||||
ctx context.Context
|
||||
topic string
|
||||
handler Handler
|
||||
batchHandler BatchHandler
|
||||
opts SubscribeOptions
|
||||
}
|
||||
|
||||
func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil
|
||||
}
|
||||
|
||||
func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil
|
||||
}
|
||||
|
||||
func (s *NoopSubscriber) Options() SubscribeOptions {
|
||||
return s.opts
|
||||
}
|
||||
|
||||
func (s *NoopSubscriber) Topic() string {
|
||||
return s.topic
|
||||
}
|
||||
|
||||
func (s *NoopSubscriber) Unsubscribe(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
@ -57,7 +57,9 @@ type Logger interface {
|
||||
Log(ctx context.Context, level Level, args ...interface{})
|
||||
// Logf logs message with needed level
|
||||
Logf(ctx context.Context, level Level, msg string, args ...interface{})
|
||||
// String returns the name of logger
|
||||
// Name returns broker instance name
|
||||
Name() string
|
||||
// String returns the type of logger
|
||||
String() string
|
||||
}
|
||||
|
||||
|
@ -13,11 +13,15 @@ func NewLogger(opts ...Option) Logger {
|
||||
return &noopLogger{opts: options}
|
||||
}
|
||||
|
||||
func (l *noopLogger) V(lvl Level) bool {
|
||||
func (l *noopLogger) V(_ Level) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (l *noopLogger) Level(lvl Level) {
|
||||
func (l *noopLogger) Level(_ Level) {
|
||||
}
|
||||
|
||||
func (l *noopLogger) Name() string {
|
||||
return l.opts.Name
|
||||
}
|
||||
|
||||
func (l *noopLogger) Init(opts ...Option) error {
|
||||
@ -35,7 +39,7 @@ func (l *noopLogger) Clone(opts ...Option) Logger {
|
||||
return nl
|
||||
}
|
||||
|
||||
func (l *noopLogger) Fields(attrs ...interface{}) Logger {
|
||||
func (l *noopLogger) Fields(_ ...interface{}) Logger {
|
||||
return l
|
||||
}
|
||||
|
||||
|
@ -368,6 +368,10 @@ func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}
|
||||
_ = s.slog.Handler().Handle(ctx, r)
|
||||
}
|
||||
|
||||
func (s *slogLogger) Name() string {
|
||||
return s.opts.Name
|
||||
}
|
||||
|
||||
func (s *slogLogger) String() string {
|
||||
return "slog"
|
||||
}
|
||||
|
96
service.go
96
service.go
@ -7,15 +7,11 @@ import (
|
||||
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/config"
|
||||
"go.unistack.org/micro/v3/flow"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/meter"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/resolver"
|
||||
"go.unistack.org/micro/v3/router"
|
||||
"go.unistack.org/micro/v3/selector"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
"go.unistack.org/micro/v3/store"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
@ -380,97 +376,15 @@ func (s *service) Run() error {
|
||||
return s.Stop()
|
||||
}
|
||||
|
||||
func getNameIndex(n string, ifaces interface{}) int {
|
||||
type namer interface {
|
||||
type Namer interface {
|
||||
Name() string
|
||||
}
|
||||
}
|
||||
|
||||
switch vt := ifaces.(type) {
|
||||
case []broker.Broker:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
func getNameIndex[T Namer](n string, ifaces []T) int {
|
||||
for idx, iface := range ifaces {
|
||||
if iface.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []client.Client:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []codec.Codec:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []config.Config:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []flow.Flow:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []logger.Logger:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []meter.Meter:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []register.Register:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []resolver.Resolver:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []router.Router:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []selector.Selector:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []server.Server:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []store.Store:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
case []tracer.Tracer:
|
||||
for idx, iface := range vt {
|
||||
if nm, ok := iface.(namer); ok && nm.Name() == n {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
@ -41,6 +41,14 @@ func (ti *testItem) Name() string {
|
||||
return ti.name
|
||||
}
|
||||
|
||||
func Test_getNameIndex(t *testing.T) {
|
||||
items := []*testItem{{name: "test1"}, {name: "test2"}}
|
||||
idx := getNameIndex("test2", items)
|
||||
if items[idx].Name() != "test2" {
|
||||
t.Fatal("getNameIndex wrong")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterHandler(t *testing.T) {
|
||||
type args struct {
|
||||
s server.Server
|
||||
|
@ -35,8 +35,8 @@ func TestUnmarshalJSON(t *testing.T) {
|
||||
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if v.TTL != 31536000000000000 {
|
||||
t.Fatalf("invalid duration %v != 31536000000000000", v.TTL)
|
||||
} else if v.TTL != 31622400000000000 {
|
||||
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
|
||||
}
|
||||
}
|
||||
|
||||
@ -55,7 +55,7 @@ func TestParseDuration(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("ParseDuration error: %v", err)
|
||||
}
|
||||
if td.String() != "8760h0m0s" {
|
||||
t.Fatalf("ParseDuration 1y != 8760h0m0s : %s", td.String())
|
||||
if td.String() != "8784h0m0s" {
|
||||
t.Fatalf("ParseDuration 1y != 8784h0m0s : %s", td.String())
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user