move memory implementations to core micro repo

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-02-12 16:33:16 +03:00
parent ef664607b4
commit 6751060d05
21 changed files with 2094 additions and 531 deletions

247
broker/memory.go Normal file
View File

@@ -0,0 +1,247 @@
package broker
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
maddr "github.com/unistack-org/micro/v3/util/addr"
mnet "github.com/unistack-org/micro/v3/util/net"
)
type memoryBroker struct {
opts Options
addr string
sync.RWMutex
connected bool
Subscribers map[string][]*memorySubscriber
}
type memoryEvent struct {
opts Options
topic string
err error
message interface{}
}
type memorySubscriber struct {
id string
topic string
exit chan bool
handler Handler
opts SubscribeOptions
ctx context.Context
}
func (m *memoryBroker) Options() Options {
return m.opts
}
func (m *memoryBroker) Address() string {
return m.addr
}
func (m *memoryBroker) Connect(ctx context.Context) error {
m.Lock()
defer m.Unlock()
if m.connected {
return nil
}
// use 127.0.0.1 to avoid scan of all network interfaces
addr, err := maddr.Extract("127.0.0.1")
if err != nil {
return err
}
i := rand.Intn(20000)
// set addr with port
addr = mnet.HostPort(addr, 10000+i)
m.addr = addr
m.connected = true
return nil
}
func (m *memoryBroker) Disconnect(ctx context.Context) error {
m.Lock()
defer m.Unlock()
if !m.connected {
return nil
}
m.connected = false
return nil
}
func (m *memoryBroker) Init(opts ...Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return errors.New("not connected")
}
subs, ok := m.Subscribers[topic]
m.RUnlock()
if !ok {
return nil
}
var v interface{}
if m.opts.Codec != nil {
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
return err
}
v = buf
} else {
v = msg
}
p := &memoryEvent{
topic: topic,
message: v,
opts: m.opts,
}
eh := m.opts.ErrorHandler
for _, sub := range subs {
if err := sub.handler(p); err != nil {
p.err = err
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
if eh != nil {
eh(p)
} else {
if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
}
continue
}
}
return nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, errors.New("not connected")
}
m.RUnlock()
options := NewSubscribeOptions(opts...)
id, err := uuid.NewRandom()
if err != nil {
return nil, err
}
sub := &memorySubscriber{
exit: make(chan bool, 1),
id: id.String(),
topic: topic,
handler: handler,
opts: options,
ctx: ctx,
}
m.Lock()
m.Subscribers[topic] = append(m.Subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
var newSubscribers []*memorySubscriber
for _, sb := range m.Subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.Subscribers[topic] = newSubscribers
m.Unlock()
}()
return sub, nil
}
func (m *memoryBroker) String() string {
return "memory"
}
func (m *memoryBroker) Name() string {
return m.opts.Name
}
func (m *memoryEvent) Topic() string {
return m.topic
}
func (m *memoryEvent) Message() *Message {
switch v := m.message.(type) {
case *Message:
return v
case []byte:
msg := &Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
}
return nil
}
return msg
}
return nil
}
func (m *memoryEvent) Ack() error {
return nil
}
func (m *memoryEvent) Error() error {
return m.err
}
func (m *memorySubscriber) Options() SubscribeOptions {
return m.opts
}
func (m *memorySubscriber) Topic() string {
return m.topic
}
func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
m.exit <- true
return nil
}
func NewBroker(opts ...Option) Broker {
rand.Seed(time.Now().UnixNano())
return &memoryBroker{
opts: NewOptions(opts...),
Subscribers: make(map[string][]*memorySubscriber),
}
}

50
broker/memory_test.go Normal file
View File

@@ -0,0 +1,50 @@
package broker
import (
"context"
"fmt"
"testing"
)
func TestMemoryBroker(t *testing.T) {
b := NewBroker()
ctx := context.Background()
if err := b.Connect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
topic := "test"
count := 10
fn := func(p Event) error {
return nil
}
sub, err := b.Subscribe(ctx, topic, fn)
if err != nil {
t.Fatalf("Unexpected error subscribing %v", err)
}
for i := 0; i < count; i++ {
message := &Message{
Header: map[string]string{
"foo": "bar",
"id": fmt.Sprintf("%d", i),
},
Body: []byte(`hello world`),
}
if err := b.Publish(ctx, topic, message); err != nil {
t.Fatalf("Unexpected error publishing %d", i)
}
}
if err := sub.Unsubscribe(ctx); err != nil {
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
}
if err := b.Disconnect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
}

View File

@@ -1,81 +0,0 @@
package broker
import "context"
type noopBroker struct {
opts Options
}
type noopSubscriber struct {
topic string
opts SubscribeOptions
}
// NewBroker returns new noop broker
func NewBroker(opts ...Option) Broker {
return &noopBroker{opts: NewOptions(opts...)}
}
func (n *noopBroker) Name() string {
return n.opts.Name
}
// Init initialize broker
func (n *noopBroker) Init(opts ...Option) error {
for _, o := range opts {
o(&n.opts)
}
return nil
}
// Options returns broker Options
func (n *noopBroker) Options() Options {
return n.opts
}
// Address returns broker address
func (n *noopBroker) Address() string {
return ""
}
// Connect connects to broker
func (n *noopBroker) Connect(ctx context.Context) error {
return nil
}
// Disconnect disconnects from broker
func (n *noopBroker) Disconnect(ctx context.Context) error {
return nil
}
// Publish publishes message to broker
func (n *noopBroker) Publish(ctx context.Context, topic string, m *Message, opts ...PublishOption) error {
return nil
}
// Subscribe subscribes to broker topic
func (n *noopBroker) Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) {
options := NewSubscribeOptions(opts...)
return &noopSubscriber{topic: topic, opts: options}, nil
}
// String return broker string representation
func (n *noopBroker) String() string {
return "noop"
}
// Options returns subscriber options
func (n *noopSubscriber) Options() SubscribeOptions {
return n.opts
}
// TOpic returns subscriber topic
func (n *noopSubscriber) Topic() string {
return n.topic
}
// Unsubscribe unsbscribes from broker topic
func (n *noopSubscriber) Unsubscribe(ctx context.Context) error {
return nil
}