From 8688179acd6167363f8b209374280cd305b988ef Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 30 Sep 2021 20:32:59 +0300 Subject: [PATCH] lint fixes Signed-off-by: Vasiliy Tolstov --- broker/memory.go | 168 ++++++++++++++++++------------------------- client/options.go | 12 ++-- config/options.go | 8 +-- logger/default.go | 6 +- logger/options.go | 8 +-- register/options.go | 10 +-- register/register.go | 2 +- store/options.go | 12 ++-- 8 files changed, 100 insertions(+), 126 deletions(-) diff --git a/broker/memory.go b/broker/memory.go index 5b9f4c6c..1ae2e8ad 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -89,36 +89,15 @@ func (m *memoryBroker) Init(opts ...Option) error { } func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { - m.RLock() - if !m.connected { - m.RUnlock() - return ErrNotConnected - } - m.RUnlock() - - options := NewPublishOptions(opts...) - vs := make([]msgWrapper, 0, 1) - if m.opts.Codec == nil || options.BodyOnly { - topic, _ := msg.Header.Get(metadata.HeaderTopic) - vs = append(vs, msgWrapper{topic: topic, body: msg}) - } else { - topic, _ := msg.Header.Get(metadata.HeaderTopic) - buf, err := m.opts.Codec.Marshal(msg) - if err != nil { - return err - } - vs = append(vs, msgWrapper{topic: topic, body: buf}) - } - - return m.publish(ctx, vs, opts...) -} - -type msgWrapper struct { - body interface{} - topic string + msg.Header.Set(metadata.HeaderTopic, topic) + return m.publish(ctx, []*Message{msg}, opts...) } func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { + return m.publish(ctx, msgs, opts...) +} + +func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { m.RLock() if !m.connected { m.RUnlock() @@ -126,89 +105,82 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts . } m.RUnlock() - options := NewPublishOptions(opts...) - vs := make([]msgWrapper, 0, len(msgs)) - if m.opts.Codec == nil || options.BodyOnly { - for _, msg := range msgs { - topic, _ := msg.Header.Get(metadata.HeaderTopic) - vs = append(vs, msgWrapper{topic: topic, body: msg}) - } - } else { - for _, msg := range msgs { - topic, _ := msg.Header.Get(metadata.HeaderTopic) - buf, err := m.opts.Codec.Marshal(msg) - if err != nil { - return err - } - vs = append(vs, msgWrapper{topic: topic, body: buf}) - } - } - - return m.publish(ctx, vs, opts...) -} - -func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error { var err error - msgTopicMap := make(map[string]Events) - for _, v := range vs { - p := &memoryEvent{ - topic: v.topic, - message: v.body, - opts: m.opts, - } - msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p) - } + select { + case <-ctx.Done(): + return ctx.Err() + default: + options := NewPublishOptions(opts...) - beh := m.opts.BatchErrorHandler - eh := m.opts.ErrorHandler + msgTopicMap := make(map[string]Events) + for _, v := range msgs { + p := &memoryEvent{opts: m.opts} - for t, ms := range msgTopicMap { - m.RLock() - subs, ok := m.subscribers[t] - m.RUnlock() - if !ok { - continue - } - - for _, sub := range subs { - if sub.opts.BatchErrorHandler != nil { - beh = sub.opts.BatchErrorHandler - } - if sub.opts.ErrorHandler != nil { - eh = sub.opts.ErrorHandler - } - - switch { - // batch processing - case sub.batchhandler != nil: - if err = sub.batchhandler(ms); err != nil { - ms.SetError(err) - if beh != nil { - _ = beh(ms) - } else if m.opts.Logger.V(logger.ErrorLevel) { - m.opts.Logger.Error(m.opts.Context, err.Error()) - } - } else if sub.opts.AutoAck { - if err = ms.Ack(); err != nil { - m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) - } + if m.opts.Codec == nil || options.BodyOnly { + p.topic, _ = v.Header.Get(metadata.HeaderTopic) + p.message = v.Body + } else { + p.topic, _ = v.Header.Get(metadata.HeaderTopic) + buf, err := m.opts.Codec.Marshal(v) + if err != nil { + return err } - // single processing - case sub.handler != nil: - for _, p := range ms { - if err = sub.handler(p); err != nil { - p.SetError(err) - if eh != nil { - _ = eh(p) + p.message = buf + } + msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p) + } + + beh := m.opts.BatchErrorHandler + eh := m.opts.ErrorHandler + + for t, ms := range msgTopicMap { + m.RLock() + subs, ok := m.subscribers[t] + m.RUnlock() + if !ok { + continue + } + + for _, sub := range subs { + if sub.opts.BatchErrorHandler != nil { + beh = sub.opts.BatchErrorHandler + } + if sub.opts.ErrorHandler != nil { + eh = sub.opts.ErrorHandler + } + + switch { + // batch processing + case sub.batchhandler != nil: + if err = sub.batchhandler(ms); err != nil { + ms.SetError(err) + if beh != nil { + _ = beh(ms) } else if m.opts.Logger.V(logger.ErrorLevel) { m.opts.Logger.Error(m.opts.Context, err.Error()) } } else if sub.opts.AutoAck { - if err = p.Ack(); err != nil { + if err = ms.Ack(); err != nil { m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) } } + // single processing + case sub.handler != nil: + for _, p := range ms { + if err = sub.handler(p); err != nil { + p.SetError(err) + if eh != nil { + _ = eh(p) + } else if m.opts.Logger.V(logger.ErrorLevel) { + m.opts.Logger.Error(m.opts.Context, err.Error()) + } + } else if sub.opts.AutoAck { + if err = p.Ack(); err != nil { + m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + } + } + } } } } diff --git a/client/options.go b/client/options.go index e7d50f2d..1a6a996f 100644 --- a/client/options.go +++ b/client/options.go @@ -19,6 +19,8 @@ import ( // Options holds client options type Options struct { + // Router used to get route + Router router.Router // Selector used to select needed address Selector selector.Selector // Logger used to log messages @@ -29,18 +31,18 @@ type Options struct { Broker broker.Broker // Meter used for metrics Meter meter.Meter - // Router used to get route - Router router.Router // Transport used for transfer messages Transport transport.Transport // Context is used for external options Context context.Context - // Lookup func used to get destination addr - Lookup LookupFunc // Codecs map Codecs map[string]codec.Codec + // Lookup func used to get destination addr + Lookup LookupFunc // TLSConfig specifies tls.Config for secure connection TLSConfig *tls.Config + // CallOptions contains default CallOptions + CallOptions CallOptions // Proxy is used for proxy requests Proxy string // ContentType is used to select codec @@ -49,8 +51,6 @@ type Options struct { Name string // Wrappers contains wrappers Wrappers []Wrapper - // CallOptions contains default CallOptions - CallOptions CallOptions // PoolSize connection pool size PoolSize int // PoolTTL connection pool ttl diff --git a/config/options.go b/config/options.go index 3f97f20f..21ee0b25 100644 --- a/config/options.go +++ b/config/options.go @@ -209,14 +209,14 @@ func Name(n string) Option { type WatchOptions struct { // Context used by non default options Context context.Context - // Coalesce multiple events to one - Coalesce bool + // Struct for filling + Struct interface{} // MinInterval specifies the min time.Duration interval for poll changes MinInterval time.Duration // MaxInterval specifies the max time.Duration interval for poll changes MaxInterval time.Duration - // Struct for filling - Struct interface{} + // Coalesce multiple events to one + Coalesce bool } type WatchOption func(*WatchOptions) diff --git a/logger/default.go b/logger/default.go index a1ca3e9a..87d02e06 100644 --- a/logger/default.go +++ b/logger/default.go @@ -12,11 +12,11 @@ import ( ) type defaultLogger struct { - enc *json.Encoder - opts Options - sync.RWMutex + enc *json.Encoder logFunc LogFunc logfFunc LogfFunc + opts Options + sync.RWMutex } // Init(opts...) should only overwrite provided options diff --git a/logger/options.go b/logger/options.go index 9dc7e87b..4918eb45 100644 --- a/logger/options.go +++ b/logger/options.go @@ -19,12 +19,12 @@ type Options struct { Fields []interface{} // Name holds the logger name Name string - // CallerSkipCount number of frmaes to skip - CallerSkipCount int - // The logging level the logger should log - Level Level // Wrappers logger wrapper that called before actual Log/Logf function Wrappers []Wrapper + // The logging level the logger should log + Level Level + // CallerSkipCount number of frmaes to skip + CallerSkipCount int } // NewOptions creates new options struct diff --git a/register/options.go b/register/options.go index efd75f85..c2b2071b 100644 --- a/register/options.go +++ b/register/options.go @@ -44,7 +44,7 @@ func NewOptions(opts ...Option) Options { return options } -// nolint: golint +// nolint: golint,revive // RegisterOptions holds options for register method type RegisterOptions struct { Context context.Context @@ -197,7 +197,7 @@ func TLSConfig(t *tls.Config) Option { } } -// nolint: golint +// nolint: golint,revive // RegisterAttempts specifies register atempts count func RegisterAttempts(t int) RegisterOption { return func(o *RegisterOptions) { @@ -205,7 +205,7 @@ func RegisterAttempts(t int) RegisterOption { } } -// nolint: golint +// nolint: golint,revive // RegisterTTL specifies register ttl func RegisterTTL(t time.Duration) RegisterOption { return func(o *RegisterOptions) { @@ -213,7 +213,7 @@ func RegisterTTL(t time.Duration) RegisterOption { } } -// nolint: golint +// nolint: golint,revive // RegisterContext sets the register context func RegisterContext(ctx context.Context) RegisterOption { return func(o *RegisterOptions) { @@ -221,7 +221,7 @@ func RegisterContext(ctx context.Context) RegisterOption { } } -// nolint: golint +// nolint: golint,revive // RegisterDomain secifies register domain func RegisterDomain(d string) RegisterOption { return func(o *RegisterOptions) { diff --git a/register/register.go b/register/register.go index f80e1106..60816781 100644 --- a/register/register.go +++ b/register/register.go @@ -68,8 +68,8 @@ type Endpoint struct { // Option func signature type Option func(*Options) +// nolint: golint,revive // RegisterOption option is used to register service -// nolint: golint type RegisterOption func(*RegisterOptions) // WatchOption option is used to watch service changes diff --git a/store/options.go b/store/options.go index 26ef2fc1..76bcabd8 100644 --- a/store/options.go +++ b/store/options.go @@ -339,9 +339,11 @@ func ExistsNamespace(ns string) ExistsOption { } } +/* // WrapStore adds a store Wrapper to a list of options passed into the store -//func WrapStore(w Wrapper) Option { -// return func(o *Options) { -// o.Wrappers = append(o.Wrappers, w) -// } -//} +func WrapStore(w Wrapper) Option { + return func(o *Options) { + o.Wrappers = append(o.Wrappers, w) + } +} +*/