improvements #68

Merged
vtolstov merged 5 commits from improvements into master 2021-12-28 09:23:46 +03:00
7 changed files with 24 additions and 5 deletions
Showing only changes of commit 88788776d2 - Show all commits

View File

@ -49,6 +49,7 @@ type Message interface {
Topic() string Topic() string
Payload() interface{} Payload() interface{}
ContentType() string ContentType() string
Metadata() metadata.Metadata
} }
// Request is the interface for a synchronous request used by Call or Stream // Request is the interface for a synchronous request used by Call or Stream

View File

@ -12,7 +12,7 @@ import (
type LookupFunc func(context.Context, Request, CallOptions) ([]string, error) type LookupFunc func(context.Context, Request, CallOptions) ([]string, error)
// LookupRoute for a request using the router and then choose one using the selector // LookupRoute for a request using the router and then choose one using the selector
func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string, error) { func LookupRoute(_ context.Context, req Request, opts CallOptions) ([]string, error) {
// check to see if an address was provided as a call option // check to see if an address was provided as a call option
if len(opts.Address) > 0 { if len(opts.Address) > 0 {
return opts.Address, nil return opts.Address, nil

View File

@ -139,6 +139,10 @@ func (n *noopMessage) ContentType() string {
return n.opts.ContentType return n.opts.ContentType
} }
func (n *noopMessage) Metadata() metadata.Metadata {
return n.opts.Metadata
}
func (n *noopClient) newCodec(contentType string) (codec.Codec, error) { func (n *noopClient) newCodec(contentType string) (codec.Codec, error) {
if cf, ok := n.opts.Codecs[contentType]; ok { if cf, ok := n.opts.Codecs[contentType]; ok {
return cf, nil return cf, nil

View File

@ -8,6 +8,7 @@ import (
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v3/network/transport"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/register"
@ -128,7 +129,7 @@ type PublishOptions struct {
// NewMessageOptions creates message options struct // NewMessageOptions creates message options struct
func NewMessageOptions(opts ...MessageOption) MessageOptions { func NewMessageOptions(opts ...MessageOption) MessageOptions {
options := MessageOptions{} options := MessageOptions{Metadata: metadata.New(1)}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
@ -137,7 +138,10 @@ func NewMessageOptions(opts ...MessageOption) MessageOptions {
// MessageOptions holds client message options // MessageOptions holds client message options
type MessageOptions struct { type MessageOptions struct {
// Metadata additional metadata
Metadata metadata.Metadata
// ContentType specify content-type of message // ContentType specify content-type of message
// deprecated
ContentType string ContentType string
} }
@ -517,6 +521,7 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption {
// Deprecated // Deprecated
func WithMessageContentType(ct string) MessageOption { func WithMessageContentType(ct string) MessageOption {
return func(o *MessageOptions) { return func(o *MessageOptions) {
o.Metadata.Set(metadata.HeaderContentType, ct)
o.ContentType = ct o.ContentType = ct
} }
} }
@ -524,10 +529,18 @@ func WithMessageContentType(ct string) MessageOption {
// MessageContentType sets the message content type // MessageContentType sets the message content type
func MessageContentType(ct string) MessageOption { func MessageContentType(ct string) MessageOption {
return func(o *MessageOptions) { return func(o *MessageOptions) {
o.Metadata.Set(metadata.HeaderContentType, ct)
o.ContentType = ct o.ContentType = ct
} }
} }
// MessageMetadata sets the message metadata
func MessageMetadata(k, v string) MessageOption {
return func(o *MessageOptions) {
o.Metadata.Set(k, v)
}
}
// StreamingRequest specifies that request is streaming // StreamingRequest specifies that request is streaming
func StreamingRequest(b bool) RequestOption { func StreamingRequest(b bool) RequestOption {
return func(o *RequestOptions) { return func(o *RequestOptions) {

View File

@ -20,7 +20,7 @@ func RetryNever(ctx context.Context, req Request, retryCount int, err error) (bo
} }
// RetryOnError retries a request on a 500 or timeout error // RetryOnError retries a request on a 500 or timeout error
func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) { func RetryOnError(_ context.Context, _ Request, _ int, err error) (bool, error) {
if err == nil { if err == nil {
return false, nil return false, nil
} }

View File

@ -23,6 +23,8 @@ var (
ErrInvalidStruct = errors.New("invalid struct specified") ErrInvalidStruct = errors.New("invalid struct specified")
// ErrWatcherStopped is returned when source watcher has been stopped // ErrWatcherStopped is returned when source watcher has been stopped
ErrWatcherStopped = errors.New("watcher stopped") ErrWatcherStopped = errors.New("watcher stopped")
// ErrWatcherNotImplemented returned when config does not implement watch
ErrWatcherNotImplemented = errors.New("watcher not implemented")
) )
// Config is an interface abstraction for dynamic configuration // Config is an interface abstraction for dynamic configuration

View File

@ -2,7 +2,6 @@ package config
import ( import (
"context" "context"
"fmt"
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
@ -271,7 +270,7 @@ func (c *defaultConfig) Name() string {
} }
func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { func (c *defaultConfig) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
return nil, fmt.Errorf("not implemented") return nil, ErrWatcherNotImplemented
} }
// NewConfig returns new default config source // NewConfig returns new default config source