Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2024-09-20 17:54:17 +03:00
parent de9e4d73f5
commit faf2454f0a
51 changed files with 52 additions and 815 deletions
broker
client
config
errors
flow
fsm
metadata
meter
mtls
network
profiler
proxy
register
resolver
router
selector
server
store
sync
tracer
util

@ -1,5 +1,5 @@
// Package broker is an interface used for asynchronous messaging
package broker // import "go.unistack.org/micro/v3/broker"
package broker
import (
"context"

@ -1,5 +1,5 @@
// Package client is an interface for an RPC client
package client // import "go.unistack.org/micro/v3/client"
package client
import (
"context"

@ -1,5 +1,5 @@
// Package config is an interface for dynamic configuration.
package config // import "go.unistack.org/micro/v3/config"
package config
import (
"context"

@ -1,6 +1,6 @@
// Package errors provides a way to return detailed information
// for an RPC request error. The error is normally JSON encoded.
package errors // import "go.unistack.org/micro/v3/errors"
package errors
import (
"bytes"

@ -1,5 +1,5 @@
// Package flow is an interface used for saga pattern microservice workflow
package flow // import "go.unistack.org/micro/v3/flow"
package flow
import (
"context"

@ -1,4 +1,4 @@
package fsm // import "go.unistack.org/micro/v3/fsm"
package fsm
import (
"context"

@ -1,5 +1,5 @@
// Package metadata is a way of defining message headers
package metadata // import "go.unistack.org/micro/v3/metadata"
package metadata
import (
"net/textproto"

@ -16,10 +16,6 @@ var (
DefaultAddress = ":9090"
// DefaultPath the meter endpoint where the Meter data will be made available
DefaultPath = "/metrics"
// DefaultMetricPrefix holds the string that prepends to all metrics
DefaultMetricPrefix = "micro_"
// DefaultLabelPrefix holds the string that prepends to all labels
DefaultLabelPrefix = "micro_"
// DefaultSummaryQuantiles is the default spread of stats for summary
DefaultSummaryQuantiles = []float64{0.5, 0.9, 0.97, 0.99, 1}
// DefaultSummaryWindow is the default window for summary

@ -1,347 +0,0 @@
package wrapper // import "go.unistack.org/micro/v3/meter/wrapper"
import (
"context"
"fmt"
"time"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/server"
)
var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
labelSuccess = "success"
labelFailure = "failure"
labelStatus = "status"
labelEndpoint = "endpoint"
// DefaultSkipEndpoints contains list of endpoints that not evaluted by wrapper
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
)
// Options struct
type Options struct {
Meter meter.Meter
lopts []meter.Option
SkipEndpoints []string
}
// Option func signature
type Option func(*Options)
// NewOptions creates new Options struct
func NewOptions(opts ...Option) Options {
options := Options{
Meter: meter.DefaultMeter,
lopts: make([]meter.Option, 0, 5),
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
}
return options
}
// ServiceName passes service name to meter label
func ServiceName(name string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Labels("name", name))
}
}
// ServiceVersion passes service version to meter label
func ServiceVersion(version string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Labels("version", version))
}
}
// ServiceID passes service id to meter label
func ServiceID(id string) Option {
return func(o *Options) {
o.lopts = append(o.lopts, meter.Labels("id", id))
}
}
// Meter passes meter
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
// SkipEndoints add endpoint to skip
func SkipEndoints(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
type wrapper struct {
client.Client
callFunc client.CallFunc
opts Options
}
// NewClientWrapper create new client wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
handler := &wrapper{
opts: NewOptions(opts...),
Client: c,
}
return handler
}
}
// NewCallWrapper create new call wrapper
func NewCallWrapper(opts ...Option) client.CallWrapper {
return func(fn client.CallFunc) client.CallFunc {
handler := &wrapper{
opts: NewOptions(opts...),
callFunc: fn,
}
return handler.CallFunc
}
}
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.callFunc(ctx, addr, req, rsp, opts)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return err
}
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Call(ctx, req, rsp, opts...)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return err
}
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Stream(ctx, req, opts...)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ClientRequestTotal, labels...).Inc()
return stream, err
}
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
ts := time.Now()
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
return err
}
// NewHandlerWrapper create new server handler wrapper
// deprecated
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.HandlerFunc
}
// NewServerHandlerWrapper create new server handler wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.HandlerFunc
}
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Service() + "." + req.Endpoint()
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return fn(ctx, req, rsp)
}
}
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, req, rsp)
te := time.Since(ts)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(ServerRequestTotal, labels...).Inc()
return err
}
}
// NewSubscriberWrapper create server subscribe wrapper
// deprecated
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, msg)
te := time.Since(ts)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
return err
}
}

