Compare commits

..

48 Commits

Author SHA1 Message Date
778dd449e2 logger: add NewStdLogger and RedirectStdLogger
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-06 13:45:11 +03:00
1d16983b67 logger: add NewStdLogger that can be used as std *log.Logger
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-06 11:52:04 +03:00
f386bffd37 logger: change logger interface
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-06 02:15:57 +03:00
772bde7938 network/tunnel/broker: fix metadata compile issue
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-06 02:14:56 +03:00
ea16f5f825 config/default: not implement watcher as it cant change
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 16:04:58 +03:00
c2f34df493 config: minor changes to split config and watcher files
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 13:51:43 +03:00
efe215cd60 config/default: watcher send changes only on non nil
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 12:25:29 +03:00
b4f332bf0d config/default: return error on Next() call
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 01:15:50 +03:00
f47fbb1030 config: add jitter interval for watcher to avoid dos
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-04 00:37:56 +03:00
1e8e57a708 config/default: minor changes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-03 00:49:21 +03:00
dependabot[bot]
5d0959b0a1 build(deps): bump github.com/golang-jwt/jwt (#54)
Bumps [github.com/golang-jwt/jwt](https://github.com/golang-jwt/jwt) from 3.2.1+incompatible to 3.2.2+incompatible.
- [Release notes](https://github.com/golang-jwt/jwt/releases)
- [Changelog](https://github.com/golang-jwt/jwt/blob/main/VERSION_HISTORY.md)
- [Commits](https://github.com/golang-jwt/jwt/compare/v3.2.1...v3.2.2)

---
updated-dependencies:
- dependency-name: github.com/golang-jwt/jwt
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2021-08-03 00:27:35 +03:00
fa8fb3aed7 fixes and improvements (#55)
* util/router: sync from github
* config: add watcher interface

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-08-03 00:24:40 +03:00
cfd2d53a79 config: cleanup tests
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-27 23:58:45 +03:00
d306f77ffc util/token/jwt: change library
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-27 23:58:29 +03:00
e5b0a7e20d server: add BatchSubscriber
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-27 23:58:06 +03:00
9a5b158b4d change jwt lib
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-27 12:43:56 +03:00
af8d81f3c6 logger: add DefaultCallerSkipCount
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-26 09:48:15 +03:00
5c9b3dae33 broker: improve option naming, move BatchBroker to Broker interface
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-24 16:16:18 +03:00
9f3957d101 client: improve option naming, add BatchPublish to noop client
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-24 16:14:42 +03:00
8fd8bdcb39 logger: fix default logger funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-24 15:22:01 +03:00
80e3d239ab broker/memory: optimize
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-23 15:12:20 +03:00
419cd486cf broker/memory: cleanup
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-23 15:06:10 +03:00
e64269b2a8 broker: add BatchBroker interface to avoid breaking older brokers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-23 12:55:36 +03:00
d18429e024 metadata: add HeaderAuthorization
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-23 12:17:00 +03:00
675e121049 metadata: add default headers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-23 12:03:18 +03:00
d357fb1e0d WIP: broker batch processing
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-22 22:53:44 +03:00
e4673bcc50 remove old cruft
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-22 15:45:44 +03:00
a839f75a2f util/reflect: add new funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-22 15:45:31 +03:00
a7e6d61b95 meter: fast path for only one label
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-21 14:29:13 +03:00
650d167313 meter: add BuildLabels func that sorts and deletes duplicates
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-21 14:10:20 +03:00
c6ba2a91e6 meter: BuildName func to combine metric name with labels into string
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-21 12:39:59 +03:00
7ece08896f server: use 127.0.0.1:0 if no address provided
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-17 01:57:39 +06:00
dependabot[bot]
57f6f23294 build(deps): bump github.com/google/uuid from 1.2.0 to 1.3.0 (#53)
Bumps [github.com/google/uuid](https://github.com/google/uuid) from 1.2.0 to 1.3.0.
- [Release notes](https://github.com/google/uuid/releases)
- [Commits](https://github.com/google/uuid/compare/v1.2.0...v1.3.0)

---
updated-dependencies:
- dependency-name: github.com/google/uuid
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2021-07-16 00:27:56 +03:00
09e6fa2fed flow: implement new methods, add Async ExecutionOption
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-16 00:17:16 +03:00
10a09a5c6f flow: improve store
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 22:56:34 +03:00
b4e5d9462a util/router: move some messages to Trace level
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 22:56:34 +03:00
96aa0b6906 store/memory: fix List
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 22:53:12 +03:00
f54658830d store/memory: fixup
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 12:11:55 +03:00
1e43122660 store/memory: small fixups for flow usage
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 11:59:35 +03:00
42800fa247 flow: improve steps handling
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-14 17:12:54 +03:00
5b9c810653 logger: add compile time test for interface compat
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-14 17:12:09 +03:00
c3def24bf4 store: add Wrappers support, create Namespace wrapper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-14 17:11:37 +03:00
0d1ef31764 client: change AuthToken option signature
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-09 10:47:40 +03:00
d49afa230f logger: add omit logger
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-05 23:04:20 +03:00
e545eb4e13 logger: add wrapper support
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-05 22:32:47 +03:00
f28b107372 broker: fix RawMessage marshal
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-01 23:23:01 +03:00
c592fabe2a minor fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-01 15:56:22 +03:00
eb107020c7 broker: replace Message.Body []byte slice to RawMessage
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-01 15:11:17 +03:00
48 changed files with 2540 additions and 651 deletions

View File

@@ -3,46 +3,128 @@ package broker
import (
"context"
"errors"
"github.com/unistack-org/micro/v3/metadata"
)
// DefaultBroker default broker
// DefaultBroker default memory broker
var DefaultBroker Broker = NewBroker()
var (
// ErrNotConnected returns when broker used but not connected yet
ErrNotConnected = errors.New("broker not connected")
// ErrDisconnected returns when broker disconnected
ErrDisconnected = errors.New("broker disconnected")
)
// Broker is an interface used for asynchronous messaging.
type Broker interface {
// Name returns broker instance name
Name() string
Init(...Option) error
// Init initilize broker
Init(opts ...Option) error
// Options returns broker options
Options() Options
// Address return configured address
Address() string
Connect(context.Context) error
Disconnect(context.Context) error
Publish(context.Context, string, *Message, ...PublishOption) error
Subscribe(context.Context, string, Handler, ...SubscribeOption) (Subscriber, error)
// Connect connects to broker
Connect(ctx context.Context) error
// Disconnect disconnect from broker
Disconnect(ctx context.Context) error
// Publish message to broker topic
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
// Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
// BatchPublish messages to broker with multiple topics
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
// BatchSubscribe subscribes to topic messages via handler
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
// String type of broker
String() string
}
// Handler is used to process messages via a subscription of a topic.
type Handler func(Event) error
// Events contains multiple events
type Events []Event
func (evs Events) Ack() error {
var err error
for _, ev := range evs {
if err = ev.Ack(); err != nil {
return err
}
}
return nil
}
func (evs Events) SetError(err error) {
for _, ev := range evs {
ev.SetError(err)
}
}
// BatchHandler is used to process messages in batches via a subscription of a topic.
type BatchHandler func(Events) error
// Event is given to a subscription handler for processing
type Event interface {
// Topic returns event topic
Topic() string
// Message returns broker message
Message() *Message
// Ack acknowledge message
Ack() error
// Error returns message error (like decoding errors or some other)
Error() error
// SetError set event processing error
SetError(err error)
}
// RawMessage is a raw encoded JSON value.
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
type RawMessage []byte
// MarshalJSON returns m as the JSON encoding of m.
func (m *RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
return *m, nil
}
// UnmarshalJSON sets *m to a copy of data.
func (m *RawMessage) UnmarshalJSON(data []byte) error {
if m == nil {
return errors.New("RawMessage UnmarshalJSON on nil pointer")
}
*m = append((*m)[0:0], data...)
return nil
}
// Message is used to transfer data
type Message struct {
Header metadata.Metadata // contains message metadata
Body []byte // contains message body
// Header contains message metadata
Header metadata.Metadata
// Body contains message body
Body RawMessage
}
// NewMessage create broker message with topic filled
func NewMessage(topic string) *Message {
m := &Message{Header: metadata.New(2)}
m.Header.Set(metadata.HeaderTopic, topic)
return m
}
// Subscriber is a convenience return type for the Subscribe method
type Subscriber interface {
// Options returns subscriber options
Options() SubscribeOptions
// Topic returns topic for subscription
Topic() string
Unsubscribe(context.Context) error
// Unsubscribe from topic
Unsubscribe(ctx context.Context) error
}

View File

@@ -2,18 +2,18 @@ package broker
import (
"context"
"errors"
"sync"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
maddr "github.com/unistack-org/micro/v3/util/addr"
mnet "github.com/unistack-org/micro/v3/util/net"
"github.com/unistack-org/micro/v3/util/rand"
)
type memoryBroker struct {
Subscribers map[string][]*memorySubscriber
subscribers map[string][]*memorySubscriber
addr string
opts Options
sync.RWMutex
@@ -28,12 +28,13 @@ type memoryEvent struct {
}
type memorySubscriber struct {
ctx context.Context
exit chan bool
handler Handler
id string
topic string
opts SubscribeOptions
ctx context.Context
exit chan bool
handler Handler
batchhandler BatchHandler
id string
topic string
opts SubscribeOptions
}
func (m *memoryBroker) Options() Options {
@@ -77,7 +78,6 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error {
}
m.connected = false
return nil
}
@@ -92,67 +92,190 @@ func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message,
m.RLock()
if !m.connected {
m.RUnlock()
return errors.New("not connected")
return ErrNotConnected
}
subs, ok := m.Subscribers[topic]
m.RUnlock()
if !ok {
return nil
}
var v interface{}
if m.opts.Codec != nil {
options := NewPublishOptions(opts...)
vs := make([]msgWrapper, 0, 1)
if m.opts.Codec == nil || options.BodyOnly {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
vs = append(vs, msgWrapper{topic: topic, body: msg})
} else {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
return err
}
v = buf
vs = append(vs, msgWrapper{topic: topic, body: buf})
}
return m.publish(ctx, vs, opts...)
}
type msgWrapper struct {
topic string
body interface{}
}
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return ErrNotConnected
}
m.RUnlock()
options := NewPublishOptions(opts...)
vs := make([]msgWrapper, 0, len(msgs))
if m.opts.Codec == nil || options.BodyOnly {
for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
vs = append(vs, msgWrapper{topic: topic, body: msg})
}
} else {
v = msg
for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
return err
}
vs = append(vs, msgWrapper{topic: topic, body: buf})
}
}
p := &memoryEvent{
topic: topic,
message: v,
opts: m.opts,
return m.publish(ctx, vs, opts...)
}
func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error {
var err error
msgTopicMap := make(map[string]Events)
for _, v := range vs {
p := &memoryEvent{
topic: v.topic,
message: v.body,
opts: m.opts,
}
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
}
beh := m.opts.BatchErrorHandler
eh := m.opts.ErrorHandler
for _, sub := range subs {
if err := sub.handler(p); err != nil {
p.err = err
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
if eh != nil {
eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
for t, ms := range msgTopicMap {
m.RLock()
subs, ok := m.subscribers[t]
m.RUnlock()
if !ok {
continue
}
for _, sub := range subs {
// batch processing
if sub.batchhandler != nil {
if err = sub.batchhandler(ms); err != nil {
ms.SetError(err)
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
}
if beh != nil {
beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
}
// single processing
if sub.handler != nil {
for _, p := range ms {
if err = sub.handler(p); err != nil {
p.SetError(err)
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
if eh != nil {
eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
}
}
}
}
return nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, errors.New("not connected")
return nil, ErrNotConnected
}
m.RUnlock()
options := NewSubscribeOptions(opts...)
id, err := uuid.NewRandom()
if err != nil {
return nil, err
}
options := NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
id: id.String(),
topic: topic,
batchhandler: handler,
opts: options,
ctx: ctx,
}
m.Lock()
m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
for _, sb := range m.subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.subscribers[topic] = newSubscribers
m.Unlock()
}()
return sub, nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
}
m.RUnlock()
id, err := uuid.NewRandom()
if err != nil {
return nil, err
}
options := NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
id: id.String(),
@@ -163,20 +286,20 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
}
m.Lock()
m.Subscribers[topic] = append(m.Subscribers[topic], sub)
m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
var newSubscribers []*memorySubscriber
for _, sb := range m.Subscribers[topic] {
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
for _, sb := range m.subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.Subscribers[topic] = newSubscribers
m.subscribers[topic] = newSubscribers
m.Unlock()
}()
@@ -221,6 +344,10 @@ func (m *memoryEvent) Error() error {
return m.err
}
func (m *memoryEvent) SetError(err error) {
m.err = err
}
func (m *memorySubscriber) Options() SubscribeOptions {
return m.opts
}
@@ -238,6 +365,6 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
func NewBroker(opts ...Option) Broker {
return &memoryBroker{
opts: NewOptions(opts...),
Subscribers: make(map[string][]*memorySubscriber),
subscribers: make(map[string][]*memorySubscriber),
}
}

View File

@@ -4,8 +4,55 @@ import (
"context"
"fmt"
"testing"
"github.com/unistack-org/micro/v3/metadata"
)
func TestMemoryBatchBroker(t *testing.T) {
b := NewBroker()
ctx := context.Background()
if err := b.Connect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
topic := "test"
count := 10
fn := func(evts Events) error {
return evts.Ack()
}
sub, err := b.BatchSubscribe(ctx, topic, fn)
if err != nil {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]*Message, 0, 0)
for i := 0; i < count; i++ {
message := &Message{
Header: map[string]string{
metadata.HeaderTopic: topic,
"foo": "bar",
"id": fmt.Sprintf("%d", i),
},
Body: []byte(`"hello world"`),
}
msgs = append(msgs, message)
}
if err := b.BatchPublish(ctx, msgs); err != nil {
t.Fatalf("Unexpected error publishing %v", err)
}
if err := sub.Unsubscribe(ctx); err != nil {
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
}
if err := b.Disconnect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
}
func TestMemoryBroker(t *testing.T) {
b := NewBroker()
ctx := context.Background()
@@ -26,20 +73,27 @@ func TestMemoryBroker(t *testing.T) {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]*Message, 0, 0)
for i := 0; i < count; i++ {
message := &Message{
Header: map[string]string{
"foo": "bar",
"id": fmt.Sprintf("%d", i),
metadata.HeaderTopic: topic,
"foo": "bar",
"id": fmt.Sprintf("%d", i),
},
Body: []byte(`hello world`),
Body: []byte(`"hello world"`),
}
msgs = append(msgs, message)
if err := b.Publish(ctx, topic, message); err != nil {
t.Fatalf("Unexpected error publishing %d", i)
t.Fatalf("Unexpected error publishing %d err: %v", i, err)
}
}
if err := b.BatchPublish(ctx, msgs); err != nil {
t.Fatalf("Unexpected error publishing %v", err)
}
if err := sub.Unsubscribe(ctx); err != nil {
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
}

View File

@@ -3,6 +3,7 @@ package broker
import (
"context"
"crypto/tls"
"time"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
@@ -29,6 +30,8 @@ type Options struct {
TLSConfig *tls.Config
// ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler
// BatchErrorHandler used when broker can't unmashal incoming messages
BatchErrorHandler BatchHandler
// Name holds the broker name
Name string
// Addrs holds the broker address
@@ -71,11 +74,9 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions {
options := PublishOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
@@ -85,12 +86,18 @@ type SubscribeOptions struct {
Context context.Context
// ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler
// BatchErrorHandler used when broker can't unmashal incoming messages
BatchErrorHandler BatchHandler
// Group holds consumer group
Group string
// AutoAck flag specifies auto ack of incoming message when no error happens
AutoAck bool
// BodyOnly flag specifies that message contains only body bytes without header
BodyOnly bool
// BatchSize flag specifies max batch size
BatchSize int
// BatchWait flag specifies max wait time for batch filling
BatchWait time.Duration
}
// Option func
@@ -113,23 +120,6 @@ func PublishContext(ctx context.Context) PublishOption {
}
}
// SubscribeOption func
type SubscribeOption func(*SubscribeOptions)
// NewSubscribeOptions creates new SubscribeOptions
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
options := SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// Addrs sets the host addresses to be used by the broker
func Addrs(addrs ...string) Option {
return func(o *Options) {
@@ -145,28 +135,6 @@ func Codec(c codec.Codec) Option {
}
}
// DisableAutoAck disables auto ack
func DisableAutoAck() SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = false
}
}
// SubscribeAutoAck will disable auto acking of messages
// after they have been handled.
func SubscribeAutoAck(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = b
}
}
// SubscribeBodyOnly consumes only body of the message
func SubscribeBodyOnly(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.BodyOnly = b
}
}
// ErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func ErrorHandler(h Handler) Option {
@@ -175,6 +143,14 @@ func ErrorHandler(h Handler) Option {
}
}
// BatchErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func BatchErrorHandler(h BatchHandler) Option {
return func(o *Options) {
o.BatchErrorHandler = h
}
}
// SubscribeErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func SubscribeErrorHandler(h Handler) SubscribeOption {
@@ -183,6 +159,14 @@ func SubscribeErrorHandler(h Handler) SubscribeOption {
}
}
// SubscribeBatchErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption {
return func(o *SubscribeOptions) {
o.BatchErrorHandler = h
}
}
// Queue sets the subscribers queue
// Deprecated
func Queue(name string) SubscribeOption {
@@ -246,3 +230,55 @@ func SubscribeContext(ctx context.Context) SubscribeOption {
o.Context = ctx
}
}
// DisableAutoAck disables auto ack
// Deprecated
func DisableAutoAck() SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = false
}
}
// SubscribeAutoAck contol auto acking of messages
// after they have been handled.
func SubscribeAutoAck(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = b
}
}
// SubscribeBodyOnly consumes only body of the message
func SubscribeBodyOnly(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.BodyOnly = b
}
}
// SubscribeBatchSize specifies max batch size
func SubscribeBatchSize(n int) SubscribeOption {
return func(o *SubscribeOptions) {
o.BatchSize = n
}
}
// SubscribeBatchWait specifies max batch wait time
func SubscribeBatchWait(td time.Duration) SubscribeOption {
return func(o *SubscribeOptions) {
o.BatchWait = td
}
}
// SubscribeOption func
type SubscribeOption func(*SubscribeOptions)
// NewSubscribeOptions creates new SubscribeOptions
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
options := SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}

