replace wrappers with hooks #339
@ -48,6 +48,17 @@ type Broker interface {
|
|||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
FuncPublish func(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
|
||||||
|
HookPublish func(next FuncPublish) FuncPublish
|
||||||
|
FuncBatchPublish func(ctx context.Context, msgs []*Message, opts ...PublishOption) error
|
||||||
|
HookBatchPublish func(next FuncBatchPublish) FuncBatchPublish
|
||||||
|
FuncSubscribe func(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
|
||||||
|
HookSubscribe func(next FuncSubscribe) FuncSubscribe
|
||||||
|
FuncBatchSubscribe func(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
|
||||||
|
HookBatchSubscribe func(next FuncBatchSubscribe) FuncBatchSubscribe
|
||||||
|
)
|
||||||
|
|
||||||
// Handler is used to process messages via a subscription of a topic.
|
// Handler is used to process messages via a subscription of a topic.
|
||||||
type Handler func(Event) error
|
type Handler func(Event) error
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v3/broker"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
maddr "go.unistack.org/micro/v3/util/addr"
|
maddr "go.unistack.org/micro/v3/util/addr"
|
||||||
"go.unistack.org/micro/v3/util/id"
|
"go.unistack.org/micro/v3/util/id"
|
||||||
mnet "go.unistack.org/micro/v3/util/net"
|
mnet "go.unistack.org/micro/v3/util/net"
|
||||||
@ -14,6 +15,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type memoryBroker struct {
|
type memoryBroker struct {
|
||||||
|
funcPublish broker.FuncPublish
|
||||||
|
funcBatchPublish broker.FuncBatchPublish
|
||||||
|
funcSubscribe broker.FuncSubscribe
|
||||||
|
funcBatchSubscribe broker.FuncBatchSubscribe
|
||||||
subscribers map[string][]*memorySubscriber
|
subscribers map[string][]*memorySubscriber
|
||||||
addr string
|
addr string
|
||||||
opts broker.Options
|
opts broker.Options
|
||||||
@ -98,15 +103,42 @@ func (m *memoryBroker) Init(opts ...broker.Option) error {
|
|||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&m.opts)
|
o(&m.opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.funcPublish = m.fnPublish
|
||||||
|
m.funcBatchPublish = m.fnBatchPublish
|
||||||
|
m.funcSubscribe = m.fnSubscribe
|
||||||
|
m.funcBatchSubscribe = m.fnBatchSubscribe
|
||||||
|
|
||||||
|
m.opts.Hooks.EachNext(func(hook options.Hook) {
|
||||||
|
switch h := hook.(type) {
|
||||||
|
case broker.HookPublish:
|
||||||
|
m.funcPublish = h(m.funcPublish)
|
||||||
|
case broker.HookBatchPublish:
|
||||||
|
m.funcBatchPublish = h(m.funcBatchPublish)
|
||||||
|
case broker.HookSubscribe:
|
||||||
|
m.funcSubscribe = h(m.funcSubscribe)
|
||||||
|
case broker.HookBatchSubscribe:
|
||||||
|
m.funcBatchSubscribe = h(m.funcBatchSubscribe)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||||
|
return m.funcPublish(ctx, topic, msg, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryBroker) fnPublish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||||
msg.Header.Set(metadata.HeaderTopic, topic)
|
msg.Header.Set(metadata.HeaderTopic, topic)
|
||||||
return m.publish(ctx, []*broker.Message{msg}, opts...)
|
return m.publish(ctx, []*broker.Message{msg}, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||||
|
return m.funcBatchPublish(ctx, msgs, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryBroker) fnBatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||||
return m.publish(ctx, msgs, opts...)
|
return m.publish(ctx, msgs, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,6 +234,10 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
|
return m.funcBatchSubscribe(ctx, topic, handler, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryBroker) fnBatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
m.RLock()
|
m.RLock()
|
||||||
if !m.connected {
|
if !m.connected {
|
||||||
m.RUnlock()
|
m.RUnlock()
|
||||||
@ -247,6 +283,10 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
|
return m.funcSubscribe(ctx, topic, handler, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
m.RLock()
|
m.RLock()
|
||||||
if !m.connected {
|
if !m.connected {
|
||||||
m.RUnlock()
|
m.RUnlock()
|
||||||
|
@ -13,6 +13,10 @@ func TestMemoryBatchBroker(t *testing.T) {
|
|||||||
b := NewBroker()
|
b := NewBroker()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
if err := b.Init(); err != nil {
|
||||||
|
t.Fatalf("Unexpected init error %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := b.Connect(ctx); err != nil {
|
if err := b.Connect(ctx); err != nil {
|
||||||
t.Fatalf("Unexpected connect error %v", err)
|
t.Fatalf("Unexpected connect error %v", err)
|
||||||
}
|
}
|
||||||
@ -59,6 +63,10 @@ func TestMemoryBroker(t *testing.T) {
|
|||||||
b := NewBroker()
|
b := NewBroker()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
if err := b.Init(); err != nil {
|
||||||
|
t.Fatalf("Unexpected init error %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := b.Connect(ctx); err != nil {
|
if err := b.Connect(ctx); err != nil {
|
||||||
t.Fatalf("Unexpected connect error %v", err)
|
t.Fatalf("Unexpected connect error %v", err)
|
||||||
}
|
}
|
||||||
|
@ -3,14 +3,25 @@ package broker
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NoopBroker struct {
|
type NoopBroker struct {
|
||||||
|
funcPublish FuncPublish
|
||||||
|
funcBatchPublish FuncBatchPublish
|
||||||
|
funcSubscribe FuncSubscribe
|
||||||
|
funcBatchSubscribe FuncBatchSubscribe
|
||||||
opts Options
|
opts Options
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBroker(opts ...Option) *NoopBroker {
|
func NewBroker(opts ...Option) *NoopBroker {
|
||||||
b := &NoopBroker{opts: NewOptions(opts...)}
|
b := &NoopBroker{opts: NewOptions(opts...)}
|
||||||
|
b.funcPublish = b.fnPublish
|
||||||
|
b.funcBatchPublish = b.fnBatchPublish
|
||||||
|
b.funcSubscribe = b.fnSubscribe
|
||||||
|
b.funcBatchSubscribe = b.fnBatchSubscribe
|
||||||
|
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -30,6 +41,25 @@ func (b *NoopBroker) Init(opts ...Option) error {
|
|||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
opt(&b.opts)
|
opt(&b.opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.funcPublish = b.fnPublish
|
||||||
|
b.funcBatchPublish = b.fnBatchPublish
|
||||||
|
b.funcSubscribe = b.fnSubscribe
|
||||||
|
b.funcBatchSubscribe = b.fnBatchSubscribe
|
||||||
|
|
||||||
|
b.opts.Hooks.EachNext(func(hook options.Hook) {
|
||||||
|
switch h := hook.(type) {
|
||||||
|
case HookPublish:
|
||||||
|
b.funcPublish = h(b.funcPublish)
|
||||||
|
case HookBatchPublish:
|
||||||
|
b.funcBatchPublish = h(b.funcBatchPublish)
|
||||||
|
case HookSubscribe:
|
||||||
|
b.funcSubscribe = h(b.funcSubscribe)
|
||||||
|
case HookBatchSubscribe:
|
||||||
|
b.funcBatchSubscribe = h(b.funcBatchSubscribe)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,14 +75,22 @@ func (b *NoopBroker) Address() string {
|
|||||||
return strings.Join(b.opts.Addrs, ",")
|
return strings.Join(b.opts.Addrs, ",")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *NoopBroker) BatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error {
|
func (b *NoopBroker) fnBatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *NoopBroker) Publish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error {
|
func (b *NoopBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
|
||||||
|
return b.funcBatchPublish(ctx, msgs, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *NoopBroker) fnPublish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *NoopBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
|
||||||
|
return b.funcPublish(ctx, topic, msg, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
type NoopSubscriber struct {
|
type NoopSubscriber struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
topic string
|
topic string
|
||||||
@ -61,14 +99,22 @@ type NoopSubscriber struct {
|
|||||||
opts SubscribeOptions
|
opts SubscribeOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
|
func (b *NoopBroker) fnBatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
|
||||||
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil
|
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
|
||||||
|
return b.funcBatchSubscribe(ctx, topic, handler, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *NoopBroker) fnSubscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||||
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil
|
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||||
|
return b.funcSubscribe(ctx, topic, handler, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *NoopSubscriber) Options() SubscribeOptions {
|
func (s *NoopSubscriber) Options() SubscribeOptions {
|
||||||
return s.opts
|
return s.opts
|
||||||
}
|
}
|
||||||
@ -77,6 +123,6 @@ func (s *NoopSubscriber) Topic() string {
|
|||||||
return s.topic
|
return s.topic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *NoopSubscriber) Unsubscribe(ctx context.Context) error {
|
func (s *NoopSubscriber) Unsubscribe(_ context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
35
broker/noop_test.go
Normal file
35
broker/noop_test.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package broker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testHook struct {
|
||||||
|
f bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testHook) Publish1(fn FuncPublish) FuncPublish {
|
||||||
|
return func(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
|
||||||
|
t.f = true
|
||||||
|
return fn(ctx, topic, msg, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoopHook(t *testing.T) {
|
||||||
|
h := &testHook{}
|
||||||
|
|
||||||
|
b := NewBroker(Hooks(HookPublish(h.Publish1)))
|
||||||
|
|
||||||
|
if err := b.Init(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.Publish(context.TODO(), "", nil); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !h.f {
|
||||||
|
t.Fatal("hook not works")
|
||||||
|
}
|
||||||
|
}
|
@ -8,6 +8,7 @@ import (
|
|||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v3/meter"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v3/register"
|
||||||
"go.unistack.org/micro/v3/sync"
|
"go.unistack.org/micro/v3/sync"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
@ -37,10 +38,13 @@ type Options struct {
|
|||||||
Name string
|
Name string
|
||||||
// Addrs holds the broker address
|
// Addrs holds the broker address
|
||||||
Addrs []string
|
Addrs []string
|
||||||
|
// Wait waits for a collection of goroutines to finish
|
||||||
Wait *sync.WaitGroup
|
Wait *sync.WaitGroup
|
||||||
|
// GracefulTimeout contains time to wait to finish in flight requests
|
||||||
GracefulTimeout time.Duration
|
GracefulTimeout time.Duration
|
||||||
|
// Hooks can be run before broker Publish/BatchPublish and
|
||||||
|
// Subscribe/BatchSubscribe methods
|
||||||
|
Hooks options.Hooks
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOptions create new Options
|
// NewOptions create new Options
|
||||||
@ -230,6 +234,13 @@ func Name(n string) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Hooks sets hook runs before action
|
||||||
|
func Hooks(h ...options.Hook) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Hooks = append(o.Hooks, h...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SubscribeContext set context
|
// SubscribeContext set context
|
||||||
func SubscribeContext(ctx context.Context) SubscribeOption {
|
func SubscribeContext(ctx context.Context) SubscribeOption {
|
||||||
return func(o *SubscribeOptions) {
|
return func(o *SubscribeOptions) {
|
||||||
|
@ -44,6 +44,17 @@ type Client interface {
|
|||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
FuncCall func(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
|
||||||
|
HookCall func(next FuncCall) FuncCall
|
||||||
|
FuncStream func(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
|
||||||
|
HookStream func(next FuncStream) FuncStream
|
||||||
|
FuncPublish func(ctx context.Context, msg Message, opts ...PublishOption) error
|
||||||
|
HookPublish func(next FuncPublish) FuncPublish
|
||||||
|
FuncBatchPublish func(ctx context.Context, msg []Message, opts ...PublishOption) error
|
||||||
|
HookBatchPublish func(next FuncBatchPublish) FuncBatchPublish
|
||||||
|
)
|
||||||
|
|
||||||
// Message is the interface for publishing asynchronously
|
// Message is the interface for publishing asynchronously
|
||||||
type Message interface {
|
type Message interface {
|
||||||
Topic() string
|
Topic() string
|
||||||
|
@ -1,26 +0,0 @@
|
|||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestNewClientCallOptions(t *testing.T) {
|
|
||||||
var flag bool
|
|
||||||
w := func(fn CallFunc) CallFunc {
|
|
||||||
flag = true
|
|
||||||
return fn
|
|
||||||
}
|
|
||||||
c := NewClientCallOptions(NewClient(),
|
|
||||||
WithAddress("127.0.0.1"),
|
|
||||||
WithCallWrapper(w),
|
|
||||||
WithRequestTimeout(1*time.Millisecond),
|
|
||||||
WithRetries(0),
|
|
||||||
WithBackoff(BackoffInterval(10*time.Millisecond, 100*time.Millisecond)),
|
|
||||||
)
|
|
||||||
_ = c.Call(context.TODO(), c.NewRequest("service", "endpoint", nil), nil)
|
|
||||||
if !flag {
|
|
||||||
t.Fatalf("NewClientCallOptions not works")
|
|
||||||
}
|
|
||||||
}
|
|
@ -10,6 +10,7 @@ import (
|
|||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v3/errors"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v3/selector"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -19,6 +20,10 @@ var DefaultCodecs = map[string]codec.Codec{
|
|||||||
}
|
}
|
||||||
|
|
||||||
type noopClient struct {
|
type noopClient struct {
|
||||||
|
funcPublish FuncPublish
|
||||||
|
funcBatchPublish FuncBatchPublish
|
||||||
|
funcCall FuncCall
|
||||||
|
funcStream FuncStream
|
||||||
opts Options
|
opts Options
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -40,16 +45,14 @@ type noopRequest struct {
|
|||||||
|
|
||||||
// NewClient returns new noop client
|
// NewClient returns new noop client
|
||||||
func NewClient(opts ...Option) Client {
|
func NewClient(opts ...Option) Client {
|
||||||
nc := &noopClient{opts: NewOptions(opts...)}
|
n := &noopClient{opts: NewOptions(opts...)}
|
||||||
// wrap in reverse
|
|
||||||
|
|
||||||
c := Client(nc)
|
n.funcCall = n.fnCall
|
||||||
|
n.funcStream = n.fnStream
|
||||||
|
n.funcPublish = n.fnPublish
|
||||||
|
n.funcBatchPublish = n.fnBatchPublish
|
||||||
|
|
||||||
for i := len(nc.opts.Wrappers); i > 0; i-- {
|
return n
|
||||||
c = nc.opts.Wrappers[i-1](c)
|
|
||||||
}
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) Name() string {
|
func (n *noopClient) Name() string {
|
||||||
@ -173,6 +176,25 @@ func (n *noopClient) Init(opts ...Option) error {
|
|||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&n.opts)
|
o(&n.opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
n.funcCall = n.fnCall
|
||||||
|
n.funcStream = n.fnStream
|
||||||
|
n.funcPublish = n.fnPublish
|
||||||
|
n.funcBatchPublish = n.fnBatchPublish
|
||||||
|
|
||||||
|
n.opts.Hooks.EachNext(func(hook options.Hook) {
|
||||||
|
switch h := hook.(type) {
|
||||||
|
case HookCall:
|
||||||
|
n.funcCall = h(n.funcCall)
|
||||||
|
case HookStream:
|
||||||
|
n.funcStream = h(n.funcStream)
|
||||||
|
case HookPublish:
|
||||||
|
n.funcPublish = h(n.funcPublish)
|
||||||
|
case HookBatchPublish:
|
||||||
|
n.funcBatchPublish = h(n.funcBatchPublish)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -185,6 +207,10 @@ func (n *noopClient) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
||||||
|
return n.funcCall(ctx, req, rsp, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error {
|
||||||
// make a copy of call opts
|
// make a copy of call opts
|
||||||
callOpts := n.opts.CallOptions
|
callOpts := n.opts.CallOptions
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
@ -213,11 +239,8 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
|
|||||||
}
|
}
|
||||||
|
|
||||||
// make copy of call method
|
// make copy of call method
|
||||||
hcall := n.call
|
hcall := func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
||||||
|
return nil
|
||||||
// wrap the call in reverse
|
|
||||||
for i := len(callOpts.CallWrappers); i > 0; i-- {
|
|
||||||
hcall = callOpts.CallWrappers[i-1](hcall)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// use the router passed as a call option, or fallback to the rpc clients router
|
// use the router passed as a call option, or fallback to the rpc clients router
|
||||||
@ -316,10 +339,6 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
|
|||||||
return gerr
|
return gerr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) call(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts ...RequestOption) Request {
|
func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts ...RequestOption) Request {
|
||||||
return &noopRequest{service: service, endpoint: endpoint}
|
return &noopRequest{service: service, endpoint: endpoint}
|
||||||
}
|
}
|
||||||
@ -330,6 +349,10 @@ func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOp
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
||||||
|
return n.funcStream(ctx, req, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// make a copy of call opts
|
// make a copy of call opts
|
||||||
@ -474,10 +497,18 @@ func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
||||||
|
return n.funcBatchPublish(ctx, ps, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClient) fnBatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
||||||
return n.publish(ctx, ps, opts...)
|
return n.publish(ctx, ps, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
|
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
|
||||||
|
return n.funcPublish(ctx, p, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopClient) fnPublish(ctx context.Context, p Message, opts ...PublishOption) error {
|
||||||
return n.publish(ctx, []Message{p}, opts...)
|
return n.publish(ctx, []Message{p}, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -538,6 +569,13 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
|
|||||||
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(msgs) == 1 {
|
||||||
|
return n.opts.Broker.Publish(ctx, msgs[0].Header[metadata.HeaderTopic], msgs[0],
|
||||||
|
broker.PublishContext(options.Context),
|
||||||
|
broker.PublishBodyOnly(options.BodyOnly),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
return n.opts.Broker.BatchPublish(ctx, msgs,
|
return n.opts.Broker.BatchPublish(ctx, msgs,
|
||||||
broker.PublishContext(options.Context),
|
broker.PublishContext(options.Context),
|
||||||
broker.PublishBodyOnly(options.BodyOnly),
|
broker.PublishBodyOnly(options.BodyOnly),
|
||||||
|
35
client/noop_test.go
Normal file
35
client/noop_test.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testHook struct {
|
||||||
|
f bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testHook) Publish(fn FuncPublish) FuncPublish {
|
||||||
|
return func(ctx context.Context, msg Message, opts ...PublishOption) error {
|
||||||
|
t.f = true
|
||||||
|
return fn(ctx, msg, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoopHook(t *testing.T) {
|
||||||
|
h := &testHook{}
|
||||||
|
|
||||||
|
c := NewClient(Hooks(HookPublish(h.Publish)))
|
||||||
|
|
||||||
|
if err := c.Init(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Publish(context.TODO(), c.NewMessage("", nil, MessageContentType("application/octet-stream"))); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !h.f {
|
||||||
|
t.Fatal("hook not works")
|
||||||
|
}
|
||||||
|
}
|
@ -12,6 +12,7 @@ import (
|
|||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v3/meter"
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
"go.unistack.org/micro/v3/network/transport"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v3/register"
|
||||||
"go.unistack.org/micro/v3/router"
|
"go.unistack.org/micro/v3/router"
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v3/selector"
|
||||||
@ -59,6 +60,9 @@ type Options struct {
|
|||||||
PoolTTL time.Duration
|
PoolTTL time.Duration
|
||||||
// ContextDialer used to connect
|
// ContextDialer used to connect
|
||||||
ContextDialer func(context.Context, string) (net.Conn, error)
|
ContextDialer func(context.Context, string) (net.Conn, error)
|
||||||
|
// Hooks can be run before broker Publish/BatchPublish and
|
||||||
|
// Subscribe/BatchSubscribe methods
|
||||||
|
Hooks options.Hooks
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCallOptions creates new call options struct
|
// NewCallOptions creates new call options struct
|
||||||
@ -92,8 +96,6 @@ type CallOptions struct {
|
|||||||
Address []string
|
Address []string
|
||||||
// SelectOptions selector options
|
// SelectOptions selector options
|
||||||
SelectOptions []selector.SelectOption
|
SelectOptions []selector.SelectOption
|
||||||
// CallWrappers call wrappers
|
|
||||||
CallWrappers []CallWrapper
|
|
||||||
// StreamTimeout stream timeout
|
// StreamTimeout stream timeout
|
||||||
StreamTimeout time.Duration
|
StreamTimeout time.Duration
|
||||||
// RequestTimeout request timeout
|
// RequestTimeout request timeout
|
||||||
@ -185,7 +187,7 @@ func NewOptions(opts ...Option) Options {
|
|||||||
options := Options{
|
options := Options{
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
ContentType: DefaultContentType,
|
ContentType: DefaultContentType,
|
||||||
Codecs: make(map[string]codec.Codec),
|
Codecs: DefaultCodecs,
|
||||||
CallOptions: CallOptions{
|
CallOptions: CallOptions{
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
Backoff: DefaultBackoff,
|
Backoff: DefaultBackoff,
|
||||||
@ -306,20 +308,6 @@ func Selector(s selector.Selector) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wrap adds a wrapper to the list of options passed into the client
|
|
||||||
func Wrap(w Wrapper) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Wrappers = append(o.Wrappers, w)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WrapCall adds a wrapper to the list of CallFunc wrappers
|
|
||||||
func WrapCall(cw ...CallWrapper) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.CallOptions.CallWrappers = append(o.CallOptions.CallWrappers, cw...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Backoff is used to set the backoff function used when retrying Calls
|
// Backoff is used to set the backoff function used when retrying Calls
|
||||||
func Backoff(fn BackoffFunc) Option {
|
func Backoff(fn BackoffFunc) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
@ -450,13 +438,6 @@ func WithAddress(a ...string) CallOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithCallWrapper is a CallOption which adds to the existing CallFunc wrappers
|
|
||||||
func WithCallWrapper(cw ...CallWrapper) CallOption {
|
|
||||||
return func(o *CallOptions) {
|
|
||||||
o.CallWrappers = append(o.CallWrappers, cw...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithBackoff is a CallOption which overrides that which
|
// WithBackoff is a CallOption which overrides that which
|
||||||
// set in Options.CallOptions
|
// set in Options.CallOptions
|
||||||
func WithBackoff(fn BackoffFunc) CallOption {
|
func WithBackoff(fn BackoffFunc) CallOption {
|
||||||
@ -591,3 +572,10 @@ func RequestContentType(ct string) RequestOption {
|
|||||||
o.ContentType = ct
|
o.ContentType = ct
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Hooks sets hook runs before action
|
||||||
|
func Hooks(h ...options.Hook) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Hooks = append(o.Hooks, h...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -50,6 +50,13 @@ type Config interface {
|
|||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
FuncLoad func(ctx context.Context, opts ...LoadOption) error
|
||||||
|
HookLoad func(next FuncLoad) FuncLoad
|
||||||
|
FuncSave func(ctx context.Context, opts ...SaveOption) error
|
||||||
|
HookSave func(next FuncSave) FuncSave
|
||||||
|
)
|
||||||
|
|
||||||
// Watcher is the config watcher
|
// Watcher is the config watcher
|
||||||
type Watcher interface {
|
type Watcher interface {
|
||||||
// Next blocks until update happens or error returned
|
// Next blocks until update happens or error returned
|
||||||
|
@ -9,12 +9,15 @@ import (
|
|||||||
|
|
||||||
"dario.cat/mergo"
|
"dario.cat/mergo"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
mid "go.unistack.org/micro/v3/util/id"
|
mid "go.unistack.org/micro/v3/util/id"
|
||||||
rutil "go.unistack.org/micro/v3/util/reflect"
|
rutil "go.unistack.org/micro/v3/util/reflect"
|
||||||
mtime "go.unistack.org/micro/v3/util/time"
|
mtime "go.unistack.org/micro/v3/util/time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type defaultConfig struct {
|
type defaultConfig struct {
|
||||||
|
funcLoad FuncLoad
|
||||||
|
funcSave FuncSave
|
||||||
opts Options
|
opts Options
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -31,6 +34,18 @@ func (c *defaultConfig) Init(opts ...Option) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.funcLoad = c.fnLoad
|
||||||
|
c.funcSave = c.fnSave
|
||||||
|
|
||||||
|
c.opts.Hooks.EachNext(func(hook options.Hook) {
|
||||||
|
switch h := hook.(type) {
|
||||||
|
case HookLoad:
|
||||||
|
c.funcLoad = h(c.funcLoad)
|
||||||
|
case HookSave:
|
||||||
|
c.funcSave = h(c.funcSave)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
if err := DefaultAfterInit(c.opts.Context, c); err != nil && !c.opts.AllowFail {
|
if err := DefaultAfterInit(c.opts.Context, c); err != nil && !c.opts.AllowFail {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -39,11 +54,17 @@ func (c *defaultConfig) Init(opts ...Option) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
|
func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error {
|
||||||
|
return c.funcLoad(ctx, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *defaultConfig) fnLoad(ctx context.Context, opts ...LoadOption) error {
|
||||||
|
var err error
|
||||||
|
|
||||||
if c.opts.SkipLoad != nil && c.opts.SkipLoad(ctx, c) {
|
if c.opts.SkipLoad != nil && c.opts.SkipLoad(ctx, c) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := DefaultBeforeLoad(ctx, c); err != nil && !c.opts.AllowFail {
|
if err = DefaultBeforeLoad(ctx, c); err != nil && !c.opts.AllowFail {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,6 +254,7 @@ func fillValue(value reflect.Value, val string) error {
|
|||||||
}
|
}
|
||||||
value.Set(reflect.ValueOf(v))
|
value.Set(reflect.ValueOf(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -295,7 +317,11 @@ func fillValues(valueOf reflect.Value, tname string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *defaultConfig) Save(ctx context.Context, _ ...SaveOption) error {
|
func (c *defaultConfig) Save(ctx context.Context, opts ...SaveOption) error {
|
||||||
|
return c.funcSave(ctx, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *defaultConfig) fnSave(ctx context.Context, opts ...SaveOption) error {
|
||||||
if c.opts.SkipSave != nil && c.opts.SkipSave(ctx, c) {
|
if c.opts.SkipSave != nil && c.opts.SkipSave(ctx, c) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -319,7 +345,7 @@ func (c *defaultConfig) Name() string {
|
|||||||
return c.opts.Name
|
return c.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
|
func (c *defaultConfig) Watch(_ context.Context, _ ...WatchOption) (Watcher, error) {
|
||||||
return nil, ErrWatcherNotImplemented
|
return nil, ErrWatcherNotImplemented
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -329,5 +355,9 @@ func NewConfig(opts ...Option) Config {
|
|||||||
if len(options.StructTag) == 0 {
|
if len(options.StructTag) == 0 {
|
||||||
options.StructTag = "default"
|
options.StructTag = "default"
|
||||||
}
|
}
|
||||||
return &defaultConfig{opts: options}
|
c := &defaultConfig{opts: options}
|
||||||
|
c.funcLoad = c.fnLoad
|
||||||
|
c.funcSave = c.fnSave
|
||||||
|
|
||||||
|
return c
|
||||||
}
|
}
|
||||||
|
@ -41,6 +41,35 @@ func (c *cfgStructValue) Validate() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testHook struct {
|
||||||
|
f bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testHook) Load(fn config.FuncLoad) config.FuncLoad {
|
||||||
|
return func(ctx context.Context, opts ...config.LoadOption) error {
|
||||||
|
t.f = true
|
||||||
|
return fn(ctx, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHook(t *testing.T) {
|
||||||
|
h := &testHook{}
|
||||||
|
|
||||||
|
c := config.NewConfig(config.Struct(h), config.Hooks(config.HookLoad(h.Load)))
|
||||||
|
|
||||||
|
if err := c.Init(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Load(context.TODO()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !h.f {
|
||||||
|
t.Fatal("hook not works")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDefault(t *testing.T) {
|
func TestDefault(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
conf := &cfg{IntValue: 10}
|
conf := &cfg{IntValue: 10}
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v3/meter"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -46,6 +47,8 @@ type Options struct {
|
|||||||
SkipLoad func(context.Context, Config) bool
|
SkipLoad func(context.Context, Config) bool
|
||||||
// SkipSave runs only if condition returns true
|
// SkipSave runs only if condition returns true
|
||||||
SkipSave func(context.Context, Config) bool
|
SkipSave func(context.Context, Config) bool
|
||||||
|
// Hooks can be run before/after config Save/Load
|
||||||
|
Hooks options.Hooks
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option function signature
|
// Option function signature
|
||||||
@ -288,3 +291,10 @@ func WatchStruct(src interface{}) WatchOption {
|
|||||||
o.Struct = src
|
o.Struct = src
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Hooks sets hook runs before action
|
||||||
|
func Hooks(h ...options.Hook) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Hooks = append(o.Hooks, h...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package options // import "go.unistack.org/micro/v3/options"
|
package options
|
||||||
|
|
||||||
// Hook func interface
|
// Hook func interface
|
||||||
type Hook interface{}
|
type Hook interface{}
|
||||||
|
@ -1,59 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"reflect"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/register"
|
|
||||||
)
|
|
||||||
|
|
||||||
type rpcHandler struct {
|
|
||||||
opts HandlerOptions
|
|
||||||
handler interface{}
|
|
||||||
name string
|
|
||||||
endpoints []*register.Endpoint
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
|
|
||||||
options := NewHandlerOptions(opts...)
|
|
||||||
|
|
||||||
typ := reflect.TypeOf(handler)
|
|
||||||
hdlr := reflect.ValueOf(handler)
|
|
||||||
name := reflect.Indirect(hdlr).Type().Name()
|
|
||||||
|
|
||||||
var endpoints []*register.Endpoint
|
|
||||||
|
|
||||||
for m := 0; m < typ.NumMethod(); m++ {
|
|
||||||
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
|
|
||||||
e.Name = name + "." + e.Name
|
|
||||||
|
|
||||||
for k, v := range options.Metadata[e.Name] {
|
|
||||||
e.Metadata[k] = v
|
|
||||||
}
|
|
||||||
|
|
||||||
endpoints = append(endpoints, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &rpcHandler{
|
|
||||||
name: name,
|
|
||||||
handler: handler,
|
|
||||||
endpoints: endpoints,
|
|
||||||
opts: options,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcHandler) Name() string {
|
|
||||||
return r.name
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcHandler) Handler() interface{} {
|
|
||||||
return r.handler
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcHandler) Endpoints() []*register.Endpoint {
|
|
||||||
return r.endpoints
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcHandler) Options() HandlerOptions {
|
|
||||||
return r.opts
|
|
||||||
}
|
|
425
server/noop.go
425
server/noop.go
@ -1,14 +1,22 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"runtime/debug"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v3/broker"
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
|
"go.unistack.org/micro/v3/errors"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
|
"go.unistack.org/micro/v3/metadata"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v3/register"
|
||||||
maddr "go.unistack.org/micro/v3/util/addr"
|
maddr "go.unistack.org/micro/v3/util/addr"
|
||||||
mnet "go.unistack.org/micro/v3/util/net"
|
mnet "go.unistack.org/micro/v3/util/net"
|
||||||
@ -24,6 +32,58 @@ const (
|
|||||||
defaultContentType = "application/json"
|
defaultContentType = "application/json"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type rpcHandler struct {
|
||||||
|
opts HandlerOptions
|
||||||
|
handler interface{}
|
||||||
|
name string
|
||||||
|
endpoints []*register.Endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
|
||||||
|
options := NewHandlerOptions(opts...)
|
||||||
|
|
||||||
|
typ := reflect.TypeOf(handler)
|
||||||
|
hdlr := reflect.ValueOf(handler)
|
||||||
|
name := reflect.Indirect(hdlr).Type().Name()
|
||||||
|
|
||||||
|
var endpoints []*register.Endpoint
|
||||||
|
|
||||||
|
for m := 0; m < typ.NumMethod(); m++ {
|
||||||
|
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
|
||||||
|
e.Name = name + "." + e.Name
|
||||||
|
|
||||||
|
for k, v := range options.Metadata[e.Name] {
|
||||||
|
e.Metadata[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoints = append(endpoints, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &rpcHandler{
|
||||||
|
name: name,
|
||||||
|
handler: handler,
|
||||||
|
endpoints: endpoints,
|
||||||
|
opts: options,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcHandler) Name() string {
|
||||||
|
return r.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcHandler) Handler() interface{} {
|
||||||
|
return r.handler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcHandler) Endpoints() []*register.Endpoint {
|
||||||
|
return r.endpoints
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcHandler) Options() HandlerOptions {
|
||||||
|
return r.opts
|
||||||
|
}
|
||||||
|
|
||||||
type noopServer struct {
|
type noopServer struct {
|
||||||
h Handler
|
h Handler
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
@ -94,6 +154,35 @@ func (n *noopServer) Subscribe(sb Subscriber) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rpcMessage struct {
|
||||||
|
payload interface{}
|
||||||
|
codec codec.Codec
|
||||||
|
header metadata.Metadata
|
||||||
|
topic string
|
||||||
|
contentType string
|
||||||
|
body []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) ContentType() string {
|
||||||
|
return r.contentType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) Topic() string {
|
||||||
|
return r.topic
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) Body() interface{} {
|
||||||
|
return r.payload
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) Header() metadata.Metadata {
|
||||||
|
return r.header
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rpcMessage) Codec() codec.Codec {
|
||||||
|
return r.codec
|
||||||
|
}
|
||||||
|
|
||||||
func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
|
func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
|
||||||
return newRPCHandler(h, opts...)
|
return newRPCHandler(h, opts...)
|
||||||
}
|
}
|
||||||
@ -478,3 +567,339 @@ func (n *noopServer) Stop() error {
|
|||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
|
||||||
|
var endpoints []*register.Endpoint
|
||||||
|
var handlers []*handler
|
||||||
|
|
||||||
|
options := NewSubscriberOptions(opts...)
|
||||||
|
|
||||||
|
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
|
||||||
|
h := &handler{
|
||||||
|
method: reflect.ValueOf(sub),
|
||||||
|
}
|
||||||
|
|
||||||
|
switch typ.NumIn() {
|
||||||
|
case 1:
|
||||||
|
h.reqType = typ.In(0)
|
||||||
|
case 2:
|
||||||
|
h.ctxType = typ.In(0)
|
||||||
|
h.reqType = typ.In(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
handlers = append(handlers, h)
|
||||||
|
ep := ®ister.Endpoint{
|
||||||
|
Name: "Func",
|
||||||
|
Request: register.ExtractSubValue(typ),
|
||||||
|
Metadata: metadata.New(2),
|
||||||
|
}
|
||||||
|
ep.Metadata.Set("topic", topic)
|
||||||
|
ep.Metadata.Set("subscriber", "true")
|
||||||
|
endpoints = append(endpoints, ep)
|
||||||
|
} else {
|
||||||
|
hdlr := reflect.ValueOf(sub)
|
||||||
|
name := reflect.Indirect(hdlr).Type().Name()
|
||||||
|
|
||||||
|
for m := 0; m < typ.NumMethod(); m++ {
|
||||||
|
method := typ.Method(m)
|
||||||
|
h := &handler{
|
||||||
|
method: method.Func,
|
||||||
|
}
|
||||||
|
|
||||||
|
switch method.Type.NumIn() {
|
||||||
|
case 2:
|
||||||
|
h.reqType = method.Type.In(1)
|
||||||
|
case 3:
|
||||||
|
h.ctxType = method.Type.In(1)
|
||||||
|
h.reqType = method.Type.In(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
handlers = append(handlers, h)
|
||||||
|
ep := ®ister.Endpoint{
|
||||||
|
Name: name + "." + method.Name,
|
||||||
|
Request: register.ExtractSubValue(method.Type),
|
||||||
|
Metadata: metadata.New(2),
|
||||||
|
}
|
||||||
|
ep.Metadata.Set("topic", topic)
|
||||||
|
ep.Metadata.Set("subscriber", "true")
|
||||||
|
endpoints = append(endpoints, ep)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &subscriber{
|
||||||
|
rcvr: reflect.ValueOf(sub),
|
||||||
|
typ: reflect.TypeOf(sub),
|
||||||
|
topic: topic,
|
||||||
|
subscriber: sub,
|
||||||
|
handlers: handlers,
|
||||||
|
endpoints: endpoints,
|
||||||
|
opts: options,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//nolint:gocyclo
|
||||||
|
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||||
|
return func(ps broker.Events) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
n.RLock()
|
||||||
|
config := n.opts
|
||||||
|
n.RUnlock()
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
|
||||||
|
config.Logger.Error(n.opts.Context, string(debug.Stack()))
|
||||||
|
}
|
||||||
|
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
msgs := make([]Message, 0, len(ps))
|
||||||
|
ctxs := make([]context.Context, 0, len(ps))
|
||||||
|
for _, p := range ps {
|
||||||
|
msg := p.Message()
|
||||||
|
// if we don't have headers, create empty map
|
||||||
|
if msg.Header == nil {
|
||||||
|
msg.Header = metadata.New(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
ct, _ := msg.Header.Get(metadata.HeaderContentType)
|
||||||
|
if len(ct) == 0 {
|
||||||
|
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
|
||||||
|
ct = defaultContentType
|
||||||
|
}
|
||||||
|
hdr := metadata.Copy(msg.Header)
|
||||||
|
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||||
|
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
|
||||||
|
msgs = append(msgs, &rpcMessage{
|
||||||
|
topic: topic,
|
||||||
|
contentType: ct,
|
||||||
|
header: msg.Header,
|
||||||
|
body: msg.Body,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
results := make(chan error, len(sb.handlers))
|
||||||
|
|
||||||
|
for i := 0; i < len(sb.handlers); i++ {
|
||||||
|
handler := sb.handlers[i]
|
||||||
|
|
||||||
|
var req reflect.Value
|
||||||
|
|
||||||
|
switch handler.reqType.Kind() {
|
||||||
|
case reflect.Ptr:
|
||||||
|
req = reflect.New(handler.reqType.Elem())
|
||||||
|
default:
|
||||||
|
req = reflect.New(handler.reqType.Elem()).Elem()
|
||||||
|
}
|
||||||
|
|
||||||
|
reqType := handler.reqType
|
||||||
|
var cf codec.Codec
|
||||||
|
for _, msg := range msgs {
|
||||||
|
cf, err = n.newCodec(msg.ContentType())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rb := reflect.New(req.Type().Elem())
|
||||||
|
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
msg.(*rpcMessage).codec = cf
|
||||||
|
msg.(*rpcMessage).payload = rb.Interface()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := func(ctxs []context.Context, ms []Message) error {
|
||||||
|
var vals []reflect.Value
|
||||||
|
if sb.typ.Kind() != reflect.Func {
|
||||||
|
vals = append(vals, sb.rcvr)
|
||||||
|
}
|
||||||
|
if handler.ctxType != nil {
|
||||||
|
vals = append(vals, reflect.ValueOf(ctxs))
|
||||||
|
}
|
||||||
|
payloads := reflect.MakeSlice(reqType, 0, len(ms))
|
||||||
|
for _, m := range ms {
|
||||||
|
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
|
||||||
|
}
|
||||||
|
vals = append(vals, payloads)
|
||||||
|
|
||||||
|
returnValues := handler.method.Call(vals)
|
||||||
|
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||||
|
return rerr.(error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
opts.Hooks.EachNext(func(hook options.Hook) {
|
||||||
|
if h, ok := hook.(HookBatchSubHandler); ok {
|
||||||
|
fn = h(fn)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if n.wg != nil {
|
||||||
|
n.wg.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if n.wg != nil {
|
||||||
|
defer n.wg.Done()
|
||||||
|
}
|
||||||
|
results <- fn(ctxs, msgs)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var errors []string
|
||||||
|
for i := 0; i < len(sb.handlers); i++ {
|
||||||
|
if rerr := <-results; rerr != nil {
|
||||||
|
errors = append(errors, rerr.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(errors) > 0 {
|
||||||
|
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//nolint:gocyclo
|
||||||
|
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||||
|
return func(p broker.Event) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
n.RLock()
|
||||||
|
config := n.opts
|
||||||
|
n.RUnlock()
|
||||||
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
|
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
|
||||||
|
config.Logger.Error(n.opts.Context, string(debug.Stack()))
|
||||||
|
}
|
||||||
|
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
msg := p.Message()
|
||||||
|
// if we don't have headers, create empty map
|
||||||
|
if msg.Header == nil {
|
||||||
|
msg.Header = metadata.New(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
ct := msg.Header["Content-Type"]
|
||||||
|
if len(ct) == 0 {
|
||||||
|
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
|
||||||
|
ct = defaultContentType
|
||||||
|
}
|
||||||
|
cf, err := n.newCodec(ct)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
hdr := metadata.New(len(msg.Header))
|
||||||
|
for k, v := range msg.Header {
|
||||||
|
hdr.Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
|
||||||
|
|
||||||
|
results := make(chan error, len(sb.handlers))
|
||||||
|
|
||||||
|
for i := 0; i < len(sb.handlers); i++ {
|
||||||
|
handler := sb.handlers[i]
|
||||||
|
|
||||||
|
var isVal bool
|
||||||
|
var req reflect.Value
|
||||||
|
|
||||||
|
if handler.reqType.Kind() == reflect.Ptr {
|
||||||
|
req = reflect.New(handler.reqType.Elem())
|
||||||
|
} else {
|
||||||
|
req = reflect.New(handler.reqType)
|
||||||
|
isVal = true
|
||||||
|
}
|
||||||
|
if isVal {
|
||||||
|
req = req.Elem()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := func(ctx context.Context, msg Message) error {
|
||||||
|
var vals []reflect.Value
|
||||||
|
if sb.typ.Kind() != reflect.Func {
|
||||||
|
vals = append(vals, sb.rcvr)
|
||||||
|
}
|
||||||
|
if handler.ctxType != nil {
|
||||||
|
vals = append(vals, reflect.ValueOf(ctx))
|
||||||
|
}
|
||||||
|
|
||||||
|
vals = append(vals, reflect.ValueOf(msg.Body()))
|
||||||
|
|
||||||
|
returnValues := handler.method.Call(vals)
|
||||||
|
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||||
|
return rerr.(error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
opts.Hooks.EachNext(func(hook options.Hook) {
|
||||||
|
if h, ok := hook.(HookSubHandler); ok {
|
||||||
|
fn = h(fn)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if n.wg != nil {
|
||||||
|
n.wg.Add(1)
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
if n.wg != nil {
|
||||||
|
defer n.wg.Done()
|
||||||
|
}
|
||||||
|
cerr := fn(ctx, &rpcMessage{
|
||||||
|
topic: sb.topic,
|
||||||
|
contentType: ct,
|
||||||
|
payload: req.Interface(),
|
||||||
|
header: msg.Header,
|
||||||
|
})
|
||||||
|
results <- cerr
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
var errors []string
|
||||||
|
for i := 0; i < len(sb.handlers); i++ {
|
||||||
|
if rerr := <-results; rerr != nil {
|
||||||
|
errors = append(errors, rerr.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(errors) > 0 {
|
||||||
|
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Topic() string {
|
||||||
|
return s.topic
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Subscriber() interface{} {
|
||||||
|
return s.subscriber
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Endpoints() []*register.Endpoint {
|
||||||
|
return s.endpoints
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscriber) Options() SubscriberOptions {
|
||||||
|
return s.opts
|
||||||
|
}
|
||||||
|
|
||||||
|
type subscriber struct {
|
||||||
|
typ reflect.Type
|
||||||
|
subscriber interface{}
|
||||||
|
topic string
|
||||||
|
endpoints []*register.Endpoint
|
||||||
|
handlers []*handler
|
||||||
|
opts SubscriberOptions
|
||||||
|
rcvr reflect.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
type handler struct {
|
||||||
|
reqType reflect.Type
|
||||||
|
ctxType reflect.Type
|
||||||
|
method reflect.Value
|
||||||
|
}
|
||||||
|
@ -69,12 +69,6 @@ type Options struct {
|
|||||||
Advertise string
|
Advertise string
|
||||||
// Version holds the server version
|
// Version holds the server version
|
||||||
Version string
|
Version string
|
||||||
// SubWrappers holds the server subscribe wrappers
|
|
||||||
SubWrappers []SubscriberWrapper
|
|
||||||
// BatchSubWrappers holds the server batch subscribe wrappers
|
|
||||||
BatchSubWrappers []BatchSubscriberWrapper
|
|
||||||
// HdlrWrappers holds the handler wrappers
|
|
||||||
HdlrWrappers []HandlerWrapper
|
|
||||||
// RegisterAttempts holds the number of register attempts before error
|
// RegisterAttempts holds the number of register attempts before error
|
||||||
RegisterAttempts int
|
RegisterAttempts int
|
||||||
// RegisterInterval holds he interval for re-register
|
// RegisterInterval holds he interval for re-register
|
||||||
@ -85,7 +79,8 @@ type Options struct {
|
|||||||
MaxConn int
|
MaxConn int
|
||||||
// DeregisterAttempts holds the number of deregister attempts before error
|
// DeregisterAttempts holds the number of deregister attempts before error
|
||||||
DeregisterAttempts int
|
DeregisterAttempts int
|
||||||
// Hooks may contains SubscriberWrapper, HandlerWrapper or Server func wrapper
|
// Hooks may contains hook actions that performs before/after server handler
|
||||||
|
// or server subscriber handler
|
||||||
Hooks options.Hooks
|
Hooks options.Hooks
|
||||||
// GracefulTimeout timeout for graceful stop server
|
// GracefulTimeout timeout for graceful stop server
|
||||||
GracefulTimeout time.Duration
|
GracefulTimeout time.Duration
|
||||||
@ -287,27 +282,6 @@ func Wait(wg *sync.WaitGroup) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WrapHandler adds a handler Wrapper to a list of options passed into the server
|
|
||||||
func WrapHandler(w HandlerWrapper) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.HdlrWrappers = append(o.HdlrWrappers, w)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
|
|
||||||
func WrapSubscriber(w SubscriberWrapper) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.SubWrappers = append(o.SubWrappers, w)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server
|
|
||||||
func WrapBatchSubscriber(w BatchSubscriberWrapper) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.BatchSubWrappers = append(o.BatchSubWrappers, w)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// MaxConn specifies maximum number of max simultaneous connections to server
|
// MaxConn specifies maximum number of max simultaneous connections to server
|
||||||
func MaxConn(n int) Option {
|
func MaxConn(n int) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
@ -461,3 +435,10 @@ func SubscriberBatchWait(td time.Duration) SubscriberOption {
|
|||||||
o.BatchWait = td
|
o.BatchWait = td
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Hooks sets hook runs before action
|
||||||
|
func Hooks(h ...options.Hook) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Hooks = append(o.Hooks, h...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,35 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
|
||||||
)
|
|
||||||
|
|
||||||
type rpcMessage struct {
|
|
||||||
payload interface{}
|
|
||||||
codec codec.Codec
|
|
||||||
header metadata.Metadata
|
|
||||||
topic string
|
|
||||||
contentType string
|
|
||||||
body []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) ContentType() string {
|
|
||||||
return r.contentType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) Topic() string {
|
|
||||||
return r.topic
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) Body() interface{} {
|
|
||||||
return r.payload
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) Header() metadata.Metadata {
|
|
||||||
return r.header
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) Codec() codec.Codec {
|
|
||||||
return r.codec
|
|
||||||
}
|
|
@ -62,6 +62,13 @@ type Server interface {
|
|||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error
|
||||||
|
HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler
|
||||||
|
FuncSubHandler func(ctx context.Context, ms Message) error
|
||||||
|
HookSubHandler func(next FuncSubHandler) FuncSubHandler
|
||||||
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
// Router handle serving messages
|
// Router handle serving messages
|
||||||
type Router interface {
|
type Router interface {
|
||||||
@ -152,7 +159,6 @@ type Stream interface {
|
|||||||
// func (g *Greeter) Hello(context, request, response) error {
|
// func (g *Greeter) Hello(context, request, response) error {
|
||||||
// return nil
|
// return nil
|
||||||
// }
|
// }
|
||||||
//
|
|
||||||
type Handler interface {
|
type Handler interface {
|
||||||
Name() string
|
Name() string
|
||||||
Handler() interface{}
|
Handler() interface{}
|
||||||
|
@ -1,21 +1,11 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"runtime/debug"
|
|
||||||
"strings"
|
"strings"
|
||||||
"unicode"
|
"unicode"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
|
||||||
"go.unistack.org/micro/v3/errors"
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
|
||||||
"go.unistack.org/micro/v3/register"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -27,26 +17,10 @@ const (
|
|||||||
// because Typeof takes an empty interface value. This is annoying.
|
// because Typeof takes an empty interface value. This is annoying.
|
||||||
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
||||||
|
|
||||||
type handler struct {
|
|
||||||
reqType reflect.Type
|
|
||||||
ctxType reflect.Type
|
|
||||||
method reflect.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
type subscriber struct {
|
|
||||||
typ reflect.Type
|
|
||||||
subscriber interface{}
|
|
||||||
topic string
|
|
||||||
endpoints []*register.Endpoint
|
|
||||||
handlers []*handler
|
|
||||||
opts SubscriberOptions
|
|
||||||
rcvr reflect.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is this an exported - upper case - name?
|
// Is this an exported - upper case - name?
|
||||||
func isExported(name string) bool {
|
func isExported(name string) bool {
|
||||||
rune, _ := utf8.DecodeRuneInString(name)
|
r, _ := utf8.DecodeRuneInString(name)
|
||||||
return unicode.IsUpper(rune)
|
return unicode.IsUpper(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Is this type exported or a builtin?
|
// Is this type exported or a builtin?
|
||||||
@ -120,318 +94,3 @@ func ValidateSubscriber(sub Subscriber) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
|
|
||||||
var endpoints []*register.Endpoint
|
|
||||||
var handlers []*handler
|
|
||||||
|
|
||||||
options := NewSubscriberOptions(opts...)
|
|
||||||
|
|
||||||
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
|
|
||||||
h := &handler{
|
|
||||||
method: reflect.ValueOf(sub),
|
|
||||||
}
|
|
||||||
|
|
||||||
switch typ.NumIn() {
|
|
||||||
case 1:
|
|
||||||
h.reqType = typ.In(0)
|
|
||||||
case 2:
|
|
||||||
h.ctxType = typ.In(0)
|
|
||||||
h.reqType = typ.In(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
handlers = append(handlers, h)
|
|
||||||
ep := ®ister.Endpoint{
|
|
||||||
Name: "Func",
|
|
||||||
Request: register.ExtractSubValue(typ),
|
|
||||||
Metadata: metadata.New(2),
|
|
||||||
}
|
|
||||||
ep.Metadata.Set("topic", topic)
|
|
||||||
ep.Metadata.Set("subscriber", "true")
|
|
||||||
endpoints = append(endpoints, ep)
|
|
||||||
} else {
|
|
||||||
hdlr := reflect.ValueOf(sub)
|
|
||||||
name := reflect.Indirect(hdlr).Type().Name()
|
|
||||||
|
|
||||||
for m := 0; m < typ.NumMethod(); m++ {
|
|
||||||
method := typ.Method(m)
|
|
||||||
h := &handler{
|
|
||||||
method: method.Func,
|
|
||||||
}
|
|
||||||
|
|
||||||
switch method.Type.NumIn() {
|
|
||||||
case 2:
|
|
||||||
h.reqType = method.Type.In(1)
|
|
||||||
case 3:
|
|
||||||
h.ctxType = method.Type.In(1)
|
|
||||||
h.reqType = method.Type.In(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
handlers = append(handlers, h)
|
|
||||||
ep := ®ister.Endpoint{
|
|
||||||
Name: name + "." + method.Name,
|
|
||||||
Request: register.ExtractSubValue(method.Type),
|
|
||||||
Metadata: metadata.New(2),
|
|
||||||
}
|
|
||||||
ep.Metadata.Set("topic", topic)
|
|
||||||
ep.Metadata.Set("subscriber", "true")
|
|
||||||
endpoints = append(endpoints, ep)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &subscriber{
|
|
||||||
rcvr: reflect.ValueOf(sub),
|
|
||||||
typ: reflect.TypeOf(sub),
|
|
||||||
topic: topic,
|
|
||||||
subscriber: sub,
|
|
||||||
handlers: handlers,
|
|
||||||
endpoints: endpoints,
|
|
||||||
opts: options,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//nolint:gocyclo
|
|
||||||
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
|
||||||
return func(ps broker.Events) (err error) {
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
n.RLock()
|
|
||||||
config := n.opts
|
|
||||||
n.RUnlock()
|
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
|
||||||
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
|
|
||||||
config.Logger.Error(n.opts.Context, string(debug.Stack()))
|
|
||||||
}
|
|
||||||
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
msgs := make([]Message, 0, len(ps))
|
|
||||||
ctxs := make([]context.Context, 0, len(ps))
|
|
||||||
for _, p := range ps {
|
|
||||||
msg := p.Message()
|
|
||||||
// if we don't have headers, create empty map
|
|
||||||
if msg.Header == nil {
|
|
||||||
msg.Header = metadata.New(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
ct, _ := msg.Header.Get(metadata.HeaderContentType)
|
|
||||||
if len(ct) == 0 {
|
|
||||||
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
|
|
||||||
ct = defaultContentType
|
|
||||||
}
|
|
||||||
hdr := metadata.Copy(msg.Header)
|
|
||||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
|
||||||
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
|
|
||||||
msgs = append(msgs, &rpcMessage{
|
|
||||||
topic: topic,
|
|
||||||
contentType: ct,
|
|
||||||
header: msg.Header,
|
|
||||||
body: msg.Body,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
results := make(chan error, len(sb.handlers))
|
|
||||||
|
|
||||||
for i := 0; i < len(sb.handlers); i++ {
|
|
||||||
handler := sb.handlers[i]
|
|
||||||
|
|
||||||
var req reflect.Value
|
|
||||||
|
|
||||||
switch handler.reqType.Kind() {
|
|
||||||
case reflect.Ptr:
|
|
||||||
req = reflect.New(handler.reqType.Elem())
|
|
||||||
default:
|
|
||||||
req = reflect.New(handler.reqType.Elem()).Elem()
|
|
||||||
}
|
|
||||||
|
|
||||||
reqType := handler.reqType
|
|
||||||
var cf codec.Codec
|
|
||||||
for _, msg := range msgs {
|
|
||||||
cf, err = n.newCodec(msg.ContentType())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rb := reflect.New(req.Type().Elem())
|
|
||||||
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
msg.(*rpcMessage).codec = cf
|
|
||||||
msg.(*rpcMessage).payload = rb.Interface()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn := func(ctxs []context.Context, ms []Message) error {
|
|
||||||
var vals []reflect.Value
|
|
||||||
if sb.typ.Kind() != reflect.Func {
|
|
||||||
vals = append(vals, sb.rcvr)
|
|
||||||
}
|
|
||||||
if handler.ctxType != nil {
|
|
||||||
vals = append(vals, reflect.ValueOf(ctxs))
|
|
||||||
}
|
|
||||||
payloads := reflect.MakeSlice(reqType, 0, len(ms))
|
|
||||||
for _, m := range ms {
|
|
||||||
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
|
|
||||||
}
|
|
||||||
vals = append(vals, payloads)
|
|
||||||
|
|
||||||
returnValues := handler.method.Call(vals)
|
|
||||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
|
||||||
return rerr.(error)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := len(opts.BatchSubWrappers); i > 0; i-- {
|
|
||||||
fn = opts.BatchSubWrappers[i-1](fn)
|
|
||||||
}
|
|
||||||
|
|
||||||
if n.wg != nil {
|
|
||||||
n.wg.Add(1)
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
if n.wg != nil {
|
|
||||||
defer n.wg.Done()
|
|
||||||
}
|
|
||||||
results <- fn(ctxs, msgs)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
var errors []string
|
|
||||||
for i := 0; i < len(sb.handlers); i++ {
|
|
||||||
if rerr := <-results; rerr != nil {
|
|
||||||
errors = append(errors, rerr.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(errors) > 0 {
|
|
||||||
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//nolint:gocyclo
|
|
||||||
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
|
||||||
return func(p broker.Event) (err error) {
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
n.RLock()
|
|
||||||
config := n.opts
|
|
||||||
n.RUnlock()
|
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
|
||||||
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
|
|
||||||
config.Logger.Error(n.opts.Context, string(debug.Stack()))
|
|
||||||
}
|
|
||||||
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
msg := p.Message()
|
|
||||||
// if we don't have headers, create empty map
|
|
||||||
if msg.Header == nil {
|
|
||||||
msg.Header = metadata.New(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
ct := msg.Header["Content-Type"]
|
|
||||||
if len(ct) == 0 {
|
|
||||||
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
|
|
||||||
ct = defaultContentType
|
|
||||||
}
|
|
||||||
cf, err := n.newCodec(ct)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
hdr := metadata.New(len(msg.Header))
|
|
||||||
for k, v := range msg.Header {
|
|
||||||
hdr.Set(k, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
|
|
||||||
|
|
||||||
results := make(chan error, len(sb.handlers))
|
|
||||||
|
|
||||||
for i := 0; i < len(sb.handlers); i++ {
|
|
||||||
handler := sb.handlers[i]
|
|
||||||
|
|
||||||
var isVal bool
|
|
||||||
var req reflect.Value
|
|
||||||
|
|
||||||
if handler.reqType.Kind() == reflect.Ptr {
|
|
||||||
req = reflect.New(handler.reqType.Elem())
|
|
||||||
} else {
|
|
||||||
req = reflect.New(handler.reqType)
|
|
||||||
isVal = true
|
|
||||||
}
|
|
||||||
if isVal {
|
|
||||||
req = req.Elem()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
fn := func(ctx context.Context, msg Message) error {
|
|
||||||
var vals []reflect.Value
|
|
||||||
if sb.typ.Kind() != reflect.Func {
|
|
||||||
vals = append(vals, sb.rcvr)
|
|
||||||
}
|
|
||||||
if handler.ctxType != nil {
|
|
||||||
vals = append(vals, reflect.ValueOf(ctx))
|
|
||||||
}
|
|
||||||
|
|
||||||
vals = append(vals, reflect.ValueOf(msg.Body()))
|
|
||||||
|
|
||||||
returnValues := handler.method.Call(vals)
|
|
||||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
|
||||||
return rerr.(error)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := len(opts.SubWrappers); i > 0; i-- {
|
|
||||||
fn = opts.SubWrappers[i-1](fn)
|
|
||||||
}
|
|
||||||
|
|
||||||
if n.wg != nil {
|
|
||||||
n.wg.Add(1)
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
if n.wg != nil {
|
|
||||||
defer n.wg.Done()
|
|
||||||
}
|
|
||||||
cerr := fn(ctx, &rpcMessage{
|
|
||||||
topic: sb.topic,
|
|
||||||
contentType: ct,
|
|
||||||
payload: req.Interface(),
|
|
||||||
header: msg.Header,
|
|
||||||
})
|
|
||||||
results <- cerr
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
var errors []string
|
|
||||||
for i := 0; i < len(sb.handlers); i++ {
|
|
||||||
if rerr := <-results; rerr != nil {
|
|
||||||
errors = append(errors, rerr.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(errors) > 0 {
|
|
||||||
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *subscriber) Topic() string {
|
|
||||||
return s.topic
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *subscriber) Subscriber() interface{} {
|
|
||||||
return s.subscriber
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *subscriber) Endpoints() []*register.Endpoint {
|
|
||||||
return s.endpoints
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *subscriber) Options() SubscriberOptions {
|
|
||||||
return s.opts
|
|
||||||
}
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package store
|
package memory
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -6,13 +6,15 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/patrickmn/go-cache"
|
cache "github.com/patrickmn/go-cache"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
|
"go.unistack.org/micro/v3/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewStore returns a memory store
|
// NewStore returns a memory store
|
||||||
func NewStore(opts ...Option) Store {
|
func NewStore(opts ...store.Option) store.Store {
|
||||||
return &memoryStore{
|
return &memoryStore{
|
||||||
opts: NewOptions(opts...),
|
opts: store.NewOptions(opts...),
|
||||||
store: cache.New(cache.NoExpiration, 5*time.Minute),
|
store: cache.New(cache.NoExpiration, 5*time.Minute),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -27,8 +29,13 @@ func (m *memoryStore) Disconnect(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type memoryStore struct {
|
type memoryStore struct {
|
||||||
|
funcRead store.FuncRead
|
||||||
|
funcWrite store.FuncWrite
|
||||||
|
funcExists store.FuncExists
|
||||||
|
funcList store.FuncList
|
||||||
|
funcDelete store.FuncDelete
|
||||||
store *cache.Cache
|
store *cache.Cache
|
||||||
opts Options
|
opts store.Options
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) key(prefix, key string) string {
|
func (m *memoryStore) key(prefix, key string) string {
|
||||||
@ -39,7 +46,7 @@ func (m *memoryStore) exists(prefix, key string) error {
|
|||||||
key = m.key(prefix, key)
|
key = m.key(prefix, key)
|
||||||
_, found := m.store.Get(key)
|
_, found := m.store.Get(key)
|
||||||
if !found {
|
if !found {
|
||||||
return ErrNotFound
|
return store.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -50,12 +57,12 @@ func (m *memoryStore) get(prefix, key string, val interface{}) error {
|
|||||||
|
|
||||||
r, found := m.store.Get(key)
|
r, found := m.store.Get(key)
|
||||||
if !found {
|
if !found {
|
||||||
return ErrNotFound
|
return store.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
buf, ok := r.([]byte)
|
buf, ok := r.([]byte)
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrNotFound
|
return store.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return m.opts.Codec.Unmarshal(buf, val)
|
return m.opts.Codec.Unmarshal(buf, val)
|
||||||
@ -100,10 +107,32 @@ func (m *memoryStore) list(prefix string, limit, offset uint) []string {
|
|||||||
return allKeys
|
return allKeys
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Init(opts ...Option) error {
|
func (m *memoryStore) Init(opts ...store.Option) error {
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&m.opts)
|
o(&m.opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.funcRead = m.fnRead
|
||||||
|
m.funcWrite = m.fnWrite
|
||||||
|
m.funcExists = m.fnExists
|
||||||
|
m.funcList = m.fnList
|
||||||
|
m.funcDelete = m.fnDelete
|
||||||
|
|
||||||
|
m.opts.Hooks.EachNext(func(hook options.Hook) {
|
||||||
|
switch h := hook.(type) {
|
||||||
|
case store.HookRead:
|
||||||
|
m.funcRead = h(m.funcRead)
|
||||||
|
case store.HookWrite:
|
||||||
|
m.funcWrite = h(m.funcWrite)
|
||||||
|
case store.HookExists:
|
||||||
|
m.funcExists = h(m.funcExists)
|
||||||
|
case store.HookList:
|
||||||
|
m.funcList = h(m.funcList)
|
||||||
|
case store.HookDelete:
|
||||||
|
m.funcDelete = h(m.funcDelete)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,24 +144,36 @@ func (m *memoryStore) Name() string {
|
|||||||
return m.opts.Name
|
return m.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
|
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
||||||
options := NewExistsOptions(opts...)
|
return m.funcExists(ctx, key, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) fnExists(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
||||||
|
options := store.NewExistsOptions(opts...)
|
||||||
if options.Namespace == "" {
|
if options.Namespace == "" {
|
||||||
options.Namespace = m.opts.Namespace
|
options.Namespace = m.opts.Namespace
|
||||||
}
|
}
|
||||||
return m.exists(options.Namespace, key)
|
return m.exists(options.Namespace, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
|
||||||
options := NewReadOptions(opts...)
|
return m.funcRead(ctx, key, val, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) fnRead(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
|
||||||
|
options := store.NewReadOptions(opts...)
|
||||||
if options.Namespace == "" {
|
if options.Namespace == "" {
|
||||||
options.Namespace = m.opts.Namespace
|
options.Namespace = m.opts.Namespace
|
||||||
}
|
}
|
||||||
return m.get(options.Namespace, key, val)
|
return m.get(options.Namespace, key, val)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
|
||||||
options := NewWriteOptions(opts...)
|
return m.funcWrite(ctx, key, val, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) fnWrite(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
|
||||||
|
options := store.NewWriteOptions(opts...)
|
||||||
if options.Namespace == "" {
|
if options.Namespace == "" {
|
||||||
options.Namespace = m.opts.Namespace
|
options.Namespace = m.opts.Namespace
|
||||||
}
|
}
|
||||||
@ -151,8 +192,12 @@ func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, op
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
|
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
|
||||||
options := NewDeleteOptions(opts...)
|
return m.funcDelete(ctx, key, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) fnDelete(ctx context.Context, key string, opts ...store.DeleteOption) error {
|
||||||
|
options := store.NewDeleteOptions(opts...)
|
||||||
if options.Namespace == "" {
|
if options.Namespace == "" {
|
||||||
options.Namespace = m.opts.Namespace
|
options.Namespace = m.opts.Namespace
|
||||||
}
|
}
|
||||||
@ -161,12 +206,16 @@ func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOpti
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Options() Options {
|
func (m *memoryStore) Options() store.Options {
|
||||||
return m.opts
|
return m.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
|
func (m *memoryStore) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
|
||||||
options := NewListOptions(opts...)
|
return m.funcList(ctx, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryStore) fnList(ctx context.Context, opts ...store.ListOption) ([]string, error) {
|
||||||
|
options := store.NewListOptions(opts...)
|
||||||
if options.Namespace == "" {
|
if options.Namespace == "" {
|
||||||
options.Namespace = m.opts.Namespace
|
options.Namespace = m.opts.Namespace
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package store_test
|
package memory
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -8,8 +8,41 @@ import (
|
|||||||
"go.unistack.org/micro/v3/store"
|
"go.unistack.org/micro/v3/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type testHook struct {
|
||||||
|
f bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testHook) Exists(fn store.FuncExists) store.FuncExists {
|
||||||
|
return func(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
||||||
|
t.f = true
|
||||||
|
return fn(ctx, key, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHook(t *testing.T) {
|
||||||
|
h := &testHook{}
|
||||||
|
|
||||||
|
s := NewStore(store.Hooks(store.HookExists(h.Exists)))
|
||||||
|
|
||||||
|
if err := s.Init(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.Write(context.TODO(), "test", nil); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.Exists(context.TODO(), "test"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !h.f {
|
||||||
|
t.Fatal("hook not works")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMemoryReInit(t *testing.T) {
|
func TestMemoryReInit(t *testing.T) {
|
||||||
s := store.NewStore(store.Namespace("aaa"))
|
s := NewStore(store.Namespace("aaa"))
|
||||||
if err := s.Init(store.Namespace("")); err != nil {
|
if err := s.Init(store.Namespace("")); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -19,7 +52,7 @@ func TestMemoryReInit(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMemoryBasic(t *testing.T) {
|
func TestMemoryBasic(t *testing.T) {
|
||||||
s := store.NewStore()
|
s := NewStore()
|
||||||
if err := s.Init(); err != nil {
|
if err := s.Init(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -27,7 +60,7 @@ func TestMemoryBasic(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMemoryPrefix(t *testing.T) {
|
func TestMemoryPrefix(t *testing.T) {
|
||||||
s := store.NewStore()
|
s := NewStore()
|
||||||
if err := s.Init(store.Namespace("some-prefix")); err != nil {
|
if err := s.Init(store.Namespace("some-prefix")); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -35,7 +68,7 @@ func TestMemoryPrefix(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMemoryNamespace(t *testing.T) {
|
func TestMemoryNamespace(t *testing.T) {
|
||||||
s := store.NewStore()
|
s := NewStore()
|
||||||
if err := s.Init(store.Namespace("some-namespace")); err != nil {
|
if err := s.Init(store.Namespace("some-namespace")); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -43,7 +76,7 @@ func TestMemoryNamespace(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMemoryNamespacePrefix(t *testing.T) {
|
func TestMemoryNamespacePrefix(t *testing.T) {
|
||||||
s := store.NewStore()
|
s := NewStore()
|
||||||
if err := s.Init(store.Namespace("some-namespace")); err != nil {
|
if err := s.Init(store.Namespace("some-namespace")); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
147
store/noop.go
Normal file
147
store/noop.go
Normal file
@ -0,0 +1,147 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ Store = (*noopStore)(nil)
|
||||||
|
|
||||||
|
type noopStore struct {
|
||||||
|
funcRead FuncRead
|
||||||
|
funcWrite FuncWrite
|
||||||
|
funcExists FuncExists
|
||||||
|
funcList FuncList
|
||||||
|
funcDelete FuncDelete
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStore(opts ...Option) *noopStore {
|
||||||
|
options := NewOptions(opts...)
|
||||||
|
return &noopStore{opts: options}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Init(opts ...Option) error {
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&n.opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
n.funcRead = n.fnRead
|
||||||
|
n.funcWrite = n.fnWrite
|
||||||
|
n.funcExists = n.fnExists
|
||||||
|
n.funcList = n.fnList
|
||||||
|
n.funcDelete = n.fnDelete
|
||||||
|
|
||||||
|
n.opts.Hooks.EachNext(func(hook options.Hook) {
|
||||||
|
switch h := hook.(type) {
|
||||||
|
case HookRead:
|
||||||
|
n.funcRead = h(n.funcRead)
|
||||||
|
case HookWrite:
|
||||||
|
n.funcWrite = h(n.funcWrite)
|
||||||
|
case HookExists:
|
||||||
|
n.funcExists = h(n.funcExists)
|
||||||
|
case HookList:
|
||||||
|
n.funcList = h(n.funcList)
|
||||||
|
case HookDelete:
|
||||||
|
n.funcDelete = h(n.funcDelete)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Connect(ctx context.Context) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Disconnect(ctx context.Context) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
||||||
|
return n.funcRead(ctx, key, val, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) fnRead(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
|
||||||
|
return n.funcDelete(ctx, key, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) fnDelete(ctx context.Context, key string, opts ...DeleteOption) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
|
||||||
|
return n.funcExists(ctx, key, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) fnExists(ctx context.Context, key string, opts ...ExistsOption) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
||||||
|
return n.funcWrite(ctx, key, val, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) fnWrite(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
|
||||||
|
return n.funcList(ctx, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) fnList(ctx context.Context, opts ...ListOption) ([]string, error) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Name() string {
|
||||||
|
return n.opts.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) String() string {
|
||||||
|
return "noop"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopStore) Options() Options {
|
||||||
|
return n.opts
|
||||||
|
}
|
35
store/noop_test.go
Normal file
35
store/noop_test.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testHook struct {
|
||||||
|
f bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testHook) Exists(fn FuncExists) FuncExists {
|
||||||
|
return func(ctx context.Context, key string, opts ...ExistsOption) error {
|
||||||
|
t.f = true
|
||||||
|
return fn(ctx, key, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHook(t *testing.T) {
|
||||||
|
h := &testHook{}
|
||||||
|
|
||||||
|
s := NewStore(Hooks(HookExists(h.Exists)))
|
||||||
|
|
||||||
|
if err := s.Init(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.Exists(context.TODO(), "test"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !h.f {
|
||||||
|
t.Fatal("hook not works")
|
||||||
|
}
|
||||||
|
}
|
@ -9,6 +9,7 @@ import (
|
|||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v3/meter"
|
||||||
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -38,6 +39,8 @@ type Options struct {
|
|||||||
// Wrappers []Wrapper
|
// Wrappers []Wrapper
|
||||||
// Timeout specifies timeout duration for all operations
|
// Timeout specifies timeout duration for all operations
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
// Hooks can be run before/after store Read/List/Write/Exists/Delete
|
||||||
|
Hooks options.Hooks
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOptions creates options struct
|
// NewOptions creates options struct
|
||||||
@ -441,11 +444,9 @@ func ExistsTimeout(td time.Duration) ExistsOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
// Hooks sets hook runs before action
|
||||||
// WrapStore adds a store Wrapper to a list of options passed into the store
|
func Hooks(h ...options.Hook) Option {
|
||||||
func WrapStore(w Wrapper) Option {
|
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.Wrappers = append(o.Wrappers, w)
|
o.Hooks = append(o.Hooks, h...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
@ -19,6 +19,7 @@ var (
|
|||||||
|
|
||||||
// Store is a data storage interface
|
// Store is a data storage interface
|
||||||
type Store interface {
|
type Store interface {
|
||||||
|
// Name returns store name
|
||||||
Name() string
|
Name() string
|
||||||
// Init initialises the store
|
// Init initialises the store
|
||||||
Init(opts ...Option) error
|
Init(opts ...Option) error
|
||||||
@ -41,3 +42,16 @@ type Store interface {
|
|||||||
// String returns the name of the implementation.
|
// String returns the name of the implementation.
|
||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
FuncExists func(ctx context.Context, key string, opts ...ExistsOption) error
|
||||||
|
HookExists func(next FuncExists) FuncExists
|
||||||
|
FuncRead func(ctx context.Context, key string, val interface{}, opts ...ReadOption) error
|
||||||
|
HookRead func(next FuncRead) FuncRead
|
||||||
|
FuncWrite func(ctx context.Context, key string, val interface{}, opts ...WriteOption) error
|
||||||
|
HookWrite func(next FuncWrite) FuncWrite
|
||||||
|
FuncDelete func(ctx context.Context, key string, opts ...DeleteOption) error
|
||||||
|
HookDelete func(next FuncDelete) FuncDelete
|
||||||
|
FuncList func(ctx context.Context, opts ...ListOption) ([]string, error)
|
||||||
|
HookList func(next FuncList) FuncList
|
||||||
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user