Compare commits
24 Commits
Author | SHA1 | Date | |
---|---|---|---|
e13c2c48fd | |||
8db55d2e55 | |||
ed61cad961 | |||
040fc4548f | |||
6189a1b980 | |||
eb2a450a7b | |||
|
d2a30a5da1 | ||
65889c66f6 | |||
dcdf133d5b | |||
8742b55305 | |||
4a64ee72f7 | |||
881d7afeea | |||
8c95448535 | |||
c1dc041d8c | |||
|
25be0ac0f0 | ||
|
86f73cac4e | ||
485eda6ce9 | |||
dbbdb24631 | |||
723ceb4f32 | |||
bac9869bb3 | |||
610427445f | |||
|
c84a66c713 | ||
00eaae717b | |||
a102e95433 |
@@ -30,7 +30,7 @@ var (
|
||||
// Auth provides authentication and authorization
|
||||
type Auth interface {
|
||||
// Init the auth
|
||||
Init(opts ...Option)
|
||||
Init(opts ...Option) error
|
||||
// Options set for auth
|
||||
Options() Options
|
||||
// Generate a new account
|
||||
|
@@ -14,10 +14,11 @@ func (n *noopAuth) String() string {
|
||||
}
|
||||
|
||||
// Init the auth
|
||||
func (n *noopAuth) Init(opts ...Option) {
|
||||
func (n *noopAuth) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&n.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Options set for auth
|
||||
|
@@ -2,6 +2,7 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"time"
|
||||
|
||||
"github.com/unistack-org/micro/v3/broker"
|
||||
@@ -50,8 +51,10 @@ type Options struct {
|
||||
Wrappers []Wrapper
|
||||
// PoolSize connection pool size
|
||||
PoolSize int
|
||||
// PoolTTL conection pool ttl
|
||||
// PoolTTL connection pool ttl
|
||||
PoolTTL time.Duration
|
||||
// TLSConfig specifies tls.Config for secure connection
|
||||
TLSConfig *tls.Config
|
||||
}
|
||||
|
||||
// NewCallOptions creates new call options struct
|
||||
@@ -160,20 +163,23 @@ func NewOptions(opts ...Option) Options {
|
||||
ContentType: DefaultContentType,
|
||||
Codecs: make(map[string]codec.Codec),
|
||||
CallOptions: CallOptions{
|
||||
Context: context.Background(),
|
||||
Backoff: DefaultBackoff,
|
||||
Retry: DefaultRetry,
|
||||
Retries: DefaultRetries,
|
||||
RequestTimeout: DefaultRequestTimeout,
|
||||
DialTimeout: transport.DefaultDialTimeout,
|
||||
},
|
||||
Lookup: LookupRoute,
|
||||
PoolSize: DefaultPoolSize,
|
||||
PoolTTL: DefaultPoolTTL,
|
||||
Selector: random.NewSelector(),
|
||||
Logger: logger.DefaultLogger,
|
||||
Broker: broker.DefaultBroker,
|
||||
Meter: meter.DefaultMeter,
|
||||
Tracer: tracer.DefaultTracer,
|
||||
Lookup: LookupRoute,
|
||||
PoolSize: DefaultPoolSize,
|
||||
PoolTTL: DefaultPoolTTL,
|
||||
Selector: random.NewSelector(),
|
||||
Logger: logger.DefaultLogger,
|
||||
Broker: broker.DefaultBroker,
|
||||
Meter: meter.DefaultMeter,
|
||||
Tracer: tracer.DefaultTracer,
|
||||
Router: router.DefaultRouter,
|
||||
Transport: transport.DefaultTransport,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
@@ -311,6 +317,22 @@ func Lookup(l LookupFunc) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// TLSConfig specifies a *tls.Config
|
||||
func TLSConfig(t *tls.Config) Option {
|
||||
return func(o *Options) {
|
||||
// set the internal tls
|
||||
o.TLSConfig = t
|
||||
|
||||
// set the default transport if one is not
|
||||
// already set. Required for Init call below.
|
||||
|
||||
// set the transport tls
|
||||
o.Transport.Init(
|
||||
transport.TLSConfig(t),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Retries sets the retry count when making the request.
|
||||
func Retries(i int) Option {
|
||||
return func(o *Options) {
|
||||
|
6
codec/frame.go
Normal file
6
codec/frame.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package codec
|
||||
|
||||
// Frame gives us the ability to define raw data to send over the pipes
|
||||
type Frame struct {
|
||||
Data []byte
|
||||
}
|
28
codec/frame.proto
Normal file
28
codec/frame.proto
Normal file
@@ -0,0 +1,28 @@
|
||||
// Copyright 2021 Unistack LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package micro.codec;
|
||||
|
||||
option cc_enable_arenas = true;
|
||||
option go_package = "github.com/unistack-org/micro/v3/codec;codec";
|
||||
option java_multiple_files = true;
|
||||
option java_outer_classname = "MicroCodec";
|
||||
option java_package = "micro.codec";
|
||||
option objc_class_prefix = "MCODEC";
|
||||
|
||||
message Frame {
|
||||
bytes data = 1;
|
||||
}
|
@@ -8,11 +8,6 @@ import (
|
||||
type noopCodec struct {
|
||||
}
|
||||
|
||||
// Frame gives us the ability to define raw data to send over the pipes
|
||||
type Frame struct {
|
||||
Data []byte
|
||||
}
|
||||
|
||||
func (c *noopCodec) ReadHeader(conn io.Reader, m *Message, t MessageType) error {
|
||||
return nil
|
||||
}
|
||||
|
4
go.mod
4
go.mod
@@ -6,8 +6,8 @@ require (
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/ef-ds/deque v1.0.4
|
||||
github.com/google/uuid v1.2.0
|
||||
github.com/imdario/mergo v0.3.11
|
||||
github.com/imdario/mergo v0.3.12
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
|
||||
golang.org/x/net v0.0.0-20210324205630-d1beb07c2056
|
||||
)
|
||||
|
9
go.sum
9
go.sum
@@ -4,15 +4,16 @@ github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
|
||||
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
||||
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
|
||||
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
|
||||
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
|
||||
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210324205630-d1beb07c2056 h1:sANdAef76Ioam9aQUUdcAqricwY/WUaMc4+7LY4eGg8=
|
||||
golang.org/x/net v0.0.0-20210324205630-d1beb07c2056/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
@@ -1,14 +1,67 @@
|
||||
// Package wrapper provides wrapper for Tracer
|
||||
// Package wrapper provides wrapper for Logger
|
||||
package wrapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string {
|
||||
labels := []string{"endpoint", msg.Topic()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, err error) []string {
|
||||
labels := []string{"endpoint", msg.Topic()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
DefaultSkipEndpoints = []string{"Meter.Metrics"}
|
||||
)
|
||||
|
||||
type lWrapper struct {
|
||||
client.Client
|
||||
serverHandler server.HandlerFunc
|
||||
@@ -24,20 +77,34 @@ type ClientCallFuncObserver func(context.Context, string, client.Request, interf
|
||||
type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
|
||||
type ServerSubscriberObserver func(context.Context, server.Message, error) []string
|
||||
|
||||
// Options struct for wrapper
|
||||
type Options struct {
|
||||
Logger logger.Logger
|
||||
Level logger.Level
|
||||
Enabled bool
|
||||
ClientCallObservers []ClientCallObserver
|
||||
ClientStreamObservers []ClientStreamObserver
|
||||
ClientPublishObservers []ClientPublishObserver
|
||||
ClientCallFuncObservers []ClientCallFuncObserver
|
||||
ServerHandlerObservers []ServerHandlerObserver
|
||||
// Logger that used for log
|
||||
Logger logger.Logger
|
||||
// Level for logger
|
||||
Level logger.Level
|
||||
// Enabled flag
|
||||
Enabled bool
|
||||
// ClientCallObservers funcs
|
||||
ClientCallObservers []ClientCallObserver
|
||||
// ClientStreamObservers funcs
|
||||
ClientStreamObservers []ClientStreamObserver
|
||||
// ClientPublishObservers funcs
|
||||
ClientPublishObservers []ClientPublishObserver
|
||||
// ClientCallFuncObservers funcs
|
||||
ClientCallFuncObservers []ClientCallFuncObserver
|
||||
// ServerHandlerObservers funcs
|
||||
ServerHandlerObservers []ServerHandlerObserver
|
||||
// ServerSubscriberObservers funcs
|
||||
ServerSubscriberObservers []ServerSubscriberObserver
|
||||
// SkipEndpoints
|
||||
SkipEndpoints []string
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
type Option func(*Options)
|
||||
|
||||
// NewOptions creates Options from Option slice
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Logger: logger.DefaultLogger,
|
||||
@@ -48,6 +115,7 @@ func NewOptions(opts ...Option) Options {
|
||||
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
|
||||
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
|
||||
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
|
||||
SkipEndpoints: DefaultSkipEndpoints,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
@@ -57,111 +125,86 @@ func NewOptions(opts ...Option) Options {
|
||||
return options
|
||||
}
|
||||
|
||||
// WithEnabled enable/diable flag
|
||||
func WithEnabled(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.Enabled = b
|
||||
}
|
||||
}
|
||||
|
||||
// WithLevel log level
|
||||
func WithLevel(l logger.Level) Option {
|
||||
return func(o *Options) {
|
||||
o.Level = l
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger logger
|
||||
func WithLogger(l logger.Logger) Option {
|
||||
return func(o *Options) {
|
||||
o.Logger = l
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientCallObservers funcs
|
||||
func WithClientCallObservers(ob ...ClientCallObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientCallObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientStreamObservers funcs
|
||||
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientStreamObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientPublishObservers funcs
|
||||
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientPublishObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientCallFuncObservers funcs
|
||||
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientCallFuncObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithServerHandlerObservers funcs
|
||||
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerHandlerObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithServerSubscriberObservers funcs
|
||||
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerSubscriberObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
// SkipEndpoins
|
||||
func SkipEndpoints(eps ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string {
|
||||
labels := []string{"endpoint", msg.Topic()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, err error) []string {
|
||||
labels := []string{"endpoint", msg.Topic()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string {
|
||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||
if err != nil {
|
||||
labels = append(labels, "error", err.Error())
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
err := l.Client.Call(ctx, req, rsp, opts...)
|
||||
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range l.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !l.opts.Enabled {
|
||||
return err
|
||||
}
|
||||
@@ -182,6 +225,13 @@ func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}
|
||||
func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
stream, err := l.Client.Stream(ctx, req, opts...)
|
||||
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range l.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return stream, err
|
||||
}
|
||||
}
|
||||
|
||||
if !l.opts.Enabled {
|
||||
return stream, err
|
||||
}
|
||||
@@ -202,6 +252,13 @@ func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...clien
|
||||
func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
|
||||
err := l.Client.Publish(ctx, msg, opts...)
|
||||
|
||||
endpoint := msg.Topic()
|
||||
for _, ep := range l.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !l.opts.Enabled {
|
||||
return err
|
||||
}
|
||||
@@ -222,6 +279,13 @@ func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...clie
|
||||
func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
err := l.serverHandler(ctx, req, rsp)
|
||||
|
||||
endpoint := req.Endpoint()
|
||||
for _, ep := range l.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !l.opts.Enabled {
|
||||
return err
|
||||
}
|
||||
@@ -242,6 +306,13 @@ func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp in
|
||||
func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
|
||||
err := l.serverSubscriber(ctx, msg)
|
||||
|
||||
endpoint := msg.Topic()
|
||||
for _, ep := range l.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !l.opts.Enabled {
|
||||
return err
|
||||
}
|
||||
@@ -259,7 +330,7 @@ func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) err
|
||||
return err
|
||||
}
|
||||
|
||||
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
|
||||
// NewClientWrapper accepts an open options and returns a Client Wrapper
|
||||
func NewClientWrapper(opts ...Option) client.Wrapper {
|
||||
return func(c client.Client) client.Client {
|
||||
options := NewOptions()
|
||||
@@ -270,7 +341,7 @@ func NewClientWrapper(opts ...Option) client.Wrapper {
|
||||
}
|
||||
}
|
||||
|
||||
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
|
||||
// NewClientCallWrapper accepts an options and returns a Call Wrapper
|
||||
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
|
||||
return func(h client.CallFunc) client.CallFunc {
|
||||
options := NewOptions()
|
||||
@@ -286,6 +357,13 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper {
|
||||
func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
err := l.clientCallFunc(ctx, addr, req, rsp, opts)
|
||||
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range l.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !l.opts.Enabled {
|
||||
return err
|
||||
}
|
||||
@@ -316,7 +394,7 @@ func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
|
||||
// NewServerSubscriberWrapper accepts an options and returns a Subscriber Wrapper
|
||||
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||
return func(h server.SubscriberFunc) server.SubscriberFunc {
|
||||
options := NewOptions()
|
||||
|
@@ -112,6 +112,7 @@ func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata {
|
||||
return nmd
|
||||
}
|
||||
|
||||
// Pairs from which metadata created
|
||||
func Pairs(kv ...string) (Metadata, bool) {
|
||||
if len(kv)%2 == 1 {
|
||||
return nil, false
|
||||
|
3
meter/generate.go
Normal file
3
meter/generate.go
Normal file
@@ -0,0 +1,3 @@
|
||||
package meter
|
||||
|
||||
//go:generate protoc -I./handler -I../ -I/home/vtolstov/.cache/go-path/pkg/mod/github.com/unistack-org/micro-proto@v0.0.1 --micro_out=components=micro|http|server,standalone=false,debug=true,paths=source_relative:./handler handler/handler.proto
|
36
meter/handler/handler.go
Normal file
36
meter/handler/handler.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package pb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/meter"
|
||||
)
|
||||
|
||||
var (
|
||||
// guard to fail early
|
||||
_ MeterServer = &handler{}
|
||||
)
|
||||
|
||||
type Empty struct{}
|
||||
|
||||
type handler struct {
|
||||
meter meter.Meter
|
||||
opts []meter.Option
|
||||
}
|
||||
|
||||
func NewHandler(meter meter.Meter, opts ...meter.Option) *handler {
|
||||
return &handler{meter: meter, opts: opts}
|
||||
}
|
||||
|
||||
func (h *handler) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
if err := h.meter.Write(buf, h.opts...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rsp.Data = buf.Bytes()
|
||||
|
||||
return nil
|
||||
}
|
28
meter/handler/handler.proto
Normal file
28
meter/handler/handler.proto
Normal file
@@ -0,0 +1,28 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package meter;
|
||||
option go_package = "github.com/unistack-org/micro/v3/meter/handler;pb";
|
||||
|
||||
import "api/annotations.proto";
|
||||
import "openapiv2/annotations.proto";
|
||||
import "codec/frame.proto";
|
||||
|
||||
service Meter {
|
||||
rpc Metrics(micro.codec.Frame) returns (micro.codec.Frame) {
|
||||
option (micro.openapiv2.openapiv2_operation) = {
|
||||
operation_id: "Metrics";
|
||||
responses: {
|
||||
key: "default";
|
||||
value: {
|
||||
description: "Error response";
|
||||
schema: {
|
||||
json_schema: {
|
||||
ref: "Empty";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
option (micro.api.http) = { get: "/metrics"; };
|
||||
};
|
||||
};
|
24
meter/handler/handler_micro.pb.go
Normal file
24
meter/handler/handler_micro.pb.go
Normal file
@@ -0,0 +1,24 @@
|
||||
// Code generated by protoc-gen-micro
|
||||
// source: handler.proto
|
||||
package pb
|
||||
|
||||
import (
|
||||
context "context"
|
||||
api "github.com/unistack-org/micro/v3/api"
|
||||
codec "github.com/unistack-org/micro/v3/codec"
|
||||
)
|
||||
|
||||
func NewMeterEndpoints() []*api.Endpoint {
|
||||
return []*api.Endpoint{
|
||||
&api.Endpoint{
|
||||
Name: "Meter.Metrics",
|
||||
Path: []string{"/metrics"},
|
||||
Method: []string{"GET"},
|
||||
Handler: "rpc",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type MeterServer interface {
|
||||
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
|
||||
}
|
33
meter/handler/handler_micro_http.pb.go
Normal file
33
meter/handler/handler_micro_http.pb.go
Normal file
@@ -0,0 +1,33 @@
|
||||
// Code generated by protoc-gen-micro
|
||||
// source: handler.proto
|
||||
package pb
|
||||
|
||||
import (
|
||||
context "context"
|
||||
api "github.com/unistack-org/micro/v3/api"
|
||||
codec "github.com/unistack-org/micro/v3/codec"
|
||||
server "github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
type meterServer struct {
|
||||
MeterServer
|
||||
}
|
||||
|
||||
func (h *meterServer) Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error {
|
||||
return h.MeterServer.Metrics(ctx, req, rsp)
|
||||
}
|
||||
|
||||
func RegisterMeterServer(s server.Server, sh MeterServer, opts ...server.HandlerOption) error {
|
||||
type meter interface {
|
||||
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
|
||||
}
|
||||
type Meter struct {
|
||||
meter
|
||||
}
|
||||
h := &meterServer{sh}
|
||||
var nopts []server.HandlerOption
|
||||
for _, endpoint := range NewMeterEndpoints() {
|
||||
nopts = append(nopts, api.WithEndpoint(endpoint))
|
||||
}
|
||||
return s.Handle(s.NewHandler(&Meter{h}, append(nopts, opts...)...))
|
||||
}
|
@@ -3,6 +3,7 @@ package meter
|
||||
|
||||
import (
|
||||
"io"
|
||||
"reflect"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
@@ -76,66 +77,36 @@ type Summary interface {
|
||||
UpdateDuration(time.Time)
|
||||
}
|
||||
|
||||
// Labels holds the metrics labels with k, v
|
||||
type Labels struct {
|
||||
keys []string
|
||||
vals []string
|
||||
type byKey []string
|
||||
|
||||
func (k byKey) Len() int { return len(k) / 2 }
|
||||
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
|
||||
func (k byKey) Swap(i, j int) {
|
||||
k[i*2], k[i*2+1], k[j*2], k[j*2+1] = k[j*2], k[j*2+1], k[i*2], k[i*2+1]
|
||||
}
|
||||
|
||||
// Append adds labels to label set
|
||||
func (ls Labels) Append(nls Labels) Labels {
|
||||
for n := range nls.keys {
|
||||
ls.keys = append(ls.keys, nls.keys[n])
|
||||
ls.vals = append(ls.vals, nls.vals[n])
|
||||
func Sort(slice *[]string) {
|
||||
bk := byKey(*slice)
|
||||
if bk.Len() <= 1 {
|
||||
return
|
||||
}
|
||||
return ls
|
||||
}
|
||||
|
||||
// Len returns number of labels
|
||||
func (ls Labels) Len() int {
|
||||
return len(ls.keys)
|
||||
}
|
||||
|
||||
type labels Labels
|
||||
|
||||
func (ls labels) Len() int {
|
||||
return len(ls.keys)
|
||||
}
|
||||
|
||||
func (ls labels) Sort() {
|
||||
sort.Sort(ls)
|
||||
}
|
||||
|
||||
func (ls labels) Swap(i, j int) {
|
||||
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
|
||||
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
|
||||
}
|
||||
|
||||
func (ls labels) Less(i, j int) bool {
|
||||
return ls.keys[i] < ls.keys[j]
|
||||
}
|
||||
|
||||
// LabelIter holds the
|
||||
type LabelIter struct {
|
||||
labels Labels
|
||||
cnt int
|
||||
cur int
|
||||
}
|
||||
|
||||
// Iter returns labels iterator
|
||||
func (ls Labels) Iter() *LabelIter {
|
||||
labels(ls).Sort()
|
||||
return &LabelIter{labels: ls, cnt: len(ls.keys)}
|
||||
}
|
||||
|
||||
// Next advance itarator to new pos
|
||||
func (iter *LabelIter) Next(k, v *string) bool {
|
||||
if iter.cur+1 > iter.cnt {
|
||||
return false
|
||||
sort.Sort(bk)
|
||||
v := reflect.ValueOf(slice).Elem()
|
||||
cnt := 0
|
||||
key := 0
|
||||
val := 1
|
||||
for key < v.Len() {
|
||||
if len(bk) > key+2 && bk[key] == bk[key+2] {
|
||||
key += 2
|
||||
val += 2
|
||||
continue
|
||||
}
|
||||
v.Index(cnt).Set(v.Index(key))
|
||||
cnt++
|
||||
v.Index(cnt).Set(v.Index(val))
|
||||
cnt++
|
||||
key += 2
|
||||
val += 2
|
||||
}
|
||||
|
||||
*k = iter.labels.keys[iter.cur]
|
||||
*v = iter.labels.vals[iter.cur]
|
||||
iter.cur++
|
||||
return true
|
||||
v.SetLen(cnt)
|
||||
}
|
||||
|
@@ -10,46 +10,15 @@ func TestNoopMeter(t *testing.T) {
|
||||
t.Fatalf("invalid options parsing: %v", m.Options())
|
||||
}
|
||||
|
||||
cnt := m.Counter("counter", Label("server", "noop"))
|
||||
cnt := m.Counter("counter", Labels("server", "noop"))
|
||||
cnt.Inc()
|
||||
}
|
||||
|
||||
func TestLabelsAppend(t *testing.T) {
|
||||
var ls Labels
|
||||
ls.keys = []string{"type", "server"}
|
||||
ls.vals = []string{"noop", "http"}
|
||||
func TestLabelsSort(t *testing.T) {
|
||||
ls := []string{"server", "http", "register", "mdns", "broker", "broker1", "broker", "broker2", "server", "tcp"}
|
||||
Sort(&ls)
|
||||
|
||||
var nls Labels
|
||||
nls.keys = []string{"register"}
|
||||
nls.vals = []string{"gossip"}
|
||||
ls = ls.Append(nls)
|
||||
|
||||
//ls.Sort()
|
||||
|
||||
if ls.keys[0] != "type" || ls.vals[0] != "noop" {
|
||||
t.Fatalf("append error: %v", ls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIterator(t *testing.T) {
|
||||
options := NewOptions(
|
||||
Label("name", "svc1"),
|
||||
Label("version", "0.0.1"),
|
||||
Label("id", "12345"),
|
||||
Label("type", "noop"),
|
||||
Label("server", "http"),
|
||||
Label("register", "gossip"),
|
||||
Label("aa", "kk"),
|
||||
Label("zz", "kk"),
|
||||
)
|
||||
|
||||
iter := options.Labels.Iter()
|
||||
var k, v string
|
||||
cnt := 0
|
||||
for iter.Next(&k, &v) {
|
||||
if cnt == 4 && (k != "server" || v != "http") {
|
||||
t.Fatalf("iter error: %s != %s || %s != %s", k, "server", v, "http")
|
||||
}
|
||||
cnt++
|
||||
if ls[0] != "broker" || ls[1] != "broker2" {
|
||||
t.Fatalf("sort error: %v", ls)
|
||||
}
|
||||
}
|
||||
|
@@ -107,7 +107,7 @@ func (r *noopMeter) String() string {
|
||||
}
|
||||
|
||||
type noopCounter struct {
|
||||
labels Labels
|
||||
labels []string
|
||||
}
|
||||
|
||||
func (r *noopCounter) Add(int) {
|
||||
@@ -131,7 +131,7 @@ func (r *noopCounter) Set(uint64) {
|
||||
}
|
||||
|
||||
type noopFloatCounter struct {
|
||||
labels Labels
|
||||
labels []string
|
||||
}
|
||||
|
||||
func (r *noopFloatCounter) Add(float64) {
|
||||
@@ -151,7 +151,7 @@ func (r *noopFloatCounter) Sub(float64) {
|
||||
}
|
||||
|
||||
type noopGauge struct {
|
||||
labels Labels
|
||||
labels []string
|
||||
}
|
||||
|
||||
func (r *noopGauge) Get() float64 {
|
||||
@@ -159,7 +159,7 @@ func (r *noopGauge) Get() float64 {
|
||||
}
|
||||
|
||||
type noopSummary struct {
|
||||
labels Labels
|
||||
labels []string
|
||||
}
|
||||
|
||||
func (r *noopSummary) Update(float64) {
|
||||
@@ -171,7 +171,7 @@ func (r *noopSummary) UpdateDuration(time.Time) {
|
||||
}
|
||||
|
||||
type noopHistogram struct {
|
||||
labels Labels
|
||||
labels []string
|
||||
}
|
||||
|
||||
func (r *noopHistogram) Reset() {
|
||||
|
@@ -26,7 +26,7 @@ type Options struct {
|
||||
// LabelPrefix holds the prefix for all labels
|
||||
LabelPrefix string
|
||||
// Labels holds the default labels
|
||||
Labels Labels
|
||||
Labels []string
|
||||
// WriteProcessMetrics flag to write process metrics
|
||||
WriteProcessMetrics bool
|
||||
// WriteFDMetrics flag to write fd metrics
|
||||
@@ -88,11 +88,9 @@ func Logger(l logger.Logger) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Label sets the label
|
||||
func Label(key, val string) Option {
|
||||
func Labels(ls ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.Labels.keys = append(o.Labels.keys, key)
|
||||
o.Labels.vals = append(o.Labels.vals, val)
|
||||
o.Labels = ls
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -28,19 +28,24 @@ var (
|
||||
labelFailure = "failure"
|
||||
labelStatus = "status"
|
||||
labelEndpoint = "endpoint"
|
||||
|
||||
// DefaultSkipEndpoints contains list of endpoints that not evaluted by wrapper
|
||||
DefaultSkipEndpoints = []string{"Meter.Metrics"}
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Meter meter.Meter
|
||||
lopts []meter.Option
|
||||
Meter meter.Meter
|
||||
lopts []meter.Option
|
||||
SkipEndpoints []string
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Meter: meter.DefaultMeter,
|
||||
lopts: make([]meter.Option, 0, 5),
|
||||
Meter: meter.DefaultMeter,
|
||||
lopts: make([]meter.Option, 0, 5),
|
||||
SkipEndpoints: DefaultSkipEndpoints,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
@@ -50,19 +55,19 @@ func NewOptions(opts ...Option) Options {
|
||||
|
||||
func ServiceName(name string) Option {
|
||||
return func(o *Options) {
|
||||
o.lopts = append(o.lopts, meter.Label("name", name))
|
||||
o.lopts = append(o.lopts, meter.Labels("name", name))
|
||||
}
|
||||
}
|
||||
|
||||
func ServiceVersion(version string) Option {
|
||||
return func(o *Options) {
|
||||
o.lopts = append(o.lopts, meter.Label("version", version))
|
||||
o.lopts = append(o.lopts, meter.Labels("version", version))
|
||||
}
|
||||
}
|
||||
|
||||
func ServiceID(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.lopts = append(o.lopts, meter.Label("id", id))
|
||||
o.lopts = append(o.lopts, meter.Labels("id", id))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +77,12 @@ func Meter(m meter.Meter) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func SkipEndoints(eps ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
|
||||
}
|
||||
}
|
||||
|
||||
type wrapper struct {
|
||||
client.Client
|
||||
callFunc client.CallFunc
|
||||
@@ -100,21 +111,25 @@ func NewCallWrapper(opts ...Option) client.CallWrapper {
|
||||
|
||||
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
|
||||
for _, ep := range w.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return w.callFunc(ctx, addr, req, rsp, opts)
|
||||
}
|
||||
}
|
||||
ts := time.Now()
|
||||
err := w.callFunc(ctx, addr, req, rsp, opts)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
} else {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
}
|
||||
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
|
||||
|
||||
@@ -123,21 +138,26 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
|
||||
|
||||
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range w.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return w.Client.Call(ctx, req, rsp, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
ts := time.Now()
|
||||
err := w.Client.Call(ctx, req, rsp, opts...)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
} else {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
}
|
||||
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
|
||||
|
||||
@@ -146,21 +166,26 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
|
||||
|
||||
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range w.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return w.Client.Stream(ctx, req, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
ts := time.Now()
|
||||
stream, err := w.Client.Stream(ctx, req, opts...)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
} else {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
}
|
||||
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
|
||||
|
||||
@@ -175,15 +200,15 @@ func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(PublishMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
} else {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
}
|
||||
w.opts.Meter.Counter(PublishMessageTotal, lopts...).Inc()
|
||||
|
||||
@@ -200,21 +225,26 @@ func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
||||
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
endpoint := req.Endpoint()
|
||||
for _, ep := range w.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return fn(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
|
||||
ts := time.Now()
|
||||
err := fn(ctx, req, rsp)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(ServerRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
} else {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
}
|
||||
w.opts.Meter.Counter(ServerRequestTotal, lopts...).Inc()
|
||||
|
||||
@@ -238,15 +268,15 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
lopts = append(lopts, meter.Labels(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
|
||||
|
||||
if err == nil {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelSuccess))
|
||||
} else {
|
||||
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
|
||||
lopts = append(lopts, meter.Labels(labelStatus, labelFailure))
|
||||
}
|
||||
w.opts.Meter.Counter(SubscribeMessageTotal, lopts...).Inc()
|
||||
|
||||
|
@@ -23,7 +23,8 @@ func ExtractValue(v reflect.Type, d int) *Value {
|
||||
v = v.Elem()
|
||||
}
|
||||
|
||||
if len(v.Name()) == 0 {
|
||||
// slices and maps don't have a defined name
|
||||
if (v.Kind() == reflect.Slice || v.Kind() == reflect.Map) || len(v.Name()) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -71,6 +72,16 @@ func ExtractValue(v reflect.Type, d int) *Value {
|
||||
p = p.Elem()
|
||||
}
|
||||
arg.Type = "[]" + p.Name()
|
||||
case reflect.Map:
|
||||
p := v.Elem()
|
||||
if p.Kind() == reflect.Ptr {
|
||||
p = p.Elem()
|
||||
}
|
||||
key := v.Key()
|
||||
if key.Kind() == reflect.Ptr {
|
||||
key = key.Elem()
|
||||
}
|
||||
arg.Type = fmt.Sprintf("map[%s]%s", key.Name(), p.Name())
|
||||
}
|
||||
|
||||
return arg
|
||||
|
91
router/dns.go
Normal file
91
router/dns.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// NewRouter returns an initialized dns router
|
||||
func NewRouter(opts ...Option) Router {
|
||||
options := NewOptions(opts...)
|
||||
return &dns{options}
|
||||
}
|
||||
|
||||
type dns struct {
|
||||
options Options
|
||||
}
|
||||
|
||||
func (d *dns) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&d.options)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dns) Options() Options {
|
||||
return d.options
|
||||
}
|
||||
|
||||
func (d *dns) Table() Table {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dns) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dns) Lookup(opts ...QueryOption) ([]Route, error) {
|
||||
options := NewQuery(opts...)
|
||||
// check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000
|
||||
host, port, err := net.SplitHostPort(options.Service)
|
||||
if err == nil {
|
||||
// lookup the service using A records
|
||||
ips, err := net.LookupHost(host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p, _ := strconv.Atoi(port)
|
||||
|
||||
// convert the ip addresses to routes
|
||||
result := make([]Route, len(ips))
|
||||
for i, ip := range ips {
|
||||
result[i] = Route{
|
||||
Service: options.Service,
|
||||
Address: fmt.Sprintf("%s:%d", ip, uint16(p)),
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// we didn't get the port so we'll lookup the service using SRV records. If we can't lookup the
|
||||
// service using the SRV record, we return the error.
|
||||
_, nodes, err := net.LookupSRV(options.Service, "tcp", d.options.Network)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// convert the nodes (net services) to routes
|
||||
result := make([]Route, len(nodes))
|
||||
for i, n := range nodes {
|
||||
result[i] = Route{
|
||||
Service: options.Service,
|
||||
Address: fmt.Sprintf("%s:%d", n.Target, n.Port),
|
||||
Network: d.options.Network,
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (d *dns) Watch(opts ...WatchOption) (Watcher, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (d *dns) Name() string {
|
||||
return d.options.Name
|
||||
}
|
||||
|
||||
func (d *dns) String() string {
|
||||
return "dns"
|
||||
}
|
@@ -7,7 +7,7 @@ import (
|
||||
|
||||
var (
|
||||
// DefaultRouter is the global default router
|
||||
DefaultRouter Router
|
||||
DefaultRouter Router = NewRouter()
|
||||
// DefaultNetwork is default micro network
|
||||
DefaultNetwork = "micro"
|
||||
// ErrRouteNotFound is returned when no route was found in the routing table
|
||||
|
@@ -37,7 +37,7 @@ type Runtime interface {
|
||||
|
||||
// Logs returns a log stream
|
||||
type Logs interface {
|
||||
// Error retuns error
|
||||
// Error returns error
|
||||
Error() error
|
||||
// Chan return chan log
|
||||
Chan() chan Log
|
||||
|
@@ -39,8 +39,12 @@ type Options struct {
|
||||
Meter meter.Meter
|
||||
// Transport holds the transport
|
||||
Transport transport.Transport
|
||||
// Router for requests
|
||||
Router Router
|
||||
|
||||
/*
|
||||
// Router for requests
|
||||
Router Router
|
||||
*/
|
||||
|
||||
// Listener may be passed if already created
|
||||
Listener net.Listener
|
||||
// Wait group
|
||||
@@ -61,7 +65,7 @@ type Options struct {
|
||||
Name string
|
||||
// Address holds the server address
|
||||
Address string
|
||||
// Advertise holds the advertie addres
|
||||
// Advertise holds the advertise address
|
||||
Advertise string
|
||||
// Version holds the server version
|
||||
Version string
|
||||
@@ -262,12 +266,14 @@ func TLSConfig(t *tls.Config) Option {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// WithRouter sets the request router
|
||||
func WithRouter(r Router) Option {
|
||||
return func(o *Options) {
|
||||
o.Router = r
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// Wait tells the server to wait for requests to finish before exiting
|
||||
// If `wg` is nil, server only wait for completion of rpc handler.
|
||||
@@ -344,7 +350,7 @@ type SubscriberOption func(*SubscriberOptions)
|
||||
type SubscriberOptions struct {
|
||||
// Context holds the external options
|
||||
Context context.Context
|
||||
// Queue holds the subscribtion queue
|
||||
// Queue holds the subscription queue
|
||||
Queue string
|
||||
// AutoAck flag for auto ack messages after processing
|
||||
AutoAck bool
|
||||
|
@@ -65,6 +65,7 @@ type Server interface {
|
||||
String() string
|
||||
}
|
||||
|
||||
/*
|
||||
// Router handle serving messages
|
||||
type Router interface {
|
||||
// ProcessMessage processes a message
|
||||
@@ -72,6 +73,7 @@ type Router interface {
|
||||
// ServeRequest processes a request to completion
|
||||
ServeRequest(ctx context.Context, req Request, rsp Response) error
|
||||
}
|
||||
*/
|
||||
|
||||
// Message is an async message interface
|
||||
type Message interface {
|
||||
|
@@ -11,6 +11,100 @@ import (
|
||||
"github.com/unistack-org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
DefaultSkipEndpoints = []string{"Meter.Metrics"}
|
||||
)
|
||||
|
||||
type tWrapper struct {
|
||||
client.Client
|
||||
serverHandler server.HandlerFunc
|
||||
@@ -26,18 +120,30 @@ type ClientCallFuncObserver func(context.Context, string, client.Request, interf
|
||||
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
|
||||
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
|
||||
|
||||
// Options struct
|
||||
type Options struct {
|
||||
Tracer tracer.Tracer
|
||||
ClientCallObservers []ClientCallObserver
|
||||
ClientStreamObservers []ClientStreamObserver
|
||||
ClientPublishObservers []ClientPublishObserver
|
||||
ClientCallFuncObservers []ClientCallFuncObserver
|
||||
ServerHandlerObservers []ServerHandlerObserver
|
||||
// Tracer that used for tracing
|
||||
Tracer tracer.Tracer
|
||||
// ClientCallObservers funcs
|
||||
ClientCallObservers []ClientCallObserver
|
||||
// ClientStreamObservers funcs
|
||||
ClientStreamObservers []ClientStreamObserver
|
||||
// ClientPublishObservers funcs
|
||||
ClientPublishObservers []ClientPublishObserver
|
||||
// ClientCallFuncObservers funcs
|
||||
ClientCallFuncObservers []ClientCallFuncObserver
|
||||
// ServerHandlerObservers funcs
|
||||
ServerHandlerObservers []ServerHandlerObserver
|
||||
// ServerSubscriberObservers funcs
|
||||
ServerSubscriberObservers []ServerSubscriberObserver
|
||||
// SkipEndpoints
|
||||
SkipEndpoints []string
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
type Option func(*Options)
|
||||
|
||||
// NewOptions create Options from Option slice
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Tracer: tracer.DefaultTracer,
|
||||
@@ -47,6 +153,7 @@ func NewOptions(opts ...Option) Options {
|
||||
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
|
||||
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
|
||||
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
|
||||
SkipEndpoints: DefaultSkipEndpoints,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
@@ -56,139 +163,70 @@ func NewOptions(opts ...Option) Options {
|
||||
return options
|
||||
}
|
||||
|
||||
// WithTracer pass tracer
|
||||
func WithTracer(t tracer.Tracer) Option {
|
||||
return func(o *Options) {
|
||||
o.Tracer = t
|
||||
}
|
||||
}
|
||||
|
||||
// SkipEndponts
|
||||
func SkipEndpoins(eps ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientCallObservers funcs
|
||||
func WithClientCallObservers(ob ...ClientCallObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientCallObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientStreamObservers funcs
|
||||
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientStreamObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientPublishObservers funcs
|
||||
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientPublishObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientCallFuncObservers funcs
|
||||
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientCallFuncObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithServerHandlerObservers funcs
|
||||
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerHandlerObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
// WithServerSubscriberObservers funcs
|
||||
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerSubscriberObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range ot.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return ot.Client.Call(ctx, req, rsp, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
@@ -202,6 +240,13 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{
|
||||
}
|
||||
|
||||
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range ot.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return ot.Client.Stream(ctx, req, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
@@ -228,6 +273,13 @@ func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...cli
|
||||
}
|
||||
|
||||
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
endpoint := req.Endpoint()
|
||||
for _, ep := range ot.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return ot.serverHandler(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
@@ -278,6 +330,13 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper {
|
||||
}
|
||||
|
||||
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
for _, ep := range ot.opts.SkipEndpoints {
|
||||
if ep == endpoint {
|
||||
return ot.ClientCallFunc(ctx, addr, req, rsp, opts)
|
||||
}
|
||||
}
|
||||
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
|
@@ -65,8 +65,24 @@ func IsEmpty(v reflect.Value) bool {
|
||||
return v.IsNil()
|
||||
case reflect.Invalid:
|
||||
return true
|
||||
case reflect.Struct:
|
||||
var ok bool
|
||||
for i := 0; i < v.NumField(); i++ {
|
||||
ok = IsEmpty(v.FieldByIndex([]int{i}))
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
default:
|
||||
return false
|
||||
}
|
||||
return false
|
||||
return true
|
||||
}
|
||||
|
||||
// IsZero returns true if struct is zero (not have any defined values)
|
||||
func IsZero(src interface{}) bool {
|
||||
v := reflect.ValueOf(src)
|
||||
return IsEmpty(v)
|
||||
}
|
||||
|
||||
// Zero creates new zero interface
|
||||
|
@@ -43,3 +43,50 @@ func TestURLVars(t *testing.T) {
|
||||
}
|
||||
_ = mp
|
||||
}
|
||||
|
||||
func TestIsZero(t *testing.T) {
|
||||
testStr1 := struct {
|
||||
Name string
|
||||
Value string
|
||||
Nested struct {
|
||||
NestedName string
|
||||
}
|
||||
}{
|
||||
Name: "test_name",
|
||||
Value: "test_value",
|
||||
}
|
||||
testStr1.Nested.NestedName = "nested_name"
|
||||
|
||||
if ok := IsZero(testStr1); ok {
|
||||
t.Fatalf("zero ret on non zero struct: %#+v", testStr1)
|
||||
}
|
||||
|
||||
testStr1.Name = ""
|
||||
testStr1.Value = ""
|
||||
testStr1.Nested.NestedName = ""
|
||||
if ok := IsZero(testStr1); !ok {
|
||||
t.Fatalf("non zero ret on zero struct: %#+v", testStr1)
|
||||
}
|
||||
|
||||
type testStr3 struct {
|
||||
Nested string
|
||||
}
|
||||
type testStr2 struct {
|
||||
Name string
|
||||
Nested *testStr3
|
||||
}
|
||||
vtest := &testStr2{
|
||||
Name: "test_name",
|
||||
Nested: &testStr3{Nested: "nested_name"},
|
||||
}
|
||||
if ok := IsZero(vtest); ok {
|
||||
t.Fatalf("zero ret on non zero struct: %#+v", vtest)
|
||||
}
|
||||
vtest.Nested = nil
|
||||
vtest.Name = ""
|
||||
if ok := IsZero(vtest); !ok {
|
||||
t.Fatalf("non zero ret on zero struct: %#+v", vtest)
|
||||
}
|
||||
|
||||
//t.Logf("XX %#+v\n", ok)
|
||||
}
|
||||
|
Reference in New Issue
Block a user