View File

@@ -40,6 +40,7 @@ type Client interface {
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error
String() string
}

View File

@@ -173,7 +173,7 @@ func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts
}
func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message {
options := NewMessageOptions(opts...)
options := NewMessageOptions(append([]MessageOption{MessageContentType(n.opts.ContentType)}, opts...)...)
return &noopMessage{topic: topic, payload: msg, opts: options}
}
@@ -181,47 +181,59 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
return &noopStream{}, nil
}
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
var body []byte
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
return n.publish(ctx, ps, opts...)
}
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
return n.publish(ctx, []Message{p}, opts...)
}
func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error {
options := NewPublishOptions(opts...)
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic()
msgs := make([]*broker.Message, 0, len(ps))
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data
} else {
// use codec for payload
cf, err := n.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
for _, p := range ps {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
md[metadata.HeaderTopic] = topic
var body []byte
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data
} else {
// use codec for payload
cf, err := n.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
}
body = b
msgs = append(msgs, &broker.Message{Header: md, Body: body})
}
topic := p.Topic()
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
return n.opts.Broker.Publish(ctx, topic, &broker.Message{
Header: md,
Body: body,
},
return n.opts.Broker.BatchPublish(ctx, msgs,
broker.PublishContext(options.Context),
broker.PublishBodyOnly(options.BodyOnly),
)

View File

@@ -96,8 +96,8 @@ type CallOptions struct {
RequestTimeout time.Duration
// DialTimeout dial timeout
DialTimeout time.Duration
// AuthToken flag
AuthToken bool
// AuthToken string
AuthToken string
}
// Context pass context to client
@@ -373,19 +373,35 @@ func DialTimeout(d time.Duration) Option {
}
// WithExchange sets the exchange to route a message through
// Deprecated
func WithExchange(e string) PublishOption {
return func(o *PublishOptions) {
o.Exchange = e
}
}
// PublishExchange sets the exchange to route a message through
func PublishExchange(e string) PublishOption {
return func(o *PublishOptions) {
o.Exchange = e
}
}
// WithBodyOnly publish only message body
// DERECATED
func WithBodyOnly(b bool) PublishOption {
return func(o *PublishOptions) {
o.BodyOnly = b
}
}
// PublishBodyOnly publish only message body
func PublishBodyOnly(b bool) PublishOption {
return func(o *PublishOptions) {
o.BodyOnly = b
}
}
// PublishContext sets the context in publish options
func PublishContext(ctx context.Context) PublishOption {
return func(o *PublishOptions) {
@@ -463,9 +479,9 @@ func WithDialTimeout(d time.Duration) CallOption {
// WithAuthToken is a CallOption which overrides the
// authorization header with the services own auth token
func WithAuthToken() CallOption {
func WithAuthToken(t string) CallOption {
return func(o *CallOptions) {
o.AuthToken = true
o.AuthToken = t
}
}
@@ -498,12 +514,20 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption {
}
// WithMessageContentType sets the message content type
// Deprecated
func WithMessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.ContentType = ct
}
}
// MessageContentType sets the message content type
func MessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.ContentType = ct
}
}
// StreamingRequest specifies that request is streaming
func StreamingRequest(b bool) RequestOption {
return func(o *RequestOptions) {

View File

@@ -4,11 +4,18 @@ package config
import (
"context"
"errors"
"time"
)
// DefaultConfig default config
var DefaultConfig Config = NewConfig()
// DefaultWatcherMinInterval default min interval for poll changes
var DefaultWatcherMinInterval = 5 * time.Second
// DefaultWatcherMinInterval default max interval for poll changes
var DefaultWatcherMaxInterval = 9 * time.Second
var (
// ErrCodecMissing is returned when codec needed and not specified
ErrCodecMissing = errors.New("codec missing")
@@ -30,26 +37,28 @@ type Config interface {
Load(context.Context, ...LoadOption) error
// Save config to sources
Save(context.Context, ...SaveOption) error
// Watch a value for changes
//Watch(context.Context) (Watcher, error)
// Watch a config for changes
Watch(context.Context, ...WatchOption) (Watcher, error)
// String returns config type name
String() string
}
// Watcher is the config watcher
type Watcher interface {
// Next() (, error)
// Next blocks until update happens or error returned
Next() (map[string]interface{}, error)
// Stop stops watcher
Stop() error
}
// Load loads config from config sources
func Load(ctx context.Context, cs ...Config) error {
func Load(ctx context.Context, cs []Config, opts ...LoadOption) error {
var err error
for _, c := range cs {
if err = c.Init(); err != nil {
return err
}
if err = c.Load(ctx); err != nil {
if err = c.Load(ctx, opts...); err != nil {
return err
}
}

View File

@@ -2,6 +2,7 @@ package config
import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
@@ -41,11 +42,15 @@ func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
mopts = append(mopts, mergo.WithAppendSlice)
}
src, err := rutil.Zero(c.opts.Struct)
dst := c.opts.Struct
if options.Struct != nil {
dst = options.Struct
}
src, err := rutil.Zero(dst)
if err == nil {
valueOf := reflect.ValueOf(src)
if err = c.fillValues(valueOf); err == nil {
err = mergo.Merge(c.opts.Struct, src, mopts...)
if err = fillValues(reflect.ValueOf(src), c.opts.StructTag); err == nil {
err = mergo.Merge(dst, src, mopts...)
}
}
@@ -63,7 +68,7 @@ func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
}
//nolint:gocyclo
func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
func fillValue(value reflect.Value, val string) error {
if !rutil.IsEmpty(value) {
return nil
}
@@ -80,10 +85,10 @@ func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
kv := strings.FieldsFunc(nval, func(c rune) bool { return c == '=' })
mkey := reflect.Indirect(reflect.New(kt))
mval := reflect.Indirect(reflect.New(et))
if err := c.fillValue(mkey, kv[0]); err != nil {
if err := fillValue(mkey, kv[0]); err != nil {
return err
}
if err := c.fillValue(mval, kv[1]); err != nil {
if err := fillValue(mval, kv[1]); err != nil {
return err
}
value.SetMapIndex(mkey, mval)
@@ -93,7 +98,7 @@ func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
value.Set(reflect.MakeSlice(reflect.SliceOf(value.Type().Elem()), len(nvals), len(nvals)))
for idx, nval := range nvals {
nvalue := reflect.Indirect(reflect.New(value.Type().Elem()))
if err := c.fillValue(nvalue, nval); err != nil {
if err := fillValue(nvalue, nval); err != nil {
return err
}
value.Index(idx).Set(nvalue)
@@ -182,7 +187,7 @@ func (c *defaultConfig) fillValue(value reflect.Value, val string) error {
return nil
}
func (c *defaultConfig) fillValues(valueOf reflect.Value) error {
func fillValues(valueOf reflect.Value, tname string) error {
var values reflect.Value
if valueOf.Kind() == reflect.Ptr {
@@ -209,7 +214,7 @@ func (c *defaultConfig) fillValues(valueOf reflect.Value) error {
switch value.Kind() {
case reflect.Struct:
value.Set(reflect.Indirect(reflect.New(value.Type())))
if err := c.fillValues(value); err != nil {
if err := fillValues(value, tname); err != nil {
return err
}
continue
@@ -223,17 +228,17 @@ func (c *defaultConfig) fillValues(valueOf reflect.Value) error {
value.Set(reflect.New(value.Type().Elem()))
}
value = value.Elem()
if err := c.fillValues(value); err != nil {
if err := fillValues(value, tname); err != nil {
return err
}
continue
}
tag, ok := field.Tag.Lookup(c.opts.StructTag)
tag, ok := field.Tag.Lookup(tname)
if !ok {
continue
}
if err := c.fillValue(value, tag); err != nil {
if err := fillValue(value, tag); err != nil {
return err
}
}
@@ -265,6 +270,10 @@ func (c *defaultConfig) Name() string {
return c.opts.Name
}
func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
return nil, fmt.Errorf("not implemented")
}
// NewConfig returns new default config source
func NewConfig(opts ...Option) Config {
options := NewOptions(opts...)

View File

@@ -47,6 +47,6 @@ func TestDefault(t *testing.T) {
if conf.StringValue != "after_load" {
t.Fatal("AfterLoad option not working")
}
t.Logf("%#+v\n", conf)
_ = conf
//t.Logf("%#+v\n", conf)
}

View File

@@ -2,6 +2,7 @@ package config
import (
"context"
"time"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
@@ -62,6 +63,7 @@ type LoadOption func(o *LoadOptions)
// LoadOptions struct
type LoadOptions struct {
Struct interface{}
Override bool
Append bool
}
@@ -88,13 +90,29 @@ func LoadAppend(b bool) LoadOption {
}
}
// LoadStruct override struct for loading
func LoadStruct(src interface{}) LoadOption {
return func(o *LoadOptions) {
o.Struct = src
}
}
// SaveOption function signature
type SaveOption func(o *SaveOptions)
// SaveOptions struct
type SaveOptions struct {
Struct interface{}
}
// SaveStruct override struct for save to config
func SaveStruct(src interface{}) SaveOption {
return func(o *SaveOptions) {
o.Struct = src
}
}
// NewSaveOptions fill SaveOptions struct
func NewSaveOptions(opts ...SaveOption) SaveOptions {
options := SaveOptions{}
for _, o := range opts {
@@ -186,3 +204,60 @@ func Name(n string) Option {
o.Name = n
}
}
// WatchOptions struuct
type WatchOptions struct {
// Context used by non default options
Context context.Context
// Coalesce multiple events to one
Coalesce bool
// MinInterval specifies the min time.Duration interval for poll changes
MinInterval time.Duration
// MaxInterval specifies the max time.Duration interval for poll changes
MaxInterval time.Duration
// Struct for filling
Struct interface{}
}
type WatchOption func(*WatchOptions)
func NewWatchOptions(opts ...WatchOption) WatchOptions {
options := WatchOptions{
Context: context.Background(),
MinInterval: DefaultWatcherMinInterval,
MaxInterval: DefaultWatcherMaxInterval,
}
for _, o := range opts {
o(&options)
}
return options
}
// WatchContext pass context
func WatchContext(ctx context.Context) WatchOption {
return func(o *WatchOptions) {
o.Context = ctx
}
}
// WatchCoalesce controls watch event combining
func WatchCoalesce(b bool) WatchOption {
return func(o *WatchOptions) {
o.Coalesce = b
}
}
// WatchInterval specifies min and max time.Duration for pulling changes
func WatchInterval(min, max time.Duration) WatchOption {
return func(o *WatchOptions) {
o.MinInterval = min
o.MaxInterval = max
}
}
// WatchStruct overrides struct for fill
func WatchStruct(src interface{}) WatchOption {
return func(o *WatchOptions) {
o.Struct = src
}
}

View File

@@ -1,26 +0,0 @@
package config_test
import (
"testing"
rutil "github.com/unistack-org/micro/v3/util/reflect"
)
type Config struct {
SubConfig *SubConfig
Config *Config
Value string
}
type SubConfig struct {
Value string
}
func TestReflect(t *testing.T) {
cfg1 := &Config{Value: "cfg1", Config: &Config{Value: "cfg1_1"}, SubConfig: &SubConfig{Value: "cfg1"}}
cfg2, err := rutil.Zero(cfg1)
if err != nil {
t.Fatal(err)
}
t.Logf("dst: %#+v\n", cfg2)
}

View File

@@ -3,12 +3,16 @@ package flow
import (
"context"
"fmt"
"path/filepath"
"sync"
"github.com/google/uuid"
"github.com/silas/dag"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/store"
)
type microFlow struct {
@@ -20,27 +24,147 @@ type microWorkflow struct {
g *dag.AcyclicGraph
init bool
sync.RWMutex
opts Options
steps map[string]Step
opts Options
steps map[string]Step
status Status
}
func (w *microWorkflow) ID() string {
return w.id
}
func (w *microWorkflow) Steps() [][]Step {
func (w *microWorkflow) Steps() ([][]Step, error) {
return w.getSteps("", false)
}
func (w *microWorkflow) Status() Status {
return w.status
}
func (w *microWorkflow) AppendSteps(steps ...Step) error {
w.Lock()
for _, s := range steps {
w.steps[s.String()] = s
w.g.Add(s)
}
for _, dst := range steps {
for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
return ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
}
}
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.Unlock()
return nil
}
func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error {
func (w *microWorkflow) RemoveSteps(steps ...Step) error {
// TODO: handle case when some step requires or required by removed step
w.Lock()
for _, s := range steps {
delete(w.steps, s.String())
w.g.Remove(s)
}
for _, dst := range steps {
for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
return ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
}
}
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.Unlock()
return nil
}
func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error {
return nil
func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
var steps [][]Step
var root dag.Vertex
var err error
fn := func(n dag.Vertex, idx int) error {
if idx == 0 {
steps = make([][]Step, 1)
steps[0] = make([]Step, 0, 1)
} else if idx >= len(steps) {
tsteps := make([][]Step, idx+1)
copy(tsteps, steps)
steps = tsteps
steps[idx] = make([]Step, 0, 1)
}
steps[idx] = append(steps[idx], n.(Step))
return nil
}
if start != "" {
var ok bool
w.RLock()
root, ok = w.steps[start]
w.RUnlock()
if !ok {
return nil, ErrStepNotExists
}
} else {
root, err = w.g.Root()
if err != nil {
return nil, err
}
}
if reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
if err != nil {
return nil, err
}
return steps, nil
}
func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) {
func (w *microWorkflow) Abort(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
}
func (w *microWorkflow) Suspend(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
}
func (w *microWorkflow) Resume(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
}
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
w.Lock()
if !w.init {
if err := w.g.Validate(); err != nil {
@@ -56,74 +180,176 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
if err != nil {
return "", err
}
eid := uid.String()
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
options := NewExecuteOptions(opts...)
var steps [][]Step
fn := func(n dag.Vertex, idx int) error {
if idx == 0 {
steps = make([][]Step, 1)
steps[0] = make([]Step, 0, 1)
} else if idx >= len(steps) {
tsteps := make([][]Step, idx+1)
copy(tsteps, steps)
steps = tsteps
steps[idx] = make([]Step, 0, 1)
}
steps[idx] = append(steps[idx], n.(Step))
return nil
}
var root dag.Vertex
if options.Start != "" {
var ok bool
w.RLock()
root, ok = w.steps[options.Start]
w.RUnlock()
if !ok {
return "", ErrStepNotExists
}
} else {
root, err = w.g.Root()
if err != nil {
return "", err
}
}
if options.Reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
steps, err := w.getSteps(options.Start, options.Reverse)
if err != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
return "", err
}
var wg sync.WaitGroup
cherr := make(chan error, 1)
defer close(cherr)
chstatus := make(chan Status, 1)
nctx, cancel := context.WithCancel(ctx)
defer cancel()
nopts := make([]ExecuteOption, 0, len(opts)+5)
nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store))
nopts = append(nopts,
ExecuteClient(w.opts.Client),
ExecuteTracer(w.opts.Tracer),
ExecuteLogger(w.opts.Logger),
ExecuteMeter(w.opts.Meter),
)
nopts = append(nopts, opts...)
done := make(chan struct{})
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
return eid, werr
}
for idx := range steps {
for nidx := range steps[idx] {
cstep := steps[idx][nidx]
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
return eid, werr
}
}
}
go func() {
for idx := range steps {
wg.Add(len(steps[idx]))
for nidx := range steps[idx] {
go func(step Step) {
defer wg.Done()
if err = step.Execute(nctx, req, nopts...); err != nil {
cherr <- err
cancel()
wStatus := &codec.Frame{}
if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
cherr <- werr
return
}
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
chstatus <- status
return
}
if w.opts.Logger.V(logger.TraceLevel) {
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
}
cstep := steps[idx][nidx]
if len(cstep.Requires()) == 0 {
wg.Add(1)
go func(step Step) {
defer wg.Done()
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := step.Execute(nctx, req, nopts...)
if serr != nil {
step.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
}
}(cstep)
wg.Wait()
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
cherr <- werr
return
}
}(steps[idx][nidx])
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := cstep.Execute(nctx, req, nopts...)
if serr != nil {
cstep.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
cherr <- werr
return
}
}
}
}
wg.Wait()
}
cherr <- nil
close(done)
}()
err = <-cherr
if options.Async {
return eid, nil
}
logger.Tracef(ctx, "wait for finish or error")
select {
case <-nctx.Done():
err = nctx.Err()
case cerr := <-cherr:
err = cerr
case <-done:
close(cherr)
case <-chstatus:
close(chstatus)
return uid.String(), nil
}
switch {
case nctx.Err() != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err == nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
}
return uid.String(), err
}
@@ -207,6 +433,17 @@ type microCallStep struct {
opts StepOptions
service string
method string
rsp *Message
req *Message
status Status
}
func (s *microCallStep) Request() *Message {
return s.req
}
func (s *microCallStep) Response() *Message {
return s.rsp
}
func (s *microCallStep) ID() string {
@@ -247,23 +484,47 @@ func (s *microCallStep) Hashcode() interface{} {
return s.String()
}
func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
func (s *microCallStep) GetStatus() Status {
return s.status
}
func (s *microCallStep) SetStatus(status Status) {
s.status = status
}
func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
options := NewExecuteOptions(opts...)
if options.Client == nil {
return fmt.Errorf("client not set")
return nil, ErrMissingClient
}
rsp := &codec.Frame{}
copts := []client.CallOption{client.WithRetries(0)}
if options.Timeout > 0 {
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
}
err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp)
return err
nctx := metadata.NewOutgoingContext(ctx, req.Header)
err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp)
if err != nil {
return nil, err
}
md, _ := metadata.FromOutgoingContext(nctx)
return &Message{Header: md, Body: rsp.Data}, err
}
type microPublishStep struct {
opts StepOptions
topic string
opts StepOptions
topic string
req *Message
rsp *Message
status Status
}
func (s *microPublishStep) Request() *Message {
return s.req
}
func (s *microPublishStep) Response() *Message {
return s.rsp
}
func (s *microPublishStep) ID() string {
@@ -304,13 +565,21 @@ func (s *microPublishStep) Hashcode() interface{} {
return s.String()
}
func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
return nil
func (s *microPublishStep) GetStatus() Status {
return s.status
}
func NewCallStep(service string, method string, opts ...StepOption) Step {
func (s *microPublishStep) SetStatus(status Status) {
s.status = status
}
func (s *microPublishStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
return nil, nil
}
func NewCallStep(service string, name string, method string, opts ...StepOption) Step {
options := NewStepOptions(opts...)
return &microCallStep{service: service, method: method, opts: options}
return &microCallStep{service: service, method: name + "." + method, opts: options}
}
func NewPublishStep(topic string, opts ...StepOption) Step {

View File

@@ -4,12 +4,43 @@ package flow
import (
"context"
"errors"
"sync"
"sync/atomic"
"github.com/unistack-org/micro/v3/metadata"
)
var (
ErrStepNotExists = errors.New("step not exists")
ErrMissingClient = errors.New("client not set")
)
// RawMessage is a raw encoded JSON value.
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
type RawMessage []byte
// MarshalJSON returns m as the JSON encoding of m.
func (m *RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
return *m, nil
}
// UnmarshalJSON sets *m to a copy of data.
func (m *RawMessage) UnmarshalJSON(data []byte) error {
if m == nil {
return errors.New("RawMessage UnmarshalJSON on nil pointer")
}
*m = append((*m)[0:0], data...)
return nil
}
type Message struct {
Header metadata.Metadata
Body RawMessage
}
// Step represents dedicated workflow step
type Step interface {
// ID returns step id
@@ -17,7 +48,7 @@ type Step interface {
// Endpoint returns rpc endpoint service_name.service_method or broker topic
Endpoint() string
// Execute step run
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error
Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error)
// Requires returns dependent steps
Requires() []string
// Options returns step options
@@ -26,20 +57,70 @@ type Step interface {
Require(steps ...Step) error
// String
String() string
// GetStatus returns step status
GetStatus() Status
// SetStatus sets the step status
SetStatus(Status)
// Request returns step request message
Request() *Message
// Response returns step response message
Response() *Message
}
type Status int
func (status Status) String() string {
return StatusString[status]
}
const (
StatusPending Status = iota
StatusRunning
StatusFailure
StatusSuccess
StatusAborted
StatusSuspend
)
var (
StatusString = map[Status]string{
StatusPending: "StatusPending",
StatusRunning: "StatusRunning",
StatusFailure: "StatusFailure",
StatusSuccess: "StatusSuccess",
StatusAborted: "StatusAborted",
StatusSuspend: "StatusSuspend",
}
StringStatus = map[string]Status{
"StatusPending": StatusPending,
"StatusRunning": StatusRunning,
"StatusFailure": StatusFailure,
"StatusSuccess": StatusSuccess,
"StatusAborted": StatusAborted,
"StatusSuspend": StatusSuspend,
}
)
// Workflow contains all steps to execute
type Workflow interface {
// ID returns id of the workflow
ID() string
// Steps returns steps slice where parallel steps returned on the same level
Steps() [][]Step
// Execute workflow with args, return execution id and error
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error)
Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error)
// RemoveSteps remove steps from workflow
RemoveSteps(ctx context.Context, steps ...Step) error
RemoveSteps(steps ...Step) error
// AppendSteps append steps to workflow
AppendSteps(ctx context.Context, steps ...Step) error
AppendSteps(steps ...Step) error
// Status returns workflow status
Status() Status
// Steps returns steps slice where parallel steps returned on the same level
Steps() ([][]Step, error)
// Suspend suspends execution
Suspend(ctx context.Context, eid string) error
// Resume resumes execution
Resume(ctx context.Context, eid string) error
// Abort abort execution
Abort(ctx context.Context, eid string) error
}
// Flow the base interface to interact with workflows
@@ -57,3 +138,15 @@ type Flow interface {
// WorkflowList lists all workflows
WorkflowList(ctx context.Context) ([]Workflow, error)
}
var (
flowMu sync.Mutex
atomicSteps atomic.Value
)
func RegisterStep(step Step) {
flowMu.Lock()
steps, _ := atomicSteps.Load().([]Step)
atomicSteps.Store(append(steps, step))
flowMu.Unlock()
}

View File

@@ -116,8 +116,6 @@ type ExecuteOptions struct {
Logger logger.Logger
// Meter holds the meter
Meter meter.Meter
// Store used for intermediate results
Store store.Store
// Context can be used to abort execution or pass additional opts
Context context.Context
// Start step
@@ -126,6 +124,8 @@ type ExecuteOptions struct {
Reverse bool
// Timeout for execution
Timeout time.Duration
// Async enables async execution
Async bool
}
type ExecuteOption func(*ExecuteOptions)
@@ -154,12 +154,6 @@ func ExecuteMeter(m meter.Meter) ExecuteOption {
}
}
func ExecuteStore(s store.Store) ExecuteOption {
return func(o *ExecuteOptions) {
o.Store = s
}
}
func ExecuteContext(ctx context.Context) ExecuteOption {
return func(o *ExecuteOptions) {
o.Context = ctx
@@ -178,8 +172,20 @@ func ExecuteTimeout(td time.Duration) ExecuteOption {
}
}
func ExecuteAsync(b bool) ExecuteOption {
return func(o *ExecuteOptions) {
o.Async = b
}
}
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions {
options := ExecuteOptions{}
options := ExecuteOptions{
Client: client.DefaultClient,
Logger: logger.DefaultLogger,
Tracer: tracer.DefaultTracer,
Meter: meter.DefaultMeter,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
@@ -196,7 +202,9 @@ type StepOptions struct {
type StepOption func(*StepOptions)
func NewStepOptions(opts ...StepOption) StepOptions {
options := StepOptions{Context: context.Background()}
options := StepOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}

4
go.mod
View File

@@ -3,9 +3,9 @@ module github.com/unistack-org/micro/v3
go 1.16
require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4
github.com/google/uuid v1.2.0
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.3.0
github.com/imdario/mergo v0.3.12
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34

8
go.sum
View File

@@ -1,9 +1,9 @@
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=

View File

@@ -24,6 +24,8 @@ type defaultLogger struct {
enc *json.Encoder
opts Options
sync.RWMutex
logFunc LogFunc
logfFunc LogfFunc
}
// Init(opts...) should only overwrite provided options
@@ -33,6 +35,11 @@ func (l *defaultLogger) Init(opts ...Option) error {
o(&l.opts)
}
l.enc = json.NewEncoder(l.opts.Out)
// wrap the Log func
for i := len(l.opts.Wrappers); i > 0; i-- {
l.logFunc = l.opts.Wrappers[i-1].Log(l.logFunc)
l.logfFunc = l.opts.Wrappers[i-1].Logf(l.logfFunc)
}
l.Unlock()
return nil
}
@@ -48,26 +55,20 @@ func (l *defaultLogger) V(level Level) bool {
return ok
}
func (l *defaultLogger) Fields(fields map[string]interface{}) Logger {
func (l *defaultLogger) Fields(fields ...interface{}) Logger {
nl := &defaultLogger{opts: l.opts, enc: l.enc}
nl.opts.Fields = make(map[string]interface{}, len(l.opts.Fields)+len(fields))
l.RLock()
for k, v := range l.opts.Fields {
nl.opts.Fields[k] = v
}
l.RUnlock()
for k, v := range fields {
nl.opts.Fields[k] = v
if len(fields) == 0 {
return nl
} else if len(fields)%2 != 0 {
fields = fields[:len(fields)-1]
}
nl.opts.Fields = append(l.opts.Fields, fields...)
return nl
}
func copyFields(src map[string]interface{}) map[string]interface{} {
dst := make(map[string]interface{}, len(src))
for k, v := range src {
dst[k] = v
}
func copyFields(src []interface{}) []interface{} {
dst := make([]interface{}, len(src))
copy(dst, src)
return dst
}
@@ -121,27 +122,27 @@ func (l *defaultLogger) Fatal(ctx context.Context, args ...interface{}) {
}
func (l *defaultLogger) Infof(ctx context.Context, msg string, args ...interface{}) {
l.Logf(ctx, InfoLevel, msg, args...)
l.logfFunc(ctx, InfoLevel, msg, args...)
}
func (l *defaultLogger) Errorf(ctx context.Context, msg string, args ...interface{}) {
l.Logf(ctx, ErrorLevel, msg, args...)
l.logfFunc(ctx, ErrorLevel, msg, args...)
}
func (l *defaultLogger) Debugf(ctx context.Context, msg string, args ...interface{}) {
l.Logf(ctx, DebugLevel, msg, args...)
l.logfFunc(ctx, DebugLevel, msg, args...)
}
func (l *defaultLogger) Warnf(ctx context.Context, msg string, args ...interface{}) {
l.Logf(ctx, WarnLevel, msg, args...)
l.logfFunc(ctx, WarnLevel, msg, args...)
}
func (l *defaultLogger) Tracef(ctx context.Context, msg string, args ...interface{}) {
l.Logf(ctx, TraceLevel, msg, args...)
l.logfFunc(ctx, TraceLevel, msg, args...)
}
func (l *defaultLogger) Fatalf(ctx context.Context, msg string, args ...interface{}) {
l.Logf(ctx, FatalLevel, msg, args...)
l.logfFunc(ctx, FatalLevel, msg, args...)
os.Exit(1)
}
@@ -154,19 +155,23 @@ func (l *defaultLogger) Log(ctx context.Context, level Level, args ...interface{
fields := copyFields(l.opts.Fields)
l.RUnlock()
fields["level"] = level.String()
fields = append(fields, "level", level.String())
if _, file, line, ok := runtime.Caller(l.opts.CallerSkipCount); ok {
fields["caller"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line)
fields = append(fields, "caller", fmt.Sprintf("%s:%d", logCallerfilePath(file), line))
}
fields = append(fields, "timestamp", time.Now().Format("2006-01-02 15:04:05"))
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
if len(args) > 0 {
fields["msg"] = fmt.Sprint(args...)
fields = append(fields, "msg", fmt.Sprint(args...))
}
out := make(map[string]interface{}, len(fields)/2)
for i := 0; i < len(fields); i += 2 {
out[fields[i].(string)] = fields[i+1]
}
l.RLock()
_ = l.enc.Encode(fields)
_ = l.enc.Encode(out)
l.RUnlock()
}
@@ -179,35 +184,39 @@ func (l *defaultLogger) Logf(ctx context.Context, level Level, msg string, args
fields := copyFields(l.opts.Fields)
l.RUnlock()
fields["level"] = level.String()
fields = append(fields, "level", level.String())
if _, file, line, ok := runtime.Caller(l.opts.CallerSkipCount); ok {
fields["caller"] = fmt.Sprintf("%s:%d", logCallerfilePath(file), line)
fields = append(fields, "caller", fmt.Sprintf("%s:%d", logCallerfilePath(file), line))
}
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
fields = append(fields, "timestamp", time.Now().Format("2006-01-02 15:04:05"))
if len(args) > 0 {
fields["msg"] = fmt.Sprintf(msg, args...)
fields = append(fields, "msg", fmt.Sprintf(msg, args...))
} else if msg != "" {
fields["msg"] = msg
fields = append(fields, "msg", msg)
}
out := make(map[string]interface{}, len(fields)/2)
for i := 0; i < len(fields); i += 2 {
out[fields[i].(string)] = fields[i+1]
}
l.RLock()
_ = l.enc.Encode(fields)
_ = l.enc.Encode(out)
l.RUnlock()
}
func (l *defaultLogger) Options() Options {
// not guard against options Context values
l.RLock()
opts := l.opts
opts.Fields = copyFields(l.opts.Fields)
l.RUnlock()
return opts
return l.opts
}
// NewLogger builds a new logger based on options
func NewLogger(opts ...Option) Logger {
l := &defaultLogger{opts: NewOptions(opts...)}
l := &defaultLogger{
opts: NewOptions(opts...),
}
l.logFunc = l.Log
l.logfFunc = l.Logf
l.enc = json.NewEncoder(l.opts.Out)
return l
}

View File

@@ -8,6 +8,8 @@ var (
DefaultLogger Logger = NewLogger()
// DefaultLevel used by logger
DefaultLevel Level = InfoLevel
// DefaultCallerSkipCount used by logger
DefaultCallerSkipCount = 2
)
// Logger is a generic logging interface
@@ -18,8 +20,8 @@ type Logger interface {
V(level Level) bool
// The Logger options
Options() Options
// Fields set fields to always be logged
Fields(fields map[string]interface{}) Logger
// Fields set fields to always be logged with keyval pairs
Fields(fields ...interface{}) Logger
// Info level message
Info(ctx context.Context, args ...interface{})
// Trace level message
@@ -52,6 +54,9 @@ type Logger interface {
String() string
}
// Field contains keyval pair
type Field interface{}
// Info writes msg to default logger on info level
func Info(ctx context.Context, args ...interface{}) {
DefaultLogger.Info(ctx, args...)
@@ -123,6 +128,6 @@ func Init(opts ...Option) error {
}
// Fields create logger with specific fields
func Fields(fields map[string]interface{}) Logger {
return DefaultLogger.Fields(fields)
func Fields(fields ...interface{}) Logger {
return DefaultLogger.Fields(fields...)
}

View File

@@ -1,18 +1,96 @@
package logger
import (
"bytes"
"context"
"log"
"testing"
)
func TestRedirectStdLogger(t *testing.T) {
buf := bytes.NewBuffer(nil)
l := NewLogger(WithLevel(TraceLevel), WithOutput(buf))
if err := l.Init(); err != nil {
t.Fatal(err)
}
fn := RedirectStdLogger(l, ErrorLevel)
defer fn()
log.Print("test")
if !bytes.Contains(buf.Bytes(), []byte(`"level":"error","msg":"test","timestamp"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
}
func TestStdLogger(t *testing.T) {
buf := bytes.NewBuffer(nil)
l := NewLogger(WithLevel(TraceLevel), WithOutput(buf))
if err := l.Init(); err != nil {
t.Fatal(err)
}
lg := NewStdLogger(l, ErrorLevel)
lg.Print("test")
if !bytes.Contains(buf.Bytes(), []byte(`"level":"error","msg":"test","timestamp"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
}
func TestLogger(t *testing.T) {
ctx := context.TODO()
l := NewLogger(WithLevel(TraceLevel))
buf := bytes.NewBuffer(nil)
l := NewLogger(WithLevel(TraceLevel), WithOutput(buf))
if err := l.Init(); err != nil {
t.Fatal(err)
}
l.Trace(ctx, "trace_msg1")
l.Warn(ctx, "warn_msg1")
l.Fields(map[string]interface{}{"error": "test"}).Info(ctx, "error message")
l.Fields("error", "test").Info(ctx, "error message")
l.Warn(ctx, "first", " ", "second")
if !bytes.Contains(buf.Bytes(), []byte(`"level":"trace","msg":"trace_msg1"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte(`"warn","msg":"warn_msg1"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte(`"error":"test","level":"info","msg":"error message"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte(`"level":"warn","msg":"first second"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
}
func TestLoggerWrapper(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(WithLevel(TraceLevel), WithOutput(buf))
if err := l.Init(WrapLogger(NewOmitWrapper())); err != nil {
t.Fatal(err)
}
type secret struct {
Name string
Passw string `logger:"omit"`
}
s := &secret{Name: "name", Passw: "secret"}
l.Errorf(ctx, "test %#+v", s)
if !bytes.Contains(buf.Bytes(), []byte(`logger.secret{Name:\"name\", Passw:\"\"}"`)) {
t.Fatalf("omit not works, struct: %v, output: %s", s, buf.Bytes())
}
}
func TestOmitLoggerWrapper(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewOmitLogger(NewLogger(WithLevel(TraceLevel), WithOutput(buf)))
if err := l.Init(); err != nil {
t.Fatal(err)
}
type secret struct {
Name string
Passw string `logger:"omit"`
}
s := &secret{Name: "name", Passw: "secret"}
l.Errorf(ctx, "test %#+v", s)
if !bytes.Contains(buf.Bytes(), []byte(`logger.secret{Name:\"name\", Passw:\"\"}"`)) {
t.Fatalf("omit not works, struct: %v, output: %s", s, buf.Bytes())
}
}

View File

@@ -16,22 +16,24 @@ type Options struct {
// Context holds exernal options
Context context.Context
// Fields holds additional metadata
Fields map[string]interface{}
Fields []interface{}
// Name holds the logger name
Name string
// CallerSkipCount number of frmaes to skip
CallerSkipCount int
// The logging level the logger should log
Level Level
// Wrappers logger wrapper that called before actual Log/Logf function
Wrappers []Wrapper
}
// NewOptions creates new options struct
func NewOptions(opts ...Option) Options {
options := Options{
Level: DefaultLevel,
Fields: make(map[string]interface{}),
Fields: make([]interface{}, 0, 6),
Out: os.Stderr,
CallerSkipCount: 0,
CallerSkipCount: DefaultCallerSkipCount,
Context: context.Background(),
}
for _, o := range opts {
@@ -41,7 +43,7 @@ func NewOptions(opts ...Option) Options {
}
// WithFields set default fields for the logger
func WithFields(fields map[string]interface{}) Option {
func WithFields(fields ...interface{}) Option {
return func(o *Options) {
o.Fields = fields
}
@@ -81,3 +83,10 @@ func WithName(n string) Option {
o.Name = n
}
}
// WrapLogger adds a logger Wrapper to a list of options passed into the logger
func WrapLogger(w Wrapper) Option {
return func(o *Options) {
o.Wrappers = append(o.Wrappers, w)
}
}

35
logger/stdlogger.go Normal file
View File

@@ -0,0 +1,35 @@
package logger
import (
"bytes"
"log"
)
type stdLogger struct {
l Logger
level Level
}
func NewStdLogger(l Logger, level Level) *log.Logger {
return log.New(&stdLogger{l: l, level: level}, "" /* prefix */, 0 /* flags */)
}
func (sl *stdLogger) Write(p []byte) (int, error) {
p = bytes.TrimSpace(p)
sl.l.Log(sl.l.Options().Context, sl.level, string(p))
return len(p), nil
}
func RedirectStdLogger(l Logger, level Level) func() {
flags := log.Flags()
prefix := log.Prefix()
writer := log.Writer()
log.SetFlags(0)
log.SetPrefix("")
log.SetOutput(&stdLogger{l: l, level: level})
return func() {
log.SetFlags(flags)
log.SetPrefix(prefix)
log.SetOutput(writer)
}
}

154
logger/wrapper.go Normal file
View File

@@ -0,0 +1,154 @@
package logger
import (
"context"
"reflect"
rutil "github.com/unistack-org/micro/v3/util/reflect"
)
// LogFunc function used for Log method
type LogFunc func(ctx context.Context, level Level, args ...interface{})
// LogfFunc function used for Logf method
type LogfFunc func(ctx context.Context, level Level, msg string, args ...interface{})
type Wrapper interface {
// Log logs message with needed level
Log(LogFunc) LogFunc
// Logf logs message with needed level
Logf(LogfFunc) LogfFunc
}
var (
_ Logger = &OmitLogger{}
)
type OmitLogger struct {
l Logger
}
func NewOmitLogger(l Logger) Logger {
return &OmitLogger{l: l}
}
func (w *OmitLogger) Init(opts ...Option) error {
return w.l.Init(append(opts, WrapLogger(NewOmitWrapper()))...)
}
func (w *OmitLogger) V(level Level) bool {
return w.l.V(level)
}
func (w *OmitLogger) Options() Options {
return w.l.Options()
}
func (w *OmitLogger) Fields(fields ...interface{}) Logger {
return w.l.Fields(fields...)
}
func (w *OmitLogger) Info(ctx context.Context, args ...interface{}) {
w.l.Info(ctx, args...)
}
func (w *OmitLogger) Trace(ctx context.Context, args ...interface{}) {
w.l.Trace(ctx, args...)
}
func (w *OmitLogger) Debug(ctx context.Context, args ...interface{}) {
w.l.Debug(ctx, args...)
}
func (w *OmitLogger) Warn(ctx context.Context, args ...interface{}) {
w.l.Warn(ctx, args...)
}
func (w *OmitLogger) Error(ctx context.Context, args ...interface{}) {
w.l.Error(ctx, args...)
}
func (w *OmitLogger) Fatal(ctx context.Context, args ...interface{}) {
w.l.Fatal(ctx, args...)
}
func (w *OmitLogger) Infof(ctx context.Context, msg string, args ...interface{}) {
w.l.Infof(ctx, msg, args...)
}
func (w *OmitLogger) Tracef(ctx context.Context, msg string, args ...interface{}) {
w.l.Tracef(ctx, msg, args...)
}
func (w *OmitLogger) Debugf(ctx context.Context, msg string, args ...interface{}) {
w.l.Debugf(ctx, msg, args...)
}
func (w *OmitLogger) Warnf(ctx context.Context, msg string, args ...interface{}) {
w.l.Warnf(ctx, msg, args...)
}
func (w *OmitLogger) Errorf(ctx context.Context, msg string, args ...interface{}) {
w.l.Errorf(ctx, msg, args...)
}
func (w *OmitLogger) Fatalf(ctx context.Context, msg string, args ...interface{}) {
w.l.Fatalf(ctx, msg, args...)
}
func (w *OmitLogger) Log(ctx context.Context, level Level, args ...interface{}) {
w.l.Log(ctx, level, args...)
}
func (w *OmitLogger) Logf(ctx context.Context, level Level, msg string, args ...interface{}) {
w.l.Logf(ctx, level, msg, args...)
}
func (w *OmitLogger) String() string {
return w.l.String()
}
type OmitWrapper struct{}
func NewOmitWrapper() Wrapper {
return &OmitWrapper{}
}
func getArgs(args []interface{}) []interface{} {
nargs := make([]interface{}, 0, len(args))
var err error
for _, arg := range args {
val := reflect.ValueOf(arg)
switch val.Kind() {
case reflect.Ptr:
val = val.Elem()
}
narg := arg
if val.Kind() == reflect.Struct {
if narg, err = rutil.Zero(arg); err == nil {
rutil.CopyDefaults(narg, arg)
if flds, ferr := rutil.StructFields(narg); ferr == nil {
for _, fld := range flds {
if tv, ok := fld.Field.Tag.Lookup("logger"); ok && tv == "omit" {
fld.Value.Set(reflect.Zero(fld.Value.Type()))
}
}
}
}
}
nargs = append(nargs, narg)
}
return nargs
}
func (w *OmitWrapper) Log(fn LogFunc) LogFunc {
return func(ctx context.Context, level Level, args ...interface{}) {
fn(ctx, level, getArgs(args)...)
}
}
func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc {
return func(ctx context.Context, level Level, msg string, args ...interface{}) {
fn(ctx, level, msg, getArgs(args)...)
}
}

View File

@@ -6,8 +6,20 @@ import (
"sort"
)
// HeaderPrefix for all headers passed
var HeaderPrefix = "Micro-"
var (
// HeaderTopic is the header name that contains topic name
HeaderTopic = "Micro-Topic"
// HeaderContentType specifies content type of message
HeaderContentType = "Content-Type"
// HeaderEndpoint specifies endpoint in service
HeaderEndpoint = "Micro-Endpoint"
// HeaderService specifies service
HeaderService = "Micro-Service"
// HeaderTimeout specifies timeout of operation
HeaderTimeout = "Micro-Timeout"
// HeaderAuthorization specifies Authorization header
HeaderAuthorization = "Authorization"
)
// Metadata is our way of representing request headers internally.
// They're used at the RPC level and translate back and forth

View File

@@ -3,8 +3,9 @@ package meter
import (
"io"
"reflect"
"sort"
"strconv"
"strings"
"time"
)
@@ -77,36 +78,62 @@ type Summary interface {
UpdateDuration(time.Time)
}
// sort labels alphabeticaly by label name
type byKey []string
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
func (k byKey) Swap(i, j int) {
k[i*2], k[i*2+1], k[j*2], k[j*2+1] = k[j*2], k[j*2+1], k[i*2], k[i*2+1]
k[i*2], k[j*2] = k[j*2], k[i*2]
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
}
func Sort(slice *[]string) {
bk := byKey(*slice)
if bk.Len() <= 1 {
return
// BuildLables used to sort labels and delete duplicates.
// Last value wins in case of duplicate label keys.
func BuildLabels(labels ...string) []string {
if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
}
sort.Sort(bk)
v := reflect.ValueOf(slice).Elem()
cnt := 0
key := 0
val := 1
for key < v.Len() {
if len(bk) > key+2 && bk[key] == bk[key+2] {
key += 2
val += 2
continue
}
v.Index(cnt).Set(v.Index(key))
cnt++
v.Index(cnt).Set(v.Index(val))
cnt++
key += 2
val += 2
}
v.SetLen(cnt)
sort.Sort(byKey(labels))
return labels
}
// BuildName used to combine metric with labels.
// If labels count is odd, drop last element
func BuildName(name string, labels ...string) string {
if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
}
if len(labels) > 2 {
sort.Sort(byKey(labels))
idx := 0
for {
if labels[idx] == labels[idx+2] {
copy(labels[idx:], labels[idx+2:])
labels = labels[:len(labels)-2]
} else {
idx += 2
}
if idx+2 >= len(labels) {
break
}
}
}
var b strings.Builder
_, _ = b.WriteString(name)
_, _ = b.WriteRune('{')
for idx := 0; idx < len(labels); idx += 2 {
if idx > 0 {
_, _ = b.WriteRune(',')
}
_, _ = b.WriteString(labels[idx])
_, _ = b.WriteString(`=`)
_, _ = b.WriteString(strconv.Quote(labels[idx+1]))
}
_, _ = b.WriteRune('}')
return b.String()
}

View File

@@ -14,11 +14,57 @@ func TestNoopMeter(t *testing.T) {
cnt.Inc()
}
func TestLabelsSort(t *testing.T) {
ls := []string{"server", "http", "register", "mdns", "broker", "broker1", "broker", "broker2", "server", "tcp"}
Sort(&ls)
func testEq(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
if ls[0] != "broker" || ls[1] != "broker2" {
t.Fatalf("sort error: %v", ls)
func TestBuildLabels(t *testing.T) {
type testData struct {
src []string
dst []string
}
data := []testData{
testData{
src: []string{"zerolabel", "value3", "firstlabel", "value2"},
dst: []string{"firstlabel", "value2", "zerolabel", "value3"},
},
}
for _, d := range data {
if !testEq(d.dst, BuildLabels(d.src...)) {
t.Fatalf("slices not properly sorted: %v %v", d.dst, d.src)
}
}
}
func TestBuildName(t *testing.T) {
data := map[string][]string{
`my_metric{firstlabel="value2",zerolabel="value3"}`: []string{
"my_metric",
"zerolabel", "value3", "firstlabel", "value2",
},
`my_metric{broker="broker2",register="mdns",server="tcp"}`: []string{
"my_metric",
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns",
},
`my_metric{aaa="aaa"}`: []string{
"my_metric",
"aaa", "aaa",
},
}
for e, d := range data {
if x := BuildName(d[0], d[1:]...); x != e {
t.Fatalf("expect: %s, result: %s", e, x)
}
}
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/network/tunnel"
)
@@ -24,9 +25,18 @@ type tunSubscriber struct {
opts broker.SubscribeOptions
}
type tunBatchSubscriber struct {
listener tunnel.Listener
handler broker.BatchHandler
closed chan bool
topic string
opts broker.SubscribeOptions
}
type tunEvent struct {
message *broker.Message
topic string
err error
}
// used to access tunnel from options context
@@ -62,6 +72,36 @@ func (t *tunBroker) Disconnect(ctx context.Context) error {
return t.tunnel.Close(ctx)
}
func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
topicMap := make(map[string]tunnel.Session)
var err error
for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
c, ok := topicMap[topic]
if !ok {
c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}
defer c.Close()
topicMap[topic] = c
}
if err = c.Send(&transport.Message{
Header: msg.Header,
Body: msg.Body,
}); err != nil {
// msg.SetError(err)
return err
}
}
return nil
}
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
@@ -77,6 +117,26 @@ func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message
})
}
func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
if err != nil {
return nil, err
}
tunSub := &tunBatchSubscriber{
topic: topic,
handler: h,
opts: broker.NewSubscribeOptions(opts...),
closed: make(chan bool),
listener: l,
}
// start processing
go tunSub.run()
return tunSub, nil
}
func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
if err != nil {
@@ -101,6 +161,49 @@ func (t *tunBroker) String() string {
return "tunnel"
}
func (t *tunBatchSubscriber) run() {
for {
// accept a new connection
c, err := t.listener.Accept()
if err != nil {
select {
case <-t.closed:
return
default:
continue
}
}
// receive message
m := new(transport.Message)
if err := c.Recv(m); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(t.opts.Context, err.Error())
}
if err = c.Close(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(t.opts.Context, err.Error())
}
}
continue
}
// close the connection
c.Close()
evts := broker.Events{&tunEvent{
topic: t.topic,
message: &broker.Message{
Header: m.Header,
Body: m.Body,
},
}}
// handle the message
go t.handler(evts)
}
}
func (t *tunSubscriber) run() {
for {
// accept a new connection
@@ -142,6 +245,24 @@ func (t *tunSubscriber) run() {
}
}
func (t *tunBatchSubscriber) Options() broker.SubscribeOptions {
return t.opts
}
func (t *tunBatchSubscriber) Topic() string {
return t.topic
}
func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error {
select {
case <-t.closed:
return nil
default:
close(t.closed)
return t.listener.Close()
}
}
func (t *tunSubscriber) Options() broker.SubscribeOptions {
return t.opts
}
@@ -173,7 +294,11 @@ func (t *tunEvent) Ack() error {
}
func (t *tunEvent) Error() error {
return nil
return t.err
}
func (t *tunEvent) SetError(err error) {
t.err = err
}
// NewBroker returns new tunnel broker

View File

@@ -6,11 +6,13 @@ import (
"sync"
"time"
// cprotorpc "github.com/unistack-org/micro-codec-protorpc"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/register"
maddr "github.com/unistack-org/micro/v3/util/addr"
mnet "github.com/unistack-org/micro/v3/util/net"
"github.com/unistack-org/micro/v3/util/rand"
)
// DefaultCodecs will be used to encode/decode
@@ -73,8 +75,7 @@ func (n *noopServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
} else if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
@@ -107,11 +108,12 @@ func (n *noopServer) Init(opts ...Option) error {
}
if n.handlers == nil {
n.handlers = make(map[string]Handler)
n.handlers = make(map[string]Handler, 1)
}
if n.subscribers == nil {
n.subscribers = make(map[*subscriber][]broker.Subscriber)
n.subscribers = make(map[*subscriber][]broker.Subscriber, 1)
}
if n.exit == nil {
n.exit = make(chan chan error)
}
@@ -202,26 +204,34 @@ func (n *noopServer) Register() error {
cx := config.Context
for sb := range n.subscribers {
handler := n.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck))
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
}
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
@@ -303,9 +313,22 @@ func (n *noopServer) Start() error {
config := n.Options()
n.RUnlock()
// use 127.0.0.1 to avoid scan of all network interfaces
addr, err := maddr.Extract("127.0.0.1")
if err != nil {
return err
}
var rng rand.Rand
i := rng.Intn(20000)
// set addr with port
addr = mnet.HostPort(addr, 10000+i)
config.Address = addr
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address)
}
n.Lock()
if len(config.Advertise) == 0 {
config.Advertise = config.Address

106
server/noop_test.go Normal file
View File

@@ -0,0 +1,106 @@
package server_test
import (
"context"
"fmt"
"testing"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/server"
)
type TestHandler struct {
t *testing.T
}
type TestMessage struct {
Name string
}
var (
numMsg int = 8
)
func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error {
//fmt.Printf("msg %s\n", msg.Data)
return nil
}
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
if len(msgs) != 8 {
h.t.Fatal("invalid number of messages received")
}
for idx := 0; idx < len(msgs); idx++ {
md, _ := metadata.FromIncomingContext(ctxs[idx])
_ = md
// fmt.Printf("msg md %v\n", md)
}
return nil
}
func TestNoopSub(t *testing.T) {
ctx := context.Background()
b := broker.NewBroker()
if err := b.Init(); err != nil {
t.Fatal(err)
}
if err := b.Connect(ctx); err != nil {
t.Fatal(err)
}
s := server.NewServer(
server.Broker(b),
server.Codec("application/octet-stream", codec.NewCodec()),
)
if err := s.Init(); err != nil {
t.Fatal(err)
}
c := client.NewClient(
client.Broker(b),
client.Codec("application/octet-stream", codec.NewCodec()),
client.ContentType("application/octet-stream"),
)
if err := c.Init(); err != nil {
t.Fatal(err)
}
h := &TestHandler{t: t}
if err := s.Subscribe(s.NewSubscriber("single_topic", h.SingleSubHandler,
server.SubscriberQueue("queue"),
)); err != nil {
t.Fatal(err)
}
if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
server.SubscriberQueue("queue"),
server.SubscriberBatch(true),
)); err != nil {
t.Fatal(err)
}
if err := s.Start(); err != nil {
t.Fatal(err)
}
msgs := make([]client.Message, 0, 8)
for i := 0; i < 8; i++ {
msgs = append(msgs, c.NewMessage("batch_topic", &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))}))
}
if err := c.BatchPublish(ctx, msgs); err != nil {
t.Fatal(err)
}
defer func() {
if err := s.Stop(); err != nil {
t.Fatal(err)
}
}()
}

View File

@@ -71,6 +71,8 @@ type Options struct {
Version string
// SubWrappers holds the server subscribe wrappers
SubWrappers []SubscriberWrapper
// BatchSubWrappers holds the server batch subscribe wrappers
BatchSubWrappers []BatchSubscriberWrapper
// HdlrWrappers holds the handler wrappers
HdlrWrappers []HandlerWrapper
// RegisterAttempts holds the number of register attempts before error
@@ -302,6 +304,13 @@ func WrapSubscriber(w SubscriberWrapper) Option {
}
}
// WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server
func WrapBatchSubscriber(w BatchSubscriberWrapper) Option {
return func(o *Options) {
o.BatchSubWrappers = append(o.BatchSubWrappers, w)
}
}
// MaxConn specifies maximum number of max simultaneous connections to server
func MaxConn(n int) Option {
return func(o *Options) {
@@ -354,6 +363,12 @@ type SubscriberOptions struct {
AutoAck bool
// BodyOnly flag specifies that message without headers
BodyOnly bool
// Batch flag specifies that message processed in batches
Batch bool
// BatchSize flag specifies max size of batch
BatchSize int
// BatchWait flag specifies max wait time for batch filling
BatchWait time.Duration
}
// NewSubscriberOptions create new SubscriberOptions
@@ -413,3 +428,32 @@ func SubscriberContext(ctx context.Context) SubscriberOption {
o.Context = ctx
}
}
// SubscriberAck control auto ack processing for handler
func SubscriberAck(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = b
}
}
// SubscriberBatch control batch processing for handler
func SubscriberBatch(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Batch = b
}
}
// SubscriberBatchSize control batch filling size for handler
// Batch filling max waiting time controlled by SubscriberBatchWait
func SubscriberBatchSize(n int) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchSize = n
}
}
// SubscriberBatchWait control batch filling wait time for handler
func SubscriberBatchWait(td time.Duration) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchWait = td
}
}

View File

@@ -15,8 +15,8 @@ import (
var DefaultServer Server = NewServer()
var (
// DefaultAddress will be used if no address passed
DefaultAddress = ":0"
// DefaultAddress will be used if no address passed, use secure localhost
DefaultAddress = "127.0.0.1:0"
// DefaultName will be used if no name passed
DefaultName = "server"
// DefaultVersion will be used if no version passed

View File

@@ -18,7 +18,8 @@ import (
)
const (
subSig = "func(context.Context, interface{}) error"
subSig = "func(context.Context, interface{}) error"
batchSubSig = "func([]context.Context, []interface{}) error"
)
// Precompute the reflect type for error. Can't use error directly
@@ -57,26 +58,33 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
return isExported(t.Name()) || t.PkgPath() == ""
}
// ValidateSubscriber func
// ValidateSubscriber func signature
func ValidateSubscriber(sub Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
if sub.Options().Batch {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
}
if strings.Compare(fmt.Sprintf("%s", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
}
}
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of outs: %v require signature %s",
name, typ.NumOut(), subSig)
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
name, typ.NumOut(), subSig, batchSubSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
@@ -87,13 +95,12 @@ func ValidateSubscriber(sub Subscriber) error {
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
@@ -101,8 +108,8 @@ func ValidateSubscriber(sub Subscriber) error {
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of outs: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
@@ -183,7 +190,125 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
}
//nolint:gocyclo
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) {
defer func() {
if r := recover(); r != nil {
n.RLock()
config := n.opts
n.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msgs := make([]Message, 0, len(ps))
ctxs := make([]context.Context, 0, len(ps))
for _, p := range ps {
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
}
ct, _ := msg.Header.Get(metadata.HeaderContentType)
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
hdr := metadata.Copy(msg.Header)
topic, _ := msg.Header.Get(metadata.HeaderTopic)
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
msgs = append(msgs, &rpcMessage{
topic: topic,
contentType: ct,
header: msg.Header,
body: msg.Body,
})
}
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var req reflect.Value
switch handler.reqType.Kind() {
case reflect.Ptr:
req = reflect.New(handler.reqType.Elem())
default:
req = reflect.New(handler.reqType.Elem()).Elem()
}
reqType := handler.reqType
for _, msg := range msgs {
cf, err := n.newCodec(msg.ContentType())
if err != nil {
return err
}
rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.Body()), rb.Interface()); err != nil {
return err
}
msg.(*rpcMessage).codec = cf
msg.(*rpcMessage).payload = rb.Interface()
}
fn := func(ctxs []context.Context, ms []Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctxs))
}
payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Payload()))
}
vals = append(vals, payloads)
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
for i := len(opts.BatchSubWrappers); i > 0; i-- {
fn = opts.BatchSubWrappers[i-1](fn)
}
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
results <- fn(ctxs, msgs)
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
//nolint:gocyclo
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {
@@ -201,12 +326,12 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = make(map[string]string)
msg.Header = metadata.New(2)
}
ct := msg.Header["Content-Type"]
if len(ct) == 0 {
msg.Header["Content-Type"] = defaultContentType
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
cf, err := n.newCodec(ct)
@@ -214,12 +339,12 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
return err
}
hdr := make(map[string]string, len(msg.Header))
hdr := metadata.New(len(msg.Header))
for k, v := range msg.Header {
if k == "Content-Type" {
continue
}
hdr[k] = v
hdr.Set(k, v)
}
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
@@ -294,7 +419,6 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}

View File

@@ -14,12 +14,20 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// publication message.
type SubscriberFunc func(ctx context.Context, msg Message) error
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message. This func used by batch subscribers
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
// StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring,

View File

@@ -36,16 +36,6 @@ func (m *memoryStore) key(prefix, key string) string {
return filepath.Join(prefix, key)
}
func (m *memoryStore) prefix(database, table string) string {
if len(database) == 0 {
database = m.opts.Database
}
if len(table) == 0 {
table = m.opts.Table
}
return filepath.Join(database, table)
}
func (m *memoryStore) exists(prefix, key string) error {
key = m.key(prefix, key)
@@ -80,15 +70,17 @@ func (m *memoryStore) delete(prefix, key string) {
func (m *memoryStore) list(prefix string, limit, offset uint) []string {
allItems := m.store.Items()
allKeys := make([]string, len(allItems))
i := 0
allKeys := make([]string, 0, len(allItems))
for k := range allItems {
if !strings.HasPrefix(k, prefix+"/") {
if !strings.HasPrefix(k, prefix) {
continue
}
allKeys[i] = strings.TrimPrefix(k, prefix+"/")
i++
k = strings.TrimPrefix(k, prefix)
if k[0] == '/' {
k = k[1:]
}
allKeys = append(allKeys, k)
}
if limit != 0 || offset != 0 {
@@ -107,7 +99,6 @@ func (m *memoryStore) list(prefix string, limit, offset uint) []string {
}
return allKeys[offset:end]
}
return allKeys
}
@@ -127,37 +118,48 @@ func (m *memoryStore) Name() string {
}
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
prefix := m.prefix(m.opts.Database, m.opts.Table)
return m.exists(prefix, key)
options := NewExistsOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
return m.exists(options.Namespace, key)
}
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
readOpts := NewReadOptions(opts...)
prefix := m.prefix(readOpts.Database, readOpts.Table)
return m.get(prefix, key, val)
options := NewReadOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
return m.get(options.Namespace, key, val)
}
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
writeOpts := NewWriteOptions(opts...)
options := NewWriteOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
if options.TTL == 0 {
options.TTL = cache.NoExpiration
}
prefix := m.prefix(writeOpts.Database, writeOpts.Table)
key = m.key(prefix, key)
key = m.key(options.Namespace, key)
buf, err := m.opts.Codec.Marshal(val)
if err != nil {
return err
}
m.store.Set(key, buf, writeOpts.TTL)
m.store.Set(key, buf, options.TTL)
return nil
}
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
deleteOptions := NewDeleteOptions(opts...)
options := NewDeleteOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
prefix := m.prefix(deleteOptions.Database, deleteOptions.Table)
m.delete(prefix, key)
m.delete(options.Namespace, key)
return nil
}
@@ -166,25 +168,27 @@ func (m *memoryStore) Options() Options {
}
func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
listOptions := NewListOptions(opts...)
options := NewListOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
prefix := m.prefix(listOptions.Database, listOptions.Table)
keys := m.list(prefix, listOptions.Limit, listOptions.Offset)
keys := m.list(options.Namespace, options.Limit, options.Offset)
if len(listOptions.Prefix) > 0 {
if len(options.Prefix) > 0 {
var prefixKeys []string
for _, k := range keys {
if strings.HasPrefix(k, listOptions.Prefix) {
if strings.HasPrefix(k, options.Prefix) {
prefixKeys = append(prefixKeys, k)
}
}
keys = prefixKeys
}
if len(listOptions.Suffix) > 0 {
if len(options.Suffix) > 0 {
var suffixKeys []string
for _, k := range keys {
if strings.HasSuffix(k, listOptions.Suffix) {
if strings.HasSuffix(k, options.Suffix) {
suffixKeys = append(suffixKeys, k)
}
}

View File

@@ -9,11 +9,11 @@ import (
)
func TestMemoryReInit(t *testing.T) {
s := store.NewStore(store.Table("aaa"))
if err := s.Init(store.Table("")); err != nil {
s := store.NewStore(store.Namespace("aaa"))
if err := s.Init(store.Namespace("")); err != nil {
t.Fatal(err)
}
if len(s.Options().Table) > 0 {
if len(s.Options().Namespace) > 0 {
t.Error("Init didn't reinitialise the store")
}
}
@@ -28,7 +28,7 @@ func TestMemoryBasic(t *testing.T) {
func TestMemoryPrefix(t *testing.T) {
s := store.NewStore()
if err := s.Init(store.Table("some-prefix")); err != nil {
if err := s.Init(store.Namespace("some-prefix")); err != nil {
t.Fatal(err)
}
basictest(s, t)
@@ -36,7 +36,7 @@ func TestMemoryPrefix(t *testing.T) {
func TestMemoryNamespace(t *testing.T) {
s := store.NewStore()
if err := s.Init(store.Database("some-namespace")); err != nil {
if err := s.Init(store.Namespace("some-namespace")); err != nil {
t.Fatal(err)
}
basictest(s, t)
@@ -44,7 +44,7 @@ func TestMemoryNamespace(t *testing.T) {
func TestMemoryNamespacePrefix(t *testing.T) {
s := store.NewStore()
if err := s.Init(store.Table("some-prefix"), store.Database("some-namespace")); err != nil {
if err := s.Init(store.Namespace("some-namespace")); err != nil {
t.Fatal(err)
}
basictest(s, t)

View File

@@ -28,13 +28,12 @@ type Options struct {
TLSConfig *tls.Config
// Name specifies store name
Name string
// Database specifies store database
Database string
// Table specifies store table
Table string
// Nodes contains store address
// TODO: replace with Addrs
Nodes []string
// Namespace of the records
Namespace string
// Addrs contains store address
Addrs []string
//Wrappers store wrapper that called before actual functions
//Wrappers []Wrapper
}
// NewOptions creates options struct
@@ -90,13 +89,20 @@ func Meter(m meter.Meter) Option {
}
}
// Name the name
// Name the name of the store
func Name(n string) Option {
return func(o *Options) {
o.Name = n
}
}
// Namespace sets namespace of the store
func Namespace(ns string) Option {
return func(o *Options) {
o.Namespace = ns
}
}
// Tracer sets the tracer
func Tracer(t tracer.Tracer) Option {
return func(o *Options) {
@@ -104,27 +110,21 @@ func Tracer(t tracer.Tracer) Option {
}
}
// Nodes contains the addresses or other connection information of the backing storage.
// Addrs contains the addresses or other connection information of the backing storage.
// For example, an etcd implementation would contain the nodes of the cluster.
// A SQL implementation could contain one or more connection strings.
func Nodes(a ...string) Option {
func Addrs(addrs ...string) Option {
return func(o *Options) {
o.Nodes = a
o.Addrs = addrs
}
}
// Database allows multiple isolated stores to be kept in one backend, if supported.
func Database(db string) Option {
return func(o *Options) {
o.Database = db
}
}
// Table is analag for a table in database backends or a key prefix in KV backends
func Table(t string) Option {
return func(o *Options) {
o.Table = t
}
// ReadOptions configures an individual Read operation
type ReadOptions struct {
// Context holds external options
Context context.Context
// Namespace holds namespace
Namespace string
}
// NewReadOptions fills ReadOptions struct with opts slice
@@ -136,29 +136,35 @@ func NewReadOptions(opts ...ReadOption) ReadOptions {
return options
}
// ReadOptions configures an individual Read operation
type ReadOptions struct {
// Context holds external options
Context context.Context
// Database holds the database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
}
// ReadOption sets values in ReadOptions
type ReadOption func(r *ReadOptions)
// ReadFrom the database and table
func ReadFrom(database, table string) ReadOption {
return func(r *ReadOptions) {
r.Database = database
r.Table = table
// ReadContext pass context.Context to ReadOptions
func ReadContext(ctx context.Context) ReadOption {
return func(o *ReadOptions) {
o.Context = ctx
}
}
// ReadNamespace pass namespace to ReadOptions
func ReadNamespace(ns string) ReadOption {
return func(o *ReadOptions) {
o.Namespace = ns
}
}
// WriteOptions configures an individual Write operation
type WriteOptions struct {
// Context holds external options
Context context.Context
// Metadata contains additional metadata
Metadata metadata.Metadata
// Namespace holds namespace
Namespace string
// TTL specifies key TTL
TTL time.Duration
}
// NewWriteOptions fills WriteOptions struct with opts slice
func NewWriteOptions(opts ...WriteOption) WriteOptions {
options := WriteOptions{}
@@ -168,47 +174,45 @@ func NewWriteOptions(opts ...WriteOption) WriteOptions {
return options
}
// WriteOptions configures an individual Write operation
type WriteOptions struct {
// Context holds external options
Context context.Context
// Metadata contains additional metadata
Metadata metadata.Metadata
// Database holds database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
// TTL specifies key TTL
TTL time.Duration
}
// WriteOption sets values in WriteOptions
type WriteOption func(w *WriteOptions)
// WriteTo the database and table
func WriteTo(database, table string) WriteOption {
return func(w *WriteOptions) {
w.Database = database
w.Table = table
}
}
// WriteTTL is the time the record expires
func WriteTTL(d time.Duration) WriteOption {
return func(w *WriteOptions) {
w.TTL = d
// WriteContext pass context.Context to wirte options
func WriteContext(ctx context.Context) WriteOption {
return func(o *WriteOptions) {
o.Context = ctx
}
}
// WriteMetadata add metadata.Metadata
func WriteMetadata(md metadata.Metadata) WriteOption {
return func(w *WriteOptions) {
w.Metadata = metadata.Copy(md)
return func(o *WriteOptions) {
o.Metadata = metadata.Copy(md)
}
}
// WriteTTL is the time the record expires
func WriteTTL(d time.Duration) WriteOption {
return func(o *WriteOptions) {
o.TTL = d
}
}
// WriteNamespace pass namespace to write options
func WriteNamespace(ns string) WriteOption {
return func(o *WriteOptions) {
o.Namespace = ns
}
}
// DeleteOptions configures an individual Delete operation
type DeleteOptions struct {
// Context holds external options
Context context.Context
// Namespace holds namespace
Namespace string
}
// NewDeleteOptions fills DeleteOptions struct with opts slice
func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
options := DeleteOptions{}
@@ -218,29 +222,33 @@ func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
return options
}
// DeleteOptions configures an individual Delete operation
type DeleteOptions struct {
// Context holds external options
Context context.Context
// Database holds database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
}
// DeleteOption sets values in DeleteOptions
type DeleteOption func(d *DeleteOptions)
// DeleteFrom the database and table
func DeleteFrom(database, table string) DeleteOption {
return func(d *DeleteOptions) {
d.Database = database
d.Table = table
// DeleteContext pass context.Context to delete options
func DeleteContext(ctx context.Context) DeleteOption {
return func(o *DeleteOptions) {
o.Context = ctx
}
}
// DeleteNamespace pass namespace to delete options
func DeleteNamespace(ns string) DeleteOption {
return func(o *DeleteOptions) {
o.Namespace = ns
}
}
// ListOptions configures an individual List operation
type ListOptions struct {
Context context.Context
Prefix string
Suffix string
Namespace string
Limit uint
Offset uint
}
// NewListOptions fills ListOptions struct with opts slice
func NewListOptions(opts ...ListOption) ListOptions {
options := ListOptions{}
@@ -250,59 +258,50 @@ func NewListOptions(opts ...ListOption) ListOptions {
return options
}
// ListOptions configures an individual List operation
type ListOptions struct {
Context context.Context
Database string
Prefix string
Suffix string
Namespace string
Table string
Limit uint
Offset uint
}
// ListOption sets values in ListOptions
type ListOption func(l *ListOptions)
// ListFrom the database and table
func ListFrom(database, table string) ListOption {
return func(l *ListOptions) {
l.Database = database
l.Table = table
// ListContext pass context.Context to list options
func ListContext(ctx context.Context) ListOption {
return func(o *ListOptions) {
o.Context = ctx
}
}
// ListPrefix returns all keys that are prefixed with key
func ListPrefix(p string) ListOption {
return func(l *ListOptions) {
l.Prefix = p
func ListPrefix(s string) ListOption {
return func(o *ListOptions) {
o.Prefix = s
}
}
// ListSuffix returns all keys that end with key
func ListSuffix(s string) ListOption {
return func(l *ListOptions) {
l.Suffix = s
return func(o *ListOptions) {
o.Suffix = s
}
}
// ListLimit limits the number of returned keys to l
func ListLimit(l uint) ListOption {
return func(lo *ListOptions) {
lo.Limit = l
// ListLimit limits the number of returned keys
func ListLimit(n uint) ListOption {
return func(o *ListOptions) {
o.Limit = n
}
}
// ListOffset starts returning responses from o. Use in conjunction with Limit for pagination.
func ListOffset(o uint) ListOption {
return func(l *ListOptions) {
l.Offset = o
// ListOffset use with Limit for pagination
func ListOffset(n uint) ListOption {
return func(o *ListOptions) {
o.Offset = n
}
}
// ExistsOption specifies Exists call options
type ExistsOption func(*ExistsOptions)
// ListNamespace pass namespace to list options
func ListNamespace(ns string) ListOption {
return func(o *ListOptions) {
o.Namespace = ns
}
}
// ExistsOptions holds options for Exists method
type ExistsOptions struct {
@@ -312,6 +311,9 @@ type ExistsOptions struct {
Namespace string
}
// ExistsOption specifies Exists call options
type ExistsOption func(*ExistsOptions)
// NewExistsOptions helper for Exists method
func NewExistsOptions(opts ...ExistsOption) ExistsOptions {
options := ExistsOptions{
@@ -322,3 +324,24 @@ func NewExistsOptions(opts ...ExistsOption) ExistsOptions {
}
return options
}
// ExistsContext pass context.Context to exist options
func ExistsContext(ctx context.Context) ExistsOption {
return func(o *ExistsOptions) {
o.Context = ctx
}
}
// ExistsNamespace pass namespace to exist options
func ExistsNamespace(ns string) ExistsOption {
return func(o *ExistsOptions) {
o.Namespace = ns
}
}
// WrapStore adds a store Wrapper to a list of options passed into the store
//func WrapStore(w Wrapper) Option {
// return func(o *Options) {
// o.Wrappers = append(o.Wrappers, w)
// }
//}

84
store/wrapper.go Normal file
View File

@@ -0,0 +1,84 @@
package store
import (
"context"
)
// LogfFunc function used for Logf method
//type LogfFunc func(ctx context.Context, level Level, msg string, args ...interface{})
//type Wrapper interface {
// Logf logs message with needed level
//Logf(LogfFunc) LogfFunc
//}
type NamespaceStore struct {
s Store
ns string
}
var (
_ Store = &NamespaceStore{}
)
func NewNamespaceStore(s Store, ns string) Store {
return &NamespaceStore{s: s, ns: ns}
}
func (w *NamespaceStore) Init(opts ...Option) error {
return w.s.Init(opts...)
}
func (w *NamespaceStore) Connect(ctx context.Context) error {
return w.s.Connect(ctx)
}
func (w *NamespaceStore) Disconnect(ctx context.Context) error {
return w.s.Disconnect(ctx)
}
func (w *NamespaceStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
return w.s.Read(ctx, key, val, append(opts, ReadNamespace(w.ns))...)
}
func (w *NamespaceStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
return w.s.Write(ctx, key, val, append(opts, WriteNamespace(w.ns))...)
}
func (w *NamespaceStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
return w.s.Delete(ctx, key, append(opts, DeleteNamespace(w.ns))...)
}
func (w *NamespaceStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
return w.s.Exists(ctx, key, append(opts, ExistsNamespace(w.ns))...)
}
func (w *NamespaceStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
return w.s.List(ctx, append(opts, ListNamespace(w.ns))...)
}
func (w *NamespaceStore) Options() Options {
return w.s.Options()
}
func (w *NamespaceStore) Name() string {
return w.s.Name()
}
func (w *NamespaceStore) String() string {
return w.s.String()
}
//type NamespaceWrapper struct{}
//func NewNamespaceWrapper() Wrapper {
// return &NamespaceWrapper{}
//}
/*
func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc {
return func(ctx context.Context, level Level, msg string, args ...interface{}) {
fn(ctx, level, msg, getArgs(args)...)
}
}
*/

View File

@@ -7,8 +7,8 @@ import (
"github.com/unistack-org/micro/v3/util/rand"
)
// Do returns a random time to jitter with max cap specified
func Do(d time.Duration) time.Duration {
// Random returns a random time to jitter with max cap specified
func Random(d time.Duration) time.Duration {
var rng rand.Rand
v := rng.Float64() * float64(d.Nanoseconds())
return time.Duration(v)

65
util/jitter/ticker.go Normal file
View File

@@ -0,0 +1,65 @@
package jitter
import (
"time"
"github.com/unistack-org/micro/v3/util/rand"
)
// Ticker is similar to time.Ticker but ticks at random intervals between
// the min and max duration values (stored internally as int64 nanosecond
// counts).
type Ticker struct {
C chan time.Time
done chan chan struct{}
min int64
max int64
rng rand.Rand
}
// NewTicker returns a pointer to an initialized instance of the Ticker.
// Min and max are durations of the shortest and longest allowed
// ticks. Ticker will run in a goroutine until explicitly stopped.
func NewTicker(min, max time.Duration) *Ticker {
ticker := &Ticker{
C: make(chan time.Time),
done: make(chan chan struct{}),
min: min.Nanoseconds(),
max: max.Nanoseconds(),
}
go ticker.run()
return ticker
}
// Stop terminates the ticker goroutine and closes the C channel.
func (ticker *Ticker) Stop() {
c := make(chan struct{})
ticker.done <- c
<-c
}
func (ticker *Ticker) run() {
defer close(ticker.C)
t := time.NewTimer(ticker.nextInterval())
for {
// either a stop signal or a timeout
select {
case c := <-ticker.done:
t.Stop()
close(c)
return
case <-t.C:
select {
case ticker.C <- time.Now():
t.Stop()
t = time.NewTimer(ticker.nextInterval())
default:
// there could be noone receiving...
}
}
}
}
func (ticker *Ticker) nextInterval() time.Duration {
return time.Duration(ticker.rng.Int63n(ticker.max-ticker.min)+ticker.min) * time.Nanosecond
}

View File

@@ -7,7 +7,6 @@ import (
"reflect"
"regexp"
"strings"
"time"
)
// ErrInvalidParam specifies invalid url query params
@@ -15,11 +14,12 @@ var ErrInvalidParam = errors.New("invalid url query param provided")
var bracketSplitter = regexp.MustCompile(`\[|\]`)
var timeKind = reflect.TypeOf(time.Time{}).Kind()
//var timeKind = reflect.ValueOf(time.Time{}).Kind()
type StructField struct {
Field reflect.StructField
Value reflect.Value
Path string
}
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
@@ -35,7 +35,7 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
if len(fld.PkgPath) != 0 {
continue
}
@@ -66,6 +66,17 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
return nil, ErrNotFound
}
func StructFieldByPath(src interface{}, path string) (interface{}, error) {
var err error
for _, p := range strings.Split(path, ".") {
src, err = StructFieldByName(src, p)
if err != nil {
return nil, err
}
}
return src, err
}
func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
@@ -79,7 +90,7 @@ func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
if len(fld.PkgPath) != 0 {
continue
}
if fld.Name == tkey {
@@ -105,6 +116,19 @@ func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
return nil, ErrNotFound
}
// StructFieldsMap returns map[string]interface{} or error
func StructFieldsMap(src interface{}) (map[string]interface{}, error) {
fields, err := StructFields(src)
if err != nil {
return nil, err
}
mp := make(map[string]interface{}, len(fields))
for _, field := range fields {
mp[field.Path] = field.Value.Interface()
}
return mp, nil
}
// StructFields returns slice of struct fields
func StructFields(src interface{}) ([]StructField, error) {
var fields []StructField
@@ -116,25 +140,29 @@ func StructFields(src interface{}) ([]StructField, error) {
if sv.Kind() != reflect.Struct {
return nil, ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
if !val.IsValid() || len(fld.PkgPath) != 0 {
continue
}
switch val.Kind() {
case timeKind:
fields = append(fields, StructField{Field: fld, Value: val})
//case timeKind:
//fmt.Printf("GGG\n")
//fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
case reflect.Struct:
infields, err := StructFields(val.Interface())
if err != nil {
return nil, err
}
fields = append(fields, infields...)
for _, infield := range infields {
infield.Path = fmt.Sprintf("%s.%s", fld.Name, infield.Path)
fields = append(fields, infield)
}
default:
fields = append(fields, StructField{Field: fld, Value: val})
fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
}
}
@@ -194,7 +222,7 @@ func StructURLValues(src interface{}, pref string, tags []string) (url.Values, e
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 || !val.IsValid() {
if len(fld.PkgPath) != 0 || !val.IsValid() {
continue
}

View File

@@ -2,9 +2,80 @@ package reflect
import (
"net/url"
"reflect"
rfl "reflect"
"testing"
)
func TestStructFieldsMap(t *testing.T) {
type NestedStr struct {
BBB string
CCC int
}
type Str struct {
Name []string `json:"name" codec:"flatten"`
XXX string `json:"xxx"`
Nested NestedStr
}
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
fields, err := StructFieldsMap(val)
if err != nil {
t.Fatal(err)
}
if v, ok := fields["Nested.BBB"]; !ok || v != "ddd" {
t.Fatalf("invalid field from %v", fields)
}
}
func TestStructFields(t *testing.T) {
type NestedStr struct {
BBB string
CCC int
}
type Str struct {
Name []string `json:"name" codec:"flatten"`
XXX string `json:"xxx"`
Nested NestedStr
}
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
fields, err := StructFields(val)
if err != nil {
t.Fatal(err)
}
var ok bool
for _, field := range fields {
if field.Path == "Nested.CCC" {
ok = true
}
}
if !ok {
t.Fatalf("struct fields returns invalid path: %v", fields)
}
}
func TestStructByPath(t *testing.T) {
type NestedStr struct {
BBB string
CCC int
}
type Str struct {
Name []string `json:"name" codec:"flatten"`
XXX string `json:"xxx"`
Nested NestedStr
}
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
field, err := StructFieldByPath(val, "Nested.CCC")
if err != nil {
t.Fatal(err)
}
if rfl.Indirect(reflect.ValueOf(field)).Int() != 9 {
t.Fatalf("invalid elem returned: %v", field)
}
}
func TestStructByTag(t *testing.T) {
type Str struct {
Name []string `json:"name" codec:"flatten"`

View File

@@ -8,18 +8,18 @@ const (
// Template is a compiled representation of path templates.
type Template struct {
// Verb is a VERB part in the template
Verb string
// Original template (example: /v1/a_bit_of_everything)
Template string
// OpCodes is a sequence of operations
// Version is the version number of the format.
Version int
// OpCodes is a sequence of operations.
OpCodes []int
// Pool is a constant pool
Pool []string
// Fields is a list of field paths bound in this template
// Verb is a VERB part in the template.
Verb string
// Fields is a list of field paths bound in this template.
Fields []string
// Version is the version number of the format
Version int
// Original template (example: /v1/a_bit_of_everything)
Template string
}
// Compiler compiles utilities representation of path templates into marshallable operations.
@@ -29,9 +29,15 @@ type Compiler interface {
}
type op struct {
str string
code OpCode
operand int
// code is the opcode of the operation
code OpCode
// str is a string operand of the code.
// num is ignored if str is not empty.
str string
// num is a numeric operand of the code.
num int
}
func (w wildcard) compile() []op {
@@ -61,8 +67,8 @@ func (v variable) compile() []op {
ops = append(ops, s.compile()...)
}
ops = append(ops, op{
code: OpConcatN,
operand: len(v.segments),
code: OpConcatN,
num: len(v.segments),
}, op{
code: OpCapture,
str: v.path,
@@ -77,7 +83,6 @@ func (t template) Compile() Template {
rawOps = append(rawOps, s.compile()...)
}
// ops := make([]int, 0, len(rawOps))
var (
ops []int
pool []string
@@ -87,8 +92,12 @@ func (t template) Compile() Template {
for _, op := range rawOps {
ops = append(ops, int(op.code))
if op.str == "" {
ops = append(ops, op.operand)
ops = append(ops, op.num)
} else {
// eof segment literal represents the "/" path pattern
if op.str == eof {
op.str = ""
}
if _, ok := consts[op.str]; !ok {
consts[op.str] = len(pool)
pool = append(pool, op.str)

View File

@@ -21,6 +21,13 @@ func TestCompile(t *testing.T) {
fields []string
}{
{},
{
segs: []segment{
literal(eof),
},
ops: []int{int(OpLitPush), 0},
pool: []string{""},
},
{
segs: []segment{
wildcard{},

View File

@@ -3,11 +3,8 @@ package router
// download from https://raw.githubusercontent.com/grpc-ecosystem/grpc-gateway/master/protoc-gen-grpc-gateway/httprule/parse.go
import (
"context"
"fmt"
"strings"
"github.com/unistack-org/micro/v3/logger"
)
// InvalidTemplateError indicates that the path template is not valid.
@@ -83,8 +80,30 @@ func tokenize(path string) (tokens []string, verb string) {
}
l := len(tokens)
// See
// https://github.com/grpc-ecosystem/grpc-gateway/pull/1947#issuecomment-774523693 ;
// although normal and backwards-compat logic here is to use the last index
// of a colon, if the final segment is a variable followed by a colon, the
// part following the colon must be a verb. Hence if the previous token is
// an end var marker, we switch the index we're looking for to Index instead
// of LastIndex, so that we correctly grab the remaining part of the path as
// the verb.
var penultimateTokenIsEndVar bool
switch l {
case 0, 1:
// Not enough to be variable so skip this logic and don't result in an
// invalid index
default:
penultimateTokenIsEndVar = tokens[l-2] == "}"
}
t := tokens[l-1]
if idx := strings.LastIndex(t, ":"); idx == 0 {
var idx int
if penultimateTokenIsEndVar {
idx = strings.Index(t, ":")
} else {
idx = strings.LastIndex(t, ":")
}
if idx == 0 {
tokens, verb = tokens[:l-1], t[1:]
} else if idx > 0 {
tokens[l-1], verb = t[:idx], t[idx+1:]
@@ -101,22 +120,17 @@ type parser struct {
// topLevelSegments is the target of this parser.
func (p *parser) topLevelSegments() ([]segment, error) {
if logger.V(logger.TraceLevel) {
logger.Debug(context.TODO(), "Parsing %q", p.tokens)
if _, err := p.accept(typeEOF); err == nil {
p.tokens = p.tokens[:0]
return []segment{literal(eof)}, nil
}
segs, err := p.segments()
if err != nil {
return nil, err
}
if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "accept segments: %q; %q", p.accepted, p.tokens)
}
if _, err := p.accept(typeEOF); err != nil {
return nil, fmt.Errorf("unexpected token %q after segments %q", p.tokens[0], strings.Join(p.accepted, ""))
}
if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "accept eof: %q; %q", p.accepted, p.tokens)
}
return segs, nil
}
@@ -126,9 +140,6 @@ func (p *parser) segments() ([]segment, error) {
return nil, err
}
if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "accept segment: %q; %q", p.accepted, p.tokens)
}
segs := []segment{s}
for {
if _, err := p.accept("/"); err != nil {
@@ -139,9 +150,6 @@ func (p *parser) segments() ([]segment, error) {
return segs, err
}
segs = append(segs, s)
if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "accept segment: %q; %q", p.accepted, p.tokens)
}
}
}

View File

@@ -4,7 +4,6 @@ package router
import (
"context"
"flag"
"fmt"
"reflect"
"testing"
@@ -16,6 +15,7 @@ func TestTokenize(t *testing.T) {
for _, spec := range []struct {
src string
tokens []string
verb string
}{
{
src: "",
@@ -84,32 +84,74 @@ func TestTokenize(t *testing.T) {
eof,
},
},
{
src: "v1/a/{endpoint}:a",
tokens: []string{
"v1", "/",
"a", "/",
"{", "endpoint", "}",
eof,
},
verb: "a",
},
{
src: "v1/a/{endpoint}:b:c",
tokens: []string{
"v1", "/",
"a", "/",
"{", "endpoint", "}",
eof,
},
verb: "b:c",
},
} {
tokens, verb := tokenize(spec.src)
if got, want := tokens, spec.tokens; !reflect.DeepEqual(got, want) {
t.Errorf("tokenize(%q) = %q, _; want %q, _", spec.src, got, want)
}
if got, want := verb, ""; got != want {
t.Errorf("tokenize(%q) = _, %q; want _, %q", spec.src, got, want)
}
src := fmt.Sprintf("%s:%s", spec.src, "LOCK")
tokens, verb = tokenize(src)
if got, want := tokens, spec.tokens; !reflect.DeepEqual(got, want) {
t.Errorf("tokenize(%q) = %q, _; want %q, _", src, got, want)
}
if got, want := verb, "LOCK"; got != want {
t.Errorf("tokenize(%q) = _, %q; want _, %q", src, got, want)
switch {
case spec.verb != "":
if got, want := verb, spec.verb; !reflect.DeepEqual(got, want) {
t.Errorf("tokenize(%q) = %q, _; want %q, _", spec.src, got, want)
}
default:
if got, want := verb, ""; got != want {
t.Errorf("tokenize(%q) = _, %q; want _, %q", spec.src, got, want)
}
src := fmt.Sprintf("%s:%s", spec.src, "LOCK")
tokens, verb = tokenize(src)
if got, want := tokens, spec.tokens; !reflect.DeepEqual(got, want) {
t.Errorf("tokenize(%q) = %q, _; want %q, _", src, got, want)
}
if got, want := verb, "LOCK"; got != want {
t.Errorf("tokenize(%q) = _, %q; want _, %q", src, got, want)
}
}
}
}
func TestParseSegments(t *testing.T) {
flag.Set("v", "3")
for _, spec := range []struct {
tokens []string
want []segment
}{
{
tokens: []string{eof},
want: []segment{
literal(eof),
},
},
{
// Note: this case will never arise as tokenize() will never return such a sequence of tokens
// and even if it does it will be treated as [eof]
tokens: []string{eof, "v1", eof},
want: []segment{
literal(eof),
},
},
{
tokens: []string{"v1", eof},
want: []segment{
@@ -251,7 +293,6 @@ func TestParseSegments(t *testing.T) {
}
func TestParseSegmentsWithErrors(t *testing.T) {
flag.Set("v", "3")
for _, spec := range []struct {
tokens []string
}{
@@ -275,10 +316,6 @@ func TestParseSegmentsWithErrors(t *testing.T) {
// invalid percent-encoding
tokens: []string{"a%2z", eof},
},
{
// empty segments
tokens: []string{eof},
},
{
// unterminated variable
tokens: []string{"{", "name", eof},

View File

@@ -23,9 +23,9 @@ type rop struct {
operand int
}
// Pattern is a template pattern of http request paths defined in github.com/googleapis/googleapis/google/api/http.proto.
// Pattern is a template pattern of http request paths defined in
// https://github.com/googleapis/googleapis/blob/master/google/api/http.proto
type Pattern struct {
verb string
// ops is a list of operations
ops []rop
// pool is a constant pool indexed by the operands or vars
@@ -36,43 +36,27 @@ type Pattern struct {
stacksize int
// tailLen is the length of the fixed-size segments after a deep wildcard
tailLen int
// assumeColonVerb indicates whether a path suffix after a final
// colon may only be interpreted as a verb.
assumeColonVerb bool
// verb is the VERB part of the path pattern. It is empty if the pattern does not have VERB part.
verb string
}
type patternOptions struct {
assumeColonVerb bool
}
// PatternOpt is an option for creating Patterns.
type PatternOpt func(*patternOptions)
// NewPattern returns a new Pattern from the given definition values.
// "ops" is a sequence of op codes. "pool" is a constant pool.
// "verb" is the verb part of the pattern. It is empty if the pattern does not have the part.
// "version" must be 1 for now.
// It returns an error if the given definition is invalid.
//nolint:gocyclo
func NewPattern(version int, ops []int, pool []string, verb string, opts ...PatternOpt) (Pattern, error) {
options := patternOptions{
assumeColonVerb: true,
}
for _, o := range opts {
o(&options)
}
func NewPattern(version int, ops []int, pool []string, verb string) (Pattern, error) {
if version != 1 {
if logger.V(logger.DebugLevel) {
logger.Debug(context.TODO(), "unsupported version: %d", version)
if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "unsupported version: %d", version)
}
return Pattern{}, ErrInvalidPattern
}
l := len(ops)
if l%2 != 0 {
if logger.V(logger.DebugLevel) {
logger.Debug(context.TODO(), "odd number of ops codes: %d", l)
if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "odd number of ops codes: %d", l)
}
return Pattern{}, ErrInvalidPattern
}
@@ -141,13 +125,13 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
vars = append(vars, v)
stack--
if stack < 0 {
if logger.V(logger.DebugLevel) {
if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "stack underflow")
}
return Pattern{}, ErrInvalidPattern
}
default:
if logger.V(logger.DebugLevel) {
if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "invalid opcode: %d", op.code)
}
return Pattern{}, ErrInvalidPattern
@@ -159,13 +143,12 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
typedOps = append(typedOps, op)
}
return Pattern{
ops: typedOps,
pool: pool,
vars: vars,
stacksize: maxstack,
tailLen: tailLen,
verb: verb,
assumeColonVerb: options.assumeColonVerb,
ops: typedOps,
pool: pool,
vars: vars,
stacksize: maxstack,
tailLen: tailLen,
verb: verb,
}, nil
}
@@ -185,7 +168,7 @@ func MustPattern(p Pattern, err error) Pattern {
//nolint:gocyclo
func (p Pattern) Match(components []string, verb string) (map[string]string, error) {
if p.verb != verb {
if p.assumeColonVerb || p.verb != "" {
if p.verb != "" {
return nil, ErrNotMatch
}
if len(components) == 0 {
@@ -274,11 +257,3 @@ func (p Pattern) String() string {
}
return "/" + segs
}
// AssumeColonVerbOpt indicates whether a path suffix after a final
// colon may only be interpreted as a verb.
func AssumeColonVerbOpt(val bool) PatternOpt {
return PatternOpt(func(o *patternOptions) {
o.assumeColonVerb = val
})
}

View File

@@ -8,9 +8,9 @@ import (
)
type template struct {
segments []segment
verb string
template string
segments []segment
}
type segment interface {

View File

@@ -4,7 +4,7 @@ import (
"encoding/base64"
"time"
"github.com/dgrijalva/jwt-go"
"github.com/golang-jwt/jwt"
"github.com/unistack-org/micro/v3/auth"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/util/token"