Compare commits

...

6 Commits

Author SHA1 Message Date
485eda6ce9 meter/wrapper: fix wrapper build
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-15 00:49:26 +03:00
dbbdb24631 meter: rework labels
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-15 00:44:13 +03:00
723ceb4f32 regsiter: fix extractor
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-12 16:09:20 +03:00
bac9869bb3 register: support map in ExtractValue
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-12 15:48:05 +03:00
610427445f codec: provide proto for codec.Frame
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-11 07:42:42 +03:00
Renovate Bot
c84a66c713 fix(deps): update module github.com/imdario/mergo to v0.3.12 2021-03-10 00:35:57 +00:00
11 changed files with 113 additions and 135 deletions

6
codec/frame.go Normal file
View File

@@ -0,0 +1,6 @@
package codec
// Frame gives us the ability to define raw data to send over the pipes
type Frame struct {
Data []byte
}

28
codec/frame.proto Normal file
View File

@@ -0,0 +1,28 @@
// Copyright 2021 Unistack LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package micro.codec;
option cc_enable_arenas = true;
option go_package = "github.com/unistack-org/micro/v3/codec;codec";
option java_multiple_files = true;
option java_outer_classname = "MicroCodec";
option java_package = "micro.codec";
option objc_class_prefix = "MCODEC";
message Frame {
bytes data = 1;
}

View File

@@ -8,11 +8,6 @@ import (
type noopCodec struct {
}
// Frame gives us the ability to define raw data to send over the pipes
type Frame struct {
Data []byte
}
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
return nil
}

2
go.mod
View File

@@ -6,7 +6,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4
github.com/google/uuid v1.2.0
github.com/imdario/mergo v0.3.11
github.com/imdario/mergo v0.3.12
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110

4
go.sum
View File

@@ -4,8 +4,8 @@ github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=

View File

