Compare commits

...

13 Commits

Author SHA1 Message Date
8c95448535 util/reflect: add IsZero helper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-21 16:17:50 +03:00
c1dc041d8c client: fix NewOptions with CallOptions filling
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-16 19:09:49 +03:00
Renovate Bot
25be0ac0f0 fix(deps): update golang.org/x/net commit hash to d523dce 2021-03-16 13:12:17 +00:00
Renovate Bot
86f73cac4e fix(deps): update golang.org/x/net commit hash to 34ac3e1 2021-03-15 22:09:13 +00:00
485eda6ce9 meter/wrapper: fix wrapper build
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-15 00:49:26 +03:00
dbbdb24631 meter: rework labels
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-15 00:44:13 +03:00
723ceb4f32 regsiter: fix extractor
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-12 16:09:20 +03:00
bac9869bb3 register: support map in ExtractValue
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-12 15:48:05 +03:00
610427445f codec: provide proto for codec.Frame
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-11 07:42:42 +03:00
Renovate Bot
c84a66c713 fix(deps): update module github.com/imdario/mergo to v0.3.12 2021-03-10 00:35:57 +00:00
00eaae717b lint fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-06 23:44:54 +03:00
a102e95433 spell fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-06 23:33:37 +03:00
39f66cc86c add logger wrapper, fix default logger fields method
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-06 23:26:47 +03:00
20 changed files with 672 additions and 245 deletions

View File

