From 326ee533339f60243ae67c1652f455b378f48707 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 30 Nov 2021 07:34:49 +0300 Subject: [PATCH 1/2] config: add new error type Signed-off-by: Vasiliy Tolstov --- config/config.go | 2 ++ config/default.go | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index f9dbd19f..9b82a477 100644 --- a/config/config.go +++ b/config/config.go @@ -23,6 +23,8 @@ var ( ErrInvalidStruct = errors.New("invalid struct specified") // ErrWatcherStopped is returned when source watcher has been stopped ErrWatcherStopped = errors.New("watcher stopped") + // ErrWatcherNotImplemented returned when config does not implement watch + ErrWatcherNotImplemented = errors.New("watcher not implemented") ) // Config is an interface abstraction for dynamic configuration diff --git a/config/default.go b/config/default.go index d2797ffb..1acc0bdf 100644 --- a/config/default.go +++ b/config/default.go @@ -2,7 +2,6 @@ package config import ( "context" - "fmt" "reflect" "strconv" "strings" @@ -271,7 +270,7 @@ func (c *defaultConfig) Name() string { } func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { - return nil, fmt.Errorf("not implemented") + return nil, ErrWatcherNotImplemented } // NewConfig returns new default config source From e143e2b547ce7c306e05ce22077ac1ff41a97102 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 16 Dec 2021 15:03:42 +0300 Subject: [PATCH 2/2] client: allow to set metadata for message Signed-off-by: Vasiliy Tolstov --- client/client.go | 1 + client/lookup.go | 2 +- client/noop.go | 4 ++++ client/options.go | 15 ++++++++++++++- client/retry.go | 2 +- 5 files changed, 21 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index 087f263f..36bbb6db 100644 --- a/client/client.go +++ b/client/client.go @@ -49,6 +49,7 @@ type Message interface { Topic() string Payload() interface{} ContentType() string + Metadata() metadata.Metadata } // Request is the interface for a synchronous request used by Call or Stream diff --git a/client/lookup.go b/client/lookup.go index e2677f90..5c409ba1 100644 --- a/client/lookup.go +++ b/client/lookup.go @@ -12,7 +12,7 @@ import ( type LookupFunc func(context.Context, Request, CallOptions) ([]string, error) // LookupRoute for a request using the router and then choose one using the selector -func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string, error) { +func LookupRoute(_ context.Context, req Request, opts CallOptions) ([]string, error) { // check to see if an address was provided as a call option if len(opts.Address) > 0 { return opts.Address, nil diff --git a/client/noop.go b/client/noop.go index 809fa3fa..e654388e 100644 --- a/client/noop.go +++ b/client/noop.go @@ -139,6 +139,10 @@ func (n *noopMessage) ContentType() string { return n.opts.ContentType } +func (n *noopMessage) Metadata() metadata.Metadata { + return n.opts.Metadata +} + func (n *noopClient) newCodec(contentType string) (codec.Codec, error) { if cf, ok := n.opts.Codecs[contentType]; ok { return cf, nil diff --git a/client/options.go b/client/options.go index 9a2eb73d..c7abd912 100644 --- a/client/options.go +++ b/client/options.go @@ -8,6 +8,7 @@ import ( "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v3/register" @@ -128,7 +129,7 @@ type PublishOptions struct { // NewMessageOptions creates message options struct func NewMessageOptions(opts ...MessageOption) MessageOptions { - options := MessageOptions{} + options := MessageOptions{Metadata: metadata.New(1)} for _, o := range opts { o(&options) } @@ -137,7 +138,10 @@ func NewMessageOptions(opts ...MessageOption) MessageOptions { // MessageOptions holds client message options type MessageOptions struct { + // Metadata additional metadata + Metadata metadata.Metadata // ContentType specify content-type of message + // deprecated ContentType string } @@ -517,6 +521,7 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption { // Deprecated func WithMessageContentType(ct string) MessageOption { return func(o *MessageOptions) { + o.Metadata.Set(metadata.HeaderContentType, ct) o.ContentType = ct } } @@ -524,10 +529,18 @@ func WithMessageContentType(ct string) MessageOption { // MessageContentType sets the message content type func MessageContentType(ct string) MessageOption { return func(o *MessageOptions) { + o.Metadata.Set(metadata.HeaderContentType, ct) o.ContentType = ct } } +// MessageMetadata sets the message metadata +func MessageMetadata(k, v string) MessageOption { + return func(o *MessageOptions) { + o.Metadata.Set(k, v) + } +} + // StreamingRequest specifies that request is streaming func StreamingRequest(b bool) RequestOption { return func(o *RequestOptions) { diff --git a/client/retry.go b/client/retry.go index 89d22aef..40272fd2 100644 --- a/client/retry.go +++ b/client/retry.go @@ -20,7 +20,7 @@ func RetryNever(ctx context.Context, req Request, retryCount int, err error) (bo } // RetryOnError retries a request on a 500 or timeout error -func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) { +func RetryOnError(_ context.Context, _ Request, _ int, err error) (bool, error) { if err == nil { return false, nil }