@ -1,4 +1,4 @@
package mtls // import "go.unistack.org/micro/v3/mtls"
package mtls
import (
"bytes"

@ -1,5 +1,5 @@
// Package network is for creating internetworks
package network // import "go.unistack.org/micro/v3/network"
package network
import (
"go.unistack.org/micro/v3/client"

@ -1,5 +1,5 @@
// Package transport is an interface for synchronous connection based communication
package transport // import "go.unistack.org/micro/v3/network/transport"
package transport
import (
"context"

@ -1,5 +1,5 @@
// Package broker is a tunnel broker
package broker // import "go.unistack.org/micro/v3/network/tunnel/broker"
package broker
import (
"context"

@ -1,5 +1,5 @@
// Package transport provides a tunnel transport
package transport // import "go.unistack.org/micro/v3/network/tunnel/transport"
package transport
import (
"context"

@ -1,5 +1,5 @@
// Package tunnel provides gre network tunnelling
package tunnel // import "go.unistack.org/micro/v3/network/transport/tunnel"
package tunnel
import (
"context"

@ -1,5 +1,5 @@
// Package http enables the http profiler
package http // import "go.unistack.org/micro/v3/profiler/http"
package http
import (
"context"

@ -1,5 +1,5 @@
// Package pprof provides a pprof profiler which writes output to /tmp/[name].{cpu,mem}.pprof
package pprof // import "go.unistack.org/micro/v3/profiler/pprof"
package pprof
import (
"os"

@ -1,5 +1,5 @@
// Package profiler is for profilers
package profiler // import "go.unistack.org/micro/v3/profiler"
package profiler
// Profiler interface
type Profiler interface {

@ -1,5 +1,5 @@
// Package proxy is a transparent proxy built on the micro/server
package proxy // import "go.unistack.org/micro/v3/proxy"
package proxy
import (
"context"

@ -1,5 +1,5 @@
// Package register is an interface for service discovery
package register // import "go.unistack.org/micro/v3/register"
package register
import (
"context"

@ -1,5 +1,5 @@
// Package dns resolves names to dns records
package dns // import "go.unistack.org/micro/v3/resolver/dns"
package dns
import (
"context"

@ -1,5 +1,5 @@
// Package dnssrv resolves names to dns srv records
package dnssrv // import "go.unistack.org/micro/v3/resolver/dnssrv"
package dnssrv
import (
"fmt"

@ -1,5 +1,5 @@
// Package http resolves names to network addresses using a http request
package http // import "go.unistack.org/micro/v3/resolver/http"
package http
import (
"encoding/json"

@ -1,5 +1,5 @@
// Package noop is a noop resolver
package noop // import "go.unistack.org/micro/v3/resolver/noop"
package noop
import (
"go.unistack.org/micro/v3/resolver"

@ -1,5 +1,5 @@
// Package register resolves names using the micro register
package register // import "go.unistack.org/micro/v3/resolver/registry"
package register
import (
"context"

@ -1,5 +1,5 @@
// Package static is a static resolver
package static // import "go.unistack.org/micro/v3/resolver/static"
package static
import (
"go.unistack.org/micro/v3/resolver"

@ -1,5 +1,5 @@
// Package router provides a network routing control plane
package router // import "go.unistack.org/micro/v3/router"
package router
import (
"errors"

@ -1,4 +1,4 @@
package random // import "go.unistack.org/micro/v3/selector/random"
package random
import (
"go.unistack.org/micro/v3/selector"

@ -1,4 +1,4 @@
package roundrobin // import "go.unistack.org/micro/v3/selector/roundrobin"
package roundrobin
import (
"go.unistack.org/micro/v3/selector"

@ -1,5 +1,5 @@
// Package selector is for node selection and load balancing
package selector // import "go.unistack.org/micro/v3/selector"
package selector
import (
"errors"

@ -1,5 +1,5 @@
// Package server is an interface for a micro server
package server // import "go.unistack.org/micro/v3/server"
package server
import (
"context"
@ -11,7 +11,9 @@ import (
)
// DefaultServer default server
var DefaultServer Server = NewServer()
var (
DefaultServer Server = NewServer()
)
var (
// DefaultAddress will be used if no address passed, use secure localhost

@ -1,5 +1,5 @@
// Package store is an interface for distributed data storage.
package store // import "go.unistack.org/micro/v3/store"
package store
import (
"context"

@ -1,5 +1,5 @@
// Package sync is an interface for distributed synchronization
package sync // import "go.unistack.org/micro/v3/sync"
package sync
import (
"errors"

@ -1,5 +1,5 @@
// Package tracer provides an interface for distributed tracing
package tracer // import "go.unistack.org/micro/v3/tracer"
package tracer
import (
"context"

@ -1,415 +0,0 @@
// Package wrapper provides wrapper for Tracer
package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper"
import (
"context"
"fmt"
"strings"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/tracer"
)
var DefaultHeadersExctract = []string{metadata.HeaderXRequestID}
func ExtractDefaultLabels(md metadata.Metadata) []interface{} {
labels := make([]interface{}, 0, len(DefaultHeadersExctract))
for _, k := range DefaultHeadersExctract {
if v, ok := md.Get(k); ok {
labels = append(labels, strings.ToLower(k), v)
}
}
return labels
}
var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Metadata())...)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Header())...)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method()))
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
)
type tWrapper struct {
client.Client
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
opts Options
}
type (
ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
)
// Options struct
type Options struct {
// Tracer that used for tracing
Tracer tracer.Tracer
// ClientCallObservers funcs
ClientCallObservers []ClientCallObserver
// ClientStreamObservers funcs
ClientStreamObservers []ClientStreamObserver
// ClientPublishObservers funcs
ClientPublishObservers []ClientPublishObserver
// ClientCallFuncObservers funcs
ClientCallFuncObservers []ClientCallFuncObserver
// ServerHandlerObservers funcs
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
}
// Option func signature
type Option func(*Options)
// NewOptions create Options from Option slice
func NewOptions(opts ...Option) Options {
options := Options{
Tracer: tracer.DefaultTracer,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
}
return options
}
// WithTracer pass tracer
func WithTracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
// SkipEndponts
func SkipEndpoins(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
// WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
// WithClientStreamObservers funcs
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
// WithClientPublishObservers funcs
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
// WithClientCallFuncObservers funcs
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
// WithServerHandlerObservers funcs
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
// WithServerSubscriberObservers funcs
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Call(ctx, req, rsp, opts...)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.Client.Call(nctx, req, rsp, opts...)
for _, o := range ot.opts.ClientCallObservers {
o(nctx, req, rsp, opts, sp, err)
}
return err
}
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Stream(ctx, req, opts...)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "stream",
),
)
defer sp.Finish()
stream, err := ot.Client.Stream(nctx, req, opts...)
for _, o := range ot.opts.ClientStreamObservers {
o(nctx, req, opts, stream, sp, err)
}
return stream, err
}
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer))
defer sp.Finish()
sp.AddLabels("messaging.destination.name", msg.Topic())
sp.AddLabels("messaging.operation", "publish")
err := ot.Client.Publish(nctx, msg, opts...)
for _, o := range ot.opts.ClientPublishObservers {
o(nctx, msg, opts, sp, err)
}
return err
}
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.serverHandler(ctx, req, rsp)
}
}
callType := "unary"
if req.Stream() {
callType = "stream"
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", callType,
),
)
defer sp.Finish()
err := ot.serverHandler(nctx, req, rsp)
for _, o := range ot.opts.ServerHandlerObservers {
o(nctx, req, rsp, sp, err)
}
return err
}
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer))
defer sp.Finish()
sp.AddLabels("messaging.operation", "process")
sp.AddLabels("messaging.source.name", msg.Topic())
err := ot.serverSubscriber(nctx, msg)
for _, o := range ot.opts.ServerSubscriberObservers {
o(nctx, msg, sp, err)
}
return err
}
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &tWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, clientCallFunc: h}
return ot.ClientCallFunc
}
}
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.ClientCallFunc(ctx, addr, req, rsp, opts)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.clientCallFunc(nctx, addr, req, rsp, opts)
for _, o := range ot.opts.ClientCallFuncObservers {
o(nctx, addr, req, rsp, opts, sp, err)
}
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverHandler: h}
return ot.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverSubscriber: h}
return ot.ServerSubscriber
}
}