@@ -3,6 +3,7 @@ package meter
import (
"io"
"reflect"
"sort"
"time"
)
@@ -76,66 +77,36 @@ type Summary interface {
UpdateDuration(time.Time)
}
// Labels holds the metrics labels with k, v
type Labels struct {
keys []string
vals []string
type byKey []string
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
func (k byKey) Swap(i, j int) {
k[i*2], k[i*2+1], k[j*2], k[j*2+1] = k[j*2], k[j*2+1], k[i*2], k[i*2+1]
}
// Append adds labels to label set
func (ls Labels) Append(nls Labels) Labels {
for n := range nls.keys {
ls.keys = append(ls.keys, nls.keys[n])
ls.vals = append(ls.vals, nls.vals[n])
func Sort(slice *[]string) {
bk := byKey(*slice)
if bk.Len() <= 1 {
return
}
return ls
}
// Len returns number of labels
func (ls Labels) Len() int {
return len(ls.keys)
}
type labels Labels
func (ls labels) Len() int {
return len(ls.keys)
}
func (ls labels) Sort() {
sort.Sort(ls)
}
func (ls labels) Swap(i, j int) {
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
}
func (ls labels) Less(i, j int) bool {
return ls.keys[i] < ls.keys[j]
}
// LabelIter holds the
type LabelIter struct {
labels Labels
cnt int
cur int
}
// Iter returns labels iterator
func (ls Labels) Iter() *LabelIter {
labels(ls).Sort()
return &LabelIter{labels: ls, cnt: len(ls.keys)}
}
// Next advance itarator to new pos
func (iter *LabelIter) Next(k, v *string) bool {
if iter.cur+1 > iter.cnt {
return false
sort.Sort(bk)
v := reflect.ValueOf(slice).Elem()
cnt := 0
key := 0
val := 1
for key < v.Len() {
if len(bk) > key+2 && bk[key] == bk[key+2] {
key += 2
val += 2
continue
}
v.Index(cnt).Set(v.Index(key))
cnt++
v.Index(cnt).Set(v.Index(val))
cnt++
key += 2
val += 2
}
*k = iter.labels.keys[iter.cur]
*v = iter.labels.vals[iter.cur]
iter.cur++
return true
v.SetLen(cnt)
}

View File

@@ -10,46 +10,15 @@ func TestNoopMeter(t *testing.T) {
t.Fatalf("invalid options parsing: %v", m.Options())
}
cnt := m.Counter("counter", Label("server", "noop"))
cnt := m.Counter("counter", Labels("server", "noop"))
cnt.Inc()
}
func TestLabelsAppend(t *testing.T) {
var ls Labels
ls.keys = []string{"type", "server"}
ls.vals = []string{"noop", "http"}
func TestLabelsSort(t *testing.T) {
ls := []string{"server", "http", "register", "mdns", "broker", "broker1", "broker", "broker2", "server", "tcp"}
Sort(&ls)
var nls Labels
nls.keys = []string{"register"}
nls.vals = []string{"gossip"}
ls = ls.Append(nls)
//ls.Sort()
if ls.keys[0] != "type" || ls.vals[0] != "noop" {
t.Fatalf("append error: %v", ls)
}
}
func TestIterator(t *testing.T) {
options := NewOptions(
Label("name", "svc1"),
Label("version", "0.0.1"),
Label("id", "12345"),
Label("type", "noop"),
Label("server", "http"),
Label("register", "gossip"),
Label("aa", "kk"),
Label("zz", "kk"),
)
iter := options.Labels.Iter()
var k, v string
cnt := 0
for iter.Next(&k, &v) {
if cnt == 4 && (k != "server" || v != "http") {
t.Fatalf("iter error: %s != %s || %s != %s", k, "server", v, "http")
}
cnt++
if ls[0] != "broker" || ls[1] != "broker2" {
t.Fatalf("sort error: %v", ls)
}
}

View File

@@ -107,7 +107,7 @@ func (r *noopMeter) String() string {
}
type noopCounter struct {
labels Labels
labels []string
}
func (r *noopCounter) Add(int) {
@@ -131,7 +131,7 @@ func (r *noopCounter) Set(uint64) {
}
type noopFloatCounter struct {
labels Labels
labels []string
}
func (r *noopFloatCounter) Add(float64) {
@@ -151,7 +151,7 @@ func (r *noopFloatCounter) Sub(float64) {
}
type noopGauge struct {
labels Labels
labels []string
}
func (r *noopGauge) Get() float64 {
@@ -159,7 +159,7 @@ func (r *noopGauge) Get() float64 {
}
type noopSummary struct {
labels Labels
labels []string
}
func (r *noopSummary) Update(float64) {
@@ -171,7 +171,7 @@ func (r *noopSummary) UpdateDuration(time.Time) {
}
type noopHistogram struct {
labels Labels
labels []string
}
func (r *noopHistogram) Reset() {

View File

@@ -26,7 +26,7 @@ type Options struct {
// LabelPrefix holds the prefix for all labels
LabelPrefix string
// Labels holds the default labels
Labels Labels
Labels []string
// WriteProcessMetrics flag to write process metrics
WriteProcessMetrics bool
// WriteFDMetrics flag to write fd metrics
@@ -88,11 +88,9 @@ func Logger(l logger.Logger) Option {
}
}
// Label sets the label
func Label(key, val string) Option {
func Labels(ls ...string) Option {
return func(o *Options) {
o.Labels.keys = append(o.Labels.keys, key)
o.Labels.vals = append(o.Labels.vals, val)
o.Labels = ls
}
}

View File

@@ -50,19 +50,19 @@ func NewOptions(opts ...Option) Options {
func ServiceName(name string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Label("name", name))
o.lopts = append(o.lopts, meter.Labels("name", name))
}
}
func ServiceVersion(version string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Label("version", version))
o.lopts = append(o.lopts, meter.Labels("version", version))
}
}
func ServiceID(id string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Label("id", id))
o.lopts = append(o.lopts, meter.Labels("id", id))
}
}
@@ -106,15 +106,15 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
@@ -129,15 +129,15 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
@@ -152,15 +152,15 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
@@ -175,15 +175,15 @@ func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(PublishMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(PublishMessageTotal, lopts...).Inc()
@@ -206,15 +206,15 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ServerRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ServerRequestTotal, lopts...).Inc()
@@ -238,15 +238,15 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
te := time.Since(ts)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
} else {
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
}
w.opts.Meter.Counter(SubscribeMessageTotal, lopts...).Inc()

View File

@@ -23,7 +23,8 @@ func ExtractValue(v reflect.Type, d int) *Value {
v = v.Elem()
}
if len(v.Name()) == 0 {
// slices and maps don't have a defined name
if (v.Kind() == reflect.Slice || v.Kind() == reflect.Map) || len(v.Name()) == 0 {
return nil
}
@@ -71,6 +72,16 @@ func ExtractValue(v reflect.Type, d int) *Value {
p = p.Elem()
}
arg.Type = "[]" + p.Name()
case reflect.Map:
p := v.Elem()
if p.Kind() == reflect.Ptr {
p = p.Elem()
}
key := v.Key()
if key.Kind() == reflect.Ptr {
key = key.Elem()
}
arg.Type = fmt.Sprintf("map[%s]%s", key.Name(), p.Name())
}
return arg