From faf2454f0a0659dc69eee2de0dc5044cc04c6166 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 20 Sep 2024 17:54:17 +0300 Subject: [PATCH] cleanup Signed-off-by: Vasiliy Tolstov --- broker/broker.go | 2 +- client/client.go | 2 +- config/config.go | 2 +- errors/errors.go | 2 +- flow/flow.go | 2 +- fsm/fsm.go | 2 +- metadata/metadata.go | 2 +- meter/meter.go | 4 - meter/wrapper/wrapper.go | 347 --------------------- mtls/mtls.go | 2 +- network/network.go | 2 +- network/transport/transport.go | 2 +- network/tunnel/broker/broker.go | 2 +- network/tunnel/transport/transport.go | 2 +- network/tunnel/tunnel.go | 2 +- profiler/http/http.go | 2 +- profiler/pprof/pprof.go | 2 +- profiler/profile.go | 2 +- proxy/proxy.go | 2 +- register/register.go | 2 +- resolver/dns/dns.go | 2 +- resolver/dnssrv/dnssrv.go | 2 +- resolver/http/http.go | 2 +- resolver/noop/noop.go | 2 +- resolver/registry/registry.go | 2 +- resolver/static/static.go | 2 +- router/router.go | 2 +- selector/random/random.go | 2 +- selector/roundrobin/roundrobin.go | 2 +- selector/selector.go | 2 +- server/server.go | 6 +- store/store.go | 2 +- sync/sync.go | 2 +- tracer/tracer.go | 2 +- tracer/wrapper/wrapper.go | 415 -------------------------- util/addr/addr.go | 3 +- util/backoff/backoff.go | 2 +- util/buf/buf.go | 2 +- util/http/http.go | 2 +- util/id/id.go | 2 +- util/io/io.go | 2 +- util/jitter/random.go | 2 +- util/jitter/ticker.go | 2 +- util/net/net.go | 2 +- util/pki/pki.go | 2 +- util/pool/pool.go | 2 +- util/rand/rand.go | 2 +- util/register/util.go | 2 +- util/ring/buffer.go | 2 +- util/socket/socket.go | 2 +- util/stream/stream.go | 2 +- 51 files changed, 52 insertions(+), 815 deletions(-) delete mode 100644 meter/wrapper/wrapper.go delete mode 100644 tracer/wrapper/wrapper.go diff --git a/broker/broker.go b/broker/broker.go index 28098188..a67a0be8 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -1,5 +1,5 @@ // Package broker is an interface used for asynchronous messaging -package broker // import "go.unistack.org/micro/v3/broker" +package broker import ( "context" diff --git a/client/client.go b/client/client.go index e7c42692..5a0c4f7d 100644 --- a/client/client.go +++ b/client/client.go @@ -1,5 +1,5 @@ // Package client is an interface for an RPC client -package client // import "go.unistack.org/micro/v3/client" +package client import ( "context" diff --git a/config/config.go b/config/config.go index 955f2957..aa8ff16f 100644 --- a/config/config.go +++ b/config/config.go @@ -1,5 +1,5 @@ // Package config is an interface for dynamic configuration. -package config // import "go.unistack.org/micro/v3/config" +package config import ( "context" diff --git a/errors/errors.go b/errors/errors.go index 3224515d..c7db79e5 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -1,6 +1,6 @@ // Package errors provides a way to return detailed information // for an RPC request error. The error is normally JSON encoded. -package errors // import "go.unistack.org/micro/v3/errors" +package errors import ( "bytes" diff --git a/flow/flow.go b/flow/flow.go index 0124db3b..ed21b078 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -1,5 +1,5 @@ // Package flow is an interface used for saga pattern microservice workflow -package flow // import "go.unistack.org/micro/v3/flow" +package flow import ( "context" diff --git a/fsm/fsm.go b/fsm/fsm.go index 2cee6b1c..a4d072a6 100644 --- a/fsm/fsm.go +++ b/fsm/fsm.go @@ -1,4 +1,4 @@ -package fsm // import "go.unistack.org/micro/v3/fsm" +package fsm import ( "context" diff --git a/metadata/metadata.go b/metadata/metadata.go index bf7a8d3a..7fd26e8a 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -1,5 +1,5 @@ // Package metadata is a way of defining message headers -package metadata // import "go.unistack.org/micro/v3/metadata" +package metadata import ( "net/textproto" diff --git a/meter/meter.go b/meter/meter.go index 2afe65d5..e5d994fe 100644 --- a/meter/meter.go +++ b/meter/meter.go @@ -16,10 +16,6 @@ var ( DefaultAddress = ":9090" // DefaultPath the meter endpoint where the Meter data will be made available DefaultPath = "/metrics" - // DefaultMetricPrefix holds the string that prepends to all metrics - DefaultMetricPrefix = "micro_" - // DefaultLabelPrefix holds the string that prepends to all labels - DefaultLabelPrefix = "micro_" // DefaultSummaryQuantiles is the default spread of stats for summary DefaultSummaryQuantiles = []float64{0.5, 0.9, 0.97, 0.99, 1} // DefaultSummaryWindow is the default window for summary diff --git a/meter/wrapper/wrapper.go b/meter/wrapper/wrapper.go deleted file mode 100644 index 00ca68df..00000000 --- a/meter/wrapper/wrapper.go +++ /dev/null @@ -1,347 +0,0 @@ -package wrapper // import "go.unistack.org/micro/v3/meter/wrapper" - -import ( - "context" - "fmt" - "time" - - "go.unistack.org/micro/v3/client" - "go.unistack.org/micro/v3/meter" - "go.unistack.org/micro/v3/server" -) - -var ( - // ClientRequestDurationSeconds specifies meter metric name - ClientRequestDurationSeconds = "client_request_duration_seconds" - // ClientRequestLatencyMicroseconds specifies meter metric name - ClientRequestLatencyMicroseconds = "client_request_latency_microseconds" - // ClientRequestTotal specifies meter metric name - ClientRequestTotal = "client_request_total" - // ClientRequestInflight specifies meter metric name - ClientRequestInflight = "client_request_inflight" - // ServerRequestDurationSeconds specifies meter metric name - ServerRequestDurationSeconds = "server_request_duration_seconds" - // ServerRequestLatencyMicroseconds specifies meter metric name - ServerRequestLatencyMicroseconds = "server_request_latency_microseconds" - // ServerRequestTotal specifies meter metric name - ServerRequestTotal = "server_request_total" - // ServerRequestInflight specifies meter metric name - ServerRequestInflight = "server_request_inflight" - // PublishMessageDurationSeconds specifies meter metric name - PublishMessageDurationSeconds = "publish_message_duration_seconds" - // PublishMessageLatencyMicroseconds specifies meter metric name - PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds" - // PublishMessageTotal specifies meter metric name - PublishMessageTotal = "publish_message_total" - // PublishMessageInflight specifies meter metric name - PublishMessageInflight = "publish_message_inflight" - // SubscribeMessageDurationSeconds specifies meter metric name - SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds" - // SubscribeMessageLatencyMicroseconds specifies meter metric name - SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds" - // SubscribeMessageTotal specifies meter metric name - SubscribeMessageTotal = "subscribe_message_total" - // SubscribeMessageInflight specifies meter metric name - SubscribeMessageInflight = "subscribe_message_inflight" - - labelSuccess = "success" - labelFailure = "failure" - labelStatus = "status" - labelEndpoint = "endpoint" - - // DefaultSkipEndpoints contains list of endpoints that not evaluted by wrapper - DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"} -) - -// Options struct -type Options struct { - Meter meter.Meter - lopts []meter.Option - SkipEndpoints []string -} - -// Option func signature -type Option func(*Options) - -// NewOptions creates new Options struct -func NewOptions(opts ...Option) Options { - options := Options{ - Meter: meter.DefaultMeter, - lopts: make([]meter.Option, 0, 5), - SkipEndpoints: DefaultSkipEndpoints, - } - for _, o := range opts { - o(&options) - } - return options -} - -// ServiceName passes service name to meter label -func ServiceName(name string) Option { - return func(o *Options) { - o.lopts = append(o.lopts, meter.Labels("name", name)) - } -} - -// ServiceVersion passes service version to meter label -func ServiceVersion(version string) Option { - return func(o *Options) { - o.lopts = append(o.lopts, meter.Labels("version", version)) - } -} - -// ServiceID passes service id to meter label -func ServiceID(id string) Option { - return func(o *Options) { - o.lopts = append(o.lopts, meter.Labels("id", id)) - } -} - -// Meter passes meter -func Meter(m meter.Meter) Option { - return func(o *Options) { - o.Meter = m - } -} - -// SkipEndoints add endpoint to skip -func SkipEndoints(eps ...string) Option { - return func(o *Options) { - o.SkipEndpoints = append(o.SkipEndpoints, eps...) - } -} - -type wrapper struct { - client.Client - callFunc client.CallFunc - opts Options -} - -// NewClientWrapper create new client wrapper -func NewClientWrapper(opts ...Option) client.Wrapper { - return func(c client.Client) client.Client { - handler := &wrapper{ - opts: NewOptions(opts...), - Client: c, - } - return handler - } -} - -// NewCallWrapper create new call wrapper -func NewCallWrapper(opts ...Option) client.CallWrapper { - return func(fn client.CallFunc) client.CallFunc { - handler := &wrapper{ - opts: NewOptions(opts...), - callFunc: fn, - } - return handler.CallFunc - } -} - -func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range w.opts.SkipEndpoints { - if ep == endpoint { - return w.callFunc(ctx, addr, req, rsp, opts) - } - } - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) - - w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc() - ts := time.Now() - err := w.callFunc(ctx, addr, req, rsp, opts) - te := time.Since(ts) - w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec() - - w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds()) - w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds()) - - if err == nil { - labels = append(labels, labelStatus, labelSuccess) - } else { - labels = append(labels, labelStatus, labelFailure) - } - w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc() - - return err -} - -func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range w.opts.SkipEndpoints { - if ep == endpoint { - return w.Client.Call(ctx, req, rsp, opts...) - } - } - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) - - w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc() - ts := time.Now() - err := w.Client.Call(ctx, req, rsp, opts...) - te := time.Since(ts) - w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec() - - w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds()) - w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds()) - - if err == nil { - labels = append(labels, labelStatus, labelSuccess) - } else { - labels = append(labels, labelStatus, labelFailure) - } - w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc() - - return err -} - -func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range w.opts.SkipEndpoints { - if ep == endpoint { - return w.Client.Stream(ctx, req, opts...) - } - } - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) - - w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc() - ts := time.Now() - stream, err := w.Client.Stream(ctx, req, opts...) - te := time.Since(ts) - w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec() - - w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds()) - w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds()) - - if err == nil { - labels = append(labels, labelStatus, labelSuccess) - } else { - labels = append(labels, labelStatus, labelFailure) - } - w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc() - - return stream, err -} - -func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { - endpoint := p.Topic() - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) - - w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc() - ts := time.Now() - err := w.Client.Publish(ctx, p, opts...) - te := time.Since(ts) - w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec() - - w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds()) - w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds()) - - if err == nil { - labels = append(labels, labelStatus, labelSuccess) - } else { - labels = append(labels, labelStatus, labelFailure) - } - w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc() - - return err -} - -// NewHandlerWrapper create new server handler wrapper -// deprecated -func NewHandlerWrapper(opts ...Option) server.HandlerWrapper { - handler := &wrapper{ - opts: NewOptions(opts...), - } - return handler.HandlerFunc -} - -// NewServerHandlerWrapper create new server handler wrapper -func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper { - handler := &wrapper{ - opts: NewOptions(opts...), - } - return handler.HandlerFunc -} - -func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc { - return func(ctx context.Context, req server.Request, rsp interface{}) error { - endpoint := req.Service() + "." + req.Endpoint() - for _, ep := range w.opts.SkipEndpoints { - if ep == endpoint { - return fn(ctx, req, rsp) - } - } - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) - - w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc() - ts := time.Now() - err := fn(ctx, req, rsp) - te := time.Since(ts) - w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec() - - w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds()) - w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds()) - - if err == nil { - labels = append(labels, labelStatus, labelSuccess) - } else { - labels = append(labels, labelStatus, labelFailure) - } - w.opts.Meter.Counter(ServerRequestTotal, labels...).Inc() - - return err - } -} - -// NewSubscriberWrapper create server subscribe wrapper -// deprecated -func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper { - handler := &wrapper{ - opts: NewOptions(opts...), - } - return handler.SubscriberFunc -} - -func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper { - handler := &wrapper{ - opts: NewOptions(opts...), - } - return handler.SubscriberFunc -} - -func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc { - return func(ctx context.Context, msg server.Message) error { - endpoint := msg.Topic() - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) - - w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc() - ts := time.Now() - err := fn(ctx, msg) - te := time.Since(ts) - w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec() - - w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds()) - w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds()) - - if err == nil { - labels = append(labels, labelStatus, labelSuccess) - } else { - labels = append(labels, labelStatus, labelFailure) - } - w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc() - - return err - } -} diff --git a/mtls/mtls.go b/mtls/mtls.go index 238be41b..c0b1bc82 100644 --- a/mtls/mtls.go +++ b/mtls/mtls.go @@ -1,4 +1,4 @@ -package mtls // import "go.unistack.org/micro/v3/mtls" +package mtls import ( "bytes" diff --git a/network/network.go b/network/network.go index 9e00e9de..ec9bf96f 100644 --- a/network/network.go +++ b/network/network.go @@ -1,5 +1,5 @@ // Package network is for creating internetworks -package network // import "go.unistack.org/micro/v3/network" +package network import ( "go.unistack.org/micro/v3/client" diff --git a/network/transport/transport.go b/network/transport/transport.go index f70f34b9..2e33dcc5 100644 --- a/network/transport/transport.go +++ b/network/transport/transport.go @@ -1,5 +1,5 @@ // Package transport is an interface for synchronous connection based communication -package transport // import "go.unistack.org/micro/v3/network/transport" +package transport import ( "context" diff --git a/network/tunnel/broker/broker.go b/network/tunnel/broker/broker.go index 3066bd44..95a2f20f 100644 --- a/network/tunnel/broker/broker.go +++ b/network/tunnel/broker/broker.go @@ -1,5 +1,5 @@ // Package broker is a tunnel broker -package broker // import "go.unistack.org/micro/v3/network/tunnel/broker" +package broker import ( "context" diff --git a/network/tunnel/transport/transport.go b/network/tunnel/transport/transport.go index e5e6f68b..9ac71e2d 100644 --- a/network/tunnel/transport/transport.go +++ b/network/tunnel/transport/transport.go @@ -1,5 +1,5 @@ // Package transport provides a tunnel transport -package transport // import "go.unistack.org/micro/v3/network/tunnel/transport" +package transport import ( "context" diff --git a/network/tunnel/tunnel.go b/network/tunnel/tunnel.go index 3efa9294..e0d8620a 100644 --- a/network/tunnel/tunnel.go +++ b/network/tunnel/tunnel.go @@ -1,5 +1,5 @@ // Package tunnel provides gre network tunnelling -package tunnel // import "go.unistack.org/micro/v3/network/transport/tunnel" +package tunnel import ( "context" diff --git a/profiler/http/http.go b/profiler/http/http.go index d0fa4a80..b3dd7cb2 100644 --- a/profiler/http/http.go +++ b/profiler/http/http.go @@ -1,5 +1,5 @@ // Package http enables the http profiler -package http // import "go.unistack.org/micro/v3/profiler/http" +package http import ( "context" diff --git a/profiler/pprof/pprof.go b/profiler/pprof/pprof.go index 1c8fbdce..d96773e8 100644 --- a/profiler/pprof/pprof.go +++ b/profiler/pprof/pprof.go @@ -1,5 +1,5 @@ // Package pprof provides a pprof profiler which writes output to /tmp/[name].{cpu,mem}.pprof -package pprof // import "go.unistack.org/micro/v3/profiler/pprof" +package pprof import ( "os" diff --git a/profiler/profile.go b/profiler/profile.go index b29eb600..f14d5fb4 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -1,5 +1,5 @@ // Package profiler is for profilers -package profiler // import "go.unistack.org/micro/v3/profiler" +package profiler // Profiler interface type Profiler interface { diff --git a/proxy/proxy.go b/proxy/proxy.go index e80c1e9a..0a209e18 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1,5 +1,5 @@ // Package proxy is a transparent proxy built on the micro/server -package proxy // import "go.unistack.org/micro/v3/proxy" +package proxy import ( "context" diff --git a/register/register.go b/register/register.go index bfe0078a..5f2bdab3 100644 --- a/register/register.go +++ b/register/register.go @@ -1,5 +1,5 @@ // Package register is an interface for service discovery -package register // import "go.unistack.org/micro/v3/register" +package register import ( "context" diff --git a/resolver/dns/dns.go b/resolver/dns/dns.go index 82de80d5..e80a9c8b 100644 --- a/resolver/dns/dns.go +++ b/resolver/dns/dns.go @@ -1,5 +1,5 @@ // Package dns resolves names to dns records -package dns // import "go.unistack.org/micro/v3/resolver/dns" +package dns import ( "context" diff --git a/resolver/dnssrv/dnssrv.go b/resolver/dnssrv/dnssrv.go index f88f816f..8b786824 100644 --- a/resolver/dnssrv/dnssrv.go +++ b/resolver/dnssrv/dnssrv.go @@ -1,5 +1,5 @@ // Package dnssrv resolves names to dns srv records -package dnssrv // import "go.unistack.org/micro/v3/resolver/dnssrv" +package dnssrv import ( "fmt" diff --git a/resolver/http/http.go b/resolver/http/http.go index 8aa8e43e..f32e791c 100644 --- a/resolver/http/http.go +++ b/resolver/http/http.go @@ -1,5 +1,5 @@ // Package http resolves names to network addresses using a http request -package http // import "go.unistack.org/micro/v3/resolver/http" +package http import ( "encoding/json" diff --git a/resolver/noop/noop.go b/resolver/noop/noop.go index 3fbe202e..71dfcb98 100644 --- a/resolver/noop/noop.go +++ b/resolver/noop/noop.go @@ -1,5 +1,5 @@ // Package noop is a noop resolver -package noop // import "go.unistack.org/micro/v3/resolver/noop" +package noop import ( "go.unistack.org/micro/v3/resolver" diff --git a/resolver/registry/registry.go b/resolver/registry/registry.go index 064f7b59..5f55e7a0 100644 --- a/resolver/registry/registry.go +++ b/resolver/registry/registry.go @@ -1,5 +1,5 @@ // Package register resolves names using the micro register -package register // import "go.unistack.org/micro/v3/resolver/registry" +package register import ( "context" diff --git a/resolver/static/static.go b/resolver/static/static.go index 4985a89f..9da00f5c 100644 --- a/resolver/static/static.go +++ b/resolver/static/static.go @@ -1,5 +1,5 @@ // Package static is a static resolver -package static // import "go.unistack.org/micro/v3/resolver/static" +package static import ( "go.unistack.org/micro/v3/resolver" diff --git a/router/router.go b/router/router.go index 43874196..8575d4ce 100644 --- a/router/router.go +++ b/router/router.go @@ -1,5 +1,5 @@ // Package router provides a network routing control plane -package router // import "go.unistack.org/micro/v3/router" +package router import ( "errors" diff --git a/selector/random/random.go b/selector/random/random.go index 40b25daf..2d739263 100644 --- a/selector/random/random.go +++ b/selector/random/random.go @@ -1,4 +1,4 @@ -package random // import "go.unistack.org/micro/v3/selector/random" +package random import ( "go.unistack.org/micro/v3/selector" diff --git a/selector/roundrobin/roundrobin.go b/selector/roundrobin/roundrobin.go index f39aca5c..f442b5b4 100644 --- a/selector/roundrobin/roundrobin.go +++ b/selector/roundrobin/roundrobin.go @@ -1,4 +1,4 @@ -package roundrobin // import "go.unistack.org/micro/v3/selector/roundrobin" +package roundrobin import ( "go.unistack.org/micro/v3/selector" diff --git a/selector/selector.go b/selector/selector.go index 2c16cca6..808481f9 100644 --- a/selector/selector.go +++ b/selector/selector.go @@ -1,5 +1,5 @@ // Package selector is for node selection and load balancing -package selector // import "go.unistack.org/micro/v3/selector" +package selector import ( "errors" diff --git a/server/server.go b/server/server.go index b6ada16a..49461e3a 100644 --- a/server/server.go +++ b/server/server.go @@ -1,5 +1,5 @@ // Package server is an interface for a micro server -package server // import "go.unistack.org/micro/v3/server" +package server import ( "context" @@ -11,7 +11,9 @@ import ( ) // DefaultServer default server -var DefaultServer Server = NewServer() +var ( + DefaultServer Server = NewServer() +) var ( // DefaultAddress will be used if no address passed, use secure localhost diff --git a/store/store.go b/store/store.go index 57646346..49eeff5c 100644 --- a/store/store.go +++ b/store/store.go @@ -1,5 +1,5 @@ // Package store is an interface for distributed data storage. -package store // import "go.unistack.org/micro/v3/store" +package store import ( "context" diff --git a/sync/sync.go b/sync/sync.go index 42329514..a84dfb64 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -1,5 +1,5 @@ // Package sync is an interface for distributed synchronization -package sync // import "go.unistack.org/micro/v3/sync" +package sync import ( "errors" diff --git a/tracer/tracer.go b/tracer/tracer.go index 8951da8a..e1cd2411 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -1,5 +1,5 @@ // Package tracer provides an interface for distributed tracing -package tracer // import "go.unistack.org/micro/v3/tracer" +package tracer import ( "context" diff --git a/tracer/wrapper/wrapper.go b/tracer/wrapper/wrapper.go deleted file mode 100644 index 0190c4e4..00000000 --- a/tracer/wrapper/wrapper.go +++ /dev/null @@ -1,415 +0,0 @@ -// Package wrapper provides wrapper for Tracer -package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper" - -import ( - "context" - "fmt" - "strings" - - "go.unistack.org/micro/v3/client" - "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/server" - "go.unistack.org/micro/v3/tracer" -) - -var DefaultHeadersExctract = []string{metadata.HeaderXRequestID} - -func ExtractDefaultLabels(md metadata.Metadata) []interface{} { - labels := make([]interface{}, 0, len(DefaultHeadersExctract)) - for _, k := range DefaultHeadersExctract { - if v, ok := md.Get(k); ok { - labels = append(labels, strings.ToLower(k), v) - } - } - return labels -} - -var ( - DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { - var labels []interface{} - if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, ExtractDefaultLabels(md)...) - } - if err != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) - } - sp.AddLabels(labels...) - } - - DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { - var labels []interface{} - if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, ExtractDefaultLabels(md)...) - } - if err != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) - } - sp.AddLabels(labels...) - } - - DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { - var labels []interface{} - if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, ExtractDefaultLabels(md)...) - } - labels = append(labels, ExtractDefaultLabels(msg.Metadata())...) - if err != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) - } - sp.AddLabels(labels...) - } - - DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { - var labels []interface{} - if md, ok := metadata.FromIncomingContext(ctx); ok { - labels = append(labels, ExtractDefaultLabels(md)...) - } - if err != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) - } - sp.AddLabels(labels...) - } - - DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) { - var labels []interface{} - if md, ok := metadata.FromIncomingContext(ctx); ok { - labels = append(labels, ExtractDefaultLabels(md)...) - } - labels = append(labels, ExtractDefaultLabels(msg.Header())...) - - if err != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) - } - sp.AddLabels(labels...) - } - - DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method())) - var labels []interface{} - if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, ExtractDefaultLabels(md)...) - } - if err != nil { - sp.SetStatus(tracer.SpanStatusError, err.Error()) - } - sp.AddLabels(labels...) - } - - DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"} -) - -type tWrapper struct { - client.Client - serverHandler server.HandlerFunc - serverSubscriber server.SubscriberFunc - clientCallFunc client.CallFunc - opts Options -} - -type ( - ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error) - ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error) - ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error) - ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error) - ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error) - ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error) -) - -// Options struct -type Options struct { - // Tracer that used for tracing - Tracer tracer.Tracer - // ClientCallObservers funcs - ClientCallObservers []ClientCallObserver - // ClientStreamObservers funcs - ClientStreamObservers []ClientStreamObserver - // ClientPublishObservers funcs - ClientPublishObservers []ClientPublishObserver - // ClientCallFuncObservers funcs - ClientCallFuncObservers []ClientCallFuncObserver - // ServerHandlerObservers funcs - ServerHandlerObservers []ServerHandlerObserver - // ServerSubscriberObservers funcs - ServerSubscriberObservers []ServerSubscriberObserver - // SkipEndpoints - SkipEndpoints []string -} - -// Option func signature -type Option func(*Options) - -// NewOptions create Options from Option slice -func NewOptions(opts ...Option) Options { - options := Options{ - Tracer: tracer.DefaultTracer, - ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver}, - ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver}, - ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver}, - ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver}, - ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver}, - ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver}, - SkipEndpoints: DefaultSkipEndpoints, - } - - for _, o := range opts { - o(&options) - } - - return options -} - -// WithTracer pass tracer -func WithTracer(t tracer.Tracer) Option { - return func(o *Options) { - o.Tracer = t - } -} - -// SkipEndponts -func SkipEndpoins(eps ...string) Option { - return func(o *Options) { - o.SkipEndpoints = append(o.SkipEndpoints, eps...) - } -} - -// WithClientCallObservers funcs -func WithClientCallObservers(ob ...ClientCallObserver) Option { - return func(o *Options) { - o.ClientCallObservers = ob - } -} - -// WithClientStreamObservers funcs -func WithClientStreamObservers(ob ...ClientStreamObserver) Option { - return func(o *Options) { - o.ClientStreamObservers = ob - } -} - -// WithClientPublishObservers funcs -func WithClientPublishObservers(ob ...ClientPublishObserver) Option { - return func(o *Options) { - o.ClientPublishObservers = ob - } -} - -// WithClientCallFuncObservers funcs -func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option { - return func(o *Options) { - o.ClientCallFuncObservers = ob - } -} - -// WithServerHandlerObservers funcs -func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option { - return func(o *Options) { - o.ServerHandlerObservers = ob - } -} - -// WithServerSubscriberObservers funcs -func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option { - return func(o *Options) { - o.ServerSubscriberObservers = ob - } -} - -func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range ot.opts.SkipEndpoints { - if ep == endpoint { - return ot.Client.Call(ctx, req, rsp, opts...) - } - } - - nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), - tracer.WithSpanKind(tracer.SpanKindClient), - tracer.WithSpanLabels( - "rpc.service", req.Service(), - "rpc.method", req.Method(), - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", "unary", - ), - ) - defer sp.Finish() - - err := ot.Client.Call(nctx, req, rsp, opts...) - - for _, o := range ot.opts.ClientCallObservers { - o(nctx, req, rsp, opts, sp, err) - } - - return err -} - -func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range ot.opts.SkipEndpoints { - if ep == endpoint { - return ot.Client.Stream(ctx, req, opts...) - } - } - - nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), - tracer.WithSpanKind(tracer.SpanKindClient), - tracer.WithSpanLabels( - "rpc.service", req.Service(), - "rpc.method", req.Method(), - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", "stream", - ), - ) - defer sp.Finish() - - stream, err := ot.Client.Stream(nctx, req, opts...) - - for _, o := range ot.opts.ClientStreamObservers { - o(nctx, req, opts, stream, sp, err) - } - - return stream, err -} - -func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { - nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer)) - defer sp.Finish() - sp.AddLabels("messaging.destination.name", msg.Topic()) - sp.AddLabels("messaging.operation", "publish") - err := ot.Client.Publish(nctx, msg, opts...) - - for _, o := range ot.opts.ClientPublishObservers { - o(nctx, msg, opts, sp, err) - } - - return err -} - -func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method()) - for _, ep := range ot.opts.SkipEndpoints { - if ep == endpoint { - return ot.serverHandler(ctx, req, rsp) - } - } - - callType := "unary" - if req.Stream() { - callType = "stream" - } - - nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()), - tracer.WithSpanKind(tracer.SpanKindServer), - tracer.WithSpanLabels( - "rpc.service", req.Service(), - "rpc.method", req.Method(), - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", callType, - ), - ) - defer sp.Finish() - - err := ot.serverHandler(nctx, req, rsp) - - for _, o := range ot.opts.ServerHandlerObservers { - o(nctx, req, rsp, sp, err) - } - - return err -} - -func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { - nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer)) - defer sp.Finish() - sp.AddLabels("messaging.operation", "process") - sp.AddLabels("messaging.source.name", msg.Topic()) - err := ot.serverSubscriber(nctx, msg) - - for _, o := range ot.opts.ServerSubscriberObservers { - o(nctx, msg, sp, err) - } - - return err -} - -// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper -func NewClientWrapper(opts ...Option) client.Wrapper { - return func(c client.Client) client.Client { - options := NewOptions() - for _, o := range opts { - o(&options) - } - return &tWrapper{opts: options, Client: c} - } -} - -// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper -func NewClientCallWrapper(opts ...Option) client.CallWrapper { - return func(h client.CallFunc) client.CallFunc { - options := NewOptions() - for _, o := range opts { - o(&options) - } - - ot := &tWrapper{opts: options, clientCallFunc: h} - return ot.ClientCallFunc - } -} - -func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method()) - for _, ep := range ot.opts.SkipEndpoints { - if ep == endpoint { - return ot.ClientCallFunc(ctx, addr, req, rsp, opts) - } - } - - nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), - tracer.WithSpanKind(tracer.SpanKindClient), - tracer.WithSpanLabels( - "rpc.service", req.Service(), - "rpc.method", req.Method(), - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", "unary", - ), - ) - - defer sp.Finish() - - err := ot.clientCallFunc(nctx, addr, req, rsp, opts) - - for _, o := range ot.opts.ClientCallFuncObservers { - o(nctx, addr, req, rsp, opts, sp, err) - } - - return err -} - -// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper -func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper { - return func(h server.HandlerFunc) server.HandlerFunc { - options := NewOptions() - for _, o := range opts { - o(&options) - } - - ot := &tWrapper{opts: options, serverHandler: h} - return ot.ServerHandler - } -} - -// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper -func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper { - return func(h server.SubscriberFunc) server.SubscriberFunc { - options := NewOptions() - for _, o := range opts { - o(&options) - } - - ot := &tWrapper{opts: options, serverSubscriber: h} - return ot.ServerSubscriber - } -} diff --git a/util/addr/addr.go b/util/addr/addr.go index 47e5636f..9415ec94 100644 --- a/util/addr/addr.go +++ b/util/addr/addr.go @@ -1,4 +1,4 @@ -package addr // import "go.unistack.org/micro/v3/util/addr" +package addr import ( "fmt" @@ -58,6 +58,7 @@ func IsLocal(addr string) bool { } // Extract returns a real ip +// //nolint:gocyclo func Extract(addr string) (string, error) { // if addr specified then its returned diff --git a/util/backoff/backoff.go b/util/backoff/backoff.go index a18c6a52..cadc2fc1 100644 --- a/util/backoff/backoff.go +++ b/util/backoff/backoff.go @@ -1,5 +1,5 @@ // Package backoff provides backoff functionality -package backoff // import "go.unistack.org/micro/v3/util/backoff" +package backoff import ( "math" diff --git a/util/buf/buf.go b/util/buf/buf.go index f01a9d6a..e6b86214 100644 --- a/util/buf/buf.go +++ b/util/buf/buf.go @@ -1,4 +1,4 @@ -package buf // import "go.unistack.org/micro/v3/util/buf" +package buf import ( "bytes" diff --git a/util/http/http.go b/util/http/http.go index de83f138..571effbf 100644 --- a/util/http/http.go +++ b/util/http/http.go @@ -1,4 +1,4 @@ -package http // import "go.unistack.org/micro/v3/util/http" +package http import ( "context" diff --git a/util/id/id.go b/util/id/id.go index f4699e6e..2d094f1f 100644 --- a/util/id/id.go +++ b/util/id/id.go @@ -1,4 +1,4 @@ -package id // import "go.unistack.org/micro/v3/util/id" +package id import ( "context" diff --git a/util/io/io.go b/util/io/io.go index 54599523..ed4d8fc0 100644 --- a/util/io/io.go +++ b/util/io/io.go @@ -1,5 +1,5 @@ // Package io is for io management -package io // import "go.unistack.org/micro/v3/util/io" +package io import ( "io" diff --git a/util/jitter/random.go b/util/jitter/random.go index eb03b29f..8e1cae69 100644 --- a/util/jitter/random.go +++ b/util/jitter/random.go @@ -1,5 +1,5 @@ // Package jitter provides a random jitter -package jitter // import "go.unistack.org/micro/v3/util/jitter" +package jitter import ( "time" diff --git a/util/jitter/ticker.go b/util/jitter/ticker.go index 84946556..b8d06ee5 100644 --- a/util/jitter/ticker.go +++ b/util/jitter/ticker.go @@ -1,4 +1,4 @@ -package jitter // import "go.unistack.org/micro/v3/util/jitter" +package jitter import ( "context" diff --git a/util/net/net.go b/util/net/net.go index a5e8e278..96a4f1e7 100644 --- a/util/net/net.go +++ b/util/net/net.go @@ -1,4 +1,4 @@ -package net // import "go.unistack.org/micro/v3/util/net" +package net import ( "errors" diff --git a/util/pki/pki.go b/util/pki/pki.go index 681821bf..e09cab82 100644 --- a/util/pki/pki.go +++ b/util/pki/pki.go @@ -1,6 +1,6 @@ // Package pki provides PKI all the PKI functions necessary to run micro over an untrusted network // including a CA -package pki // import "go.unistack.org/micro/v3/util/pki" +package pki import ( "bytes" diff --git a/util/pool/pool.go b/util/pool/pool.go index 576d1873..2433cd59 100644 --- a/util/pool/pool.go +++ b/util/pool/pool.go @@ -1,5 +1,5 @@ // Package pool is a connection pool -package pool // import "go.unistack.org/micro/v3/util/pool" +package pool import ( "context" diff --git a/util/rand/rand.go b/util/rand/rand.go index 3df9701c..a6243365 100644 --- a/util/rand/rand.go +++ b/util/rand/rand.go @@ -1,4 +1,4 @@ -package rand // import "go.unistack.org/micro/v3/util/rand" +package rand import ( crand "crypto/rand" diff --git a/util/register/util.go b/util/register/util.go index e3867389..9bb8b48a 100644 --- a/util/register/util.go +++ b/util/register/util.go @@ -1,4 +1,4 @@ -package register // import "go.unistack.org/micro/v3/util/register" +package register import ( "context" diff --git a/util/ring/buffer.go b/util/ring/buffer.go index 50fe2499..9cc4b891 100644 --- a/util/ring/buffer.go +++ b/util/ring/buffer.go @@ -1,5 +1,5 @@ // Package ring provides a simple ring buffer for storing local data -package ring // import "go.unistack.org/micro/v3/util/ring" +package ring import ( "sync" diff --git a/util/socket/socket.go b/util/socket/socket.go index 1797a71a..0ae12b4a 100644 --- a/util/socket/socket.go +++ b/util/socket/socket.go @@ -1,5 +1,5 @@ // Package socket provides a pseudo socket -package socket // import "go.unistack.org/micro/v3/util/socket" +package socket import ( "io" diff --git a/util/stream/stream.go b/util/stream/stream.go index 5e51df98..36bf06b1 100644 --- a/util/stream/stream.go +++ b/util/stream/stream.go @@ -1,5 +1,5 @@ // Package stream encapsulates streams within streams -package stream // import "go.unistack.org/micro/v3/util/stream" +package stream import ( "context"