Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
d357fb1e0d | |||
e4673bcc50 | |||
a839f75a2f | |||
a7e6d61b95 | |||
650d167313 | |||
c6ba2a91e6 | |||
7ece08896f | |||
|
57f6f23294 | ||
09e6fa2fed | |||
10a09a5c6f | |||
b4e5d9462a | |||
96aa0b6906 | |||
f54658830d | |||
1e43122660 | |||
42800fa247 | |||
5b9c810653 | |||
c3def24bf4 | |||
0d1ef31764 |
@@ -8,31 +8,79 @@ import (
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
// DefaultBroker default broker
|
||||
// DefaultBroker default memory broker
|
||||
var DefaultBroker Broker = NewBroker()
|
||||
|
||||
var (
|
||||
// ErrNotConnected returns when broker used but not connected yet
|
||||
ErrNotConnected = errors.New("broker not connected")
|
||||
// ErrDisconnected returns when broker disconnected
|
||||
ErrDisconnected = errors.New("broker disconnected")
|
||||
)
|
||||
|
||||
// Broker is an interface used for asynchronous messaging.
|
||||
type Broker interface {
|
||||
// Name returns broker instance name
|
||||
Name() string
|
||||
Init(...Option) error
|
||||
// Init initilize broker
|
||||
Init(opts ...Option) error
|
||||
// Options returns broker options
|
||||
Options() Options
|
||||
// Address return configured address
|
||||
Address() string
|
||||
Connect(context.Context) error
|
||||
Disconnect(context.Context) error
|
||||
Publish(context.Context, string, *Message, ...PublishOption) error
|
||||
Subscribe(context.Context, string, Handler, ...SubscribeOption) (Subscriber, error)
|
||||
// Connect connects to broker
|
||||
Connect(ctx context.Context) error
|
||||
// Disconnect disconnect from broker
|
||||
Disconnect(ctx context.Context) error
|
||||
// Publish message to broker topic
|
||||
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
|
||||
// BatchPublish messages to broker with multiple topics
|
||||
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
|
||||
// Subscribe subscribes to topic message via handler
|
||||
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
|
||||
// BatchSubscribe subscribes to topic messages via handler
|
||||
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
|
||||
// String type of broker
|
||||
String() string
|
||||
}
|
||||
|
||||
// Handler is used to process messages via a subscription of a topic.
|
||||
type Handler func(Event) error
|
||||
|
||||
// Events contains multiple events
|
||||
type Events []Event
|
||||
|
||||
func (evs Events) Ack() error {
|
||||
var err error
|
||||
for _, ev := range evs {
|
||||
if err = ev.Ack(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (evs Events) SetError(err error) {
|
||||
for _, ev := range evs {
|
||||
ev.SetError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// BatchHandler is used to process messages in batches via a subscription of a topic.
|
||||
type BatchHandler func(Events) error
|
||||
|
||||
// Event is given to a subscription handler for processing
|
||||
type Event interface {
|
||||
// Topic returns event topic
|
||||
Topic() string
|
||||
// Message returns broker message
|
||||
Message() *Message
|
||||
// Ack acknowledge message
|
||||
Ack() error
|
||||
// Error returns message error (like decoding errors or some other)
|
||||
Error() error
|
||||
// SetError set event processing error
|
||||
SetError(err error)
|
||||
}
|
||||
|
||||
// RawMessage is a raw encoded JSON value.
|
||||
@@ -58,13 +106,18 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||
|
||||
// Message is used to transfer data
|
||||
type Message struct {
|
||||
Header metadata.Metadata // contains message metadata
|
||||
Body RawMessage // contains message body
|
||||
// Header contains message metadata
|
||||
Header metadata.Metadata
|
||||
// Body contains message body
|
||||
Body RawMessage
|
||||
}
|
||||
|
||||
// Subscriber is a convenience return type for the Subscribe method
|
||||
type Subscriber interface {
|
||||
// Options returns subscriber options
|
||||
Options() SubscribeOptions
|
||||
// Topic returns topic for subscription
|
||||
Topic() string
|
||||
Unsubscribe(context.Context) error
|
||||
// Unsubscribe from topic
|
||||
Unsubscribe(ctx context.Context) error
|
||||
}
|
||||
|
212
broker/memory.go
212
broker/memory.go
@@ -2,7 +2,6 @@ package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -13,9 +12,10 @@ import (
|
||||
)
|
||||
|
||||
type memoryBroker struct {
|
||||
Subscribers map[string][]*memorySubscriber
|
||||
addr string
|
||||
opts Options
|
||||
subscribers map[string][]*memorySubscriber
|
||||
batchsubscribers map[string][]*memoryBatchSubscriber
|
||||
addr string
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
connected bool
|
||||
}
|
||||
@@ -36,6 +36,15 @@ type memorySubscriber struct {
|
||||
opts SubscribeOptions
|
||||
}
|
||||
|
||||
type memoryBatchSubscriber struct {
|
||||
ctx context.Context
|
||||
exit chan bool
|
||||
handler BatchHandler
|
||||
id string
|
||||
topic string
|
||||
opts SubscribeOptions
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Options() Options {
|
||||
return m.opts
|
||||
}
|
||||
@@ -77,7 +86,6 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error {
|
||||
}
|
||||
|
||||
m.connected = false
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -88,14 +96,127 @@ func (m *memoryBroker) Init(opts ...Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
type msgWrapper struct {
|
||||
topic string
|
||||
body interface{}
|
||||
}
|
||||
|
||||
vs := make([]msgWrapper, 0, len(msgs))
|
||||
if m.opts.Codec == nil {
|
||||
m.RLock()
|
||||
for _, msg := range msgs {
|
||||
topic, _ := msg.Header.Get("Micro-Topic")
|
||||
vs = append(vs, msgWrapper{topic: topic, body: m})
|
||||
}
|
||||
m.RUnlock()
|
||||
} else {
|
||||
m.RLock()
|
||||
for _, msg := range msgs {
|
||||
topic, _ := msg.Header.Get("Micro-Topic")
|
||||
buf, err := m.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
m.RUnlock()
|
||||
return err
|
||||
}
|
||||
vs = append(vs, msgWrapper{topic: topic, body: buf})
|
||||
}
|
||||
m.RUnlock()
|
||||
}
|
||||
|
||||
if len(m.batchsubscribers) > 0 {
|
||||
eh := m.opts.BatchErrorHandler
|
||||
|
||||
msgTopicMap := make(map[string]Events)
|
||||
for _, v := range vs {
|
||||
p := &memoryEvent{
|
||||
topic: v.topic,
|
||||
message: v.body,
|
||||
opts: m.opts,
|
||||
}
|
||||
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
|
||||
}
|
||||
|
||||
for t, ms := range msgTopicMap {
|
||||
m.RLock()
|
||||
subs, ok := m.batchsubscribers[t]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for _, sub := range subs {
|
||||
if err := sub.handler(ms); err != nil {
|
||||
ms.SetError(err)
|
||||
if sub.opts.BatchErrorHandler != nil {
|
||||
eh = sub.opts.BatchErrorHandler
|
||||
}
|
||||
if eh != nil {
|
||||
eh(ms)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err := ms.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
eh := m.opts.ErrorHandler
|
||||
|
||||
for _, v := range vs {
|
||||
p := &memoryEvent{
|
||||
topic: v.topic,
|
||||
message: v.body,
|
||||
opts: m.opts,
|
||||
}
|
||||
|
||||
m.RLock()
|
||||
subs, ok := m.subscribers[p.topic]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for _, sub := range subs {
|
||||
if err := sub.handler(p); err != nil {
|
||||
p.SetError(err)
|
||||
if sub.opts.ErrorHandler != nil {
|
||||
eh = sub.opts.ErrorHandler
|
||||
}
|
||||
if eh != nil {
|
||||
eh(p)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err := p.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return errors.New("not connected")
|
||||
return ErrNotConnected
|
||||
}
|
||||
|
||||
subs, ok := m.Subscribers[topic]
|
||||
subs, ok := m.subscribers[topic]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
return nil
|
||||
@@ -138,11 +259,58 @@ func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return nil, ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
options := NewSubscribeOptions(opts...)
|
||||
|
||||
id, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sub := &memoryBatchSubscriber{
|
||||
exit: make(chan bool, 1),
|
||||
id: id.String(),
|
||||
topic: topic,
|
||||
handler: handler,
|
||||
opts: options,
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.batchsubscribers[topic] = append(m.batchsubscribers[topic], sub)
|
||||
m.Unlock()
|
||||
|
||||
go func() {
|
||||
<-sub.exit
|
||||
m.Lock()
|
||||
var newSubscribers []*memoryBatchSubscriber
|
||||
for _, sb := range m.batchsubscribers[topic] {
|
||||
if sb.id == sub.id {
|
||||
continue
|
||||
}
|
||||
newSubscribers = append(newSubscribers, sb)
|
||||
}
|
||||
m.batchsubscribers[topic] = newSubscribers
|
||||
m.Unlock()
|
||||
}()
|
||||
|
||||
return sub, nil
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
m.RLock()
|
||||
if !m.connected {
|
||||
m.RUnlock()
|
||||
return nil, errors.New("not connected")
|
||||
return nil, ErrNotConnected
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
@@ -163,20 +331,20 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.Subscribers[topic] = append(m.Subscribers[topic], sub)
|
||||
m.subscribers[topic] = append(m.subscribers[topic], sub)
|
||||
m.Unlock()
|
||||
|
||||
go func() {
|
||||
<-sub.exit
|
||||
m.Lock()
|
||||
var newSubscribers []*memorySubscriber
|
||||
for _, sb := range m.Subscribers[topic] {
|
||||
for _, sb := range m.subscribers[topic] {
|
||||
if sb.id == sub.id {
|
||||
continue
|
||||
}
|
||||
newSubscribers = append(newSubscribers, sb)
|
||||
}
|
||||
m.Subscribers[topic] = newSubscribers
|
||||
m.subscribers[topic] = newSubscribers
|
||||
m.Unlock()
|
||||
}()
|
||||
|
||||
@@ -221,6 +389,23 @@ func (m *memoryEvent) Error() error {
|
||||
return m.err
|
||||
}
|
||||
|
||||
func (m *memoryEvent) SetError(err error) {
|
||||
m.err = err
|
||||
}
|
||||
|
||||
func (m *memoryBatchSubscriber) Options() SubscribeOptions {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *memoryBatchSubscriber) Topic() string {
|
||||
return m.topic
|
||||
}
|
||||
|
||||
func (m *memoryBatchSubscriber) Unsubscribe(ctx context.Context) error {
|
||||
m.exit <- true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memorySubscriber) Options() SubscribeOptions {
|
||||
return m.opts
|
||||
}
|
||||
@@ -237,7 +422,8 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
|
||||
// NewBroker return new memory broker
|
||||
func NewBroker(opts ...Option) Broker {
|
||||
return &memoryBroker{
|
||||
opts: NewOptions(opts...),
|
||||
Subscribers: make(map[string][]*memorySubscriber),
|
||||
opts: NewOptions(opts...),
|
||||
subscribers: make(map[string][]*memorySubscriber),
|
||||
batchsubscribers: make(map[string][]*memoryBatchSubscriber),
|
||||
}
|
||||
}
|
||||
|
@@ -6,6 +6,51 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMemoryBatchBroker(t *testing.T) {
|
||||
b := NewBroker()
|
||||
ctx := context.Background()
|
||||
|
||||
if err := b.Connect(ctx); err != nil {
|
||||
t.Fatalf("Unexpected connect error %v", err)
|
||||
}
|
||||
|
||||
topic := "test"
|
||||
count := 10
|
||||
|
||||
fn := func(evts Events) error {
|
||||
return evts.Ack()
|
||||
}
|
||||
|
||||
sub, err := b.BatchSubscribe(ctx, topic, fn)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error subscribing %v", err)
|
||||
}
|
||||
|
||||
msgs := make([]*Message, 0, 0)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &Message{
|
||||
Header: map[string]string{
|
||||
"Micro-Topic": topic,
|
||||
"foo": "bar",
|
||||
"id": fmt.Sprintf("%d", i),
|
||||
},
|
||||
Body: []byte(`"hello world"`),
|
||||
}
|
||||
msgs = append(msgs, message)
|
||||
}
|
||||
|
||||
if err := b.BatchPublish(ctx, msgs); err != nil {
|
||||
t.Fatalf("Unexpected error publishing %v", err)
|
||||
}
|
||||
|
||||
if err := sub.Unsubscribe(ctx); err != nil {
|
||||
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
|
||||
}
|
||||
|
||||
if err := b.Disconnect(ctx); err != nil {
|
||||
t.Fatalf("Unexpected connect error %v", err)
|
||||
}
|
||||
}
|
||||
func TestMemoryBroker(t *testing.T) {
|
||||
b := NewBroker()
|
||||
ctx := context.Background()
|
||||
@@ -26,20 +71,27 @@ func TestMemoryBroker(t *testing.T) {
|
||||
t.Fatalf("Unexpected error subscribing %v", err)
|
||||
}
|
||||
|
||||
msgs := make([]*Message, 0, 0)
|
||||
for i := 0; i < count; i++ {
|
||||
message := &Message{
|
||||
Header: map[string]string{
|
||||
"foo": "bar",
|
||||
"id": fmt.Sprintf("%d", i),
|
||||
"Micro-Topic": topic,
|
||||
"foo": "bar",
|
||||
"id": fmt.Sprintf("%d", i),
|
||||
},
|
||||
Body: []byte(`"hello world"`),
|
||||
}
|
||||
msgs = append(msgs, message)
|
||||
|
||||
if err := b.Publish(ctx, topic, message); err != nil {
|
||||
t.Fatalf("Unexpected error publishing %d", i)
|
||||
t.Fatalf("Unexpected error publishing %d err: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := b.BatchPublish(ctx, msgs); err != nil {
|
||||
t.Fatalf("Unexpected error publishing %v", err)
|
||||
}
|
||||
|
||||
if err := sub.Unsubscribe(ctx); err != nil {
|
||||
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
|
||||
}
|
||||
|
@@ -29,6 +29,8 @@ type Options struct {
|
||||
TLSConfig *tls.Config
|
||||
// ErrorHandler used when broker can't unmarshal incoming message
|
||||
ErrorHandler Handler
|
||||
// BatchErrorHandler used when broker can't unmashal incoming messages
|
||||
BatchErrorHandler BatchHandler
|
||||
// Name holds the broker name
|
||||
Name string
|
||||
// Addrs holds the broker address
|
||||
@@ -85,6 +87,8 @@ type SubscribeOptions struct {
|
||||
Context context.Context
|
||||
// ErrorHandler used when broker can't unmarshal incoming message
|
||||
ErrorHandler Handler
|
||||
// BatchErrorHandler used when broker can't unmashal incoming messages
|
||||
BatchErrorHandler BatchHandler
|
||||
// Group holds consumer group
|
||||
Group string
|
||||
// AutoAck flag specifies auto ack of incoming message when no error happens
|
||||
@@ -175,6 +179,14 @@ func ErrorHandler(h Handler) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// BatchErrorHandler will catch all broker errors that cant be handled
|
||||
// in normal way, for example Codec errors
|
||||
func BatchErrorHandler(h BatchHandler) Option {
|
||||
return func(o *Options) {
|
||||
o.BatchErrorHandler = h
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeErrorHandler will catch all broker errors that cant be handled
|
||||
// in normal way, for example Codec errors
|
||||
func SubscribeErrorHandler(h Handler) SubscribeOption {
|
||||
@@ -183,6 +195,14 @@ func SubscribeErrorHandler(h Handler) SubscribeOption {
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeBatchErrorHandler will catch all broker errors that cant be handled
|
||||
// in normal way, for example Codec errors
|
||||
func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.BatchErrorHandler = h
|
||||
}
|
||||
}
|
||||
|
||||
// Queue sets the subscribers queue
|
||||
// Deprecated
|
||||
func Queue(name string) SubscribeOption {
|
||||
|
@@ -96,8 +96,8 @@ type CallOptions struct {
|
||||
RequestTimeout time.Duration
|
||||
// DialTimeout dial timeout
|
||||
DialTimeout time.Duration
|
||||
// AuthToken flag
|
||||
AuthToken bool
|
||||
// AuthToken string
|
||||
AuthToken string
|
||||
}
|
||||
|
||||
// Context pass context to client
|
||||
@@ -463,9 +463,9 @@ func WithDialTimeout(d time.Duration) CallOption {
|
||||
|
||||
// WithAuthToken is a CallOption which overrides the
|
||||
// authorization header with the services own auth token
|
||||
func WithAuthToken() CallOption {
|
||||
func WithAuthToken(t string) CallOption {
|
||||
return func(o *CallOptions) {
|
||||
o.AuthToken = true
|
||||
o.AuthToken = t
|
||||
}
|
||||
}
|
||||
|
||||
|
395
flow/default.go
395
flow/default.go
@@ -3,12 +3,16 @@ package flow
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/silas/dag"
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
"github.com/unistack-org/micro/v3/store"
|
||||
)
|
||||
|
||||
type microFlow struct {
|
||||
@@ -20,27 +24,147 @@ type microWorkflow struct {
|
||||
g *dag.AcyclicGraph
|
||||
init bool
|
||||
sync.RWMutex
|
||||
opts Options
|
||||
steps map[string]Step
|
||||
opts Options
|
||||
steps map[string]Step
|
||||
status Status
|
||||
}
|
||||
|
||||
func (w *microWorkflow) ID() string {
|
||||
return w.id
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Steps() [][]Step {
|
||||
func (w *microWorkflow) Steps() ([][]Step, error) {
|
||||
return w.getSteps("", false)
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Status() Status {
|
||||
return w.status
|
||||
}
|
||||
|
||||
func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
||||
w.Lock()
|
||||
|
||||
for _, s := range steps {
|
||||
w.steps[s.String()] = s
|
||||
w.g.Add(s)
|
||||
}
|
||||
|
||||
for _, dst := range steps {
|
||||
for _, req := range dst.Requires() {
|
||||
src, ok := w.steps[req]
|
||||
if !ok {
|
||||
return ErrStepNotExists
|
||||
}
|
||||
w.g.Connect(dag.BasicEdge(src, dst))
|
||||
}
|
||||
}
|
||||
|
||||
if err := w.g.Validate(); err != nil {
|
||||
w.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
w.g.TransitiveReduction()
|
||||
|
||||
w.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error {
|
||||
func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
||||
// TODO: handle case when some step requires or required by removed step
|
||||
|
||||
w.Lock()
|
||||
|
||||
for _, s := range steps {
|
||||
delete(w.steps, s.String())
|
||||
w.g.Remove(s)
|
||||
}
|
||||
|
||||
for _, dst := range steps {
|
||||
for _, req := range dst.Requires() {
|
||||
src, ok := w.steps[req]
|
||||
if !ok {
|
||||
return ErrStepNotExists
|
||||
}
|
||||
w.g.Connect(dag.BasicEdge(src, dst))
|
||||
}
|
||||
}
|
||||
|
||||
if err := w.g.Validate(); err != nil {
|
||||
w.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
w.g.TransitiveReduction()
|
||||
|
||||
w.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error {
|
||||
return nil
|
||||
func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
|
||||
var steps [][]Step
|
||||
var root dag.Vertex
|
||||
var err error
|
||||
|
||||
fn := func(n dag.Vertex, idx int) error {
|
||||
if idx == 0 {
|
||||
steps = make([][]Step, 1)
|
||||
steps[0] = make([]Step, 0, 1)
|
||||
} else if idx >= len(steps) {
|
||||
tsteps := make([][]Step, idx+1)
|
||||
copy(tsteps, steps)
|
||||
steps = tsteps
|
||||
steps[idx] = make([]Step, 0, 1)
|
||||
}
|
||||
steps[idx] = append(steps[idx], n.(Step))
|
||||
return nil
|
||||
}
|
||||
|
||||
if start != "" {
|
||||
var ok bool
|
||||
w.RLock()
|
||||
root, ok = w.steps[start]
|
||||
w.RUnlock()
|
||||
if !ok {
|
||||
return nil, ErrStepNotExists
|
||||
}
|
||||
} else {
|
||||
root, err = w.g.Root()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if reverse {
|
||||
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||
} else {
|
||||
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return steps, nil
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) {
|
||||
func (w *microWorkflow) Abort(ctx context.Context, eid string) error {
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Suspend(ctx context.Context, eid string) error {
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Resume(ctx context.Context, eid string) error {
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
|
||||
}
|
||||
|
||||
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
|
||||
w.Lock()
|
||||
if !w.init {
|
||||
if err := w.g.Validate(); err != nil {
|
||||
@@ -56,74 +180,176 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
eid := uid.String()
|
||||
|
||||
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||
|
||||
options := NewExecuteOptions(opts...)
|
||||
var steps [][]Step
|
||||
fn := func(n dag.Vertex, idx int) error {
|
||||
if idx == 0 {
|
||||
steps = make([][]Step, 1)
|
||||
steps[0] = make([]Step, 0, 1)
|
||||
} else if idx >= len(steps) {
|
||||
tsteps := make([][]Step, idx+1)
|
||||
copy(tsteps, steps)
|
||||
steps = tsteps
|
||||
steps[idx] = make([]Step, 0, 1)
|
||||
}
|
||||
steps[idx] = append(steps[idx], n.(Step))
|
||||
return nil
|
||||
}
|
||||
|
||||
var root dag.Vertex
|
||||
if options.Start != "" {
|
||||
var ok bool
|
||||
w.RLock()
|
||||
root, ok = w.steps[options.Start]
|
||||
w.RUnlock()
|
||||
if !ok {
|
||||
return "", ErrStepNotExists
|
||||
}
|
||||
} else {
|
||||
root, err = w.g.Root()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
if options.Reverse {
|
||||
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||
} else {
|
||||
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||
}
|
||||
steps, err := w.getSteps(options.Start, options.Reverse)
|
||||
if err != nil {
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
cherr := make(chan error, 1)
|
||||
defer close(cherr)
|
||||
chstatus := make(chan Status, 1)
|
||||
|
||||
nctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
nopts := make([]ExecuteOption, 0, len(opts)+5)
|
||||
nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store))
|
||||
|
||||
nopts = append(nopts,
|
||||
ExecuteClient(w.opts.Client),
|
||||
ExecuteTracer(w.opts.Tracer),
|
||||
ExecuteLogger(w.opts.Logger),
|
||||
ExecuteMeter(w.opts.Meter),
|
||||
)
|
||||
nopts = append(nopts, opts...)
|
||||
done := make(chan struct{})
|
||||
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
return eid, werr
|
||||
}
|
||||
for idx := range steps {
|
||||
for nidx := range steps[idx] {
|
||||
cstep := steps[idx][nidx]
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||
return eid, werr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for idx := range steps {
|
||||
wg.Add(len(steps[idx]))
|
||||
for nidx := range steps[idx] {
|
||||
go func(step Step) {
|
||||
defer wg.Done()
|
||||
if err = step.Execute(nctx, req, nopts...); err != nil {
|
||||
cherr <- err
|
||||
cancel()
|
||||
wStatus := &codec.Frame{}
|
||||
if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
|
||||
chstatus <- status
|
||||
return
|
||||
}
|
||||
if w.opts.Logger.V(logger.TraceLevel) {
|
||||
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
|
||||
}
|
||||
cstep := steps[idx][nidx]
|
||||
if len(cstep.Requires()) == 0 {
|
||||
wg.Add(1)
|
||||
go func(step Step) {
|
||||
defer wg.Done()
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
rsp, serr := step.Execute(nctx, req, nopts...)
|
||||
if serr != nil {
|
||||
step.SetStatus(StatusFailure)
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
}
|
||||
cherr <- serr
|
||||
return
|
||||
} else {
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
}
|
||||
}(cstep)
|
||||
wg.Wait()
|
||||
} else {
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
}(steps[idx][nidx])
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
rsp, serr := cstep.Execute(nctx, req, nopts...)
|
||||
if serr != nil {
|
||||
cstep.SetStatus(StatusFailure)
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
}
|
||||
cherr <- serr
|
||||
return
|
||||
} else {
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
|
||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||
cherr <- werr
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
cherr <- nil
|
||||
close(done)
|
||||
}()
|
||||
|
||||
err = <-cherr
|
||||
if options.Async {
|
||||
return eid, nil
|
||||
}
|
||||
|
||||
logger.Tracef(ctx, "wait for finish or error")
|
||||
select {
|
||||
case <-nctx.Done():
|
||||
err = nctx.Err()
|
||||
case cerr := <-cherr:
|
||||
err = cerr
|
||||
case <-done:
|
||||
close(cherr)
|
||||
case <-chstatus:
|
||||
close(chstatus)
|
||||
return uid.String(), nil
|
||||
}
|
||||
|
||||
switch {
|
||||
case nctx.Err() != nil:
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
}
|
||||
break
|
||||
case err == nil:
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
}
|
||||
break
|
||||
case err != nil:
|
||||
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
|
||||
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
return uid.String(), err
|
||||
}
|
||||
@@ -207,6 +433,17 @@ type microCallStep struct {
|
||||
opts StepOptions
|
||||
service string
|
||||
method string
|
||||
rsp *Message
|
||||
req *Message
|
||||
status Status
|
||||
}
|
||||
|
||||
func (s *microCallStep) Request() *Message {
|
||||
return s.req
|
||||
}
|
||||
|
||||
func (s *microCallStep) Response() *Message {
|
||||
return s.rsp
|
||||
}
|
||||
|
||||
func (s *microCallStep) ID() string {
|
||||
@@ -247,23 +484,47 @@ func (s *microCallStep) Hashcode() interface{} {
|
||||
return s.String()
|
||||
}
|
||||
|
||||
func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
|
||||
func (s *microCallStep) GetStatus() Status {
|
||||
return s.status
|
||||
}
|
||||
|
||||
func (s *microCallStep) SetStatus(status Status) {
|
||||
s.status = status
|
||||
}
|
||||
|
||||
func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
|
||||
options := NewExecuteOptions(opts...)
|
||||
if options.Client == nil {
|
||||
return fmt.Errorf("client not set")
|
||||
return nil, ErrMissingClient
|
||||
}
|
||||
rsp := &codec.Frame{}
|
||||
copts := []client.CallOption{client.WithRetries(0)}
|
||||
if options.Timeout > 0 {
|
||||
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
|
||||
}
|
||||
err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp)
|
||||
return err
|
||||
nctx := metadata.NewOutgoingContext(ctx, req.Header)
|
||||
err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
md, _ := metadata.FromOutgoingContext(nctx)
|
||||
return &Message{Header: md, Body: rsp.Data}, err
|
||||
}
|
||||
|
||||
type microPublishStep struct {
|
||||
opts StepOptions
|
||||
topic string
|
||||
opts StepOptions
|
||||
topic string
|
||||
req *Message
|
||||
rsp *Message
|
||||
status Status
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Request() *Message {
|
||||
return s.req
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Response() *Message {
|
||||
return s.rsp
|
||||
}
|
||||
|
||||
func (s *microPublishStep) ID() string {
|
||||
@@ -304,13 +565,21 @@ func (s *microPublishStep) Hashcode() interface{} {
|
||||
return s.String()
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
|
||||
return nil
|
||||
func (s *microPublishStep) GetStatus() Status {
|
||||
return s.status
|
||||
}
|
||||
|
||||
func NewCallStep(service string, method string, opts ...StepOption) Step {
|
||||
func (s *microPublishStep) SetStatus(status Status) {
|
||||
s.status = status
|
||||
}
|
||||
|
||||
func (s *microPublishStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func NewCallStep(service string, name string, method string, opts ...StepOption) Step {
|
||||
options := NewStepOptions(opts...)
|
||||
return µCallStep{service: service, method: method, opts: options}
|
||||
return µCallStep{service: service, method: name + "." + method, opts: options}
|
||||
}
|
||||
|
||||
func NewPublishStep(topic string, opts ...StepOption) Step {
|
||||
|
105
flow/flow.go
105
flow/flow.go
@@ -4,12 +4,43 @@ package flow
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrStepNotExists = errors.New("step not exists")
|
||||
ErrMissingClient = errors.New("client not set")
|
||||
)
|
||||
|
||||
// RawMessage is a raw encoded JSON value.
|
||||
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
||||
type RawMessage []byte
|
||||
|
||||
// MarshalJSON returns m as the JSON encoding of m.
|
||||
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
||||
if m == nil {
|
||||
return []byte("null"), nil
|
||||
}
|
||||
return *m, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON sets *m to a copy of data.
|
||||
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||
if m == nil {
|
||||
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
||||
}
|
||||
*m = append((*m)[0:0], data...)
|
||||
return nil
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
Header metadata.Metadata
|
||||
Body RawMessage
|
||||
}
|
||||
|
||||
// Step represents dedicated workflow step
|
||||
type Step interface {
|
||||
// ID returns step id
|
||||
@@ -17,7 +48,7 @@ type Step interface {
|
||||
// Endpoint returns rpc endpoint service_name.service_method or broker topic
|
||||
Endpoint() string
|
||||
// Execute step run
|
||||
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error
|
||||
Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error)
|
||||
// Requires returns dependent steps
|
||||
Requires() []string
|
||||
// Options returns step options
|
||||
@@ -26,20 +57,70 @@ type Step interface {
|
||||
Require(steps ...Step) error
|
||||
// String
|
||||
String() string
|
||||
// GetStatus returns step status
|
||||
GetStatus() Status
|
||||
// SetStatus sets the step status
|
||||
SetStatus(Status)
|
||||
// Request returns step request message
|
||||
Request() *Message
|
||||
// Response returns step response message
|
||||
Response() *Message
|
||||
}
|
||||
|
||||
type Status int
|
||||
|
||||
func (status Status) String() string {
|
||||
return StatusString[status]
|
||||
}
|
||||
|
||||
const (
|
||||
StatusPending Status = iota
|
||||
StatusRunning
|
||||
StatusFailure
|
||||
StatusSuccess
|
||||
StatusAborted
|
||||
StatusSuspend
|
||||
)
|
||||
|
||||
var (
|
||||
StatusString = map[Status]string{
|
||||
StatusPending: "StatusPending",
|
||||
StatusRunning: "StatusRunning",
|
||||
StatusFailure: "StatusFailure",
|
||||
StatusSuccess: "StatusSuccess",
|
||||
StatusAborted: "StatusAborted",
|
||||
StatusSuspend: "StatusSuspend",
|
||||
}
|
||||
StringStatus = map[string]Status{
|
||||
"StatusPending": StatusPending,
|
||||
"StatusRunning": StatusRunning,
|
||||
"StatusFailure": StatusFailure,
|
||||
"StatusSuccess": StatusSuccess,
|
||||
"StatusAborted": StatusAborted,
|
||||
"StatusSuspend": StatusSuspend,
|
||||
}
|
||||
)
|
||||
|
||||
// Workflow contains all steps to execute
|
||||
type Workflow interface {
|
||||
// ID returns id of the workflow
|
||||
ID() string
|
||||
// Steps returns steps slice where parallel steps returned on the same level
|
||||
Steps() [][]Step
|
||||
// Execute workflow with args, return execution id and error
|
||||
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error)
|
||||
Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error)
|
||||
// RemoveSteps remove steps from workflow
|
||||
RemoveSteps(ctx context.Context, steps ...Step) error
|
||||
RemoveSteps(steps ...Step) error
|
||||
// AppendSteps append steps to workflow
|
||||
AppendSteps(ctx context.Context, steps ...Step) error
|
||||
AppendSteps(steps ...Step) error
|
||||
// Status returns workflow status
|
||||
Status() Status
|
||||
// Steps returns steps slice where parallel steps returned on the same level
|
||||
Steps() ([][]Step, error)
|
||||
// Suspend suspends execution
|
||||
Suspend(ctx context.Context, eid string) error
|
||||
// Resume resumes execution
|
||||
Resume(ctx context.Context, eid string) error
|
||||
// Abort abort execution
|
||||
Abort(ctx context.Context, eid string) error
|
||||
}
|
||||
|
||||
// Flow the base interface to interact with workflows
|
||||
@@ -57,3 +138,15 @@ type Flow interface {
|
||||
// WorkflowList lists all workflows
|
||||
WorkflowList(ctx context.Context) ([]Workflow, error)
|
||||
}
|
||||
|
||||
var (
|
||||
flowMu sync.Mutex
|
||||
atomicSteps atomic.Value
|
||||
)
|
||||
|
||||
func RegisterStep(step Step) {
|
||||
flowMu.Lock()
|
||||
steps, _ := atomicSteps.Load().([]Step)
|
||||
atomicSteps.Store(append(steps, step))
|
||||
flowMu.Unlock()
|
||||
}
|
||||
|
@@ -116,8 +116,6 @@ type ExecuteOptions struct {
|
||||
Logger logger.Logger
|
||||
// Meter holds the meter
|
||||
Meter meter.Meter
|
||||
// Store used for intermediate results
|
||||
Store store.Store
|
||||
// Context can be used to abort execution or pass additional opts
|
||||
Context context.Context
|
||||
// Start step
|
||||
@@ -126,6 +124,8 @@ type ExecuteOptions struct {
|
||||
Reverse bool
|
||||
// Timeout for execution
|
||||
Timeout time.Duration
|
||||
// Async enables async execution
|
||||
Async bool
|
||||
}
|
||||
|
||||
type ExecuteOption func(*ExecuteOptions)
|
||||
@@ -154,12 +154,6 @@ func ExecuteMeter(m meter.Meter) ExecuteOption {
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteStore(s store.Store) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Store = s
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteContext(ctx context.Context) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Context = ctx
|
||||
@@ -178,8 +172,20 @@ func ExecuteTimeout(td time.Duration) ExecuteOption {
|
||||
}
|
||||
}
|
||||
|
||||
func ExecuteAsync(b bool) ExecuteOption {
|
||||
return func(o *ExecuteOptions) {
|
||||
o.Async = b
|
||||
}
|
||||
}
|
||||
|
||||
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions {
|
||||
options := ExecuteOptions{}
|
||||
options := ExecuteOptions{
|
||||
Client: client.DefaultClient,
|
||||
Logger: logger.DefaultLogger,
|
||||
Tracer: tracer.DefaultTracer,
|
||||
Meter: meter.DefaultMeter,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
@@ -196,7 +202,9 @@ type StepOptions struct {
|
||||
type StepOption func(*StepOptions)
|
||||
|
||||
func NewStepOptions(opts ...StepOption) StepOptions {
|
||||
options := StepOptions{Context: context.Background()}
|
||||
options := StepOptions{
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@@ -5,7 +5,7 @@ go 1.16
|
||||
require (
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/ef-ds/deque v1.0.4
|
||||
github.com/google/uuid v1.2.0
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/imdario/mergo v0.3.12
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
|
||||
|
4
go.sum
4
go.sum
@@ -2,8 +2,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
|
||||
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
||||
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
|
||||
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
|
||||
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||
|
@@ -20,6 +20,10 @@ type Wrapper interface {
|
||||
Logf(LogfFunc) LogfFunc
|
||||
}
|
||||
|
||||
var (
|
||||
_ Logger = &OmitLogger{}
|
||||
)
|
||||
|
||||
type OmitLogger struct {
|
||||
l Logger
|
||||
}
|
||||
|
@@ -3,8 +3,9 @@ package meter
|
||||
|
||||
import (
|
||||
"io"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -77,36 +78,62 @@ type Summary interface {
|
||||
UpdateDuration(time.Time)
|
||||
}
|
||||
|
||||
// sort labels alphabeticaly by label name
|
||||
type byKey []string
|
||||
|
||||
func (k byKey) Len() int { return len(k) / 2 }
|
||||
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
|
||||
func (k byKey) Swap(i, j int) {
|
||||
k[i*2], k[i*2+1], k[j*2], k[j*2+1] = k[j*2], k[j*2+1], k[i*2], k[i*2+1]
|
||||
k[i*2], k[j*2] = k[j*2], k[i*2]
|
||||
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
|
||||
}
|
||||
|
||||
func Sort(slice *[]string) {
|
||||
bk := byKey(*slice)
|
||||
if bk.Len() <= 1 {
|
||||
return
|
||||
// BuildLables used to sort labels and delete duplicates.
|
||||
// Last value wins in case of duplicate label keys.
|
||||
func BuildLabels(labels ...string) []string {
|
||||
if len(labels)%2 == 1 {
|
||||
labels = labels[:len(labels)-1]
|
||||
}
|
||||
sort.Sort(bk)
|
||||
v := reflect.ValueOf(slice).Elem()
|
||||
cnt := 0
|
||||
key := 0
|
||||
val := 1
|
||||
for key < v.Len() {
|
||||
if len(bk) > key+2 && bk[key] == bk[key+2] {
|
||||
key += 2
|
||||
val += 2
|
||||
continue
|
||||
}
|
||||
v.Index(cnt).Set(v.Index(key))
|
||||
cnt++
|
||||
v.Index(cnt).Set(v.Index(val))
|
||||
cnt++
|
||||
key += 2
|
||||
val += 2
|
||||
}
|
||||
v.SetLen(cnt)
|
||||
sort.Sort(byKey(labels))
|
||||
return labels
|
||||
}
|
||||
|
||||
// BuildName used to combine metric with labels.
|
||||
// If labels count is odd, drop last element
|
||||
func BuildName(name string, labels ...string) string {
|
||||
if len(labels)%2 == 1 {
|
||||
labels = labels[:len(labels)-1]
|
||||
}
|
||||
|
||||
if len(labels) > 2 {
|
||||
sort.Sort(byKey(labels))
|
||||
|
||||
idx := 0
|
||||
for {
|
||||
if labels[idx] == labels[idx+2] {
|
||||
copy(labels[idx:], labels[idx+2:])
|
||||
labels = labels[:len(labels)-2]
|
||||
} else {
|
||||
idx += 2
|
||||
}
|
||||
if idx+2 >= len(labels) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
_, _ = b.WriteString(name)
|
||||
_, _ = b.WriteRune('{')
|
||||
for idx := 0; idx < len(labels); idx += 2 {
|
||||
if idx > 0 {
|
||||
_, _ = b.WriteRune(',')
|
||||
}
|
||||
_, _ = b.WriteString(labels[idx])
|
||||
_, _ = b.WriteString(`=`)
|
||||
_, _ = b.WriteString(strconv.Quote(labels[idx+1]))
|
||||
}
|
||||
_, _ = b.WriteRune('}')
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
@@ -14,11 +14,57 @@ func TestNoopMeter(t *testing.T) {
|
||||
cnt.Inc()
|
||||
}
|
||||
|
||||
func TestLabelsSort(t *testing.T) {
|
||||
ls := []string{"server", "http", "register", "mdns", "broker", "broker1", "broker", "broker2", "server", "tcp"}
|
||||
Sort(&ls)
|
||||
func testEq(a, b []string) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
if ls[0] != "broker" || ls[1] != "broker2" {
|
||||
t.Fatalf("sort error: %v", ls)
|
||||
func TestBuildLabels(t *testing.T) {
|
||||
type testData struct {
|
||||
src []string
|
||||
dst []string
|
||||
}
|
||||
|
||||
data := []testData{
|
||||
testData{
|
||||
src: []string{"zerolabel", "value3", "firstlabel", "value2"},
|
||||
dst: []string{"firstlabel", "value2", "zerolabel", "value3"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, d := range data {
|
||||
if !testEq(d.dst, BuildLabels(d.src...)) {
|
||||
t.Fatalf("slices not properly sorted: %v %v", d.dst, d.src)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildName(t *testing.T) {
|
||||
data := map[string][]string{
|
||||
`my_metric{firstlabel="value2",zerolabel="value3"}`: []string{
|
||||
"my_metric",
|
||||
"zerolabel", "value3", "firstlabel", "value2",
|
||||
},
|
||||
`my_metric{broker="broker2",register="mdns",server="tcp"}`: []string{
|
||||
"my_metric",
|
||||
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns",
|
||||
},
|
||||
`my_metric{aaa="aaa"}`: []string{
|
||||
"my_metric",
|
||||
"aaa", "aaa",
|
||||
},
|
||||
}
|
||||
|
||||
for e, d := range data {
|
||||
if x := BuildName(d[0], d[1:]...); x != e {
|
||||
t.Fatalf("expect: %s, result: %s", e, x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -24,9 +24,18 @@ type tunSubscriber struct {
|
||||
opts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
type tunBatchSubscriber struct {
|
||||
listener tunnel.Listener
|
||||
handler broker.BatchHandler
|
||||
closed chan bool
|
||||
topic string
|
||||
opts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
type tunEvent struct {
|
||||
message *broker.Message
|
||||
topic string
|
||||
err error
|
||||
}
|
||||
|
||||
// used to access tunnel from options context
|
||||
@@ -62,6 +71,36 @@ func (t *tunBroker) Disconnect(ctx context.Context) error {
|
||||
return t.tunnel.Close(ctx)
|
||||
}
|
||||
|
||||
func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
// TODO: this is probably inefficient, we might want to just maintain an open connection
|
||||
// it may be easier to add broadcast to the tunnel
|
||||
topicMap := make(map[string]tunnel.Session)
|
||||
|
||||
var err error
|
||||
for _, msg := range msgs {
|
||||
topic, _ := msg.Header.Get("Micro-Topic")
|
||||
c, ok := topicMap[topic]
|
||||
if !ok {
|
||||
c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.Close()
|
||||
topicMap[topic] = c
|
||||
}
|
||||
|
||||
if err = c.Send(&transport.Message{
|
||||
Header: msg.Header,
|
||||
Body: msg.Body,
|
||||
}); err != nil {
|
||||
// msg.SetError(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
|
||||
// TODO: this is probably inefficient, we might want to just maintain an open connection
|
||||
// it may be easier to add broadcast to the tunnel
|
||||
@@ -77,6 +116,26 @@ func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message
|
||||
})
|
||||
}
|
||||
|
||||
func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tunSub := &tunBatchSubscriber{
|
||||
topic: topic,
|
||||
handler: h,
|
||||
opts: broker.NewSubscribeOptions(opts...),
|
||||
closed: make(chan bool),
|
||||
listener: l,
|
||||
}
|
||||
|
||||
// start processing
|
||||
go tunSub.run()
|
||||
|
||||
return tunSub, nil
|
||||
}
|
||||
|
||||
func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
|
||||
if err != nil {
|
||||
@@ -101,6 +160,49 @@ func (t *tunBroker) String() string {
|
||||
return "tunnel"
|
||||
}
|
||||
|
||||
func (t *tunBatchSubscriber) run() {
|
||||
for {
|
||||
// accept a new connection
|
||||
c, err := t.listener.Accept()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-t.closed:
|
||||
return
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// receive message
|
||||
m := new(transport.Message)
|
||||
if err := c.Recv(m); err != nil {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(t.opts.Context, err.Error())
|
||||
}
|
||||
if err = c.Close(); err != nil {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(t.opts.Context, err.Error())
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// close the connection
|
||||
c.Close()
|
||||
|
||||
evts := broker.Events{&tunEvent{
|
||||
topic: t.topic,
|
||||
message: &broker.Message{
|
||||
Header: m.Header,
|
||||
Body: m.Body,
|
||||
},
|
||||
}}
|
||||
// handle the message
|
||||
go t.handler(evts)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tunSubscriber) run() {
|
||||
for {
|
||||
// accept a new connection
|
||||
@@ -142,6 +244,24 @@ func (t *tunSubscriber) run() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tunBatchSubscriber) Options() broker.SubscribeOptions {
|
||||
return t.opts
|
||||
}
|
||||
|
||||
func (t *tunBatchSubscriber) Topic() string {
|
||||
return t.topic
|
||||
}
|
||||
|
||||
func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error {
|
||||
select {
|
||||
case <-t.closed:
|
||||
return nil
|
||||
default:
|
||||
close(t.closed)
|
||||
return t.listener.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tunSubscriber) Options() broker.SubscribeOptions {
|
||||
return t.opts
|
||||
}
|
||||
@@ -173,7 +293,11 @@ func (t *tunEvent) Ack() error {
|
||||
}
|
||||
|
||||
func (t *tunEvent) Error() error {
|
||||
return nil
|
||||
return t.err
|
||||
}
|
||||
|
||||
func (t *tunEvent) SetError(err error) {
|
||||
t.err = err
|
||||
}
|
||||
|
||||
// NewBroker returns new tunnel broker
|
||||
|
@@ -15,8 +15,8 @@ import (
|
||||
var DefaultServer Server = NewServer()
|
||||
|
||||
var (
|
||||
// DefaultAddress will be used if no address passed
|
||||
DefaultAddress = ":0"
|
||||
// DefaultAddress will be used if no address passed, use secure localhost
|
||||
DefaultAddress = "127.0.0.1:0"
|
||||
// DefaultName will be used if no name passed
|
||||
DefaultName = "server"
|
||||
// DefaultVersion will be used if no version passed
|
||||
|
@@ -36,16 +36,6 @@ func (m *memoryStore) key(prefix, key string) string {
|
||||
return filepath.Join(prefix, key)
|
||||
}
|
||||
|
||||
func (m *memoryStore) prefix(database, table string) string {
|
||||
if len(database) == 0 {
|
||||
database = m.opts.Database
|
||||
}
|
||||
if len(table) == 0 {
|
||||
table = m.opts.Table
|
||||
}
|
||||
return filepath.Join(database, table)
|
||||
}
|
||||
|
||||
func (m *memoryStore) exists(prefix, key string) error {
|
||||
key = m.key(prefix, key)
|
||||
|
||||
@@ -80,15 +70,17 @@ func (m *memoryStore) delete(prefix, key string) {
|
||||
|
||||
func (m *memoryStore) list(prefix string, limit, offset uint) []string {
|
||||
allItems := m.store.Items()
|
||||
allKeys := make([]string, len(allItems))
|
||||
i := 0
|
||||
allKeys := make([]string, 0, len(allItems))
|
||||
|
||||
for k := range allItems {
|
||||
if !strings.HasPrefix(k, prefix+"/") {
|
||||
if !strings.HasPrefix(k, prefix) {
|
||||
continue
|
||||
}
|
||||
allKeys[i] = strings.TrimPrefix(k, prefix+"/")
|
||||
i++
|
||||
k = strings.TrimPrefix(k, prefix)
|
||||
if k[0] == '/' {
|
||||
k = k[1:]
|
||||
}
|
||||
allKeys = append(allKeys, k)
|
||||
}
|
||||
|
||||
if limit != 0 || offset != 0 {
|
||||
@@ -107,7 +99,6 @@ func (m *memoryStore) list(prefix string, limit, offset uint) []string {
|
||||
}
|
||||
return allKeys[offset:end]
|
||||
}
|
||||
|
||||
return allKeys
|
||||
}
|
||||
|
||||
@@ -127,37 +118,48 @@ func (m *memoryStore) Name() string {
|
||||
}
|
||||
|
||||
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
|
||||
prefix := m.prefix(m.opts.Database, m.opts.Table)
|
||||
return m.exists(prefix, key)
|
||||
options := NewExistsOptions(opts...)
|
||||
if options.Namespace == "" {
|
||||
options.Namespace = m.opts.Namespace
|
||||
}
|
||||
return m.exists(options.Namespace, key)
|
||||
}
|
||||
|
||||
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
||||
readOpts := NewReadOptions(opts...)
|
||||
prefix := m.prefix(readOpts.Database, readOpts.Table)
|
||||
return m.get(prefix, key, val)
|
||||
options := NewReadOptions(opts...)
|
||||
if options.Namespace == "" {
|
||||
options.Namespace = m.opts.Namespace
|
||||
}
|
||||
return m.get(options.Namespace, key, val)
|
||||
}
|
||||
|
||||
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
||||
writeOpts := NewWriteOptions(opts...)
|
||||
options := NewWriteOptions(opts...)
|
||||
if options.Namespace == "" {
|
||||
options.Namespace = m.opts.Namespace
|
||||
}
|
||||
if options.TTL == 0 {
|
||||
options.TTL = cache.NoExpiration
|
||||
}
|
||||
|
||||
prefix := m.prefix(writeOpts.Database, writeOpts.Table)
|
||||
|
||||
key = m.key(prefix, key)
|
||||
key = m.key(options.Namespace, key)
|
||||
|
||||
buf, err := m.opts.Codec.Marshal(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.store.Set(key, buf, writeOpts.TTL)
|
||||
m.store.Set(key, buf, options.TTL)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
|
||||
deleteOptions := NewDeleteOptions(opts...)
|
||||
options := NewDeleteOptions(opts...)
|
||||
if options.Namespace == "" {
|
||||
options.Namespace = m.opts.Namespace
|
||||
}
|
||||
|
||||
prefix := m.prefix(deleteOptions.Database, deleteOptions.Table)
|
||||
m.delete(prefix, key)
|
||||
m.delete(options.Namespace, key)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -166,25 +168,27 @@ func (m *memoryStore) Options() Options {
|
||||
}
|
||||
|
||||
func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
|
||||
listOptions := NewListOptions(opts...)
|
||||
options := NewListOptions(opts...)
|
||||
if options.Namespace == "" {
|
||||
options.Namespace = m.opts.Namespace
|
||||
}
|
||||
|
||||
prefix := m.prefix(listOptions.Database, listOptions.Table)
|
||||
keys := m.list(prefix, listOptions.Limit, listOptions.Offset)
|
||||
keys := m.list(options.Namespace, options.Limit, options.Offset)
|
||||
|
||||
if len(listOptions.Prefix) > 0 {
|
||||
if len(options.Prefix) > 0 {
|
||||
var prefixKeys []string
|
||||
for _, k := range keys {
|
||||
if strings.HasPrefix(k, listOptions.Prefix) {
|
||||
if strings.HasPrefix(k, options.Prefix) {
|
||||
prefixKeys = append(prefixKeys, k)
|
||||
}
|
||||
}
|
||||
keys = prefixKeys
|
||||
}
|
||||
|
||||
if len(listOptions.Suffix) > 0 {
|
||||
if len(options.Suffix) > 0 {
|
||||
var suffixKeys []string
|
||||
for _, k := range keys {
|
||||
if strings.HasSuffix(k, listOptions.Suffix) {
|
||||
if strings.HasSuffix(k, options.Suffix) {
|
||||
suffixKeys = append(suffixKeys, k)
|
||||
}
|
||||
}
|
||||
|
@@ -9,11 +9,11 @@ import (
|
||||
)
|
||||
|
||||
func TestMemoryReInit(t *testing.T) {
|
||||
s := store.NewStore(store.Table("aaa"))
|
||||
if err := s.Init(store.Table("")); err != nil {
|
||||
s := store.NewStore(store.Namespace("aaa"))
|
||||
if err := s.Init(store.Namespace("")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(s.Options().Table) > 0 {
|
||||
if len(s.Options().Namespace) > 0 {
|
||||
t.Error("Init didn't reinitialise the store")
|
||||
}
|
||||
}
|
||||
@@ -28,7 +28,7 @@ func TestMemoryBasic(t *testing.T) {
|
||||
|
||||
func TestMemoryPrefix(t *testing.T) {
|
||||
s := store.NewStore()
|
||||
if err := s.Init(store.Table("some-prefix")); err != nil {
|
||||
if err := s.Init(store.Namespace("some-prefix")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
basictest(s, t)
|
||||
@@ -36,7 +36,7 @@ func TestMemoryPrefix(t *testing.T) {
|
||||
|
||||
func TestMemoryNamespace(t *testing.T) {
|
||||
s := store.NewStore()
|
||||
if err := s.Init(store.Database("some-namespace")); err != nil {
|
||||
if err := s.Init(store.Namespace("some-namespace")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
basictest(s, t)
|
||||
@@ -44,7 +44,7 @@ func TestMemoryNamespace(t *testing.T) {
|
||||
|
||||
func TestMemoryNamespacePrefix(t *testing.T) {
|
||||
s := store.NewStore()
|
||||
if err := s.Init(store.Table("some-prefix"), store.Database("some-namespace")); err != nil {
|
||||
if err := s.Init(store.Namespace("some-namespace")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
basictest(s, t)
|
||||
|
261
store/options.go
261
store/options.go
@@ -28,13 +28,12 @@ type Options struct {
|
||||
TLSConfig *tls.Config
|
||||
// Name specifies store name
|
||||
Name string
|
||||
// Database specifies store database
|
||||
Database string
|
||||
// Table specifies store table
|
||||
Table string
|
||||
// Nodes contains store address
|
||||
// TODO: replace with Addrs
|
||||
Nodes []string
|
||||
// Namespace of the records
|
||||
Namespace string
|
||||
// Addrs contains store address
|
||||
Addrs []string
|
||||
//Wrappers store wrapper that called before actual functions
|
||||
//Wrappers []Wrapper
|
||||
}
|
||||
|
||||
// NewOptions creates options struct
|
||||
@@ -90,13 +89,20 @@ func Meter(m meter.Meter) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Name the name
|
||||
// Name the name of the store
|
||||
func Name(n string) Option {
|
||||
return func(o *Options) {
|
||||
o.Name = n
|
||||
}
|
||||
}
|
||||
|
||||
// Namespace sets namespace of the store
|
||||
func Namespace(ns string) Option {
|
||||
return func(o *Options) {
|
||||
o.Namespace = ns
|
||||
}
|
||||
}
|
||||
|
||||
// Tracer sets the tracer
|
||||
func Tracer(t tracer.Tracer) Option {
|
||||
return func(o *Options) {
|
||||
@@ -104,27 +110,21 @@ func Tracer(t tracer.Tracer) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Nodes contains the addresses or other connection information of the backing storage.
|
||||
// Addrs contains the addresses or other connection information of the backing storage.
|
||||
// For example, an etcd implementation would contain the nodes of the cluster.
|
||||
// A SQL implementation could contain one or more connection strings.
|
||||
func Nodes(a ...string) Option {
|
||||
func Addrs(addrs ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.Nodes = a
|
||||
o.Addrs = addrs
|
||||
}
|
||||
}
|
||||
|
||||
// Database allows multiple isolated stores to be kept in one backend, if supported.
|
||||
func Database(db string) Option {
|
||||
return func(o *Options) {
|
||||
o.Database = db
|
||||
}
|
||||
}
|
||||
|
||||
// Table is analag for a table in database backends or a key prefix in KV backends
|
||||
func Table(t string) Option {
|
||||
return func(o *Options) {
|
||||
o.Table = t
|
||||
}
|
||||
// ReadOptions configures an individual Read operation
|
||||
type ReadOptions struct {
|
||||
// Context holds external options
|
||||
Context context.Context
|
||||
// Namespace holds namespace
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// NewReadOptions fills ReadOptions struct with opts slice
|
||||
@@ -136,29 +136,35 @@ func NewReadOptions(opts ...ReadOption) ReadOptions {
|
||||
return options
|
||||
}
|
||||
|
||||
// ReadOptions configures an individual Read operation
|
||||
type ReadOptions struct {
|
||||
// Context holds external options
|
||||
Context context.Context
|
||||
// Database holds the database name
|
||||
Database string
|
||||
// Table holds table name
|
||||
Table string
|
||||
// Namespace holds namespace
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// ReadOption sets values in ReadOptions
|
||||
type ReadOption func(r *ReadOptions)
|
||||
|
||||
// ReadFrom the database and table
|
||||
func ReadFrom(database, table string) ReadOption {
|
||||
return func(r *ReadOptions) {
|
||||
r.Database = database
|
||||
r.Table = table
|
||||
// ReadContext pass context.Context to ReadOptions
|
||||
func ReadContext(ctx context.Context) ReadOption {
|
||||
return func(o *ReadOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// ReadNamespace pass namespace to ReadOptions
|
||||
func ReadNamespace(ns string) ReadOption {
|
||||
return func(o *ReadOptions) {
|
||||
o.Namespace = ns
|
||||
}
|
||||
}
|
||||
|
||||
// WriteOptions configures an individual Write operation
|
||||
type WriteOptions struct {
|
||||
// Context holds external options
|
||||
Context context.Context
|
||||
// Metadata contains additional metadata
|
||||
Metadata metadata.Metadata
|
||||
// Namespace holds namespace
|
||||
Namespace string
|
||||
// TTL specifies key TTL
|
||||
TTL time.Duration
|
||||
}
|
||||
|
||||
// NewWriteOptions fills WriteOptions struct with opts slice
|
||||
func NewWriteOptions(opts ...WriteOption) WriteOptions {
|
||||
options := WriteOptions{}
|
||||
@@ -168,47 +174,45 @@ func NewWriteOptions(opts ...WriteOption) WriteOptions {
|
||||
return options
|
||||
}
|
||||
|
||||
// WriteOptions configures an individual Write operation
|
||||
type WriteOptions struct {
|
||||
// Context holds external options
|
||||
Context context.Context
|
||||
// Metadata contains additional metadata
|
||||
Metadata metadata.Metadata
|
||||
// Database holds database name
|
||||
Database string
|
||||
// Table holds table name
|
||||
Table string
|
||||
// Namespace holds namespace
|
||||
Namespace string
|
||||
// TTL specifies key TTL
|
||||
TTL time.Duration
|
||||
}
|
||||
|
||||
// WriteOption sets values in WriteOptions
|
||||
type WriteOption func(w *WriteOptions)
|
||||
|
||||
// WriteTo the database and table
|
||||
func WriteTo(database, table string) WriteOption {
|
||||
return func(w *WriteOptions) {
|
||||
w.Database = database
|
||||
w.Table = table
|
||||
}
|
||||
}
|
||||
|
||||
// WriteTTL is the time the record expires
|
||||
func WriteTTL(d time.Duration) WriteOption {
|
||||
return func(w *WriteOptions) {
|
||||
w.TTL = d
|
||||
// WriteContext pass context.Context to wirte options
|
||||
func WriteContext(ctx context.Context) WriteOption {
|
||||
return func(o *WriteOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// WriteMetadata add metadata.Metadata
|
||||
func WriteMetadata(md metadata.Metadata) WriteOption {
|
||||
return func(w *WriteOptions) {
|
||||
w.Metadata = metadata.Copy(md)
|
||||
return func(o *WriteOptions) {
|
||||
o.Metadata = metadata.Copy(md)
|
||||
}
|
||||
}
|
||||
|
||||
// WriteTTL is the time the record expires
|
||||
func WriteTTL(d time.Duration) WriteOption {
|
||||
return func(o *WriteOptions) {
|
||||
o.TTL = d
|
||||
}
|
||||
}
|
||||
|
||||
// WriteNamespace pass namespace to write options
|
||||
func WriteNamespace(ns string) WriteOption {
|
||||
return func(o *WriteOptions) {
|
||||
o.Namespace = ns
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteOptions configures an individual Delete operation
|
||||
type DeleteOptions struct {
|
||||
// Context holds external options
|
||||
Context context.Context
|
||||
// Namespace holds namespace
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// NewDeleteOptions fills DeleteOptions struct with opts slice
|
||||
func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
|
||||
options := DeleteOptions{}
|
||||
@@ -218,29 +222,33 @@ func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
|
||||
return options
|
||||
}
|
||||
|
||||
// DeleteOptions configures an individual Delete operation
|
||||
type DeleteOptions struct {
|
||||
// Context holds external options
|
||||
Context context.Context
|
||||
// Database holds database name
|
||||
Database string
|
||||
// Table holds table name
|
||||
Table string
|
||||
// Namespace holds namespace
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// DeleteOption sets values in DeleteOptions
|
||||
type DeleteOption func(d *DeleteOptions)
|
||||
|
||||
// DeleteFrom the database and table
|
||||
func DeleteFrom(database, table string) DeleteOption {
|
||||
return func(d *DeleteOptions) {
|
||||
d.Database = database
|
||||
d.Table = table
|
||||
// DeleteContext pass context.Context to delete options
|
||||
func DeleteContext(ctx context.Context) DeleteOption {
|
||||
return func(o *DeleteOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteNamespace pass namespace to delete options
|
||||
func DeleteNamespace(ns string) DeleteOption {
|
||||
return func(o *DeleteOptions) {
|
||||
o.Namespace = ns
|
||||
}
|
||||
}
|
||||
|
||||
// ListOptions configures an individual List operation
|
||||
type ListOptions struct {
|
||||
Context context.Context
|
||||
Prefix string
|
||||
Suffix string
|
||||
Namespace string
|
||||
Limit uint
|
||||
Offset uint
|
||||
}
|
||||
|
||||
// NewListOptions fills ListOptions struct with opts slice
|
||||
func NewListOptions(opts ...ListOption) ListOptions {
|
||||
options := ListOptions{}
|
||||
@@ -250,59 +258,50 @@ func NewListOptions(opts ...ListOption) ListOptions {
|
||||
return options
|
||||
}
|
||||
|
||||
// ListOptions configures an individual List operation
|
||||
type ListOptions struct {
|
||||
Context context.Context
|
||||
Database string
|
||||
Prefix string
|
||||
Suffix string
|
||||
Namespace string
|
||||
Table string
|
||||
Limit uint
|
||||
Offset uint
|
||||
}
|
||||
|
||||
// ListOption sets values in ListOptions
|
||||
type ListOption func(l *ListOptions)
|
||||
|
||||
// ListFrom the database and table
|
||||
func ListFrom(database, table string) ListOption {
|
||||
return func(l *ListOptions) {
|
||||
l.Database = database
|
||||
l.Table = table
|
||||
// ListContext pass context.Context to list options
|
||||
func ListContext(ctx context.Context) ListOption {
|
||||
return func(o *ListOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// ListPrefix returns all keys that are prefixed with key
|
||||
func ListPrefix(p string) ListOption {
|
||||
return func(l *ListOptions) {
|
||||
l.Prefix = p
|
||||
func ListPrefix(s string) ListOption {
|
||||
return func(o *ListOptions) {
|
||||
o.Prefix = s
|
||||
}
|
||||
}
|
||||
|
||||
// ListSuffix returns all keys that end with key
|
||||
func ListSuffix(s string) ListOption {
|
||||
return func(l *ListOptions) {
|
||||
l.Suffix = s
|
||||
return func(o *ListOptions) {
|
||||
o.Suffix = s
|
||||
}
|
||||
}
|
||||
|
||||
// ListLimit limits the number of returned keys to l
|
||||
func ListLimit(l uint) ListOption {
|
||||
return func(lo *ListOptions) {
|
||||
lo.Limit = l
|
||||
// ListLimit limits the number of returned keys
|
||||
func ListLimit(n uint) ListOption {
|
||||
return func(o *ListOptions) {
|
||||
o.Limit = n
|
||||
}
|
||||
}
|
||||
|
||||
// ListOffset starts returning responses from o. Use in conjunction with Limit for pagination.
|
||||
func ListOffset(o uint) ListOption {
|
||||
return func(l *ListOptions) {
|
||||
l.Offset = o
|
||||
// ListOffset use with Limit for pagination
|
||||
func ListOffset(n uint) ListOption {
|
||||
return func(o *ListOptions) {
|
||||
o.Offset = n
|
||||
}
|
||||
}
|
||||
|
||||
// ExistsOption specifies Exists call options
|
||||
type ExistsOption func(*ExistsOptions)
|
||||
// ListNamespace pass namespace to list options
|
||||
func ListNamespace(ns string) ListOption {
|
||||
return func(o *ListOptions) {
|
||||
o.Namespace = ns
|
||||
}
|
||||
}
|
||||
|
||||
// ExistsOptions holds options for Exists method
|
||||
type ExistsOptions struct {
|
||||
@@ -312,6 +311,9 @@ type ExistsOptions struct {
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// ExistsOption specifies Exists call options
|
||||
type ExistsOption func(*ExistsOptions)
|
||||
|
||||
// NewExistsOptions helper for Exists method
|
||||
func NewExistsOptions(opts ...ExistsOption) ExistsOptions {
|
||||
options := ExistsOptions{
|
||||
@@ -322,3 +324,24 @@ func NewExistsOptions(opts ...ExistsOption) ExistsOptions {
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
// ExistsContext pass context.Context to exist options
|
||||
func ExistsContext(ctx context.Context) ExistsOption {
|
||||
return func(o *ExistsOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
// ExistsNamespace pass namespace to exist options
|
||||
func ExistsNamespace(ns string) ExistsOption {
|
||||
return func(o *ExistsOptions) {
|
||||
o.Namespace = ns
|
||||
}
|
||||
}
|
||||
|
||||
// WrapStore adds a store Wrapper to a list of options passed into the store
|
||||
//func WrapStore(w Wrapper) Option {
|
||||
// return func(o *Options) {
|
||||
// o.Wrappers = append(o.Wrappers, w)
|
||||
// }
|
||||
//}
|
||||
|
84
store/wrapper.go
Normal file
84
store/wrapper.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// LogfFunc function used for Logf method
|
||||
//type LogfFunc func(ctx context.Context, level Level, msg string, args ...interface{})
|
||||
|
||||
//type Wrapper interface {
|
||||
// Logf logs message with needed level
|
||||
//Logf(LogfFunc) LogfFunc
|
||||
//}
|
||||
|
||||
type NamespaceStore struct {
|
||||
s Store
|
||||
ns string
|
||||
}
|
||||
|
||||
var (
|
||||
_ Store = &NamespaceStore{}
|
||||
)
|
||||
|
||||
func NewNamespaceStore(s Store, ns string) Store {
|
||||
return &NamespaceStore{s: s, ns: ns}
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) Init(opts ...Option) error {
|
||||
return w.s.Init(opts...)
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) Connect(ctx context.Context) error {
|
||||
return w.s.Connect(ctx)
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) Disconnect(ctx context.Context) error {
|
||||
return w.s.Disconnect(ctx)
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
||||
return w.s.Read(ctx, key, val, append(opts, ReadNamespace(w.ns))...)
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
||||
return w.s.Write(ctx, key, val, append(opts, WriteNamespace(w.ns))...)
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
|
||||
return w.s.Delete(ctx, key, append(opts, DeleteNamespace(w.ns))...)
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
|
||||
return w.s.Exists(ctx, key, append(opts, ExistsNamespace(w.ns))...)
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
|
||||
return w.s.List(ctx, append(opts, ListNamespace(w.ns))...)
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) Options() Options {
|
||||
return w.s.Options()
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) Name() string {
|
||||
return w.s.Name()
|
||||
}
|
||||
|
||||
func (w *NamespaceStore) String() string {
|
||||
return w.s.String()
|
||||
}
|
||||
|
||||
//type NamespaceWrapper struct{}
|
||||
|
||||
//func NewNamespaceWrapper() Wrapper {
|
||||
// return &NamespaceWrapper{}
|
||||
//}
|
||||
|
||||
/*
|
||||
func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc {
|
||||
return func(ctx context.Context, level Level, msg string, args ...interface{}) {
|
||||
fn(ctx, level, msg, getArgs(args)...)
|
||||
}
|
||||
}
|
||||
*/
|
@@ -1,15 +0,0 @@
|
||||
package fn
|
||||
|
||||
type Initer interface {
|
||||
Init(opts ...interface{}) error
|
||||
}
|
||||
|
||||
func Init(ifaces ...Initer) error {
|
||||
var err error
|
||||
for _, iface := range ifaces {
|
||||
if err = iface.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -7,7 +7,6 @@ import (
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ErrInvalidParam specifies invalid url query params
|
||||
@@ -15,11 +14,12 @@ var ErrInvalidParam = errors.New("invalid url query param provided")
|
||||
|
||||
var bracketSplitter = regexp.MustCompile(`\[|\]`)
|
||||
|
||||
var timeKind = reflect.TypeOf(time.Time{}).Kind()
|
||||
//var timeKind = reflect.ValueOf(time.Time{}).Kind()
|
||||
|
||||
type StructField struct {
|
||||
Field reflect.StructField
|
||||
Value reflect.Value
|
||||
Path string
|
||||
}
|
||||
|
||||
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
|
||||
@@ -35,7 +35,7 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 {
|
||||
if len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -66,6 +66,17 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
func StructFieldByPath(src interface{}, path string) (interface{}, error) {
|
||||
var err error
|
||||
for _, p := range strings.Split(path, ".") {
|
||||
src, err = StructFieldByName(src, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return src, err
|
||||
}
|
||||
|
||||
func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
|
||||
sv := reflect.ValueOf(src)
|
||||
if sv.Kind() == reflect.Ptr {
|
||||
@@ -79,7 +90,7 @@ func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 {
|
||||
if len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
if fld.Name == tkey {
|
||||
@@ -105,6 +116,19 @@ func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
// StructFieldsMap returns map[string]interface{} or error
|
||||
func StructFieldsMap(src interface{}) (map[string]interface{}, error) {
|
||||
fields, err := StructFields(src)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mp := make(map[string]interface{}, len(fields))
|
||||
for _, field := range fields {
|
||||
mp[field.Path] = field.Value.Interface()
|
||||
}
|
||||
return mp, nil
|
||||
}
|
||||
|
||||
// StructFields returns slice of struct fields
|
||||
func StructFields(src interface{}) ([]StructField, error) {
|
||||
var fields []StructField
|
||||
@@ -116,25 +140,29 @@ func StructFields(src interface{}) ([]StructField, error) {
|
||||
if sv.Kind() != reflect.Struct {
|
||||
return nil, ErrInvalidStruct
|
||||
}
|
||||
|
||||
typ := sv.Type()
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 {
|
||||
if !val.IsValid() || len(fld.PkgPath) != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
switch val.Kind() {
|
||||
case timeKind:
|
||||
fields = append(fields, StructField{Field: fld, Value: val})
|
||||
//case timeKind:
|
||||
//fmt.Printf("GGG\n")
|
||||
//fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
|
||||
case reflect.Struct:
|
||||
infields, err := StructFields(val.Interface())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fields = append(fields, infields...)
|
||||
for _, infield := range infields {
|
||||
infield.Path = fmt.Sprintf("%s.%s", fld.Name, infield.Path)
|
||||
fields = append(fields, infield)
|
||||
}
|
||||
default:
|
||||
fields = append(fields, StructField{Field: fld, Value: val})
|
||||
fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,7 +222,7 @@ func StructURLValues(src interface{}, pref string, tags []string) (url.Values, e
|
||||
for idx := 0; idx < typ.NumField(); idx++ {
|
||||
fld := typ.Field(idx)
|
||||
val := sv.Field(idx)
|
||||
if !val.CanSet() || len(fld.PkgPath) != 0 || !val.IsValid() {
|
||||
if len(fld.PkgPath) != 0 || !val.IsValid() {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@@ -2,9 +2,80 @@ package reflect
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"reflect"
|
||||
rfl "reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStructFieldsMap(t *testing.T) {
|
||||
type NestedStr struct {
|
||||
BBB string
|
||||
CCC int
|
||||
}
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
XXX string `json:"xxx"`
|
||||
Nested NestedStr
|
||||
}
|
||||
|
||||
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
|
||||
fields, err := StructFieldsMap(val)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v, ok := fields["Nested.BBB"]; !ok || v != "ddd" {
|
||||
t.Fatalf("invalid field from %v", fields)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructFields(t *testing.T) {
|
||||
type NestedStr struct {
|
||||
BBB string
|
||||
CCC int
|
||||
}
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
XXX string `json:"xxx"`
|
||||
Nested NestedStr
|
||||
}
|
||||
|
||||
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
|
||||
fields, err := StructFields(val)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var ok bool
|
||||
for _, field := range fields {
|
||||
if field.Path == "Nested.CCC" {
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("struct fields returns invalid path: %v", fields)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructByPath(t *testing.T) {
|
||||
type NestedStr struct {
|
||||
BBB string
|
||||
CCC int
|
||||
}
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
XXX string `json:"xxx"`
|
||||
Nested NestedStr
|
||||
}
|
||||
|
||||
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
|
||||
field, err := StructFieldByPath(val, "Nested.CCC")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if rfl.Indirect(reflect.ValueOf(field)).Int() != 9 {
|
||||
t.Fatalf("invalid elem returned: %v", field)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStructByTag(t *testing.T) {
|
||||
type Str struct {
|
||||
Name []string `json:"name" codec:"flatten"`
|
||||
|
@@ -102,7 +102,7 @@ type parser struct {
|
||||
// topLevelSegments is the target of this parser.
|
||||
func (p *parser) topLevelSegments() ([]segment, error) {
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Debug(context.TODO(), "Parsing %q", p.tokens)
|
||||
logger.Trace(context.TODO(), "Parsing %q", p.tokens)
|
||||
}
|
||||
segs, err := p.segments()
|
||||
if err != nil {
|
||||
|
@@ -63,16 +63,16 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
||||
}
|
||||
|
||||
if version != 1 {
|
||||
if logger.V(logger.DebugLevel) {
|
||||
logger.Debug(context.TODO(), "unsupported version: %d", version)
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Trace(context.TODO(), "unsupported version: %d", version)
|
||||
}
|
||||
return Pattern{}, ErrInvalidPattern
|
||||
}
|
||||
|
||||
l := len(ops)
|
||||
if l%2 != 0 {
|
||||
if logger.V(logger.DebugLevel) {
|
||||
logger.Debug(context.TODO(), "odd number of ops codes: %d", l)
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Trace(context.TODO(), "odd number of ops codes: %d", l)
|
||||
}
|
||||
return Pattern{}, ErrInvalidPattern
|
||||
}
|
||||
@@ -141,13 +141,13 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
||||
vars = append(vars, v)
|
||||
stack--
|
||||
if stack < 0 {
|
||||
if logger.V(logger.DebugLevel) {
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Trace(context.TODO(), "stack underflow")
|
||||
}
|
||||
return Pattern{}, ErrInvalidPattern
|
||||
}
|
||||
default:
|
||||
if logger.V(logger.DebugLevel) {
|
||||
if logger.V(logger.TraceLevel) {
|
||||
logger.Trace(context.TODO(), "invalid opcode: %d", op.code)
|
||||
}
|
||||
return Pattern{}, ErrInvalidPattern
|
||||
|
Reference in New Issue
Block a user