lint fixes

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-09-30 20:32:59 +03:00
parent 3e40bac5f4
commit 8688179acd
8 changed files with 100 additions and 126 deletions

View File

@ -89,36 +89,15 @@ func (m *memoryBroker) Init(opts ...Option) error {
} }
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
m.RLock() msg.Header.Set(metadata.HeaderTopic, topic)
if !m.connected { return m.publish(ctx, []*Message{msg}, opts...)
m.RUnlock()
return ErrNotConnected
}
m.RUnlock()
options := NewPublishOptions(opts...)
vs := make([]msgWrapper, 0, 1)
if m.opts.Codec == nil || options.BodyOnly {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
vs = append(vs, msgWrapper{topic: topic, body: msg})
} else {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
return err
}
vs = append(vs, msgWrapper{topic: topic, body: buf})
}
return m.publish(ctx, vs, opts...)
}
type msgWrapper struct {
body interface{}
topic string
} }
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
return m.publish(ctx, msgs, opts...)
}
func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
m.RLock() m.RLock()
if !m.connected { if !m.connected {
m.RUnlock() m.RUnlock()
@ -126,89 +105,82 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts .
} }
m.RUnlock() m.RUnlock()
options := NewPublishOptions(opts...)
vs := make([]msgWrapper, 0, len(msgs))
if m.opts.Codec == nil || options.BodyOnly {
for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
vs = append(vs, msgWrapper{topic: topic, body: msg})
}
} else {
for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
return err
}
vs = append(vs, msgWrapper{topic: topic, body: buf})
}
}
return m.publish(ctx, vs, opts...)
}
func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error {
var err error var err error
msgTopicMap := make(map[string]Events) select {
for _, v := range vs { case <-ctx.Done():
p := &memoryEvent{ return ctx.Err()
topic: v.topic, default:
message: v.body, options := NewPublishOptions(opts...)
opts: m.opts,
}
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
}
beh := m.opts.BatchErrorHandler msgTopicMap := make(map[string]Events)
eh := m.opts.ErrorHandler for _, v := range msgs {
p := &memoryEvent{opts: m.opts}
for t, ms := range msgTopicMap { if m.opts.Codec == nil || options.BodyOnly {
m.RLock() p.topic, _ = v.Header.Get(metadata.HeaderTopic)
subs, ok := m.subscribers[t] p.message = v.Body
m.RUnlock() } else {
if !ok { p.topic, _ = v.Header.Get(metadata.HeaderTopic)
continue buf, err := m.opts.Codec.Marshal(v)
} if err != nil {
return err
for _, sub := range subs {
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
}
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
switch {
// batch processing
case sub.batchhandler != nil:
if err = sub.batchhandler(ms); err != nil {
ms.SetError(err)
if beh != nil {
_ = beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
} }
// single processing p.message = buf
case sub.handler != nil: }
for _, p := range ms { msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
if err = sub.handler(p); err != nil { }
p.SetError(err)
if eh != nil { beh := m.opts.BatchErrorHandler
_ = eh(p) eh := m.opts.ErrorHandler
for t, ms := range msgTopicMap {
m.RLock()
subs, ok := m.subscribers[t]
m.RUnlock()
if !ok {
continue
}
for _, sub := range subs {
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
}
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
switch {
// batch processing
case sub.batchhandler != nil:
if err = sub.batchhandler(ms); err != nil {
ms.SetError(err)
if beh != nil {
_ = beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) { } else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error()) m.opts.Logger.Error(m.opts.Context, err.Error())
} }
} else if sub.opts.AutoAck { } else if sub.opts.AutoAck {
if err = p.Ack(); err != nil { if err = ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
} }
} }
// single processing
case sub.handler != nil:
for _, p := range ms {
if err = sub.handler(p); err != nil {
p.SetError(err)
if eh != nil {
_ = eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
}
} }
} }
} }

View File

@ -19,6 +19,8 @@ import (
// Options holds client options // Options holds client options
type Options struct { type Options struct {
// Router used to get route
Router router.Router
// Selector used to select needed address // Selector used to select needed address
Selector selector.Selector Selector selector.Selector
// Logger used to log messages // Logger used to log messages
@ -29,18 +31,18 @@ type Options struct {
Broker broker.Broker Broker broker.Broker
// Meter used for metrics // Meter used for metrics
Meter meter.Meter Meter meter.Meter
// Router used to get route
Router router.Router
// Transport used for transfer messages // Transport used for transfer messages
Transport transport.Transport Transport transport.Transport
// Context is used for external options // Context is used for external options
Context context.Context Context context.Context
// Lookup func used to get destination addr
Lookup LookupFunc
// Codecs map // Codecs map
Codecs map[string]codec.Codec Codecs map[string]codec.Codec
// Lookup func used to get destination addr
Lookup LookupFunc
// TLSConfig specifies tls.Config for secure connection // TLSConfig specifies tls.Config for secure connection
TLSConfig *tls.Config TLSConfig *tls.Config
// CallOptions contains default CallOptions
CallOptions CallOptions
// Proxy is used for proxy requests // Proxy is used for proxy requests
Proxy string Proxy string
// ContentType is used to select codec // ContentType is used to select codec
@ -49,8 +51,6 @@ type Options struct {
Name string Name string
// Wrappers contains wrappers // Wrappers contains wrappers
Wrappers []Wrapper Wrappers []Wrapper
// CallOptions contains default CallOptions
CallOptions CallOptions
// PoolSize connection pool size // PoolSize connection pool size
PoolSize int PoolSize int
// PoolTTL connection pool ttl // PoolTTL connection pool ttl

View File

@ -209,14 +209,14 @@ func Name(n string) Option {
type WatchOptions struct { type WatchOptions struct {
// Context used by non default options // Context used by non default options
Context context.Context Context context.Context
// Coalesce multiple events to one // Struct for filling
Coalesce bool Struct interface{}
// MinInterval specifies the min time.Duration interval for poll changes // MinInterval specifies the min time.Duration interval for poll changes
MinInterval time.Duration MinInterval time.Duration
// MaxInterval specifies the max time.Duration interval for poll changes // MaxInterval specifies the max time.Duration interval for poll changes
MaxInterval time.Duration MaxInterval time.Duration
// Struct for filling // Coalesce multiple events to one
Struct interface{} Coalesce bool
} }
type WatchOption func(*WatchOptions) type WatchOption func(*WatchOptions)

View File

@ -12,11 +12,11 @@ import (
) )
type defaultLogger struct { type defaultLogger struct {
enc *json.Encoder enc *json.Encoder
opts Options
sync.RWMutex
logFunc LogFunc logFunc LogFunc
logfFunc LogfFunc logfFunc LogfFunc
opts Options
sync.RWMutex
} }
// Init(opts...) should only overwrite provided options // Init(opts...) should only overwrite provided options

View File

@ -19,12 +19,12 @@ type Options struct {
Fields []interface{} Fields []interface{}
// Name holds the logger name // Name holds the logger name
Name string Name string
// CallerSkipCount number of frmaes to skip
CallerSkipCount int
// The logging level the logger should log
Level Level
// Wrappers logger wrapper that called before actual Log/Logf function // Wrappers logger wrapper that called before actual Log/Logf function
Wrappers []Wrapper Wrappers []Wrapper
// The logging level the logger should log
Level Level
// CallerSkipCount number of frmaes to skip
CallerSkipCount int
} }
// NewOptions creates new options struct // NewOptions creates new options struct

View File

@ -44,7 +44,7 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
// nolint: golint // nolint: golint,revive
// RegisterOptions holds options for register method // RegisterOptions holds options for register method
type RegisterOptions struct { type RegisterOptions struct {
Context context.Context Context context.Context
@ -197,7 +197,7 @@ func TLSConfig(t *tls.Config) Option {
} }
} }
// nolint: golint // nolint: golint,revive
// RegisterAttempts specifies register atempts count // RegisterAttempts specifies register atempts count
func RegisterAttempts(t int) RegisterOption { func RegisterAttempts(t int) RegisterOption {
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
@ -205,7 +205,7 @@ func RegisterAttempts(t int) RegisterOption {
} }
} }
// nolint: golint // nolint: golint,revive
// RegisterTTL specifies register ttl // RegisterTTL specifies register ttl
func RegisterTTL(t time.Duration) RegisterOption { func RegisterTTL(t time.Duration) RegisterOption {
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
@ -213,7 +213,7 @@ func RegisterTTL(t time.Duration) RegisterOption {
} }
} }
// nolint: golint // nolint: golint,revive
// RegisterContext sets the register context // RegisterContext sets the register context
func RegisterContext(ctx context.Context) RegisterOption { func RegisterContext(ctx context.Context) RegisterOption {
return func(o *RegisterOptions) { return func(o *RegisterOptions) {
@ -221,7 +221,7 @@ func RegisterContext(ctx context.Context) RegisterOption {
} }
} }
// nolint: golint // nolint: golint,revive
// RegisterDomain secifies register domain // RegisterDomain secifies register domain
func RegisterDomain(d string) RegisterOption { func RegisterDomain(d string) RegisterOption {
return func(o *RegisterOptions) { return func(o *RegisterOptions) {

View File

@ -68,8 +68,8 @@ type Endpoint struct {
// Option func signature // Option func signature
type Option func(*Options) type Option func(*Options)
// nolint: golint,revive
// RegisterOption option is used to register service // RegisterOption option is used to register service
// nolint: golint
type RegisterOption func(*RegisterOptions) type RegisterOption func(*RegisterOptions)
// WatchOption option is used to watch service changes // WatchOption option is used to watch service changes

View File

@ -339,9 +339,11 @@ func ExistsNamespace(ns string) ExistsOption {
} }
} }
/*
// WrapStore adds a store Wrapper to a list of options passed into the store // WrapStore adds a store Wrapper to a list of options passed into the store
//func WrapStore(w Wrapper) Option { func WrapStore(w Wrapper) Option {
// return func(o *Options) { return func(o *Options) {
// o.Wrappers = append(o.Wrappers, w) o.Wrappers = append(o.Wrappers, w)
// } }
//} }
*/