Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
d9b822deff
commit
b6a0e4d983
@ -222,7 +222,7 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
|
|||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
||||||
var sp tracer.Span
|
var sp tracer.Span
|
||||||
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
ctx, sp = n.opts.Tracer.Start(ctx, "rpc-client",
|
||||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
||||||
)
|
)
|
||||||
@ -385,7 +385,7 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
|
|||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
|
||||||
var sp tracer.Span
|
var sp tracer.Span
|
||||||
ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
|
ctx, sp = n.opts.Tracer.Start(ctx, "rpc-client",
|
||||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
tracer.WithSpanLabels("endpoint", req.Endpoint()),
|
||||||
)
|
)
|
||||||
|
14
semconv/cache.go
Normal file
14
semconv/cache.go
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
package semconv
|
||||||
|
|
||||||
|
var (
|
||||||
|
// CacheRequestDurationSeconds specifies meter metric name
|
||||||
|
CacheRequestDurationSeconds = "micro_cache_request_duration_seconds"
|
||||||
|
// CacheRequestLatencyMicroseconds specifies meter metric name
|
||||||
|
CacheRequestLatencyMicroseconds = "micro_cache_request_latency_microseconds"
|
||||||
|
// CacheRequestTotal specifies meter metric name
|
||||||
|
CacheRequestTotal = "micro_cache_request_total"
|
||||||
|
// CacheRequestInflight specifies meter metric name
|
||||||
|
CacheRequestInflight = "micro_cache_request_inflight"
|
||||||
|
// CacheItemsTotal specifies total cache items
|
||||||
|
CacheItemsTotal = "micro_cache_items_total"
|
||||||
|
)
|
@ -3,7 +3,7 @@ package semconv
|
|||||||
var (
|
var (
|
||||||
// StoreRequestDurationSeconds specifies meter metric name
|
// StoreRequestDurationSeconds specifies meter metric name
|
||||||
StoreRequestDurationSeconds = "micro_store_request_duration_seconds"
|
StoreRequestDurationSeconds = "micro_store_request_duration_seconds"
|
||||||
// ClientRequestLatencyMicroseconds specifies meter metric name
|
// StoreRequestLatencyMicroseconds specifies meter metric name
|
||||||
StoreRequestLatencyMicroseconds = "micro_store_request_latency_microseconds"
|
StoreRequestLatencyMicroseconds = "micro_store_request_latency_microseconds"
|
||||||
// StoreRequestTotal specifies meter metric name
|
// StoreRequestTotal specifies meter metric name
|
||||||
StoreRequestTotal = "micro_store_request_total"
|
StoreRequestTotal = "micro_store_request_total"
|
||||||
|
@ -34,7 +34,10 @@ func init() {
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
net.DefaultResolver = utildns.NewNetResolver(utildns.Timeout(1 * time.Second))
|
net.DefaultResolver = utildns.NewNetResolver(
|
||||||
|
utildns.Timeout(1*time.Second),
|
||||||
|
utildns.MinCacheTTL(5*time.Second),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service is an interface that wraps the lower level components.
|
// Service is an interface that wraps the lower level components.
|
||||||
|
@ -6,6 +6,9 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v3/meter"
|
||||||
|
"go.unistack.org/micro/v3/semconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DialFunc is a [net.Resolver.Dial] function.
|
// DialFunc is a [net.Resolver.Dial] function.
|
||||||
@ -19,6 +22,11 @@ func NewNetResolver(opts ...Option) *net.Resolver {
|
|||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if options.Meter == nil {
|
||||||
|
options.Meter = meter.DefaultMeter
|
||||||
|
opts = append(opts, Meter(options.Meter))
|
||||||
|
}
|
||||||
|
|
||||||
return &net.Resolver{
|
return &net.Resolver{
|
||||||
PreferGo: true,
|
PreferGo: true,
|
||||||
StrictErrors: options.Resolver.StrictErrors,
|
StrictErrors: options.Resolver.StrictErrors,
|
||||||
@ -56,6 +64,7 @@ type Options struct {
|
|||||||
PreferIPV4 bool
|
PreferIPV4 bool
|
||||||
PreferIPV6 bool
|
PreferIPV6 bool
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
Meter meter.Meter
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxCacheEntries sets the maximum number of entries to cache.
|
// MaxCacheEntries sets the maximum number of entries to cache.
|
||||||
@ -87,6 +96,13 @@ func NegativeCache(b bool) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Meter sets meter.Meter
|
||||||
|
func Meter(m meter.Meter) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Meter = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Timeout sets upstream *net.Resolver timeout
|
// Timeout sets upstream *net.Resolver timeout
|
||||||
func Timeout(td time.Duration) Option {
|
func Timeout(td time.Duration) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
@ -156,7 +172,6 @@ func (c *cache) put(req string, res string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
|
||||||
if c.entries == nil {
|
if c.entries == nil {
|
||||||
c.entries = make(map[string]cacheEntry)
|
c.entries = make(map[string]cacheEntry)
|
||||||
}
|
}
|
||||||
@ -165,6 +180,8 @@ func (c *cache) put(req string, res string) {
|
|||||||
var tested, evicted int
|
var tested, evicted int
|
||||||
for k, e := range c.entries {
|
for k, e := range c.entries {
|
||||||
if time.Until(e.deadline) <= 0 {
|
if time.Until(e.deadline) <= 0 {
|
||||||
|
c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Dec()
|
||||||
|
c.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "evict").Inc()
|
||||||
// delete expired entry
|
// delete expired entry
|
||||||
delete(c.entries, k)
|
delete(c.entries, k)
|
||||||
evicted++
|
evicted++
|
||||||
@ -175,6 +192,8 @@ func (c *cache) put(req string, res string) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if evicted == 0 && c.opts.MaxCacheEntries > 0 && len(c.entries) >= c.opts.MaxCacheEntries {
|
if evicted == 0 && c.opts.MaxCacheEntries > 0 && len(c.entries) >= c.opts.MaxCacheEntries {
|
||||||
|
c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Dec()
|
||||||
|
c.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "evict").Inc()
|
||||||
// delete at least one entry
|
// delete at least one entry
|
||||||
delete(c.entries, k)
|
delete(c.entries, k)
|
||||||
}
|
}
|
||||||
@ -186,6 +205,9 @@ func (c *cache) put(req string, res string) {
|
|||||||
deadline: time.Now().Add(ttl),
|
deadline: time.Now().Add(ttl),
|
||||||
value: res[2:],
|
value: res[2:],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.opts.Meter.Counter(semconv.CacheItemsTotal, "type", "dns").Inc()
|
||||||
|
c.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) get(req string) (res string) {
|
func (c *cache) get(req string) (res string) {
|
||||||
@ -210,6 +232,7 @@ func (c *cache) get(req string) (res string) {
|
|||||||
// prepend correct ID
|
// prepend correct ID
|
||||||
return req[:2] + entry.value
|
return req[:2] + entry.value
|
||||||
}
|
}
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -310,10 +333,19 @@ func getUint32(s string) int {
|
|||||||
|
|
||||||
func cachingRoundTrip(cache *cache, network, address string) roundTripper {
|
func cachingRoundTrip(cache *cache, network, address string) roundTripper {
|
||||||
return func(ctx context.Context, req string) (res string, err error) {
|
return func(ctx context.Context, req string) (res string, err error) {
|
||||||
|
cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Inc()
|
||||||
|
defer cache.opts.Meter.Counter(semconv.CacheRequestInflight, "type", "dns").Dec()
|
||||||
// check cache
|
// check cache
|
||||||
if res := cache.get(req); res != "" {
|
if res := cache.get(req); res != "" {
|
||||||
|
cache.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "get", "status", "hit").Inc()
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
cache.opts.Meter.Counter(semconv.CacheRequestTotal, "type", "dns", "method", "get", "status", "miss").Inc()
|
||||||
|
ts := time.Now()
|
||||||
|
defer func() {
|
||||||
|
cache.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "type", "dns", "method", "get").UpdateDuration(ts)
|
||||||
|
cache.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "type", "dns", "method", "get").UpdateDuration(ts)
|
||||||
|
}()
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case cache.opts.PreferIPV4 && cache.opts.PreferIPV6:
|
case cache.opts.PreferIPV4 && cache.opts.PreferIPV6:
|
||||||
@ -340,6 +372,7 @@ func cachingRoundTrip(cache *cache, network, address string) roundTripper {
|
|||||||
var d net.Dialer
|
var d net.Dialer
|
||||||
conn, err = d.DialContext(ctx, network, address)
|
conn, err = d.DialContext(ctx, network, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -12,5 +12,11 @@ func TestCache(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
t.Logf("addrs %v", addrs)
|
|
||||||
|
addrs, err = net.LookupHost("unistack.org")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = addrs
|
||||||
}
|
}
|
||||||
|
@ -11,15 +11,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type dnsConn struct {
|
type dnsConn struct {
|
||||||
sync.Mutex
|
deadline time.Time
|
||||||
|
|
||||||
ibuf bytes.Buffer
|
|
||||||
obuf bytes.Buffer
|
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
deadline time.Time
|
|
||||||
roundTrip roundTripper
|
roundTrip roundTripper
|
||||||
|
ibuf bytes.Buffer
|
||||||
|
obuf bytes.Buffer
|
||||||
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type roundTripper func(ctx context.Context, req string) (res string, err error)
|
type roundTripper func(ctx context.Context, req string) (res string, err error)
|
||||||
@ -78,8 +76,8 @@ func (c *dnsConn) SetDeadline(t time.Time) error {
|
|||||||
|
|
||||||
func (c *dnsConn) SetReadDeadline(t time.Time) error {
|
func (c *dnsConn) SetReadDeadline(t time.Time) error {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
|
||||||
c.deadline = t
|
c.deadline = t
|
||||||
|
c.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user