Vasiliy Tolstov
ffa01de78f
* remove subscribe from server * remove publish from client * broker package refactoring Co-authored-by: vtolstov <vtolstov@users.noreply.github.com> Reviewed-on: #396 Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org> Co-committed-by: Vasiliy Tolstov <v.tolstov@unistack.org>
177 lines
3.5 KiB
Go
177 lines
3.5 KiB
Go
package broker
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
|
|
"go.unistack.org/micro/v4/codec"
|
|
"go.unistack.org/micro/v4/metadata"
|
|
"go.unistack.org/micro/v4/options"
|
|
)
|
|
|
|
type NoopBroker struct {
|
|
funcPublish FuncPublish
|
|
funcSubscribe FuncSubscribe
|
|
opts Options
|
|
sync.RWMutex
|
|
}
|
|
|
|
func (b *NoopBroker) newCodec(ct string) (codec.Codec, error) {
|
|
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
|
ct = ct[:idx]
|
|
}
|
|
b.RLock()
|
|
c, ok := b.opts.Codecs[ct]
|
|
b.RUnlock()
|
|
if ok {
|
|
return c, nil
|
|
}
|
|
return nil, codec.ErrUnknownContentType
|
|
}
|
|
|
|
func NewBroker(opts ...Option) *NoopBroker {
|
|
b := &NoopBroker{opts: NewOptions(opts...)}
|
|
b.funcPublish = b.fnPublish
|
|
b.funcSubscribe = b.fnSubscribe
|
|
|
|
return b
|
|
}
|
|
|
|
func (b *NoopBroker) Health() bool {
|
|
return true
|
|
}
|
|
|
|
func (b *NoopBroker) Live() bool {
|
|
return true
|
|
}
|
|
|
|
func (b *NoopBroker) Ready() bool {
|
|
return true
|
|
}
|
|
|
|
func (b *NoopBroker) Name() string {
|
|
return b.opts.Name
|
|
}
|
|
|
|
func (b *NoopBroker) String() string {
|
|
return "noop"
|
|
}
|
|
|
|
func (b *NoopBroker) Options() Options {
|
|
return b.opts
|
|
}
|
|
|
|
func (b *NoopBroker) Init(opts ...Option) error {
|
|
for _, opt := range opts {
|
|
opt(&b.opts)
|
|
}
|
|
|
|
b.funcPublish = b.fnPublish
|
|
b.funcSubscribe = b.fnSubscribe
|
|
|
|
b.opts.Hooks.EachPrev(func(hook options.Hook) {
|
|
switch h := hook.(type) {
|
|
case HookPublish:
|
|
b.funcPublish = h(b.funcPublish)
|
|
case HookSubscribe:
|
|
b.funcSubscribe = h(b.funcSubscribe)
|
|
}
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *NoopBroker) Connect(_ context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (b *NoopBroker) Disconnect(_ context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (b *NoopBroker) Address() string {
|
|
return strings.Join(b.opts.Addrs, ",")
|
|
}
|
|
|
|
type noopMessage struct {
|
|
c codec.Codec
|
|
ctx context.Context
|
|
body []byte
|
|
hdr metadata.Metadata
|
|
opts PublishOptions
|
|
}
|
|
|
|
func (m *noopMessage) Ack() error {
|
|
return nil
|
|
}
|
|
|
|
func (m *noopMessage) Body() []byte {
|
|
return m.body
|
|
}
|
|
|
|
func (m *noopMessage) Header() metadata.Metadata {
|
|
return m.hdr
|
|
}
|
|
|
|
func (m *noopMessage) Context() context.Context {
|
|
return m.ctx
|
|
}
|
|
|
|
func (m *noopMessage) Topic() string {
|
|
return ""
|
|
}
|
|
|
|
func (m *noopMessage) Unmarshal(dst interface{}, opts ...codec.Option) error {
|
|
return m.c.Unmarshal(m.body, dst)
|
|
}
|
|
|
|
func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) {
|
|
options := NewPublishOptions(opts...)
|
|
m := &noopMessage{ctx: ctx, hdr: hdr, opts: options}
|
|
c, err := b.newCodec(m.opts.ContentType)
|
|
if err == nil {
|
|
m.body, err = c.Marshal(body)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
func (b *NoopBroker) fnPublish(_ context.Context, _ string, _ ...Message) error {
|
|
return nil
|
|
}
|
|
|
|
func (b *NoopBroker) Publish(ctx context.Context, topic string, msg ...Message) error {
|
|
return b.funcPublish(ctx, topic, msg...)
|
|
}
|
|
|
|
type NoopSubscriber struct {
|
|
ctx context.Context
|
|
topic string
|
|
handler interface{}
|
|
opts SubscribeOptions
|
|
}
|
|
|
|
func (b *NoopBroker) fnSubscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) {
|
|
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil
|
|
}
|
|
|
|
func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) {
|
|
return b.funcSubscribe(ctx, topic, handler, opts...)
|
|
}
|
|
|
|
func (s *NoopSubscriber) Options() SubscribeOptions {
|
|
return s.opts
|
|
}
|
|
|
|
func (s *NoopSubscriber) Topic() string {
|
|
return s.topic
|
|
}
|
|
|
|
func (s *NoopSubscriber) Unsubscribe(_ context.Context) error {
|
|
return nil
|
|
}
|