Compare commits
13 Commits
Author | SHA1 | Date | |
---|---|---|---|
af8d81f3c6 | |||
5c9b3dae33 | |||
9f3957d101 | |||
8fd8bdcb39 | |||
80e3d239ab | |||
419cd486cf | |||
e64269b2a8 | |||
d18429e024 | |||
675e121049 | |||
d357fb1e0d | |||
e4673bcc50 | |||
a839f75a2f | |||
a7e6d61b95 |
@@ -8,31 +8,79 @@ import (
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
// DefaultBroker default broker
|
||||
// DefaultBroker default memory broker
|
||||
var DefaultBroker Broker = NewBroker()
|
||||
|
||||
var (
|
||||
// ErrNotConnected returns when broker used but not connected yet
|
||||
ErrNotConnected = errors.New("broker not connected")
|
||||
// ErrDisconnected returns when broker disconnected
|
||||
ErrDisconnected = errors.New("broker disconnected")
|
||||
)
|
||||
|
||||
// Broker is an interface used for asynchronous messaging.
|
||||
type Broker interface {
|
||||
// Name returns broker instance name
|
||||
Name() string
|
||||
Init(...Option) error
|
||||
// Init initilize broker
|
||||
Init(opts ...Option) error
|
||||
// Options returns broker options
|
||||
Options() Options
|
||||
// Address return configured address
|
||||
Address() string
|
||||
Connect(context.Context) error
|
||||
Disconnect(context.Context) error
|
||||
Publish(context.Context, string, *Message, ...PublishOption) error
|
||||
Subscribe(context.Context, string, Handler, ...SubscribeOption) (Subscriber, error)
|
||||
// Connect connects to broker
|
||||
Connect(ctx context.Context) error
|
||||
// Disconnect disconnect from broker
|
||||
Disconnect(ctx context.Context) error
|
||||
// Publish message to broker topic
|
||||
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
|
||||
// Subscribe subscribes to topic message via handler
|
||||
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
|
||||
// BatchPublish messages to broker with multiple topics
|
||||
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
|
||||
// BatchSubscribe subscribes to topic messages via handler
|
||||
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
|
||||
// String type of broker
|
||||
String() string
|
||||
}
|
||||
|
||||
// Handler is used to process messages via a subscription of a topic.
|
||||
type Handler func(Event) error
|
||||
|
||||
// Events contains multiple events
|
||||
type Events []Event
|
||||
|
||||
func (evs Events) Ack() error {
|
||||
var err error
|
||||
for _, ev := range evs {
|
||||
if err = ev.Ack(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (evs Events) SetError(err error) {
|
||||
for _, ev := range evs {
|
||||
ev.SetError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// BatchHandler is used to process messages in batches via a subscription of a topic.
|
||||
type BatchHandler func(Events) error
|
||||
|
||||
// Event is given to a subscription handler for processing
|
||||
type Event interface {
|
||||
// Topic returns event topic
|
||||
Topic() string
|
||||
// Message returns broker message
|
||||
Message() *Message
|
||||
// Ack acknowledge message
|
||||
Ack() error
|
||||
// Error returns message error (like decoding errors or some other)
|
||||
Error() error
|
||||
// SetError set event processing error
|
||||
SetError(err error)
|
||||
}
|
||||
|
||||
// RawMessage is a raw encoded JSON value.
|
||||
@@ -58,13 +106,25 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||
|
||||
// Message is used to transfer data
|
||||
type Message struct {
|
||||
Header metadata.Metadata // contains message metadata
|
||||
Body RawMessage // contains message body
|
||||
// Header contains message metadata
|
||||
Header metadata.Metadata
|
||||
// Body contains message body
|
||||
Body RawMessage
|
||||
}
|
||||
|
||||
// NewMessage create broker message with topic filled
|
||||
func NewMessage(topic string) *Message {
|
||||
m := &Message{Header: metadata.New(2)}
|
||||
m.Header.Set(metadata.HeaderTopic, topic)
|
||||
return m
|
||||
}
|
||||
|
||||
// Subscriber is a convenience return type for the Subscribe method
|
||||
type Subscriber interface {
|
||||
// Options returns subscriber options
|
||||
Options() SubscribeOptions
|
||||
// Topic returns topic for subscription
|
||||
Topic() string
|
||||
Unsubscribe(context.Context) error
|
||||
// Unsubscribe from topic
|
||||
Unsubscribe(ctx context.Context) error
|
||||
}
|
||||
|
213
broker/memory.go
213
broker/memory.go
@@ -2,18 +2,18 @@ package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
maddr "github.com/unistack-org/micro/v3/util/addr"
|
||||
mnet "github.com/unistack-org/micro/v3/util/net"
|
||||
"github.com/unistack-org/micro/v3/util/rand"
|
||||
)
|
||||
|
||||
type memoryBroker struct {
|
||||
Subscribers map[string][]*memorySubscriber
|
||||
subscribers map[string][]*memorySubscriber
|
||||
addr string
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
@@ -28,12 +28,13 @@ type memoryEvent struct {
|
||||
}
|
||||
|
||||
type memorySubscriber struct {
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler Handler
|
||||
id string
|
||||
topic string
|
||||
opts SubscribeOptions
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler Handler
|
||||
batchhandler BatchHandler
|
||||
id string
|
||||
topic string
|
||||
opts SubscribeOptions
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Options() Options {
|
||||
@@ -77,7 +78,6 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error {
|
||||
}
|
||||
|
||||
m.connected = false
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -92,67 +92,190 @@ func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message,
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return errors.New("not connected")
|
||||
return ErrNotConnected
|
||||
}
|
||||
|
||||
subs, ok := m.Subscribers[topic]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
var v interface{}
|
||||
if m.opts.Codec != nil {
|
||||
options := NewPublishOptions(opts...)
|
||||
vs := make([]msgWrapper, 0, 1)
|
||||
if m.opts.Codec == nil || options.BodyOnly {
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
vs = append(vs, msgWrapper{topic: topic, body: msg})
|
||||
} else {
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
buf, err := m.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v = buf
|
||||
vs = append(vs, msgWrapper{topic: topic, body: buf})
|
||||
}
|
||||
|
||||
return m.publish(ctx, vs, opts...)
|
||||
}
|
||||
|
||||
type msgWrapper struct {
|
||||
topic string
|
||||
body interface{}
|
||||
}
|
||||
|
||||
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
options := NewPublishOptions(opts...)
|
||||
vs := make([]msgWrapper, 0, len(msgs))
|
||||
if m.opts.Codec == nil || options.BodyOnly {
|
||||
for _, msg := range msgs {
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
vs = append(vs, msgWrapper{topic: topic, body: msg})
|
||||
}
|
||||
} else {
|
||||
v = msg
|
||||
for _, msg := range msgs {
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
buf, err := m.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vs = append(vs, msgWrapper{topic: topic, body: buf})
|
||||
}
|
||||
}
|
||||
|
||||
p := &memoryEvent{
|
||||
topic: topic,
|
||||
message: v,
|
||||
opts: m.opts,
|
||||
return m.publish(ctx, vs, opts...)
|
||||
}
|
||||
|
||||
func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error {
|
||||
var err error
|
||||
|
||||
msgTopicMap := make(map[string]Events)
|
||||
for _, v := range vs {
|
||||
p := &memoryEvent{
|
||||
topic: v.topic,
|
||||
message: v.body,
|
||||
opts: m.opts,
|
||||
}
|
||||
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
|
||||
}
|
||||
|
||||
beh := m.opts.BatchErrorHandler
|
||||
eh := m.opts.ErrorHandler
|
||||
|
||||
for _, sub := range subs {
|
||||
if err := sub.handler(p); err != nil {
|
||||
p.err = err
|
||||
if sub.opts.ErrorHandler != nil {
|
||||
eh = sub.opts.ErrorHandler
|
||||
}
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
for t, ms := range msgTopicMap {
|
||||
m.RLock()
|
||||
subs, ok := m.subscribers[t]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sub := range subs {
|
||||
// batch processing
|
||||
if sub.batchhandler != nil {
|
||||
if err = sub.batchhandler(ms); err != nil {
|
||||
ms.SetError(err)
|
||||
if sub.opts.BatchErrorHandler != nil {
|
||||
beh = sub.opts.BatchErrorHandler
|
||||
}
|
||||
if beh != nil {
|
||||
beh(ms)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err = ms.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
// single processing
|
||||
if sub.handler != nil {
|
||||
for _, p := range ms {
|
||||
if err = sub.handler(p); err != nil {
|
||||
p.SetError(err)
|
||||
if sub.opts.ErrorHandler != nil {
|
||||
eh = sub.opts.ErrorHandler
|
||||
}
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return nil, errors.New("not connected")
|
||||
return nil, ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
options := NewSubscribeOptions(opts...)
|
||||
|
||||
id, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
options := NewSubscribeOptions(opts...)
|
||||
|
||||
sub := &memorySubscriber{
|
||||
exit: make(chan bool, 1),
|
||||
id: id.String(),
|
||||
topic: topic,
|
||||
batchhandler: handler,
|
||||
opts: options,
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.subscribers[topic] = append(m.subscribers[topic], sub)
|
||||
m.Unlock()
|
||||
|
||||
go func() {
|
||||
<-sub.exit
|
||||
m.Lock()
|
||||
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
|
||||
for _, sb := range m.subscribers[topic] {
|
||||
if sb.id == sub.id {
|
||||
continue
|
||||
}
|
||||
newSubscribers = append(newSubscribers, sb)
|
||||
}
|
||||
m.subscribers[topic] = newSubscribers
|
||||
m.Unlock()
|
||||
}()
|
||||
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return nil, ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
id, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
options := NewSubscribeOptions(opts...)
|
||||
|
||||
sub := &memorySubscriber{
|
||||
exit: make(chan bool, 1),
|
||||
id: id.String(),
|
||||
@@ -163,20 +286,20 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.Subscribers[topic] = append(m.Subscribers[topic], sub)
|
||||
m.subscribers[topic] = append(m.subscribers[topic], sub)
|
||||
m.Unlock()
|
||||
|
||||
go func() {
|
||||
<-sub.exit
|
||||
m.Lock()
|
||||
var newSubscribers []*memorySubscriber
|
||||
for _, sb := range m.Subscribers[topic] {
|
||||
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
|
||||
for _, sb := range m.subscribers[topic] {
|
||||
if sb.id == sub.id {
|
||||
continue
|
||||
}
|
||||
newSubscribers = append(newSubscribers, sb)
|
||||
}
|
||||
m.Subscribers[topic] = newSubscribers
|
||||
m.subscribers[topic] = newSubscribers
|
||||
m.Unlock()
|
||||
}()
|
||||
|
||||
@@ -221,6 +344,10 @@ func (m *memoryEvent) Error() error {
|
||||
return m.err
|
||||
}
|
||||
|
||||
func (m *memoryEvent) SetError(err error) {
|
||||
m.err = err
|
||||
}
|
||||
|
||||
func (m *memorySubscriber) Options() SubscribeOptions {
|
||||
return m.opts
|
||||
}
|
||||
@@ -238,6 +365,6 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
|
||||
func NewBroker(opts ...Option) Broker {
|
||||
return &memoryBroker{
|
||||
opts: NewOptions(opts...),
|
||||
Subscribers: make(map[string][]*memorySubscriber),
|
||||
subscribers: make(map[string][]*memorySubscriber),
|
||||
}
|
||||
}
|
||||
|
@@ -4,8 +4,55 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
func TestMemoryBatchBroker(t *testing.T) {
|
||||
b := NewBroker()
|
||||
ctx := context.Background()
|
||||
|
||||
if err := b.Connect(ctx); err != nil {
|
||||
t.Fatalf("Unexpected connect error %v", err)
|
||||
}
|
||||
|
||||
topic := "test"
|
||||
count := 10
|
||||
|
||||
fn := func(evts Events) error {
|
||||
return evts.Ack()
|
||||
}
|
||||
|
||||
sub, err := b.BatchSubscribe(ctx, topic, fn)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error subscribing %v", err)
|
||||
}
|
||||
|
||||
msgs := make([]*Message, 0, 0)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &Message{
|
||||
Header: map[string]string{
|
||||
metadata.HeaderTopic: topic,
|
||||
"foo": "bar",
|
||||
"id": fmt.Sprintf("%d", i),
|
||||
},
|
||||
Body: []byte(`"hello world"`),
|
||||
}
|
||||
msgs = append(msgs, message)
|
||||
}
|
||||
|
||||
if err := b.BatchPublish(ctx, msgs); err != nil {
|
||||
t.Fatalf("Unexpected error publishing %v", err)
|
||||
}
|
||||
|
||||
if err := sub.Unsubscribe(ctx); err != nil {
|
||||
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
|
||||
}
|
||||
|
||||
if err := b.Disconnect(ctx); err != nil {
|
||||
t.Fatalf("Unexpected connect error %v", err)
|
||||
}
|
||||
}
|
||||
func TestMemoryBroker(t *testing.T) {
|
||||
b := NewBroker()
|
||||
ctx := context.Background()
|
||||
@@ -26,20 +73,27 @@ func TestMemoryBroker(t *testing.T) {
|
||||
t.Fatalf("Unexpected error subscribing %v", err)
|
||||
}
|
||||
|
||||
msgs := make([]*Message, 0, 0)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &Message{
|
||||
Header: map[string]string{
|
||||
"foo": "bar",
|
||||
"id": fmt.Sprintf("%d", i),
|
||||
metadata.HeaderTopic: topic,
|
||||
"foo": "bar",
|
||||
"id": fmt.Sprintf("%d", i),
|
||||
},
|
||||
Body: []byte(`"hello world"`),
|
||||
}
|
||||
msgs = append(msgs, message)
|
||||
|
||||
if err := b.Publish(ctx, topic, message); err != nil {
|
||||
t.Fatalf("Unexpected error publishing %d", i)
|
||||
t.Fatalf("Unexpected error publishing %d err: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := b.BatchPublish(ctx, msgs); err != nil {
|
||||
t.Fatalf("Unexpected error publishing %v", err)
|
||||
}
|
||||
|
||||
if err := sub.Unsubscribe(ctx); err != nil {
|
||||
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@ package broker
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"time"
|
||||
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
@@ -29,6 +30,8 @@ type Options struct {
|
||||
TLSConfig *tls.Config
|
||||
// ErrorHandler used when broker can't unmarshal incoming message
|
||||
ErrorHandler Handler
|
||||
// BatchErrorHandler used when broker can't unmashal incoming messages
|
||||
BatchErrorHandler BatchHandler
|
||||
// Name holds the broker name
|
||||
Name string
|
||||
// Addrs holds the broker address
|
||||
@@ -71,11 +74,9 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
||||
options := PublishOptions{
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
@@ -85,12 +86,18 @@ type SubscribeOptions struct {
|
||||
Context context.Context
|
||||
// ErrorHandler used when broker can't unmarshal incoming message
|
||||
ErrorHandler Handler
|
||||
// BatchErrorHandler used when broker can't unmashal incoming messages
|
||||
BatchErrorHandler BatchHandler
|
||||
// Group holds consumer group
|
||||
Group string
|
||||
// AutoAck flag specifies auto ack of incoming message when no error happens
|
||||
AutoAck bool
|
||||
// BodyOnly flag specifies that message contains only body bytes without header
|
||||
BodyOnly bool
|
||||
// BatchSize flag specifies max batch size
|
||||
BatchSize int
|
||||
// BatchWait flag specifies max wait time for batch filling
|
||||
BatchWait time.Duration
|
||||
}
|
||||
|
||||
// Option func
|
||||
@@ -113,23 +120,6 @@ func PublishContext(ctx context.Context) PublishOption {
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeOption func
|
||||
type SubscribeOption func(*SubscribeOptions)
|
||||
|
||||
// NewSubscribeOptions creates new SubscribeOptions
|
||||
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||
options := SubscribeOptions{
|
||||
AutoAck: true,
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
// Addrs sets the host addresses to be used by the broker
|
||||
func Addrs(addrs ...string) Option {
|
||||
return func(o *Options) {
|
||||
@@ -145,28 +135,6 @@ func Codec(c codec.Codec) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// DisableAutoAck disables auto ack
|
||||
func DisableAutoAck() SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.AutoAck = false
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeAutoAck will disable auto acking of messages
|
||||
// after they have been handled.
|
||||
func SubscribeAutoAck(b bool) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.AutoAck = b
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBodyOnly consumes only body of the message
|
||||
func SubscribeBodyOnly(b bool) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// ErrorHandler will catch all broker errors that cant be handled
|
||||
// in normal way, for example Codec errors
|
||||
func ErrorHandler(h Handler) Option {
|
||||
@@ -175,6 +143,14 @@ func ErrorHandler(h Handler) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// BatchErrorHandler will catch all broker errors that cant be handled
|
||||
// in normal way, for example Codec errors
|
||||
func BatchErrorHandler(h BatchHandler) Option {
|
||||
return func(o *Options) {
|
||||
o.BatchErrorHandler = h
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeErrorHandler will catch all broker errors that cant be handled
|
||||
// in normal way, for example Codec errors
|
||||
func SubscribeErrorHandler(h Handler) SubscribeOption {
|
||||
@@ -183,6 +159,14 @@ func SubscribeErrorHandler(h Handler) SubscribeOption {
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBatchErrorHandler will catch all broker errors that cant be handled
|
||||
// in normal way, for example Codec errors
|
||||
func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BatchErrorHandler = h
|
||||
}
|
||||
}
|
||||
|
||||
// Queue sets the subscribers queue
|
||||
// Deprecated
|
||||
func Queue(name string) SubscribeOption {
|
||||
@@ -246,3 +230,55 @@ func SubscribeContext(ctx context.Context) SubscribeOption {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// DisableAutoAck disables auto ack
|
||||
// DEPRECATED
|
||||
func DisableAutoAck() SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.AutoAck = false
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeAutoAck contol auto acking of messages
|
||||
// after they have been handled.
|
||||
func SubscribeAutoAck(b bool) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.AutoAck = b
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBodyOnly consumes only body of the message
|
||||
func SubscribeBodyOnly(b bool) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBatchSize specifies max batch size
|
||||
func SubscribeBatchSize(n int) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BatchSize = n
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBatchWait specifies max batch wait time
|
||||
func SubscribeBatchWait(td time.Duration) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BatchWait = td
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeOption func
|
||||
type SubscribeOption func(*SubscribeOptions)
|
||||
|
||||
// NewSubscribeOptions creates new SubscribeOptions
|
||||
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||
options := SubscribeOptions{
|
||||
AutoAck: true,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
@@ -40,6 +40,7 @@ type Client interface {
|
||||
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
|
||||
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
|
||||
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
|
||||
BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error
|
||||
String() string
|
||||
}
|
||||
|
||||
|
@@ -181,47 +181,59 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
|
||||
return &noopStream{}, nil
|
||||
}
|
||||
|
||||
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
|
||||
var body []byte
|
||||
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
||||
return n.publish(ctx, ps, opts...)
|
||||
}
|
||||
|
||||
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
|
||||
return n.publish(ctx, []Message{p}, opts...)
|
||||
}
|
||||
|
||||
func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
||||
options := NewPublishOptions(opts...)
|
||||
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
md = metadata.New(0)
|
||||
}
|
||||
md["Content-Type"] = p.ContentType()
|
||||
md["Micro-Topic"] = p.Topic()
|
||||
msgs := make([]*broker.Message, 0, len(ps))
|
||||
|
||||
// passed in raw data
|
||||
if d, ok := p.Payload().(*codec.Frame); ok {
|
||||
body = d.Data
|
||||
} else {
|
||||
// use codec for payload
|
||||
cf, err := n.newCodec(p.ContentType())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
for _, p := range ps {
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
md = metadata.New(0)
|
||||
}
|
||||
md[metadata.HeaderContentType] = p.ContentType()
|
||||
|
||||
topic := p.Topic()
|
||||
|
||||
// get the exchange
|
||||
if len(options.Exchange) > 0 {
|
||||
topic = options.Exchange
|
||||
}
|
||||
|
||||
// set the body
|
||||
b, err := cf.Marshal(p.Payload())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
md[metadata.HeaderTopic] = topic
|
||||
|
||||
var body []byte
|
||||
|
||||
// passed in raw data
|
||||
if d, ok := p.Payload().(*codec.Frame); ok {
|
||||
body = d.Data
|
||||
} else {
|
||||
// use codec for payload
|
||||
cf, err := n.newCodec(p.ContentType())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
|
||||
// set the body
|
||||
b, err := cf.Marshal(p.Payload())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
body = b
|
||||
}
|
||||
body = b
|
||||
|
||||
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
||||
}
|
||||
|
||||
topic := p.Topic()
|
||||
|
||||
// get the exchange
|
||||
if len(options.Exchange) > 0 {
|
||||
topic = options.Exchange
|
||||
}
|
||||
|
||||
return n.opts.Broker.Publish(ctx, topic, &broker.Message{
|
||||
Header: md,
|
||||
Body: body,
|
||||
},
|
||||
return n.opts.Broker.BatchPublish(ctx, msgs,
|
||||
broker.PublishContext(options.Context),
|
||||
broker.PublishBodyOnly(options.BodyOnly),
|
||||
)
|
||||
|
@@ -373,19 +373,35 @@ func DialTimeout(d time.Duration) Option {
|
||||
}
|
||||
|
||||
// WithExchange sets the exchange to route a message through
|
||||
// DEPRECATED
|
||||
func WithExchange(e string) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.Exchange = e
|
||||
}
|
||||
}
|
||||
|
||||
// PublishExchange sets the exchange to route a message through
|
||||
func PublishExchange(e string) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.Exchange = e
|
||||
}
|
||||
}
|
||||
|
||||
// WithBodyOnly publish only message body
|
||||
// DERECATED
|
||||
func WithBodyOnly(b bool) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// PublishBodyOnly publish only message body
|
||||
func PublishBodyOnly(b bool) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// PublishContext sets the context in publish options
|
||||
func PublishContext(ctx context.Context) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
@@ -498,12 +514,20 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption {
|
||||
}
|
||||
|
||||
// WithMessageContentType sets the message content type
|
||||
// DEPRECATED
|
||||
func WithMessageContentType(ct string) MessageOption {
|
||||
return func(o *MessageOptions) {
|
||||
o.ContentType = ct
|
||||
}
|
||||
}
|
||||
|
||||
// MessageContentType sets the message content type
|
||||
func MessageContentType(ct string) MessageOption {
|
||||
return func(o *MessageOptions) {
|
||||
o.ContentType = ct
|
||||
}
|
||||
}
|
||||
|
||||
// StreamingRequest specifies that request is streaming
|
||||
func StreamingRequest(b bool) RequestOption {
|
||||
return func(o *RequestOptions) {
|
||||
|
@@ -35,9 +35,6 @@ func (l *defaultLogger) Init(opts ...Option) error {
|
||||
o(&l.opts)
|
||||
}
|
||||
l.enc = json.NewEncoder(l.opts.Out)
|
||||
|
||||
l.logFunc = l.Log
|
||||
l.logfFunc = l.Logf
|
||||
// wrap the Log func
|
||||
for i := len(l.opts.Wrappers); i > 0; i-- {
|
||||
l.logFunc = l.opts.Wrappers[i-1].Log(l.logFunc)
|
||||
@@ -218,7 +215,11 @@ func (l *defaultLogger) Options() Options {
|
||||
|
||||
// NewLogger builds a new logger based on options
|
||||
func NewLogger(opts ...Option) Logger {
|
||||
l := &defaultLogger{opts: NewOptions(opts...)}
|
||||
l := &defaultLogger{
|
||||
opts: NewOptions(opts...),
|
||||
}
|
||||
l.logFunc = l.Log
|
||||
l.logfFunc = l.Logf
|
||||
l.enc = json.NewEncoder(l.opts.Out)
|
||||
return l
|
||||
}
|
||||
|
@@ -8,6 +8,8 @@ var (
|
||||
DefaultLogger Logger = NewLogger()
|
||||
// DefaultLevel used by logger
|
||||
DefaultLevel Level = InfoLevel
|
||||
// DefaultCallerSkipCount used by logger
|
||||
DefaultCallerSkipCount = 2
|
||||
)
|
||||
|
||||
// Logger is a generic logging interface
|
||||
|
@@ -33,7 +33,7 @@ func NewOptions(opts ...Option) Options {
|
||||
Level: DefaultLevel,
|
||||
Fields: make(map[string]interface{}),
|
||||
Out: os.Stderr,
|
||||
CallerSkipCount: 0,
|
||||
CallerSkipCount: DefaultCallerSkipCount,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
|
@@ -6,8 +6,20 @@ import (
|
||||
"sort"
|
||||
)
|
||||
|
||||
// HeaderPrefix for all headers passed
|
||||
var HeaderPrefix = "Micro-"
|
||||
var (
|
||||
// HeaderTopic is the header name that contains topic name
|
||||
HeaderTopic = "Micro-Topic"
|
||||
// HeaderContentType specifies content type of message
|
||||
HeaderContentType = "Content-Type"
|
||||
// HeaderEndpoint specifies endpoint in service
|
||||
HeaderEndpoint = "Micro-Endpoint"
|
||||
// HeaderService specifies service
|
||||
HeaderService = "Micro-Service"
|
||||
// HeaderTimeout specifies timeout of operation
|
||||
HeaderTimeout = "Micro-Timeout"
|
||||
// HeaderAuthorization specifies Authorization header
|
||||
HeaderAuthorization = "Authorization"
|
||||
)
|
||||
|
||||
// Metadata is our way of representing request headers internally.
|
||||
// They're used at the RPC level and translate back and forth
|
||||
|
@@ -105,18 +105,20 @@ func BuildName(name string, labels ...string) string {
|
||||
labels = labels[:len(labels)-1]
|
||||
}
|
||||
|
||||
sort.Sort(byKey(labels))
|
||||
if len(labels) > 2 {
|
||||
sort.Sort(byKey(labels))
|
||||
|
||||
idx := 0
|
||||
for {
|
||||
if labels[idx] == labels[idx+2] {
|
||||
copy(labels[idx:], labels[idx+2:])
|
||||
labels = labels[:len(labels)-2]
|
||||
} else {
|
||||
idx += 2
|
||||
}
|
||||
if idx+2 >= len(labels) {
|
||||
break
|
||||
idx := 0
|
||||
for {
|
||||
if labels[idx] == labels[idx+2] {
|
||||
copy(labels[idx:], labels[idx+2:])
|
||||
labels = labels[:len(labels)-2]
|
||||
} else {
|
||||
idx += 2
|
||||
}
|
||||
if idx+2 >= len(labels) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -56,6 +56,10 @@ func TestBuildName(t *testing.T) {
|
||||
"my_metric",
|
||||
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns",
|
||||
},
|
||||
`my_metric{aaa="aaa"}`: []string{
|
||||
"my_metric",
|
||||
"aaa", "aaa",
|
||||
},
|
||||
}
|
||||
|
||||
for e, d := range data {
|
||||
|
@@ -24,9 +24,18 @@ type tunSubscriber struct {
|
||||
opts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
type tunBatchSubscriber struct {
|
||||
listener tunnel.Listener
|
||||
handler broker.BatchHandler
|
||||
closed chan bool
|
||||
topic string
|
||||
opts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
type tunEvent struct {
|
||||
message *broker.Message
|
||||
topic string
|
||||
err error
|
||||
}
|
||||
|
||||
// used to access tunnel from options context
|
||||
@@ -62,6 +71,36 @@ func (t *tunBroker) Disconnect(ctx context.Context) error {
|
||||
return t.tunnel.Close(ctx)
|
||||
}
|
||||
|
||||
func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
// TODO: this is probably inefficient, we might want to just maintain an open connection
|
||||
// it may be easier to add broadcast to the tunnel
|
||||
topicMap := make(map[string]tunnel.Session)
|
||||
|
||||
var err error
|
||||
for _, msg := range msgs {
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
c, ok := topicMap[topic]
|
||||
if !ok {
|
||||
c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.Close()
|
||||
topicMap[topic] = c
|
||||
}
|
||||
|
||||
if err = c.Send(&transport.Message{
|
||||
Header: msg.Header,
|
||||
Body: msg.Body,
|
||||
}); err != nil {
|
||||
// msg.SetError(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
|
||||
// TODO: this is probably inefficient, we might want to just maintain an open connection
|
||||
// it may be easier to add broadcast to the tunnel
|
||||
@@ -77,6 +116,26 @@ func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message
|
||||
})
|
||||
}
|
||||
|
||||
func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tunSub := &tunBatchSubscriber{
|
||||
topic: topic,
|
||||
handler: h,
|
||||
opts: broker.NewSubscribeOptions(opts...),
|
||||
closed: make(chan bool),
|
||||
listener: l,
|
||||
}
|
||||
|
||||
// start processing
|
||||
go tunSub.run()
|
||||
|
||||
return tunSub, nil
|
||||
}
|
||||
|
||||
func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
|
||||
if err != nil {
|
||||
@@ -101,6 +160,49 @@ func (t *tunBroker) String() string {
|
||||
return "tunnel"
|
||||
}
|
||||
|
||||
func (t *tunBatchSubscriber) run() {
|
||||
for {
|
||||
// accept a new connection
|
||||
c, err := t.listener.Accept()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-t.closed:
|
||||
return
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// receive message
|
||||
m := new(transport.Message)
|
||||
if err := c.Recv(m); err != nil {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(t.opts.Context, err.Error())
|
||||
}
|
||||
if err = c.Close(); err != nil {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(t.opts.Context, err.Error())
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// close the connection
|
||||
c.Close()
|
||||
|
||||
evts := broker.Events{&tunEvent{
|
||||
topic: t.topic,
|
||||
message: &broker.Message{
|
||||
Header: m.Header,
|
||||
Body: m.Body,
|
||||
},
|
||||
}}
|
||||
// handle the message
|
||||
go t.handler(evts)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tunSubscriber) run() {
|
||||
for {
|
||||
// accept a new connection
|
||||
@@ -142,6 +244,24 @@ func (t *tunSubscriber) run() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tunBatchSubscriber) Options() broker.SubscribeOptions {
|
||||
return t.opts
|
||||
}
|
||||
|
||||
func (t *tunBatchSubscriber) Topic() string {
|
||||
return t.topic
|
||||
}
|
||||
|
||||
func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error {
|
||||
select {
|
||||
case <-t.closed:
|
||||
return nil
|
||||
default:
|
||||
close(t.closed)
|
||||
return t.listener.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tunSubscriber) Options() broker.SubscribeOptions {
|
||||
return t.opts
|
||||
}
|
||||
@@ -173,7 +293,11 @@ func (t *tunEvent) Ack() error {
|
||||
}
|
||||
|
||||
func (t *tunEvent) Error() error {
|
||||
return nil
|
||||
return t.err
|
||||
}
|
||||
|
||||
func (t *tunEvent) SetError(err error) {
|
||||
t.err = err
|
||||
}
|
||||
|
||||
// NewBroker returns new tunnel broker
|
||||
|
@@ -1,15 +0,0 @@
|
||||
package fn
|
||||
|
||||
type Initer interface {
|
||||
Init(opts ...interface{}) error
|
||||
}
|
||||
|
||||
func Init(ifaces ...Initer) error {
|
||||
var err error
|
||||
for _, iface := range ifaces {
|
||||
if err = iface.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -7,7 +7,6 @@ import (
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ErrInvalidParam specifies invalid url query params
|
||||
@@ -15,11 +14,12 @@ var ErrInvalidParam = errors.New("invalid url query param provided")
|
||||
|
||||
var bracketSplitter = regexp.MustCompile(`\[|\]`)
|
||||
|
||||
var timeKind = reflect.TypeOf(time.Time{}).Kind()
|
||||
//var timeKind = reflect.ValueOf(time.Time{}).Kind()
|
||||
|
||||
type StructField struct {
|
||||
Field reflect.StructField
|
||||
Value reflect.Value
|
||||
Path string
|
||||
}
|
||||
|
||||
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
|
||||
@@ -35,7 +35,7 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 {
|
||||
if len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -66,6 +66,17 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
func StructFieldByPath(src interface{}, path string) (interface{}, error) {
|
||||
var err error
|
||||
for _, p := range strings.Split(path, ".") {
|
||||
src, err = StructFieldByName(src, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return src, err
|
||||
}
|
||||
|
||||
func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
|
||||
sv := reflect.ValueOf(src)
|
||||
if sv.Kind() == reflect.Ptr {
|
||||
@@ -79,7 +90,7 @@ func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 {
|
||||
if len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
if fld.Name == tkey {
|
||||
@@ -105,6 +116,19 @@ func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
// StructFieldsMap returns map[string]interface{} or error
|
||||
func StructFieldsMap(src interface{}) (map[string]interface{}, error) {
|
||||
fields, err := StructFields(src)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mp := make(map[string]interface{}, len(fields))
|
||||
for _, field := range fields {
|
||||
mp[field.Path] = field.Value.Interface()
|
||||
}
|
||||
return mp, nil
|
||||
}
|
||||
|
||||
// StructFields returns slice of struct fields
|
||||
func StructFields(src interface{}) ([]StructField, error) {
|
||||
var fields []StructField
|
||||
@@ -116,25 +140,29 @@ func StructFields(src interface{}) ([]StructField, error) {
|
||||
if sv.Kind() != reflect.Struct {
|
||||
return nil, ErrInvalidStruct
|
||||
}
|
||||
|
||||
typ := sv.Type()
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 {
|
||||
if !val.IsValid() || len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
switch val.Kind() {
|
||||
case timeKind:
|
||||
fields = append(fields, StructField{Field: fld, Value: val})
|
||||
//case timeKind:
|
||||
//fmt.Printf("GGG\n")
|
||||
//fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
|
||||
case reflect.Struct:
|
||||
infields, err := StructFields(val.Interface())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fields = append(fields, infields...)
|
||||
for _, infield := range infields {
|
||||
infield.Path = fmt.Sprintf("%s.%s", fld.Name, infield.Path)
|
||||
fields = append(fields, infield)
|
||||
}
|
||||
default:
|
||||
fields = append(fields, StructField{Field: fld, Value: val})
|
||||
fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,7 +222,7 @@ func StructURLValues(src interface{}, pref string, tags []string) (url.Values, e
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 || !val.IsValid() {
|
||||
if len(fld.PkgPath) != 0 || !val.IsValid() {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@@ -2,9 +2,80 @@ package reflect
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"reflect"
|
||||
rfl "reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStructFieldsMap(t *testing.T) {
|
||||
type NestedStr struct {
|
||||
BBB string
|
||||
CCC int
|
||||
}
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
XXX string `json:"xxx"`
|
||||
Nested NestedStr
|
||||
}
|
||||
|
||||
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
|
||||
fields, err := StructFieldsMap(val)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v, ok := fields["Nested.BBB"]; !ok || v != "ddd" {
|
||||
t.Fatalf("invalid field from %v", fields)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructFields(t *testing.T) {
|
||||
type NestedStr struct {
|
||||
BBB string
|
||||
CCC int
|
||||
}
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
XXX string `json:"xxx"`
|
||||
Nested NestedStr
|
||||
}
|
||||
|
||||
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
|
||||
fields, err := StructFields(val)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var ok bool
|
||||
for _, field := range fields {
|
||||
if field.Path == "Nested.CCC" {
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("struct fields returns invalid path: %v", fields)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructByPath(t *testing.T) {
|
||||
type NestedStr struct {
|
||||
BBB string
|
||||
CCC int
|
||||
}
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
XXX string `json:"xxx"`
|
||||
Nested NestedStr
|
||||
}
|
||||
|
||||
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
|
||||
field, err := StructFieldByPath(val, "Nested.CCC")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if rfl.Indirect(reflect.ValueOf(field)).Int() != 9 {
|
||||
t.Fatalf("invalid elem returned: %v", field)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructByTag(t *testing.T) {
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
|
Reference in New Issue
Block a user