diff --git a/broker/broker.go b/broker/broker.go index 80585c4c..f716c947 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -48,6 +48,17 @@ type Broker interface { String() string } +type ( + FuncPublish func(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error + HookPublish func(next FuncPublish) FuncPublish + FuncBatchPublish func(ctx context.Context, msgs []*Message, opts ...PublishOption) error + HookBatchPublish func(next FuncBatchPublish) FuncBatchPublish + FuncSubscribe func(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) + HookSubscribe func(next FuncSubscribe) FuncSubscribe + FuncBatchSubscribe func(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) + HookBatchSubscribe func(next FuncBatchSubscribe) FuncBatchSubscribe +) + // Handler is used to process messages via a subscription of a topic. type Handler func(Event) error diff --git a/broker/memory/memory.go b/broker/memory/memory.go index d5609e1e..1b301baf 100644 --- a/broker/memory/memory.go +++ b/broker/memory/memory.go @@ -7,6 +7,7 @@ import ( "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/options" maddr "go.unistack.org/micro/v3/util/addr" "go.unistack.org/micro/v3/util/id" mnet "go.unistack.org/micro/v3/util/net" @@ -14,9 +15,13 @@ import ( ) type memoryBroker struct { - subscribers map[string][]*memorySubscriber - addr string - opts broker.Options + funcPublish broker.FuncPublish + funcBatchPublish broker.FuncBatchPublish + funcSubscribe broker.FuncSubscribe + funcBatchSubscribe broker.FuncBatchSubscribe + subscribers map[string][]*memorySubscriber + addr string + opts broker.Options sync.RWMutex connected bool } @@ -98,15 +103,42 @@ func (m *memoryBroker) Init(opts ...broker.Option) error { for _, o := range opts { o(&m.opts) } + + m.funcPublish = m.fnPublish + m.funcBatchPublish = m.fnBatchPublish + m.funcSubscribe = m.fnSubscribe + m.funcBatchSubscribe = m.fnBatchSubscribe + + m.opts.Hooks.EachNext(func(hook options.Hook) { + switch h := hook.(type) { + case broker.HookPublish: + m.funcPublish = h(m.funcPublish) + case broker.HookBatchPublish: + m.funcBatchPublish = h(m.funcBatchPublish) + case broker.HookSubscribe: + m.funcSubscribe = h(m.funcSubscribe) + case broker.HookBatchSubscribe: + m.funcBatchSubscribe = h(m.funcBatchSubscribe) + } + }) + return nil } func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { + return m.funcPublish(ctx, topic, msg, opts...) +} + +func (m *memoryBroker) fnPublish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { msg.Header.Set(metadata.HeaderTopic, topic) return m.publish(ctx, []*broker.Message{msg}, opts...) } func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + return m.funcBatchPublish(ctx, msgs, opts...) +} + +func (m *memoryBroker) fnBatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { return m.publish(ctx, msgs, opts...) } @@ -202,6 +234,10 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts } func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + return m.funcBatchSubscribe(ctx, topic, handler, opts...) +} + +func (m *memoryBroker) fnBatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() @@ -247,6 +283,10 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler } func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + return m.funcSubscribe(ctx, topic, handler, opts...) +} + +func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() diff --git a/broker/memory/memory_test.go b/broker/memory/memory_test.go index e558ef10..7557e6b0 100644 --- a/broker/memory/memory_test.go +++ b/broker/memory/memory_test.go @@ -13,6 +13,10 @@ func TestMemoryBatchBroker(t *testing.T) { b := NewBroker() ctx := context.Background() + if err := b.Init(); err != nil { + t.Fatalf("Unexpected init error %v", err) + } + if err := b.Connect(ctx); err != nil { t.Fatalf("Unexpected connect error %v", err) } @@ -59,6 +63,10 @@ func TestMemoryBroker(t *testing.T) { b := NewBroker() ctx := context.Background() + if err := b.Init(); err != nil { + t.Fatalf("Unexpected init error %v", err) + } + if err := b.Connect(ctx); err != nil { t.Fatalf("Unexpected connect error %v", err) } diff --git a/broker/noop.go b/broker/noop.go index 29142673..43f6e7dc 100644 --- a/broker/noop.go +++ b/broker/noop.go @@ -3,14 +3,25 @@ package broker import ( "context" "strings" + + "go.unistack.org/micro/v3/options" ) type NoopBroker struct { - opts Options + funcPublish FuncPublish + funcBatchPublish FuncBatchPublish + funcSubscribe FuncSubscribe + funcBatchSubscribe FuncBatchSubscribe + opts Options } func NewBroker(opts ...Option) *NoopBroker { b := &NoopBroker{opts: NewOptions(opts...)} + b.funcPublish = b.fnPublish + b.funcBatchPublish = b.fnBatchPublish + b.funcSubscribe = b.fnSubscribe + b.funcBatchSubscribe = b.fnBatchSubscribe + return b } @@ -30,6 +41,25 @@ func (b *NoopBroker) Init(opts ...Option) error { for _, opt := range opts { opt(&b.opts) } + + b.funcPublish = b.fnPublish + b.funcBatchPublish = b.fnBatchPublish + b.funcSubscribe = b.fnSubscribe + b.funcBatchSubscribe = b.fnBatchSubscribe + + b.opts.Hooks.EachNext(func(hook options.Hook) { + switch h := hook.(type) { + case HookPublish: + b.funcPublish = h(b.funcPublish) + case HookBatchPublish: + b.funcBatchPublish = h(b.funcBatchPublish) + case HookSubscribe: + b.funcSubscribe = h(b.funcSubscribe) + case HookBatchSubscribe: + b.funcBatchSubscribe = h(b.funcBatchSubscribe) + } + }) + return nil } @@ -45,14 +75,22 @@ func (b *NoopBroker) Address() string { return strings.Join(b.opts.Addrs, ",") } -func (b *NoopBroker) BatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error { +func (b *NoopBroker) fnBatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error { return nil } -func (b *NoopBroker) Publish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error { +func (b *NoopBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { + return b.funcBatchPublish(ctx, msgs, opts...) +} + +func (b *NoopBroker) fnPublish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error { return nil } +func (b *NoopBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { + return b.funcPublish(ctx, topic, msg, opts...) +} + type NoopSubscriber struct { ctx context.Context topic string @@ -61,14 +99,22 @@ type NoopSubscriber struct { opts SubscribeOptions } -func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { +func (b *NoopBroker) fnBatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil } -func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { +func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { + return b.funcBatchSubscribe(ctx, topic, handler, opts...) +} + +func (b *NoopBroker) fnSubscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil } +func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { + return b.funcSubscribe(ctx, topic, handler, opts...) +} + func (s *NoopSubscriber) Options() SubscribeOptions { return s.opts } @@ -77,6 +123,6 @@ func (s *NoopSubscriber) Topic() string { return s.topic } -func (s *NoopSubscriber) Unsubscribe(ctx context.Context) error { +func (s *NoopSubscriber) Unsubscribe(_ context.Context) error { return nil } diff --git a/broker/noop_test.go b/broker/noop_test.go new file mode 100644 index 00000000..824b4b00 --- /dev/null +++ b/broker/noop_test.go @@ -0,0 +1,35 @@ +package broker + +import ( + "context" + "testing" +) + +type testHook struct { + f bool +} + +func (t *testHook) Publish1(fn FuncPublish) FuncPublish { + return func(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { + t.f = true + return fn(ctx, topic, msg, opts...) + } +} + +func TestNoopHook(t *testing.T) { + h := &testHook{} + + b := NewBroker(Hooks(HookPublish(h.Publish1))) + + if err := b.Init(); err != nil { + t.Fatal(err) + } + + if err := b.Publish(context.TODO(), "", nil); err != nil { + t.Fatal(err) + } + + if !h.f { + t.Fatal("hook not works") + } +} diff --git a/broker/options.go b/broker/options.go index c5e91263..7e26adbc 100644 --- a/broker/options.go +++ b/broker/options.go @@ -8,6 +8,7 @@ import ( "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/sync" "go.unistack.org/micro/v3/tracer" @@ -37,10 +38,13 @@ type Options struct { Name string // Addrs holds the broker address Addrs []string - + // Wait waits for a collection of goroutines to finish Wait *sync.WaitGroup - + // GracefulTimeout contains time to wait to finish in flight requests GracefulTimeout time.Duration + // Hooks can be run before broker Publish/BatchPublish and + // Subscribe/BatchSubscribe methods + Hooks options.Hooks } // NewOptions create new Options @@ -230,6 +234,13 @@ func Name(n string) Option { } } +// Hooks sets hook runs before action +func Hooks(h ...options.Hook) Option { + return func(o *Options) { + o.Hooks = append(o.Hooks, h...) + } +} + // SubscribeContext set context func SubscribeContext(ctx context.Context) SubscribeOption { return func(o *SubscribeOptions) { diff --git a/client/client.go b/client/client.go index 6d60aef0..e7c42692 100644 --- a/client/client.go +++ b/client/client.go @@ -44,6 +44,17 @@ type Client interface { String() string } +type ( + FuncCall func(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error + HookCall func(next FuncCall) FuncCall + FuncStream func(ctx context.Context, req Request, opts ...CallOption) (Stream, error) + HookStream func(next FuncStream) FuncStream + FuncPublish func(ctx context.Context, msg Message, opts ...PublishOption) error + HookPublish func(next FuncPublish) FuncPublish + FuncBatchPublish func(ctx context.Context, msg []Message, opts ...PublishOption) error + HookBatchPublish func(next FuncBatchPublish) FuncBatchPublish +) + // Message is the interface for publishing asynchronously type Message interface { Topic() string diff --git a/client/client_call_options_test.go b/client/client_call_options_test.go deleted file mode 100644 index 2cffdf36..00000000 --- a/client/client_call_options_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package client - -import ( - "context" - "testing" - "time" -) - -func TestNewClientCallOptions(t *testing.T) { - var flag bool - w := func(fn CallFunc) CallFunc { - flag = true - return fn - } - c := NewClientCallOptions(NewClient(), - WithAddress("127.0.0.1"), - WithCallWrapper(w), - WithRequestTimeout(1*time.Millisecond), - WithRetries(0), - WithBackoff(BackoffInterval(10*time.Millisecond, 100*time.Millisecond)), - ) - _ = c.Call(context.TODO(), c.NewRequest("service", "endpoint", nil), nil) - if !flag { - t.Fatalf("NewClientCallOptions not works") - } -} diff --git a/client/noop.go b/client/noop.go index 1c92ed71..b5ae7791 100644 --- a/client/noop.go +++ b/client/noop.go @@ -10,6 +10,7 @@ import ( "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/selector" ) @@ -19,7 +20,11 @@ var DefaultCodecs = map[string]codec.Codec{ } type noopClient struct { - opts Options + funcPublish FuncPublish + funcBatchPublish FuncBatchPublish + funcCall FuncCall + funcStream FuncStream + opts Options } type noopMessage struct { @@ -40,16 +45,14 @@ type noopRequest struct { // NewClient returns new noop client func NewClient(opts ...Option) Client { - nc := &noopClient{opts: NewOptions(opts...)} - // wrap in reverse + n := &noopClient{opts: NewOptions(opts...)} - c := Client(nc) + n.funcCall = n.fnCall + n.funcStream = n.fnStream + n.funcPublish = n.fnPublish + n.funcBatchPublish = n.fnBatchPublish - for i := len(nc.opts.Wrappers); i > 0; i-- { - c = nc.opts.Wrappers[i-1](c) - } - - return c + return n } func (n *noopClient) Name() string { @@ -173,6 +176,25 @@ func (n *noopClient) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) } + + n.funcCall = n.fnCall + n.funcStream = n.fnStream + n.funcPublish = n.fnPublish + n.funcBatchPublish = n.fnBatchPublish + + n.opts.Hooks.EachNext(func(hook options.Hook) { + switch h := hook.(type) { + case HookCall: + n.funcCall = h(n.funcCall) + case HookStream: + n.funcStream = h(n.funcStream) + case HookPublish: + n.funcPublish = h(n.funcPublish) + case HookBatchPublish: + n.funcBatchPublish = h(n.funcBatchPublish) + } + }) + return nil } @@ -185,6 +207,10 @@ func (n *noopClient) String() string { } func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error { + return n.funcCall(ctx, req, rsp, opts...) +} + +func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error { // make a copy of call opts callOpts := n.opts.CallOptions for _, opt := range opts { @@ -213,11 +239,8 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt } // make copy of call method - hcall := n.call - - // wrap the call in reverse - for i := len(callOpts.CallWrappers); i > 0; i-- { - hcall = callOpts.CallWrappers[i-1](hcall) + hcall := func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { + return nil } // use the router passed as a call option, or fallback to the rpc clients router @@ -316,10 +339,6 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt return gerr } -func (n *noopClient) call(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { - return nil -} - func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts ...RequestOption) Request { return &noopRequest{service: service, endpoint: endpoint} } @@ -330,6 +349,10 @@ func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOp } func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { + return n.funcStream(ctx, req, opts...) +} + +func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { var err error // make a copy of call opts @@ -474,10 +497,18 @@ func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts } func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error { + return n.funcBatchPublish(ctx, ps, opts...) +} + +func (n *noopClient) fnBatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error { return n.publish(ctx, ps, opts...) } func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error { + return n.funcPublish(ctx, p, opts...) +} + +func (n *noopClient) fnPublish(ctx context.Context, p Message, opts ...PublishOption) error { return n.publish(ctx, []Message{p}, opts...) } @@ -538,6 +569,13 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO msgs = append(msgs, &broker.Message{Header: md, Body: body}) } + if len(msgs) == 1 { + return n.opts.Broker.Publish(ctx, msgs[0].Header[metadata.HeaderTopic], msgs[0], + broker.PublishContext(options.Context), + broker.PublishBodyOnly(options.BodyOnly), + ) + } + return n.opts.Broker.BatchPublish(ctx, msgs, broker.PublishContext(options.Context), broker.PublishBodyOnly(options.BodyOnly), diff --git a/client/noop_test.go b/client/noop_test.go new file mode 100644 index 00000000..cc05204c --- /dev/null +++ b/client/noop_test.go @@ -0,0 +1,35 @@ +package client + +import ( + "context" + "testing" +) + +type testHook struct { + f bool +} + +func (t *testHook) Publish(fn FuncPublish) FuncPublish { + return func(ctx context.Context, msg Message, opts ...PublishOption) error { + t.f = true + return fn(ctx, msg, opts...) + } +} + +func TestNoopHook(t *testing.T) { + h := &testHook{} + + c := NewClient(Hooks(HookPublish(h.Publish))) + + if err := c.Init(); err != nil { + t.Fatal(err) + } + + if err := c.Publish(context.TODO(), c.NewMessage("", nil, MessageContentType("application/octet-stream"))); err != nil { + t.Fatal(err) + } + + if !h.f { + t.Fatal("hook not works") + } +} diff --git a/client/options.go b/client/options.go index d4f976d4..6c8d0637 100644 --- a/client/options.go +++ b/client/options.go @@ -12,6 +12,7 @@ import ( "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/network/transport" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/router" "go.unistack.org/micro/v3/selector" @@ -59,6 +60,9 @@ type Options struct { PoolTTL time.Duration // ContextDialer used to connect ContextDialer func(context.Context, string) (net.Conn, error) + // Hooks can be run before broker Publish/BatchPublish and + // Subscribe/BatchSubscribe methods + Hooks options.Hooks } // NewCallOptions creates new call options struct @@ -92,8 +96,6 @@ type CallOptions struct { Address []string // SelectOptions selector options SelectOptions []selector.SelectOption - // CallWrappers call wrappers - CallWrappers []CallWrapper // StreamTimeout stream timeout StreamTimeout time.Duration // RequestTimeout request timeout @@ -185,7 +187,7 @@ func NewOptions(opts ...Option) Options { options := Options{ Context: context.Background(), ContentType: DefaultContentType, - Codecs: make(map[string]codec.Codec), + Codecs: DefaultCodecs, CallOptions: CallOptions{ Context: context.Background(), Backoff: DefaultBackoff, @@ -306,20 +308,6 @@ func Selector(s selector.Selector) Option { } } -// Wrap adds a wrapper to the list of options passed into the client -func Wrap(w Wrapper) Option { - return func(o *Options) { - o.Wrappers = append(o.Wrappers, w) - } -} - -// WrapCall adds a wrapper to the list of CallFunc wrappers -func WrapCall(cw ...CallWrapper) Option { - return func(o *Options) { - o.CallOptions.CallWrappers = append(o.CallOptions.CallWrappers, cw...) - } -} - // Backoff is used to set the backoff function used when retrying Calls func Backoff(fn BackoffFunc) Option { return func(o *Options) { @@ -450,13 +438,6 @@ func WithAddress(a ...string) CallOption { } } -// WithCallWrapper is a CallOption which adds to the existing CallFunc wrappers -func WithCallWrapper(cw ...CallWrapper) CallOption { - return func(o *CallOptions) { - o.CallWrappers = append(o.CallWrappers, cw...) - } -} - // WithBackoff is a CallOption which overrides that which // set in Options.CallOptions func WithBackoff(fn BackoffFunc) CallOption { @@ -591,3 +572,10 @@ func RequestContentType(ct string) RequestOption { o.ContentType = ct } } + +// Hooks sets hook runs before action +func Hooks(h ...options.Hook) Option { + return func(o *Options) { + o.Hooks = append(o.Hooks, h...) + } +} diff --git a/config/config.go b/config/config.go index 44c18f76..955f2957 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,13 @@ type Config interface { String() string } +type ( + FuncLoad func(ctx context.Context, opts ...LoadOption) error + HookLoad func(next FuncLoad) FuncLoad + FuncSave func(ctx context.Context, opts ...SaveOption) error + HookSave func(next FuncSave) FuncSave +) + // Watcher is the config watcher type Watcher interface { // Next blocks until update happens or error returned diff --git a/config/default.go b/config/default.go index c03fb812..62cef02c 100644 --- a/config/default.go +++ b/config/default.go @@ -9,13 +9,16 @@ import ( "dario.cat/mergo" "github.com/google/uuid" + "go.unistack.org/micro/v3/options" mid "go.unistack.org/micro/v3/util/id" rutil "go.unistack.org/micro/v3/util/reflect" mtime "go.unistack.org/micro/v3/util/time" ) type defaultConfig struct { - opts Options + funcLoad FuncLoad + funcSave FuncSave + opts Options } func (c *defaultConfig) Options() Options { @@ -31,6 +34,18 @@ func (c *defaultConfig) Init(opts ...Option) error { return err } + c.funcLoad = c.fnLoad + c.funcSave = c.fnSave + + c.opts.Hooks.EachNext(func(hook options.Hook) { + switch h := hook.(type) { + case HookLoad: + c.funcLoad = h(c.funcLoad) + case HookSave: + c.funcSave = h(c.funcSave) + } + }) + if err := DefaultAfterInit(c.opts.Context, c); err != nil && !c.opts.AllowFail { return err } @@ -39,11 +54,17 @@ func (c *defaultConfig) Init(opts ...Option) error { } func (c *defaultConfig) Load(ctx context.Context, opts ...LoadOption) error { + return c.funcLoad(ctx, opts...) +} + +func (c *defaultConfig) fnLoad(ctx context.Context, opts ...LoadOption) error { + var err error + if c.opts.SkipLoad != nil && c.opts.SkipLoad(ctx, c) { return nil } - if err := DefaultBeforeLoad(ctx, c); err != nil && !c.opts.AllowFail { + if err = DefaultBeforeLoad(ctx, c); err != nil && !c.opts.AllowFail { return err } @@ -233,6 +254,7 @@ func fillValue(value reflect.Value, val string) error { } value.Set(reflect.ValueOf(v)) } + return nil } @@ -295,7 +317,11 @@ func fillValues(valueOf reflect.Value, tname string) error { return nil } -func (c *defaultConfig) Save(ctx context.Context, _ ...SaveOption) error { +func (c *defaultConfig) Save(ctx context.Context, opts ...SaveOption) error { + return c.funcSave(ctx, opts...) +} + +func (c *defaultConfig) fnSave(ctx context.Context, opts ...SaveOption) error { if c.opts.SkipSave != nil && c.opts.SkipSave(ctx, c) { return nil } @@ -319,7 +345,7 @@ func (c *defaultConfig) Name() string { return c.opts.Name } -func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { +func (c *defaultConfig) Watch(_ context.Context, _ ...WatchOption) (Watcher, error) { return nil, ErrWatcherNotImplemented } @@ -329,5 +355,9 @@ func NewConfig(opts ...Option) Config { if len(options.StructTag) == 0 { options.StructTag = "default" } - return &defaultConfig{opts: options} + c := &defaultConfig{opts: options} + c.funcLoad = c.fnLoad + c.funcSave = c.fnSave + + return c } diff --git a/config/default_test.go b/config/default_test.go index 40ee6924..a4d90650 100644 --- a/config/default_test.go +++ b/config/default_test.go @@ -41,6 +41,35 @@ func (c *cfgStructValue) Validate() error { return nil } +type testHook struct { + f bool +} + +func (t *testHook) Load(fn config.FuncLoad) config.FuncLoad { + return func(ctx context.Context, opts ...config.LoadOption) error { + t.f = true + return fn(ctx, opts...) + } +} + +func TestHook(t *testing.T) { + h := &testHook{} + + c := config.NewConfig(config.Struct(h), config.Hooks(config.HookLoad(h.Load))) + + if err := c.Init(); err != nil { + t.Fatal(err) + } + + if err := c.Load(context.TODO()); err != nil { + t.Fatal(err) + } + + if !h.f { + t.Fatal("hook not works") + } +} + func TestDefault(t *testing.T) { ctx := context.Background() conf := &cfg{IntValue: 10} diff --git a/config/options.go b/config/options.go index 38a3fbec..62e57659 100644 --- a/config/options.go +++ b/config/options.go @@ -7,6 +7,7 @@ import ( "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/tracer" ) @@ -46,6 +47,8 @@ type Options struct { SkipLoad func(context.Context, Config) bool // SkipSave runs only if condition returns true SkipSave func(context.Context, Config) bool + // Hooks can be run before/after config Save/Load + Hooks options.Hooks } // Option function signature @@ -288,3 +291,10 @@ func WatchStruct(src interface{}) WatchOption { o.Struct = src } } + +// Hooks sets hook runs before action +func Hooks(h ...options.Hook) Option { + return func(o *Options) { + o.Hooks = append(o.Hooks, h...) + } +} diff --git a/options/hooks.go b/options/hooks.go index 48a45ad2..99e66f40 100644 --- a/options/hooks.go +++ b/options/hooks.go @@ -1,4 +1,4 @@ -package options // import "go.unistack.org/micro/v3/options" +package options // Hook func interface type Hook interface{} diff --git a/server/handler.go b/server/handler.go deleted file mode 100644 index 8b99a0ff..00000000 --- a/server/handler.go +++ /dev/null @@ -1,59 +0,0 @@ -package server - -import ( - "reflect" - - "go.unistack.org/micro/v3/register" -) - -type rpcHandler struct { - opts HandlerOptions - handler interface{} - name string - endpoints []*register.Endpoint -} - -func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler { - options := NewHandlerOptions(opts...) - - typ := reflect.TypeOf(handler) - hdlr := reflect.ValueOf(handler) - name := reflect.Indirect(hdlr).Type().Name() - - var endpoints []*register.Endpoint - - for m := 0; m < typ.NumMethod(); m++ { - if e := register.ExtractEndpoint(typ.Method(m)); e != nil { - e.Name = name + "." + e.Name - - for k, v := range options.Metadata[e.Name] { - e.Metadata[k] = v - } - - endpoints = append(endpoints, e) - } - } - - return &rpcHandler{ - name: name, - handler: handler, - endpoints: endpoints, - opts: options, - } -} - -func (r *rpcHandler) Name() string { - return r.name -} - -func (r *rpcHandler) Handler() interface{} { - return r.handler -} - -func (r *rpcHandler) Endpoints() []*register.Endpoint { - return r.endpoints -} - -func (r *rpcHandler) Options() HandlerOptions { - return r.opts -} diff --git a/server/noop.go b/server/noop.go index c397eee8..f2bc031f 100644 --- a/server/noop.go +++ b/server/noop.go @@ -1,14 +1,22 @@ package server import ( + "bytes" + "context" "fmt" + "reflect" + "runtime/debug" "sort" + "strings" "sync" "time" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/codec" + "go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/register" maddr "go.unistack.org/micro/v3/util/addr" mnet "go.unistack.org/micro/v3/util/net" @@ -24,6 +32,58 @@ const ( defaultContentType = "application/json" ) +type rpcHandler struct { + opts HandlerOptions + handler interface{} + name string + endpoints []*register.Endpoint +} + +func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler { + options := NewHandlerOptions(opts...) + + typ := reflect.TypeOf(handler) + hdlr := reflect.ValueOf(handler) + name := reflect.Indirect(hdlr).Type().Name() + + var endpoints []*register.Endpoint + + for m := 0; m < typ.NumMethod(); m++ { + if e := register.ExtractEndpoint(typ.Method(m)); e != nil { + e.Name = name + "." + e.Name + + for k, v := range options.Metadata[e.Name] { + e.Metadata[k] = v + } + + endpoints = append(endpoints, e) + } + } + + return &rpcHandler{ + name: name, + handler: handler, + endpoints: endpoints, + opts: options, + } +} + +func (r *rpcHandler) Name() string { + return r.name +} + +func (r *rpcHandler) Handler() interface{} { + return r.handler +} + +func (r *rpcHandler) Endpoints() []*register.Endpoint { + return r.endpoints +} + +func (r *rpcHandler) Options() HandlerOptions { + return r.opts +} + type noopServer struct { h Handler wg *sync.WaitGroup @@ -94,6 +154,35 @@ func (n *noopServer) Subscribe(sb Subscriber) error { return nil } +type rpcMessage struct { + payload interface{} + codec codec.Codec + header metadata.Metadata + topic string + contentType string + body []byte +} + +func (r *rpcMessage) ContentType() string { + return r.contentType +} + +func (r *rpcMessage) Topic() string { + return r.topic +} + +func (r *rpcMessage) Body() interface{} { + return r.payload +} + +func (r *rpcMessage) Header() metadata.Metadata { + return r.header +} + +func (r *rpcMessage) Codec() codec.Codec { + return r.codec +} + func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { return newRPCHandler(h, opts...) } @@ -478,3 +567,339 @@ func (n *noopServer) Stop() error { return err } + +func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { + var endpoints []*register.Endpoint + var handlers []*handler + + options := NewSubscriberOptions(opts...) + + if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { + h := &handler{ + method: reflect.ValueOf(sub), + } + + switch typ.NumIn() { + case 1: + h.reqType = typ.In(0) + case 2: + h.ctxType = typ.In(0) + h.reqType = typ.In(1) + } + + handlers = append(handlers, h) + ep := ®ister.Endpoint{ + Name: "Func", + Request: register.ExtractSubValue(typ), + Metadata: metadata.New(2), + } + ep.Metadata.Set("topic", topic) + ep.Metadata.Set("subscriber", "true") + endpoints = append(endpoints, ep) + } else { + hdlr := reflect.ValueOf(sub) + name := reflect.Indirect(hdlr).Type().Name() + + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + h := &handler{ + method: method.Func, + } + + switch method.Type.NumIn() { + case 2: + h.reqType = method.Type.In(1) + case 3: + h.ctxType = method.Type.In(1) + h.reqType = method.Type.In(2) + } + + handlers = append(handlers, h) + ep := ®ister.Endpoint{ + Name: name + "." + method.Name, + Request: register.ExtractSubValue(method.Type), + Metadata: metadata.New(2), + } + ep.Metadata.Set("topic", topic) + ep.Metadata.Set("subscriber", "true") + endpoints = append(endpoints, ep) + } + } + + return &subscriber{ + rcvr: reflect.ValueOf(sub), + typ: reflect.TypeOf(sub), + topic: topic, + subscriber: sub, + handlers: handlers, + endpoints: endpoints, + opts: options, + } +} + +//nolint:gocyclo +func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler { + return func(ps broker.Events) (err error) { + defer func() { + if r := recover(); r != nil { + n.RLock() + config := n.opts + n.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error(n.opts.Context, "panic recovered: ", r) + config.Logger.Error(n.opts.Context, string(debug.Stack())) + } + err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) + } + }() + + msgs := make([]Message, 0, len(ps)) + ctxs := make([]context.Context, 0, len(ps)) + for _, p := range ps { + msg := p.Message() + // if we don't have headers, create empty map + if msg.Header == nil { + msg.Header = metadata.New(2) + } + + ct, _ := msg.Header.Get(metadata.HeaderContentType) + if len(ct) == 0 { + msg.Header.Set(metadata.HeaderContentType, defaultContentType) + ct = defaultContentType + } + hdr := metadata.Copy(msg.Header) + topic, _ := msg.Header.Get(metadata.HeaderTopic) + ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr)) + msgs = append(msgs, &rpcMessage{ + topic: topic, + contentType: ct, + header: msg.Header, + body: msg.Body, + }) + } + results := make(chan error, len(sb.handlers)) + + for i := 0; i < len(sb.handlers); i++ { + handler := sb.handlers[i] + + var req reflect.Value + + switch handler.reqType.Kind() { + case reflect.Ptr: + req = reflect.New(handler.reqType.Elem()) + default: + req = reflect.New(handler.reqType.Elem()).Elem() + } + + reqType := handler.reqType + var cf codec.Codec + for _, msg := range msgs { + cf, err = n.newCodec(msg.ContentType()) + if err != nil { + return err + } + rb := reflect.New(req.Type().Elem()) + if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil { + return err + } + msg.(*rpcMessage).codec = cf + msg.(*rpcMessage).payload = rb.Interface() + } + + fn := func(ctxs []context.Context, ms []Message) error { + var vals []reflect.Value + if sb.typ.Kind() != reflect.Func { + vals = append(vals, sb.rcvr) + } + if handler.ctxType != nil { + vals = append(vals, reflect.ValueOf(ctxs)) + } + payloads := reflect.MakeSlice(reqType, 0, len(ms)) + for _, m := range ms { + payloads = reflect.Append(payloads, reflect.ValueOf(m.Body())) + } + vals = append(vals, payloads) + + returnValues := handler.method.Call(vals) + if rerr := returnValues[0].Interface(); rerr != nil { + return rerr.(error) + } + return nil + } + + opts.Hooks.EachNext(func(hook options.Hook) { + if h, ok := hook.(HookBatchSubHandler); ok { + fn = h(fn) + } + }) + + if n.wg != nil { + n.wg.Add(1) + } + + go func() { + if n.wg != nil { + defer n.wg.Done() + } + results <- fn(ctxs, msgs) + }() + } + + var errors []string + for i := 0; i < len(sb.handlers); i++ { + if rerr := <-results; rerr != nil { + errors = append(errors, rerr.Error()) + } + } + if len(errors) > 0 { + err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + } + return err + } +} + +//nolint:gocyclo +func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { + return func(p broker.Event) (err error) { + defer func() { + if r := recover(); r != nil { + n.RLock() + config := n.opts + n.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error(n.opts.Context, "panic recovered: ", r) + config.Logger.Error(n.opts.Context, string(debug.Stack())) + } + err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) + } + }() + + msg := p.Message() + // if we don't have headers, create empty map + if msg.Header == nil { + msg.Header = metadata.New(2) + } + + ct := msg.Header["Content-Type"] + if len(ct) == 0 { + msg.Header.Set(metadata.HeaderContentType, defaultContentType) + ct = defaultContentType + } + cf, err := n.newCodec(ct) + if err != nil { + return err + } + + hdr := metadata.New(len(msg.Header)) + for k, v := range msg.Header { + hdr.Set(k, v) + } + + ctx := metadata.NewIncomingContext(sb.opts.Context, hdr) + + results := make(chan error, len(sb.handlers)) + + for i := 0; i < len(sb.handlers); i++ { + handler := sb.handlers[i] + + var isVal bool + var req reflect.Value + + if handler.reqType.Kind() == reflect.Ptr { + req = reflect.New(handler.reqType.Elem()) + } else { + req = reflect.New(handler.reqType) + isVal = true + } + if isVal { + req = req.Elem() + } + + if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil { + return err + } + + fn := func(ctx context.Context, msg Message) error { + var vals []reflect.Value + if sb.typ.Kind() != reflect.Func { + vals = append(vals, sb.rcvr) + } + if handler.ctxType != nil { + vals = append(vals, reflect.ValueOf(ctx)) + } + + vals = append(vals, reflect.ValueOf(msg.Body())) + + returnValues := handler.method.Call(vals) + if rerr := returnValues[0].Interface(); rerr != nil { + return rerr.(error) + } + return nil + } + + opts.Hooks.EachNext(func(hook options.Hook) { + if h, ok := hook.(HookSubHandler); ok { + fn = h(fn) + } + }) + + if n.wg != nil { + n.wg.Add(1) + } + go func() { + if n.wg != nil { + defer n.wg.Done() + } + cerr := fn(ctx, &rpcMessage{ + topic: sb.topic, + contentType: ct, + payload: req.Interface(), + header: msg.Header, + }) + results <- cerr + }() + } + var errors []string + for i := 0; i < len(sb.handlers); i++ { + if rerr := <-results; rerr != nil { + errors = append(errors, rerr.Error()) + } + } + if len(errors) > 0 { + err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + } + return err + } +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Subscriber() interface{} { + return s.subscriber +} + +func (s *subscriber) Endpoints() []*register.Endpoint { + return s.endpoints +} + +func (s *subscriber) Options() SubscriberOptions { + return s.opts +} + +type subscriber struct { + typ reflect.Type + subscriber interface{} + topic string + endpoints []*register.Endpoint + handlers []*handler + opts SubscriberOptions + rcvr reflect.Value +} + +type handler struct { + reqType reflect.Type + ctxType reflect.Type + method reflect.Value +} diff --git a/server/options.go b/server/options.go index e320ff86..093b5e0e 100644 --- a/server/options.go +++ b/server/options.go @@ -69,12 +69,6 @@ type Options struct { Advertise string // Version holds the server version Version string - // SubWrappers holds the server subscribe wrappers - SubWrappers []SubscriberWrapper - // BatchSubWrappers holds the server batch subscribe wrappers - BatchSubWrappers []BatchSubscriberWrapper - // HdlrWrappers holds the handler wrappers - HdlrWrappers []HandlerWrapper // RegisterAttempts holds the number of register attempts before error RegisterAttempts int // RegisterInterval holds he interval for re-register @@ -85,7 +79,8 @@ type Options struct { MaxConn int // DeregisterAttempts holds the number of deregister attempts before error DeregisterAttempts int - // Hooks may contains SubscriberWrapper, HandlerWrapper or Server func wrapper + // Hooks may contains hook actions that performs before/after server handler + // or server subscriber handler Hooks options.Hooks // GracefulTimeout timeout for graceful stop server GracefulTimeout time.Duration @@ -287,27 +282,6 @@ func Wait(wg *sync.WaitGroup) Option { } } -// WrapHandler adds a handler Wrapper to a list of options passed into the server -func WrapHandler(w HandlerWrapper) Option { - return func(o *Options) { - o.HdlrWrappers = append(o.HdlrWrappers, w) - } -} - -// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server -func WrapSubscriber(w SubscriberWrapper) Option { - return func(o *Options) { - o.SubWrappers = append(o.SubWrappers, w) - } -} - -// WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server -func WrapBatchSubscriber(w BatchSubscriberWrapper) Option { - return func(o *Options) { - o.BatchSubWrappers = append(o.BatchSubWrappers, w) - } -} - // MaxConn specifies maximum number of max simultaneous connections to server func MaxConn(n int) Option { return func(o *Options) { @@ -461,3 +435,10 @@ func SubscriberBatchWait(td time.Duration) SubscriberOption { o.BatchWait = td } } + +// Hooks sets hook runs before action +func Hooks(h ...options.Hook) Option { + return func(o *Options) { + o.Hooks = append(o.Hooks, h...) + } +} diff --git a/server/request.go b/server/request.go deleted file mode 100644 index 6b20e988..00000000 --- a/server/request.go +++ /dev/null @@ -1,35 +0,0 @@ -package server - -import ( - "go.unistack.org/micro/v3/codec" - "go.unistack.org/micro/v3/metadata" -) - -type rpcMessage struct { - payload interface{} - codec codec.Codec - header metadata.Metadata - topic string - contentType string - body []byte -} - -func (r *rpcMessage) ContentType() string { - return r.contentType -} - -func (r *rpcMessage) Topic() string { - return r.topic -} - -func (r *rpcMessage) Body() interface{} { - return r.payload -} - -func (r *rpcMessage) Header() metadata.Metadata { - return r.header -} - -func (r *rpcMessage) Codec() codec.Codec { - return r.codec -} diff --git a/server/server.go b/server/server.go index f6ea3956..eca9e82e 100644 --- a/server/server.go +++ b/server/server.go @@ -62,6 +62,13 @@ type Server interface { String() string } +type ( + FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error + HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler + FuncSubHandler func(ctx context.Context, ms Message) error + HookSubHandler func(next FuncSubHandler) FuncSubHandler +) + /* // Router handle serving messages type Router interface { @@ -147,12 +154,11 @@ type Stream interface { // // Example: // -// type Greeter struct {} -// -// func (g *Greeter) Hello(context, request, response) error { -// return nil -// } +// type Greeter struct {} // +// func (g *Greeter) Hello(context, request, response) error { +// return nil +// } type Handler interface { Name() string Handler() interface{} diff --git a/server/subscriber.go b/server/subscriber.go index 82688e4d..beed7dfe 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -1,21 +1,11 @@ package server import ( - "bytes" - "context" "fmt" "reflect" - "runtime/debug" "strings" "unicode" "unicode/utf8" - - "go.unistack.org/micro/v3/broker" - "go.unistack.org/micro/v3/codec" - "go.unistack.org/micro/v3/errors" - "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/register" ) const ( @@ -27,26 +17,10 @@ const ( // because Typeof takes an empty interface value. This is annoying. var typeOfError = reflect.TypeOf((*error)(nil)).Elem() -type handler struct { - reqType reflect.Type - ctxType reflect.Type - method reflect.Value -} - -type subscriber struct { - typ reflect.Type - subscriber interface{} - topic string - endpoints []*register.Endpoint - handlers []*handler - opts SubscriberOptions - rcvr reflect.Value -} - // Is this an exported - upper case - name? func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) + r, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(r) } // Is this type exported or a builtin? @@ -120,318 +94,3 @@ func ValidateSubscriber(sub Subscriber) error { return nil } - -func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { - var endpoints []*register.Endpoint - var handlers []*handler - - options := NewSubscriberOptions(opts...) - - if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { - h := &handler{ - method: reflect.ValueOf(sub), - } - - switch typ.NumIn() { - case 1: - h.reqType = typ.In(0) - case 2: - h.ctxType = typ.In(0) - h.reqType = typ.In(1) - } - - handlers = append(handlers, h) - ep := ®ister.Endpoint{ - Name: "Func", - Request: register.ExtractSubValue(typ), - Metadata: metadata.New(2), - } - ep.Metadata.Set("topic", topic) - ep.Metadata.Set("subscriber", "true") - endpoints = append(endpoints, ep) - } else { - hdlr := reflect.ValueOf(sub) - name := reflect.Indirect(hdlr).Type().Name() - - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - h := &handler{ - method: method.Func, - } - - switch method.Type.NumIn() { - case 2: - h.reqType = method.Type.In(1) - case 3: - h.ctxType = method.Type.In(1) - h.reqType = method.Type.In(2) - } - - handlers = append(handlers, h) - ep := ®ister.Endpoint{ - Name: name + "." + method.Name, - Request: register.ExtractSubValue(method.Type), - Metadata: metadata.New(2), - } - ep.Metadata.Set("topic", topic) - ep.Metadata.Set("subscriber", "true") - endpoints = append(endpoints, ep) - } - } - - return &subscriber{ - rcvr: reflect.ValueOf(sub), - typ: reflect.TypeOf(sub), - topic: topic, - subscriber: sub, - handlers: handlers, - endpoints: endpoints, - opts: options, - } -} - -//nolint:gocyclo -func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler { - return func(ps broker.Events) (err error) { - defer func() { - if r := recover(); r != nil { - n.RLock() - config := n.opts - n.RUnlock() - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(n.opts.Context, "panic recovered: ", r) - config.Logger.Error(n.opts.Context, string(debug.Stack())) - } - err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) - } - }() - - msgs := make([]Message, 0, len(ps)) - ctxs := make([]context.Context, 0, len(ps)) - for _, p := range ps { - msg := p.Message() - // if we don't have headers, create empty map - if msg.Header == nil { - msg.Header = metadata.New(2) - } - - ct, _ := msg.Header.Get(metadata.HeaderContentType) - if len(ct) == 0 { - msg.Header.Set(metadata.HeaderContentType, defaultContentType) - ct = defaultContentType - } - hdr := metadata.Copy(msg.Header) - topic, _ := msg.Header.Get(metadata.HeaderTopic) - ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr)) - msgs = append(msgs, &rpcMessage{ - topic: topic, - contentType: ct, - header: msg.Header, - body: msg.Body, - }) - } - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var req reflect.Value - - switch handler.reqType.Kind() { - case reflect.Ptr: - req = reflect.New(handler.reqType.Elem()) - default: - req = reflect.New(handler.reqType.Elem()).Elem() - } - - reqType := handler.reqType - var cf codec.Codec - for _, msg := range msgs { - cf, err = n.newCodec(msg.ContentType()) - if err != nil { - return err - } - rb := reflect.New(req.Type().Elem()) - if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil { - return err - } - msg.(*rpcMessage).codec = cf - msg.(*rpcMessage).payload = rb.Interface() - } - - fn := func(ctxs []context.Context, ms []Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctxs)) - } - payloads := reflect.MakeSlice(reqType, 0, len(ms)) - for _, m := range ms { - payloads = reflect.Append(payloads, reflect.ValueOf(m.Body())) - } - vals = append(vals, payloads) - - returnValues := handler.method.Call(vals) - if rerr := returnValues[0].Interface(); rerr != nil { - return rerr.(error) - } - return nil - } - - for i := len(opts.BatchSubWrappers); i > 0; i-- { - fn = opts.BatchSubWrappers[i-1](fn) - } - - if n.wg != nil { - n.wg.Add(1) - } - go func() { - if n.wg != nil { - defer n.wg.Done() - } - results <- fn(ctxs, msgs) - }() - } - - var errors []string - for i := 0; i < len(sb.handlers); i++ { - if rerr := <-results; rerr != nil { - errors = append(errors, rerr.Error()) - } - } - if len(errors) > 0 { - err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - return err - } -} - -//nolint:gocyclo -func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { - return func(p broker.Event) (err error) { - defer func() { - if r := recover(); r != nil { - n.RLock() - config := n.opts - n.RUnlock() - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(n.opts.Context, "panic recovered: ", r) - config.Logger.Error(n.opts.Context, string(debug.Stack())) - } - err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) - } - }() - - msg := p.Message() - // if we don't have headers, create empty map - if msg.Header == nil { - msg.Header = metadata.New(2) - } - - ct := msg.Header["Content-Type"] - if len(ct) == 0 { - msg.Header.Set(metadata.HeaderContentType, defaultContentType) - ct = defaultContentType - } - cf, err := n.newCodec(ct) - if err != nil { - return err - } - - hdr := metadata.New(len(msg.Header)) - for k, v := range msg.Header { - hdr.Set(k, v) - } - - ctx := metadata.NewIncomingContext(sb.opts.Context, hdr) - - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var isVal bool - var req reflect.Value - - if handler.reqType.Kind() == reflect.Ptr { - req = reflect.New(handler.reqType.Elem()) - } else { - req = reflect.New(handler.reqType) - isVal = true - } - if isVal { - req = req.Elem() - } - - if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil { - return err - } - - fn := func(ctx context.Context, msg Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctx)) - } - - vals = append(vals, reflect.ValueOf(msg.Body())) - - returnValues := handler.method.Call(vals) - if rerr := returnValues[0].Interface(); rerr != nil { - return rerr.(error) - } - return nil - } - - for i := len(opts.SubWrappers); i > 0; i-- { - fn = opts.SubWrappers[i-1](fn) - } - - if n.wg != nil { - n.wg.Add(1) - } - go func() { - if n.wg != nil { - defer n.wg.Done() - } - cerr := fn(ctx, &rpcMessage{ - topic: sb.topic, - contentType: ct, - payload: req.Interface(), - header: msg.Header, - }) - results <- cerr - }() - } - var errors []string - for i := 0; i < len(sb.handlers); i++ { - if rerr := <-results; rerr != nil { - errors = append(errors, rerr.Error()) - } - } - if len(errors) > 0 { - err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - return err - } -} - -func (s *subscriber) Topic() string { - return s.topic -} - -func (s *subscriber) Subscriber() interface{} { - return s.subscriber -} - -func (s *subscriber) Endpoints() []*register.Endpoint { - return s.endpoints -} - -func (s *subscriber) Options() SubscriberOptions { - return s.opts -} diff --git a/store/memory.go b/store/memory/memory.go similarity index 59% rename from store/memory.go rename to store/memory/memory.go index 0e27ebec..7f012fab 100644 --- a/store/memory.go +++ b/store/memory/memory.go @@ -1,4 +1,4 @@ -package store +package memory import ( "context" @@ -6,13 +6,15 @@ import ( "strings" "time" - "github.com/patrickmn/go-cache" + cache "github.com/patrickmn/go-cache" + "go.unistack.org/micro/v3/options" + "go.unistack.org/micro/v3/store" ) // NewStore returns a memory store -func NewStore(opts ...Option) Store { +func NewStore(opts ...store.Option) store.Store { return &memoryStore{ - opts: NewOptions(opts...), + opts: store.NewOptions(opts...), store: cache.New(cache.NoExpiration, 5*time.Minute), } } @@ -27,8 +29,13 @@ func (m *memoryStore) Disconnect(ctx context.Context) error { } type memoryStore struct { - store *cache.Cache - opts Options + funcRead store.FuncRead + funcWrite store.FuncWrite + funcExists store.FuncExists + funcList store.FuncList + funcDelete store.FuncDelete + store *cache.Cache + opts store.Options } func (m *memoryStore) key(prefix, key string) string { @@ -39,7 +46,7 @@ func (m *memoryStore) exists(prefix, key string) error { key = m.key(prefix, key) _, found := m.store.Get(key) if !found { - return ErrNotFound + return store.ErrNotFound } return nil @@ -50,12 +57,12 @@ func (m *memoryStore) get(prefix, key string, val interface{}) error { r, found := m.store.Get(key) if !found { - return ErrNotFound + return store.ErrNotFound } buf, ok := r.([]byte) if !ok { - return ErrNotFound + return store.ErrNotFound } return m.opts.Codec.Unmarshal(buf, val) @@ -100,10 +107,32 @@ func (m *memoryStore) list(prefix string, limit, offset uint) []string { return allKeys } -func (m *memoryStore) Init(opts ...Option) error { +func (m *memoryStore) Init(opts ...store.Option) error { for _, o := range opts { o(&m.opts) } + + m.funcRead = m.fnRead + m.funcWrite = m.fnWrite + m.funcExists = m.fnExists + m.funcList = m.fnList + m.funcDelete = m.fnDelete + + m.opts.Hooks.EachNext(func(hook options.Hook) { + switch h := hook.(type) { + case store.HookRead: + m.funcRead = h(m.funcRead) + case store.HookWrite: + m.funcWrite = h(m.funcWrite) + case store.HookExists: + m.funcExists = h(m.funcExists) + case store.HookList: + m.funcList = h(m.funcList) + case store.HookDelete: + m.funcDelete = h(m.funcDelete) + } + }) + return nil } @@ -115,24 +144,36 @@ func (m *memoryStore) Name() string { return m.opts.Name } -func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error { - options := NewExistsOptions(opts...) +func (m *memoryStore) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error { + return m.funcExists(ctx, key, opts...) +} + +func (m *memoryStore) fnExists(ctx context.Context, key string, opts ...store.ExistsOption) error { + options := store.NewExistsOptions(opts...) if options.Namespace == "" { options.Namespace = m.opts.Namespace } return m.exists(options.Namespace, key) } -func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error { - options := NewReadOptions(opts...) +func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error { + return m.funcRead(ctx, key, val, opts...) +} + +func (m *memoryStore) fnRead(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error { + options := store.NewReadOptions(opts...) if options.Namespace == "" { options.Namespace = m.opts.Namespace } return m.get(options.Namespace, key, val) } -func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error { - options := NewWriteOptions(opts...) +func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error { + return m.funcWrite(ctx, key, val, opts...) +} + +func (m *memoryStore) fnWrite(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error { + options := store.NewWriteOptions(opts...) if options.Namespace == "" { options.Namespace = m.opts.Namespace } @@ -151,8 +192,12 @@ func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, op return nil } -func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error { - options := NewDeleteOptions(opts...) +func (m *memoryStore) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error { + return m.funcDelete(ctx, key, opts...) +} + +func (m *memoryStore) fnDelete(ctx context.Context, key string, opts ...store.DeleteOption) error { + options := store.NewDeleteOptions(opts...) if options.Namespace == "" { options.Namespace = m.opts.Namespace } @@ -161,12 +206,16 @@ func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOpti return nil } -func (m *memoryStore) Options() Options { +func (m *memoryStore) Options() store.Options { return m.opts } -func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) { - options := NewListOptions(opts...) +func (m *memoryStore) List(ctx context.Context, opts ...store.ListOption) ([]string, error) { + return m.funcList(ctx, opts...) +} + +func (m *memoryStore) fnList(ctx context.Context, opts ...store.ListOption) ([]string, error) { + options := store.NewListOptions(opts...) if options.Namespace == "" { options.Namespace = m.opts.Namespace } diff --git a/store/memory_test.go b/store/memory/memory_test.go similarity index 66% rename from store/memory_test.go rename to store/memory/memory_test.go index da8a9ad0..598f00eb 100644 --- a/store/memory_test.go +++ b/store/memory/memory_test.go @@ -1,4 +1,4 @@ -package store_test +package memory import ( "context" @@ -8,8 +8,41 @@ import ( "go.unistack.org/micro/v3/store" ) +type testHook struct { + f bool +} + +func (t *testHook) Exists(fn store.FuncExists) store.FuncExists { + return func(ctx context.Context, key string, opts ...store.ExistsOption) error { + t.f = true + return fn(ctx, key, opts...) + } +} + +func TestHook(t *testing.T) { + h := &testHook{} + + s := NewStore(store.Hooks(store.HookExists(h.Exists))) + + if err := s.Init(); err != nil { + t.Fatal(err) + } + + if err := s.Write(context.TODO(), "test", nil); err != nil { + t.Error(err) + } + + if err := s.Exists(context.TODO(), "test"); err != nil { + t.Fatal(err) + } + + if !h.f { + t.Fatal("hook not works") + } +} + func TestMemoryReInit(t *testing.T) { - s := store.NewStore(store.Namespace("aaa")) + s := NewStore(store.Namespace("aaa")) if err := s.Init(store.Namespace("")); err != nil { t.Fatal(err) } @@ -19,7 +52,7 @@ func TestMemoryReInit(t *testing.T) { } func TestMemoryBasic(t *testing.T) { - s := store.NewStore() + s := NewStore() if err := s.Init(); err != nil { t.Fatal(err) } @@ -27,7 +60,7 @@ func TestMemoryBasic(t *testing.T) { } func TestMemoryPrefix(t *testing.T) { - s := store.NewStore() + s := NewStore() if err := s.Init(store.Namespace("some-prefix")); err != nil { t.Fatal(err) } @@ -35,7 +68,7 @@ func TestMemoryPrefix(t *testing.T) { } func TestMemoryNamespace(t *testing.T) { - s := store.NewStore() + s := NewStore() if err := s.Init(store.Namespace("some-namespace")); err != nil { t.Fatal(err) } @@ -43,7 +76,7 @@ func TestMemoryNamespace(t *testing.T) { } func TestMemoryNamespacePrefix(t *testing.T) { - s := store.NewStore() + s := NewStore() if err := s.Init(store.Namespace("some-namespace")); err != nil { t.Fatal(err) } diff --git a/store/noop.go b/store/noop.go new file mode 100644 index 00000000..c7e2922d --- /dev/null +++ b/store/noop.go @@ -0,0 +1,147 @@ +package store + +import ( + "context" + + "go.unistack.org/micro/v3/options" +) + +var _ Store = (*noopStore)(nil) + +type noopStore struct { + funcRead FuncRead + funcWrite FuncWrite + funcExists FuncExists + funcList FuncList + funcDelete FuncDelete + opts Options +} + +func NewStore(opts ...Option) *noopStore { + options := NewOptions(opts...) + return &noopStore{opts: options} +} + +func (n *noopStore) Init(opts ...Option) error { + for _, o := range opts { + o(&n.opts) + } + + n.funcRead = n.fnRead + n.funcWrite = n.fnWrite + n.funcExists = n.fnExists + n.funcList = n.fnList + n.funcDelete = n.fnDelete + + n.opts.Hooks.EachNext(func(hook options.Hook) { + switch h := hook.(type) { + case HookRead: + n.funcRead = h(n.funcRead) + case HookWrite: + n.funcWrite = h(n.funcWrite) + case HookExists: + n.funcExists = h(n.funcExists) + case HookList: + n.funcList = h(n.funcList) + case HookDelete: + n.funcDelete = h(n.funcDelete) + } + }) + + return nil +} + +func (n *noopStore) Connect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil +} + +func (n *noopStore) Disconnect(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil +} + +func (n *noopStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error { + return n.funcRead(ctx, key, val, opts...) +} + +func (n *noopStore) fnRead(ctx context.Context, key string, val interface{}, opts ...ReadOption) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil +} + +func (n *noopStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error { + return n.funcDelete(ctx, key, opts...) +} + +func (n *noopStore) fnDelete(ctx context.Context, key string, opts ...DeleteOption) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil +} + +func (n *noopStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error { + return n.funcExists(ctx, key, opts...) +} + +func (n *noopStore) fnExists(ctx context.Context, key string, opts ...ExistsOption) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil +} + +func (n *noopStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error { + return n.funcWrite(ctx, key, val, opts...) +} + +func (n *noopStore) fnWrite(ctx context.Context, key string, val interface{}, opts ...WriteOption) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil +} + +func (n *noopStore) List(ctx context.Context, opts ...ListOption) ([]string, error) { + return n.funcList(ctx, opts...) +} + +func (n *noopStore) fnList(ctx context.Context, opts ...ListOption) ([]string, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + return nil, nil +} + +func (n *noopStore) Name() string { + return n.opts.Name +} + +func (n *noopStore) String() string { + return "noop" +} + +func (n *noopStore) Options() Options { + return n.opts +} diff --git a/store/noop_test.go b/store/noop_test.go new file mode 100644 index 00000000..19fab2d7 --- /dev/null +++ b/store/noop_test.go @@ -0,0 +1,35 @@ +package store + +import ( + "context" + "testing" +) + +type testHook struct { + f bool +} + +func (t *testHook) Exists(fn FuncExists) FuncExists { + return func(ctx context.Context, key string, opts ...ExistsOption) error { + t.f = true + return fn(ctx, key, opts...) + } +} + +func TestHook(t *testing.T) { + h := &testHook{} + + s := NewStore(Hooks(HookExists(h.Exists))) + + if err := s.Init(); err != nil { + t.Fatal(err) + } + + if err := s.Exists(context.TODO(), "test"); err != nil { + t.Fatal(err) + } + + if !h.f { + t.Fatal("hook not works") + } +} diff --git a/store/options.go b/store/options.go index 6bbee1a4..438b745c 100644 --- a/store/options.go +++ b/store/options.go @@ -9,6 +9,7 @@ import ( "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/tracer" ) @@ -38,6 +39,8 @@ type Options struct { // Wrappers []Wrapper // Timeout specifies timeout duration for all operations Timeout time.Duration + // Hooks can be run before/after store Read/List/Write/Exists/Delete + Hooks options.Hooks } // NewOptions creates options struct @@ -441,11 +444,9 @@ func ExistsTimeout(td time.Duration) ExistsOption { } } -/* -// WrapStore adds a store Wrapper to a list of options passed into the store -func WrapStore(w Wrapper) Option { +// Hooks sets hook runs before action +func Hooks(h ...options.Hook) Option { return func(o *Options) { - o.Wrappers = append(o.Wrappers, w) + o.Hooks = append(o.Hooks, h...) } } -*/ diff --git a/store/store.go b/store/store.go index 896c1350..57646346 100644 --- a/store/store.go +++ b/store/store.go @@ -19,6 +19,7 @@ var ( // Store is a data storage interface type Store interface { + // Name returns store name Name() string // Init initialises the store Init(opts ...Option) error @@ -41,3 +42,16 @@ type Store interface { // String returns the name of the implementation. String() string } + +type ( + FuncExists func(ctx context.Context, key string, opts ...ExistsOption) error + HookExists func(next FuncExists) FuncExists + FuncRead func(ctx context.Context, key string, val interface{}, opts ...ReadOption) error + HookRead func(next FuncRead) FuncRead + FuncWrite func(ctx context.Context, key string, val interface{}, opts ...WriteOption) error + HookWrite func(next FuncWrite) FuncWrite + FuncDelete func(ctx context.Context, key string, opts ...DeleteOption) error + HookDelete func(next FuncDelete) FuncDelete + FuncList func(ctx context.Context, opts ...ListOption) ([]string, error) + HookList func(next FuncList) FuncList +) diff --git a/tracer/tracer_test.go b/tracer/tracer_test.go deleted file mode 100644 index 7388f577..00000000 --- a/tracer/tracer_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package tracer - -import ( - "testing" -) - -func TestUniqLabels(t *testing.T) { - labels := []interface{}{"key1", "val1", "key1", "val2"} - labels = UniqLabels(labels) - if labels[1] != "val2" { - t.Fatalf("UniqLabels not works") - } -}