@@ -50,7 +50,7 @@ type Options struct {
Wrappers []Wrapper
// PoolSize connection pool size
PoolSize int
// PoolTTL conection pool ttl
// PoolTTL connection pool ttl
PoolTTL time.Duration
}
@@ -160,6 +160,7 @@ func NewOptions(opts ...Option) Options {
ContentType: DefaultContentType,
Codecs: make(map[string]codec.Codec),
CallOptions: CallOptions{
Context: context.Background(),
Backoff: DefaultBackoff,
Retry: DefaultRetry,
Retries: DefaultRetries,

6
codec/frame.go Normal file
View File

@@ -0,0 +1,6 @@
package codec
// Frame gives us the ability to define raw data to send over the pipes
type Frame struct {
Data []byte
}

28
codec/frame.proto Normal file
View File

@@ -0,0 +1,28 @@
// Copyright 2021 Unistack LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package micro.codec;
option cc_enable_arenas = true;
option go_package = "github.com/unistack-org/micro/v3/codec;codec";
option java_multiple_files = true;
option java_outer_classname = "MicroCodec";
option java_package = "micro.codec";
option objc_class_prefix = "MCODEC";
message Frame {
bytes data = 1;
}

View File

@@ -8,11 +8,6 @@ import (
type noopCodec struct {
}
// Frame gives us the ability to define raw data to send over the pipes
type Frame struct {
Data []byte
}
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
return nil
}

4
go.mod
View File

@@ -6,8 +6,8 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4
github.com/google/uuid v1.2.0
github.com/imdario/mergo v0.3.11
github.com/imdario/mergo v0.3.12
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4
)

9
go.sum
View File

@@ -4,15 +4,16 @@ github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 h1:b0LrWgu8+q7z4J+0Y3Umo5q1dL7NXBkKBWkaVkAq17E=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -49,10 +49,18 @@ func (l *defaultLogger) V(level Level) bool {
}
func (l *defaultLogger) Fields(fields map[string]interface{}) Logger {
l.Lock()
l.opts.Fields = copyFields(fields)
l.Unlock()
return l
nl := &defaultLogger{opts: l.opts, enc: l.enc}
nl.opts.Fields = make(map[string]interface{}, len(l.opts.Fields)+len(fields))
l.RLock()
for k, v := range l.opts.Fields {
nl.opts.Fields[k] = v
}
l.RUnlock()
for k, v := range fields {
nl.opts.Fields[k] = v
}
return nl
}
func copyFields(src map[string]interface{}) map[string]interface{} {
@@ -153,7 +161,9 @@ func (l *defaultLogger) Log(ctx context.Context, level Level, args ...interface{
}
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
fields["msg"] = fmt.Sprint(args...)
if len(args) > 0 {
fields["msg"] = fmt.Sprint(args...)
}
l.RLock()
_ = l.enc.Encode(fields)
@@ -178,7 +188,7 @@ func (l *defaultLogger) Logf(ctx context.Context, level Level, msg string, args
fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
if len(args) > 0 {
fields["msg"] = fmt.Sprintf(msg, args...)
} else {
} else if msg != "" {
fields["msg"] = msg
}
l.RLock()

353
logger/wrapper/wrapper.go Normal file
View File

@@ -0,0 +1,353 @@
// Package wrapper provides wrapper for Logger
package wrapper
import (
"context"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/server"
)
var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string {
labels := []string{"endpoint", msg.Topic()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, err error) []string {
labels := []string{"endpoint", msg.Topic()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string {
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
if err != nil {
labels = append(labels, "error", err.Error())
}
return labels
}
)
type lWrapper struct {
client.Client
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
opts Options
}
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string
type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
type ServerSubscriberObserver func(context.Context, server.Message, error) []string
// Options struct for wrapper
type Options struct {
// Logger that used for log
Logger logger.Logger
// Level for logger
Level logger.Level
// Enabled flag
Enabled bool
// ClientCallObservers funcs
ClientCallObservers []ClientCallObserver
// ClientStreamObservers funcs
ClientStreamObservers []ClientStreamObserver
// ClientPublishObservers funcs
ClientPublishObservers []ClientPublishObserver
// ClientCallFuncObservers funcs
ClientCallFuncObservers []ClientCallFuncObserver
// ServerHandlerObservers funcs
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
}
// Option func signature
type Option func(*Options)
// NewOptions creates Options from Option slice
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Level: logger.TraceLevel,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
}
for _, o := range opts {
o(&options)
}
return options
}
// WithEnabled enable/diable flag
func WithEnabled(b bool) Option {
return func(o *Options) {
o.Enabled = b
}
}
// WithLevel log level
func WithLevel(l logger.Level) Option {
return func(o *Options) {
o.Level = l
}
}
// WithLogger logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
// WithClientStreamObservers funcs
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
// WithClientPublishObservers funcs
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
// WithClientCallFuncObservers funcs
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
// WithServerHandlerObservers funcs
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
// WithServerSubscriberObservers funcs
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
err := l.Client.Call(ctx, req, rsp, opts...)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ClientCallObservers {
labels = append(labels, o(ctx, req, rsp, opts, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
stream, err := l.Client.Stream(ctx, req, opts...)
if !l.opts.Enabled {
return stream, err
}
var labels []string
for _, o := range l.opts.ClientStreamObservers {
labels = append(labels, o(ctx, req, opts, stream, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return stream, err
}
func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
err := l.Client.Publish(ctx, msg, opts...)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ClientPublishObservers {
labels = append(labels, o(ctx, msg, opts, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
err := l.serverHandler(ctx, req, rsp)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ServerHandlerObservers {
labels = append(labels, o(ctx, req, rsp, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
err := l.serverSubscriber(ctx, msg)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ServerSubscriberObservers {
labels = append(labels, o(ctx, msg, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
// NewClientWrapper accepts an open options and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &lWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an options and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
l := &lWrapper{opts: options, clientCallFunc: h}
return l.ClientCallFunc
}
}
func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
err := l.clientCallFunc(ctx, addr, req, rsp, opts)
if !l.opts.Enabled {
return err
}
var labels []string
for _, o := range l.opts.ClientCallFuncObservers {
labels = append(labels, o(ctx, addr, req, rsp, opts, err)...)
}
fields := make(map[string]interface{}, int(len(labels)/2))
for i := 0; i < len(labels); i += 2 {
fields[labels[i]] = labels[i+1]
}
l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level)
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
l := &lWrapper{opts: options, serverHandler: h}
return l.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an options and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
l := &lWrapper{opts: options, serverSubscriber: h}
return l.ServerSubscriber
}
}

View File

@@ -112,6 +112,7 @@ func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata {
return nmd
}
// Pairs from which metadata created
func Pairs(kv ...string) (Metadata, bool) {
if len(kv)%2 == 1 {
return nil, false

View File

@@ -3,6 +3,7 @@ package meter
import (
"io"
"reflect"
"sort"
"time"
)
@@ -76,66 +77,36 @@ type Summary interface {
UpdateDuration(time.Time)
}
// Labels holds the metrics labels with k, v
type Labels struct {
keys []string
vals []string
type byKey []string
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
func (k byKey) Swap(i, j int) {
k[i*2], k[i*2+1], k[j*2], k[j*2+1] = k[j*2], k[j*2+1], k[i*2], k[i*2+1]
}
// Append adds labels to label set
func (ls Labels) Append(nls Labels) Labels {
for n := range nls.keys {
ls.keys = append(ls.keys, nls.keys[n])
ls.vals = append(ls.vals, nls.vals[n])
func Sort(slice *[]string) {
bk := byKey(*slice)
if bk.Len() <= 1 {
return
}
return ls
}
// Len returns number of labels
func (ls Labels) Len() int {
return len(ls.keys)
}
type labels Labels
func (ls labels) Len() int {
return len(ls.keys)
}
func (ls labels) Sort() {
sort.Sort(ls)
}
func (ls labels) Swap(i, j int) {
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
}
func (ls labels) Less(i, j int) bool {
return ls.keys[i] < ls.keys[j]
}
// LabelIter holds the
type LabelIter struct {
labels Labels
cnt int
cur int
}
// Iter returns labels iterator
func (ls Labels) Iter() *LabelIter {
labels(ls).Sort()
return &LabelIter{labels: ls, cnt: len(ls.keys)}
}
// Next advance itarator to new pos
func (iter *LabelIter) Next(k, v *string) bool {
if iter.cur+1 > iter.cnt {
return false
sort.Sort(bk)
v := reflect.ValueOf(slice).Elem()
cnt := 0
key := 0
val := 1
for key < v.Len() {
if len(bk) > key+2 && bk[key] == bk[key+2] {
key += 2
val += 2
continue
}
v.Index(cnt).Set(v.Index(key))
cnt++
v.Index(cnt).Set(v.Index(val))
cnt++
key += 2
val += 2
}
*k = iter.labels.keys[iter.cur]
*v = iter.labels.vals[iter.cur]
iter.cur++
return true
v.SetLen(cnt)
}

View File

@@ -10,46 +10,15 @@ func TestNoopMeter(t *testing.T) {
t.Fatalf("invalid options parsing: %v", m.Options())
}
cnt := m.Counter("counter", Label("server", "noop"))
cnt := m.Counter("counter", Labels("server", "noop"))
cnt.Inc()
}
func TestLabelsAppend(t *testing.T) {
var ls Labels
ls.keys = []string{"type", "server"}
ls.vals = []string{"noop", "http"}
func TestLabelsSort(t *testing.T) {
ls := []string{"server", "http", "register", "mdns", "broker", "broker1", "broker", "broker2", "server", "tcp"}
Sort(&ls)
var nls Labels
nls.keys = []string{"register"}
nls.vals = []string{"gossip"}
ls = ls.Append(nls)
//ls.Sort()
if ls.keys[0] != "type" || ls.vals[0] != "noop" {
t.Fatalf("append error: %v", ls)
}
}
func TestIterator(t *testing.T) {
options := NewOptions(
Label("name", "svc1"),
Label("version", "0.0.1"),
Label("id", "12345"),
Label("type", "noop"),
Label("server", "http"),
Label("register", "gossip"),
Label("aa", "kk"),
Label("zz", "kk"),
)
iter := options.Labels.Iter()
var k, v string
cnt := 0
for iter.Next(&k, &v) {
if cnt == 4 && (k != "server" || v != "http") {
t.Fatalf("iter error: %s != %s || %s != %s", k, "server", v, "http")
}
cnt++
if ls[0] != "broker" || ls[1] != "broker2" {
t.Fatalf("sort error: %v", ls)
}
}

View File

@@ -107,7 +107,7 @@ func (r *noopMeter) String() string {
}
type noopCounter struct {
labels Labels
labels []string
}
func (r *noopCounter) Add(int) {
@@ -131,7 +131,7 @@ func (r *noopCounter) Set(uint64) {
}
type noopFloatCounter struct {
labels Labels
labels []string
}
func (r *noopFloatCounter) Add(float64) {
@@ -151,7 +151,7 @@ func (r *noopFloatCounter) Sub(float64) {
}
type noopGauge struct {
labels Labels
labels []string
}
func (r *noopGauge) Get() float64 {
@@ -159,7 +159,7 @@ func (r *noopGauge) Get() float64 {
}
type noopSummary struct {
labels Labels
labels []string
}
func (r *noopSummary) Update(float64) {
@@ -171,7 +171,7 @@ func (r *noopSummary) UpdateDuration(time.Time) {
}
type noopHistogram struct {
labels Labels
labels []string
}
func (r *noopHistogram) Reset() {

View File

@@ -26,7 +26,7 @@ type Options struct {
// LabelPrefix holds the prefix for all labels
LabelPrefix string
// Labels holds the default labels
Labels Labels
Labels []string
// WriteProcessMetrics flag to write process metrics
WriteProcessMetrics bool
// WriteFDMetrics flag to write fd metrics
@@ -88,11 +88,9 @@ func Logger(l logger.Logger) Option {
}
}
// Label sets the label
func Label(key, val string) Option {
func Labels(ls ...string) Option {
return func(o *Options) {
o.Labels.keys = append(o.Labels.keys, key)
o.Labels.vals = append(o.Labels.vals, val)
o.Labels = ls
}
}

View File

@@ -50,19 +50,19 @@ func NewOptions(opts ...Option) Options {
func ServiceName(name string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Label("name", name))
o.lopts = append(o.lopts, meter.Labels("name", name))
}
}
func ServiceVersion(version string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Label("version", version))
o.lopts = append(o.lopts, meter.Labels("version", version))
}
}
func ServiceID(id string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Label("id", id))
o.lopts = append(o.lopts, meter.Labels("id", id))
}
}
@@ -106,15 +106,15 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
@@ -129,15 +129,15 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
@@ -152,15 +152,15 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
@@ -175,15 +175,15 @@ func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(PublishMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(PublishMessageTotal, lopts...).Inc()
@@ -206,15 +206,15 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ServerRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ServerRequestTotal, lopts...).Inc()
@@ -238,15 +238,15 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(SubscribeMessageTotal, lopts...).Inc()

View File

@@ -23,7 +23,8 @@ func ExtractValue(v reflect.Type, d int) *Value {
v = v.Elem()
}
if len(v.Name()) == 0 {
// slices and maps don't have a defined name
if (v.Kind() == reflect.Slice || v.Kind() == reflect.Map) || len(v.Name()) == 0 {
return nil
}
@@ -71,6 +72,16 @@ func ExtractValue(v reflect.Type, d int) *Value {
p = p.Elem()
}
arg.Type = "[]" + p.Name()
case reflect.Map:
p := v.Elem()
if p.Kind() == reflect.Ptr {
p = p.Elem()
}
key := v.Key()
if key.Kind() == reflect.Ptr {
key = key.Elem()
}
arg.Type = fmt.Sprintf("map[%s]%s", key.Name(), p.Name())
}
return arg

View File

@@ -37,7 +37,7 @@ type Runtime interface {
// Logs returns a log stream
type Logs interface {
// Error retuns error
// Error returns error
Error() error
// Chan return chan log
Chan() chan Log

View File

@@ -61,7 +61,7 @@ type Options struct {
Name string
// Address holds the server address
Address string
// Advertise holds the advertie addres
// Advertise holds the advertise address
Advertise string
// Version holds the server version
Version string
@@ -344,7 +344,7 @@ type SubscriberOption func(*SubscriberOptions)
type SubscriberOptions struct {
// Context holds the external options
Context context.Context
// Queue holds the subscribtion queue
// Queue holds the subscription queue
Queue string
// AutoAck flag for auto ack messages after processing
AutoAck bool

View File

@@ -11,6 +11,98 @@ import (
"github.com/unistack-org/micro/v3/tracer"
)
var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
)
type tWrapper struct {
client.Client
serverHandler server.HandlerFunc
@@ -26,18 +118,28 @@ type ClientCallFuncObserver func(context.Context, string, client.Request, interf
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
// Options struct
type Options struct {
Tracer tracer.Tracer
ClientCallObservers []ClientCallObserver
ClientStreamObservers []ClientStreamObserver
ClientPublishObservers []ClientPublishObserver
ClientCallFuncObservers []ClientCallFuncObserver
ServerHandlerObservers []ServerHandlerObserver
// Tracer that used for tracing
Tracer tracer.Tracer
// ClientCallObservers funcs
ClientCallObservers []ClientCallObserver
// ClientStreamObservers funcs
ClientStreamObservers []ClientStreamObserver
// ClientPublishObservers funcs
ClientPublishObservers []ClientPublishObserver
// ClientCallFuncObservers funcs
ClientCallFuncObservers []ClientCallFuncObserver
// ServerHandlerObservers funcs
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
}
// Option func signature
type Option func(*Options)
// NewOptions create Options from Option slice
func NewOptions(opts ...Option) Options {
options := Options{
Tracer: tracer.DefaultTracer,
@@ -56,138 +158,55 @@ func NewOptions(opts ...Option) Options {
return options
}
// WithTracer pass tracer
func WithTracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
// WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
// WithClientStreamObservers funcs
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
// WithClientPublishObservers funcs
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
// WithClientCallFuncObservers funcs
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
// WithServerHandlerObservers funcs
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
// WithServerSubscriberObservers funcs
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()

View File

@@ -65,8 +65,25 @@ func IsEmpty(v reflect.Value) bool {
return v.IsNil()
case reflect.Invalid:
return true
case reflect.Struct:
var ok bool
for i := 0; i < v.NumField(); i++ {
ok = IsEmpty(v.FieldByIndex([]int{i}))
if !ok {
return false
}
}
default:
fmt.Printf("%#+v\n", v)
return false
}
return false
return true
}
// IsZero returns true if struct is zero (not have any defined values)
func IsZero(src interface{}) bool {
v := reflect.ValueOf(src)
return IsEmpty(v)
}
// Zero creates new zero interface

View File

@@ -43,3 +43,50 @@ func TestURLVars(t *testing.T) {
}
_ = mp
}
func TestIsZero(t *testing.T) {
testStr1 := struct {
Name string
Value string
Nested struct {
NestedName string
}
}{
Name: "test_name",
Value: "test_value",
}
testStr1.Nested.NestedName = "nested_name"
if ok := IsZero(testStr1); ok {
t.Fatalf("zero ret on non zero struct: %#+v", testStr1)
}
testStr1.Name = ""
testStr1.Value = ""
testStr1.Nested.NestedName = ""
if ok := IsZero(testStr1); !ok {
t.Fatalf("non zero ret on zero struct: %#+v", testStr1)
}
type testStr3 struct {
Nested string
}
type testStr2 struct {
Name string
Nested *testStr3
}
vtest := &testStr2{
Name: "test_name",
Nested: &testStr3{Nested: "nested_name"},
}
if ok := IsZero(vtest); ok {
t.Fatalf("zero ret on non zero struct: %#+v", vtest)
}
vtest.Nested = nil
vtest.Name = ""
if ok := IsZero(vtest); !ok {
t.Fatalf("non zero ret on zero struct: %#+v", vtest)
}
//t.Logf("XX %#+v\n", ok)
}