@ -1,4 +1,4 @@
package addr // import "go.unistack.org/micro/v3/util/addr"
package addr
import (
"fmt"
@ -58,6 +58,7 @@ func IsLocal(addr string) bool {
}
// Extract returns a real ip
//
//nolint:gocyclo
func Extract(addr string) (string, error) {
// if addr specified then its returned

@ -1,5 +1,5 @@
// Package backoff provides backoff functionality
package backoff // import "go.unistack.org/micro/v3/util/backoff"
package backoff
import (
"math"

@ -1,4 +1,4 @@
package buf // import "go.unistack.org/micro/v3/util/buf"
package buf
import (
"bytes"

@ -1,4 +1,4 @@
package http // import "go.unistack.org/micro/v3/util/http"
package http
import (
"context"

@ -1,4 +1,4 @@
package id // import "go.unistack.org/micro/v3/util/id"
package id
import (
"context"

@ -1,5 +1,5 @@
// Package io is for io management
package io // import "go.unistack.org/micro/v3/util/io"
package io
import (
"io"

@ -1,5 +1,5 @@
// Package jitter provides a random jitter
package jitter // import "go.unistack.org/micro/v3/util/jitter"
package jitter
import (
"time"

@ -1,4 +1,4 @@
package jitter // import "go.unistack.org/micro/v3/util/jitter"
package jitter
import (
"context"

@ -1,4 +1,4 @@
package net // import "go.unistack.org/micro/v3/util/net"
package net
import (
"errors"

@ -1,6 +1,6 @@
// Package pki provides PKI all the PKI functions necessary to run micro over an untrusted network
// including a CA
package pki // import "go.unistack.org/micro/v3/util/pki"
package pki
import (
"bytes"

@ -1,5 +1,5 @@
// Package pool is a connection pool
package pool // import "go.unistack.org/micro/v3/util/pool"
package pool
import (
"context"

@ -1,4 +1,4 @@
package rand // import "go.unistack.org/micro/v3/util/rand"
package rand
import (
crand "crypto/rand"

@ -1,4 +1,4 @@
package register // import "go.unistack.org/micro/v3/util/register"
package register
import (
"context"

@ -1,5 +1,5 @@
// Package ring provides a simple ring buffer for storing local data
package ring // import "go.unistack.org/micro/v3/util/ring"
package ring
import (
"sync"

@ -1,5 +1,5 @@
// Package socket provides a pseudo socket
package socket // import "go.unistack.org/micro/v3/util/socket"
package socket
import (
"io"

@ -1,5 +1,5 @@
// Package stream encapsulates streams within streams
package stream // import "go.unistack.org/micro/v3/util/stream"
package stream
import (
"context"