move options to dedicated package
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
177
broker/memory.go
177
broker/memory.go
@@ -1,20 +1,23 @@
|
||||
//go:build ignore
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v4/codec"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
"go.unistack.org/micro/v4/options"
|
||||
"go.unistack.org/micro/v4/semconv"
|
||||
maddr "go.unistack.org/micro/v4/util/addr"
|
||||
"go.unistack.org/micro/v4/util/id"
|
||||
mnet "go.unistack.org/micro/v4/util/net"
|
||||
"go.unistack.org/micro/v4/util/rand"
|
||||
)
|
||||
|
||||
type memoryBroker struct {
|
||||
type MemoryBroker struct {
|
||||
subscribers map[string][]*memorySubscriber
|
||||
addr string
|
||||
opts Options
|
||||
@@ -22,15 +25,15 @@ type memoryBroker struct {
|
||||
connected bool
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Options() Options {
|
||||
func (m *MemoryBroker) Options() Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Address() string {
|
||||
func (m *MemoryBroker) Address() string {
|
||||
return m.addr
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Connect(ctx context.Context) error {
|
||||
func (m *MemoryBroker) Connect(ctx context.Context) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
@@ -54,30 +57,33 @@ func (m *memoryBroker) Connect(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Disconnect(ctx context.Context) error {
|
||||
func (m *MemoryBroker) Disconnect(ctx context.Context) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if !m.connected {
|
||||
return nil
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
if m.connected {
|
||||
m.connected = false
|
||||
}
|
||||
}
|
||||
|
||||
m.connected = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Init(opts ...Option) error {
|
||||
func (m *MemoryBroker) Init(opts ...options.Option) error {
|
||||
var err error
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
if err = o(&m.opts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) NewMessage(endpoint string, req interface{}, opts ...MessageOption) Message {
|
||||
return &memoryMessage{}
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Publish(ctx context.Context, message interface{}, opts ...PublishOption) error {
|
||||
func (m *MemoryBroker) Publish(ctx context.Context, message interface{}, opts ...options.Option) error {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
@@ -92,30 +98,34 @@ func (m *memoryBroker) Publish(ctx context.Context, message interface{}, opts ..
|
||||
return ctx.Err()
|
||||
default:
|
||||
options := NewPublishOptions(opts...)
|
||||
var msgs []*memoryMessage
|
||||
var msgs []Message
|
||||
switch v := message.(type) {
|
||||
case *memoryMessage:
|
||||
msgs = []*memoryMessage{v}
|
||||
case []*memoryMessage:
|
||||
case []Message:
|
||||
msgs = v
|
||||
case Message:
|
||||
msgs = append(msgs, v)
|
||||
default:
|
||||
return ErrInvalidMessage
|
||||
}
|
||||
msgTopicMap := make(map[string][]*memoryMessage)
|
||||
for _, msg := range msgs {
|
||||
p := &memoryMessage{opts: options}
|
||||
/*
|
||||
if mb, ok := msg.Body().(*codec.Frame); ok {
|
||||
p.message = v.Body
|
||||
} else {
|
||||
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
|
||||
p.message, err = m.opts.Codec.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.topic, _ = msg.Header().Get(metadata.HeaderTopic)
|
||||
if v, ok := msg.Body().(*codec.Frame); ok {
|
||||
p.body = msg.Body()
|
||||
} else if len(m.opts.Codecs) == 0 {
|
||||
p.body = msg.Body()
|
||||
} else {
|
||||
cf, ok := m.opts.Codecs[options.ContentType]
|
||||
if !ok {
|
||||
return fmt.Errorf("%s: %s", codec.ErrUnknownContentType, options.ContentType)
|
||||
}
|
||||
*/
|
||||
msgTopicMap[msg.Topic()] = append(msgTopicMap[p.topic], p)
|
||||
p.body, err = cf.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
|
||||
}
|
||||
|
||||
eh := m.opts.ErrorHandler
|
||||
@@ -123,55 +133,83 @@ func (m *memoryBroker) Publish(ctx context.Context, message interface{}, opts ..
|
||||
for t, ms := range msgTopicMap {
|
||||
ts := time.Now()
|
||||
|
||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(len(ms))
|
||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(len(ms))
|
||||
m.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", t).Add(len(ms))
|
||||
m.opts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", t).Add(len(ms))
|
||||
|
||||
m.RLock()
|
||||
subs, ok := m.subscribers[t]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "failure").Add(len(ms))
|
||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
|
||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
|
||||
m.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", t, "status", "failure").Add(len(ms))
|
||||
m.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", t).Add(-len(ms))
|
||||
m.opts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
|
||||
continue
|
||||
}
|
||||
|
||||
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms))
|
||||
m.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms))
|
||||
for _, sub := range subs {
|
||||
if sub.opts.ErrorHandler != nil {
|
||||
eh = sub.opts.ErrorHandler
|
||||
}
|
||||
|
||||
for _, p := range ms {
|
||||
if err = sub.handler(p); err != nil {
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||
switch mh := sub.handler.(type) {
|
||||
case MessagesHandler:
|
||||
mhs := make([]Message, 0, len(ms))
|
||||
for _, m := range ms {
|
||||
mhs = append(mhs, m)
|
||||
}
|
||||
if err = mh(mhs); err != nil {
|
||||
m.opts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", t, "status", "failure").Add(len(ms))
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
switch meh := eh.(type) {
|
||||
case MessagesHandler:
|
||||
_ = meh(mhs)
|
||||
case MessageHandler:
|
||||
for _, me := range mhs {
|
||||
_ = meh(me)
|
||||
}
|
||||
}
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else {
|
||||
if sub.opts.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||
} else {
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||
}
|
||||
case MessageHandler:
|
||||
for _, p := range ms {
|
||||
if err = mh(p); err != nil {
|
||||
m.opts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||
if eh != nil {
|
||||
switch meh := eh.(type) {
|
||||
case MessageHandler:
|
||||
_ = meh(p)
|
||||
case MessagesHandler:
|
||||
_ = meh([]Message{p})
|
||||
}
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else {
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||
if sub.opts.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
m.opts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||
} else {
|
||||
m.opts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||
}
|
||||
} else {
|
||||
m.opts.Meter.Counter(semconv.SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||
}
|
||||
}
|
||||
m.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", t).Add(-1)
|
||||
m.opts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", t).Add(-1)
|
||||
}
|
||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1)
|
||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
te := time.Since(ts)
|
||||
m.opts.Meter.Summary(PublishMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Histogram(PublishMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Histogram(SubscribeMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
||||
}
|
||||
|
||||
}
|
||||
@@ -179,7 +217,7 @@ func (m *memoryBroker) Publish(ctx context.Context, message interface{}, opts ..
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) {
|
||||
func (m *MemoryBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...options.Option) (Subscriber, error) {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
@@ -224,26 +262,31 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler inte
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) String() string {
|
||||
func (m *MemoryBroker) String() string {
|
||||
return "memory"
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Name() string {
|
||||
func (m *MemoryBroker) Name() string {
|
||||
return m.opts.Name
|
||||
}
|
||||
|
||||
type memoryMessage struct {
|
||||
err error
|
||||
body interface{}
|
||||
topic string
|
||||
opts PublishOptions
|
||||
ctx context.Context
|
||||
err error
|
||||
body interface{}
|
||||
topic string
|
||||
header metadata.Metadata
|
||||
opts PublishOptions
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (m *memoryMessage) Topic() string {
|
||||
return m.topic
|
||||
}
|
||||
|
||||
func (m *memoryMessage) Header() metadata.Metadata {
|
||||
return m.header
|
||||
}
|
||||
|
||||
func (m *memoryMessage) Body() interface{} {
|
||||
return m.body
|
||||
}
|
||||
@@ -283,8 +326,8 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// NewBroker return new memory broker
|
||||
func NewBroker(opts ...Option) *memoryBroker {
|
||||
return &memoryBroker{
|
||||
func NewBroker(opts ...options.Option) *MemoryBroker {
|
||||
return &MemoryBroker{
|
||||
opts: NewOptions(opts...),
|
||||
subscribers: make(map[string][]*memorySubscriber),
|
||||
}
|
||||
|
Reference in New Issue
Block a user