use metadata.Metadata

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-11-18 00:02:02 +03:00
parent e0ef8b2953
commit 01e64cb0c0
28 changed files with 119 additions and 84 deletions

View File

@ -5,6 +5,8 @@ import (
"context"
"errors"
"time"
"github.com/unistack-org/micro/v3/metadata"
)
const (
@ -57,7 +59,7 @@ type Account struct {
// Issuer of the account
Issuer string `json:"issuer"`
// Any other associated metadata
Metadata map[string]string `json:"metadata"`
Metadata metadata.Metadata `json:"metadata"`
// Scopes the account has access to
Scopes []string `json:"scopes"`
// Secret for the account, e.g. the password

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/store"
)
@ -102,7 +103,7 @@ func LoginURL(url string) Option {
type GenerateOptions struct {
// Metadata associated with the account
Metadata map[string]string
Metadata metadata.Metadata
// Scopes the account has access too
Scopes []string
// Provider of the account, e.g. oauth
@ -132,9 +133,9 @@ func WithType(t string) GenerateOption {
}
// WithMetadata for the generated account
func WithMetadata(md map[string]string) GenerateOption {
func WithMetadata(md metadata.Metadata) GenerateOption {
return func(o *GenerateOptions) {
o.Metadata = md
o.Metadata = metadata.Copy(md)
}
}

View File

@ -1,7 +1,11 @@
// Package broker is an interface used for asynchronous messaging
package broker
import "context"
import (
"context"
"github.com/unistack-org/micro/v3/metadata"
)
var (
DefaultBroker Broker = NewBroker()
@ -32,7 +36,7 @@ type Event interface {
// Message is used to transfer data
type Message struct {
Header map[string]string // contains message metadata
Header metadata.Metadata // contains message metadata
Body []byte // contains message body
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@ -57,7 +58,7 @@ type Response interface {
// Read the response
Codec() codec.Reader
// read the header
Header() map[string]string
Header() metadata.Metadata
// Read the undecoded response
Read() ([]byte, error)
}

View File

@ -66,14 +66,14 @@ func (n *noopRequest) Stream() bool {
type noopResponse struct {
codec codec.Reader
header map[string]string
header metadata.Metadata
}
func (n *noopResponse) Codec() codec.Reader {
return n.codec
}
func (n *noopResponse) Header() map[string]string {
func (n *noopResponse) Header() metadata.Metadata {
return n.header
}

View File

@ -4,6 +4,8 @@ package codec
import (
"errors"
"io"
"github.com/unistack-org/micro/v3/metadata"
)
const (
@ -67,6 +69,6 @@ type Message struct {
Error string
// The values read from the socket
Header map[string]string
Header metadata.Metadata
Body []byte
}

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/unistack-org/micro/v3/debug/log"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/util/kubernetes/client"
)
@ -89,7 +90,7 @@ func (k *klog) parse(line string) log.Record {
if err := json.Unmarshal([]byte(line), &record); err != nil {
record.Timestamp = time.Now().UTC()
record.Message = line
record.Metadata = make(map[string]string)
record.Metadata = metadata.New(1)
}
record.Metadata["service"] = k.Options.Name

View File

@ -5,6 +5,8 @@ import (
"encoding/json"
"fmt"
"time"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@ -29,7 +31,7 @@ type Record struct {
// Timestamp of logged event
Timestamp time.Time `json:"timestamp"`
// Metadata to enrich log record
Metadata map[string]string `json:"metadata"`
Metadata metadata.Metadata `json:"metadata"`
// Value contains log entry
Message interface{} `json:"message"`
}

View File

@ -6,6 +6,8 @@ import (
"encoding/json"
"errors"
"time"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@ -36,7 +38,7 @@ type Event struct {
// Timestamp of the event
Timestamp time.Time
// Metadata contains the encoded event was indexed by
Metadata map[string]string
Metadata metadata.Metadata
// Payload contains the encoded message
Payload []byte
}

View File

@ -1,11 +1,15 @@
package events
import "time"
import (
"time"
"github.com/unistack-org/micro/v3/metadata"
)
// PublishOptions contains all the options which can be provided when publishing an event
type PublishOptions struct {
// Metadata contains any keys which can be used to query the data, for example a customer id
Metadata map[string]string
Metadata metadata.Metadata
// Timestamp to set for the event, if the timestamp is a zero value, the current time will be used
Timestamp time.Time
}
@ -14,9 +18,9 @@ type PublishOptions struct {
type PublishOption func(o *PublishOptions)
// WithMetadata sets the Metadata field on PublishOptions
func WithMetadata(md map[string]string) PublishOption {
func WithMetadata(md metadata.Metadata) PublishOption {
return func(o *PublishOptions) {
o.Metadata = md
o.Metadata = metadata.Copy(md)
}
}

View File

@ -2,6 +2,8 @@ package metrics
import (
"time"
"github.com/unistack-org/micro/v3/metadata"
)
// NoopReporter is an noop implementation of Reporter:
@ -25,17 +27,17 @@ func (r *noopReporter) Init(opts ...Option) error {
}
// Count implements the Reporter interface Count method:
func (r *noopReporter) Count(metricName string, value int64, tags Tags) error {
func (r *noopReporter) Count(metricName string, value int64, md metadata.Metadata) error {
return nil
}
// Gauge implements the Reporter interface Gauge method:
func (r *noopReporter) Gauge(metricName string, value float64, tags Tags) error {
func (r *noopReporter) Gauge(metricName string, value float64, md metadata.Metadata) error {
return nil
}
// Timing implements the Reporter interface Timing method:
func (r *noopReporter) Timing(metricName string, value time.Duration, tags Tags) error {
func (r *noopReporter) Timing(metricName string, value time.Duration, md metadata.Metadata) error {
return nil
}

View File

@ -1,6 +1,9 @@
package metrics
import "github.com/unistack-org/micro/v3/logger"
import (
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
)
var (
// The Prometheus metrics will be made available on this port:
@ -18,7 +21,7 @@ type Option func(*Options)
type Options struct {
Address string
Path string
DefaultTags Tags
DefaultTags metadata.Metadata
TimingObjectives map[float64]float64
Logger logger.Logger
}
@ -27,7 +30,7 @@ type Options struct {
func NewOptions(opt ...Option) Options {
opts := Options{
Address: defaultPrometheusListenAddress,
DefaultTags: make(Tags),
DefaultTags: metadata.New(2),
Path: defaultPath,
TimingObjectives: defaultTimingObjectives,
}
@ -54,9 +57,9 @@ func Address(value string) Option {
}
// DefaultTags will be added to every metric:
func DefaultTags(value Tags) Option {
func DefaultTags(md metadata.Metadata) Option {
return func(o *Options) {
o.DefaultTags = value
o.DefaultTags = metadata.Copy(md)
}
}

View File

@ -1,10 +1,11 @@
// Package metrics is for instrumentation and debugging
package metrics
import "time"
import (
"time"
// Tags is a map of fields to add to a metric:
type Tags map[string]string
"github.com/unistack-org/micro/v3/metadata"
)
var (
DefaultReporter Reporter = NewReporter()
@ -13,8 +14,8 @@ var (
// Reporter is an interface for collecting and instrumenting metrics
type Reporter interface {
Init(...Option) error
Count(id string, value int64, tags Tags) error
Gauge(id string, value float64, tags Tags) error
Timing(id string, value time.Duration, tags Tags) error
Count(id string, value int64, md metadata.Metadata) error
Gauge(id string, value float64, md metadata.Metadata) error
Timing(id string, value time.Duration, md metadata.Metadata) error
Options() Options
}

View File

@ -1,10 +1,10 @@
package wrapper
import (
"context"
"time"
"context"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/metrics"
"github.com/unistack-org/micro/v3/server"
)
@ -26,9 +26,8 @@ func (w *Wrapper) HandlerFunc(handlerFunction server.HandlerFunc) server.Handler
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// Build some tags to describe the call:
tags := metrics.Tags{
"method": req.Method(),
}
tags := metadata.New(2)
tags.Set("method", req.Method())
// Start the clock:
callTime := time.Now()

View File

@ -4,6 +4,8 @@ package transport
import (
"context"
"time"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@ -26,7 +28,7 @@ type Transport interface {
// Message is used to transfer data
type Message struct {
Header map[string]string
Header metadata.Metadata
Body []byte
}

View File

@ -12,6 +12,7 @@ import (
"github.com/unistack-org/micro/v3/config"
"github.com/unistack-org/micro/v3/debug/profile"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/router"
@ -260,7 +261,7 @@ func Version(v string) Option {
}
// Metadata associated with the service
func Metadata(md map[string]string) Option {
func Metadata(md metadata.Metadata) Option {
return func(o *Options) {
if o.Server != nil {
o.Server.Init(server.Metadata(md))

View File

@ -100,9 +100,7 @@ func ExtractEndpoint(method reflect.Method) *Endpoint {
}
if stream {
ep.Metadata = map[string]string{
"stream": fmt.Sprintf("%v", stream),
}
ep.Metadata.Set("stream", fmt.Sprintf("%v", stream))
}
return ep

View File

@ -4,6 +4,8 @@ package registry
import (
"context"
"errors"
"github.com/unistack-org/micro/v3/metadata"
)
const (
@ -42,7 +44,7 @@ type Registry interface {
type Service struct {
Name string `json:"name"`
Version string `json:"version"`
Metadata map[string]string `json:"metadata"`
Metadata metadata.Metadata `json:"metadata"`
Endpoints []*Endpoint `json:"endpoints"`
Nodes []*Node `json:"nodes"`
}
@ -51,7 +53,7 @@ type Service struct {
type Node struct {
Id string `json:"id"`
Address string `json:"address"`
Metadata map[string]string `json:"metadata"`
Metadata metadata.Metadata `json:"metadata"`
}
// Endpoint holds endpoint registry info
@ -59,7 +61,7 @@ type Endpoint struct {
Name string `json:"name"`
Request *Value `json:"request"`
Response *Value `json:"response"`
Metadata map[string]string `json:"metadata"`
Metadata metadata.Metadata `json:"metadata"`
}
// Valud holds additional kv stuff

View File

@ -2,6 +2,8 @@ package router
import (
"hash/fnv"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@ -28,7 +30,7 @@ type Route struct {
// Metric is the route cost metric
Metric int64
// Metadata for the route
Metadata map[string]string
Metadata metadata.Metadata
}
// Hash returns route hash sum.

View File

@ -4,6 +4,8 @@ package runtime
import (
"errors"
"time"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@ -43,7 +45,7 @@ type Logs interface {
// Log is a log message
type Log struct {
Message string
Metadata map[string]string
Metadata metadata.Metadata
}
// Scheduler is a runtime service scheduler
@ -103,7 +105,7 @@ type Service struct {
// url location of source
Source string
// Metadata stores metadata
Metadata map[string]string
Metadata metadata.Metadata
}
// Resources which are allocated to a serivce

View File

@ -10,6 +10,7 @@ import (
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/tracer"
@ -24,7 +25,7 @@ type Options struct {
Auth auth.Auth
Logger logger.Logger
Transport transport.Transport
Metadata map[string]string
Metadata metadata.Metadata
Name string
Address string
Advertise string
@ -63,7 +64,7 @@ func NewOptions(opts ...Option) Options {
Auth: auth.DefaultAuth,
Codecs: make(map[string]codec.NewCodec),
Context: context.Background(),
Metadata: map[string]string{},
Metadata: metadata.New(0),
RegisterInterval: DefaultRegisterInterval,
RegisterTTL: DefaultRegisterTTL,
RegisterCheck: DefaultRegisterCheck,
@ -187,9 +188,9 @@ func Transport(t transport.Transport) Option {
}
// Metadata associated with the server
func Metadata(md map[string]string) Option {
func Metadata(md metadata.Metadata) Option {
return func(o *Options) {
o.Metadata = md
o.Metadata = metadata.Copy(md)
}
}
@ -271,7 +272,7 @@ type HandlerOption func(*HandlerOptions)
// HandlerOptions struct
type HandlerOptions struct {
Internal bool
Metadata map[string]map[string]string
Metadata map[string]metadata.Metadata
Context context.Context
}
@ -317,9 +318,9 @@ func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
// EndpointMetadata is a Handler option that allows metadata to be added to
// individual endpoints.
func EndpointMetadata(name string, md map[string]string) HandlerOption {
func EndpointMetadata(name string, md metadata.Metadata) HandlerOption {
return func(o *HandlerOptions) {
o.Metadata[name] = md
o.Metadata[name] = metadata.Copy(md)
}
}

View File

@ -2,13 +2,14 @@ package server
import (
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/metadata"
)
type rpcMessage struct {
topic string
contentType string
payload interface{}
header map[string]string
header metadata.Metadata
body []byte
codec codec.Codec
}
@ -25,7 +26,7 @@ func (r *rpcMessage) Payload() interface{} {
return r.payload
}
func (r *rpcMessage) Header() map[string]string {
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}

View File

@ -7,6 +7,7 @@ import (
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/registry"
)
@ -53,7 +54,7 @@ type Message interface {
// The content type of the payload
ContentType() string
// The raw headers of the message
Header() map[string]string
Header() metadata.Metadata
// The raw body of the message
Body() []byte
// Codec used to decode the message
@ -71,7 +72,7 @@ type Request interface {
// Content type provided
ContentType() string
// Header of the request
Header() map[string]string
Header() metadata.Metadata
// Body is the initial decoded value
Body() interface{}
// Read the undecoded request body
@ -87,7 +88,7 @@ type Response interface {
// Encoded writer
Codec() codec.Writer
// Write the header
WriteHeader(map[string]string)
WriteHeader(metadata.Metadata)
// write a response directly to the client
Write([]byte) error
}

View File

@ -134,15 +134,14 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
}
handlers = append(handlers, h)
endpoints = append(endpoints, &registry.Endpoint{
Name: "Func",
Request: registry.ExtractSubValue(typ),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
ep := &registry.Endpoint{
Name: "Func",
Request: registry.ExtractSubValue(typ),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
@ -162,15 +161,14 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
}
handlers = append(handlers, h)
endpoints = append(endpoints, &registry.Endpoint{
Name: name + "." + method.Name,
Request: registry.ExtractSubValue(method.Type),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
ep := &registry.Endpoint{
Name: name + "." + method.Name,
Request: registry.ExtractSubValue(method.Type),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
}
}

View File

@ -29,8 +29,8 @@ func FromContext(ctx context.Context) (traceID string, parentSpanID string, isFo
// NewContext saves the trace and span ids in the context
func NewContext(ctx context.Context, traceID, parentSpanID string) context.Context {
return metadata.MergeContext(ctx, map[string]string{
traceIDKey: traceID,
spanIDKey: parentSpanID,
}, true)
md := metadata.New(2)
md.Set(traceIDKey, traceID)
md.Set(spanIDKey, parentSpanID)
return metadata.MergeContext(ctx, md, true)
}

View File

@ -4,6 +4,8 @@ package tracer
import (
"context"
"time"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@ -46,7 +48,7 @@ type Span struct {
// Duration in nano seconds
Duration time.Duration
// associated data
Metadata map[string]string
Metadata metadata.Metadata
// Type
Type SpanType
}

View File

@ -35,7 +35,7 @@ func (r *request) Codec() codec.Reader {
return r.Request.Codec().(codec.Reader)
}
func (r *request) Header() map[string]string {
func (r *request) Header() metadata.Metadata {
md, _ := metadata.FromContext(r.context)
return md
}

View File

@ -6,6 +6,7 @@ import (
"github.com/dgrijalva/jwt-go"
"github.com/unistack-org/micro/v3/auth"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/util/token"
)
@ -13,7 +14,7 @@ import (
type authClaims struct {
Type string `json:"type"`
Scopes []string `json:"scopes"`
Metadata map[string]string `json:"metadata"`
Metadata metadata.Metadata `json:"metadata"`
jwt.StandardClaims
}