Compare commits
13 Commits
Author | SHA1 | Date | |
---|---|---|---|
1f0482fbd5 | |||
a862562284 | |||
c320c23913 | |||
|
ae848ba8bb | ||
|
8e264cbb3e | ||
|
54e523ab3f | ||
09973af099 | |||
3247da3dd0 | |||
b505455f7c | |||
293949f081 | |||
8d7e442b3a | |||
|
f7b5211af3 | ||
7eb6d030dc |
1
.github/renovate.json
vendored
1
.github/renovate.json
vendored
@@ -2,6 +2,7 @@
|
||||
"extends": [
|
||||
"config:base"
|
||||
],
|
||||
"postUpdateOptions": ["gomodTidy"],
|
||||
"packageRules": [
|
||||
{
|
||||
"matchUpdateTypes": ["minor", "patch", "pin", "digest"],
|
||||
|
@@ -175,7 +175,7 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
|
||||
}
|
||||
|
||||
func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts ...RequestOption) Request {
|
||||
return &noopRequest{}
|
||||
return &noopRequest{service: service, endpoint: endpoint}
|
||||
}
|
||||
|
||||
func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message {
|
||||
|
8
go.mod
8
go.mod
@@ -6,12 +6,8 @@ require (
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/ef-ds/deque v1.0.4
|
||||
github.com/google/uuid v1.2.0
|
||||
github.com/heimdalr/dag v1.0.1 // indirect
|
||||
github.com/imdario/mergo v0.3.11
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 // indirect
|
||||
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
|
||||
)
|
||||
|
18
go.sum
18
go.sum
@@ -1,35 +1,21 @@
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
|
||||
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
||||
github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M=
|
||||
github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8=
|
||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
|
||||
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/heimdalr/dag v1.0.1 h1:iR2K3DSUFDYx0GeV7iXBnZkedWS1xePSGrylQ197uxg=
|
||||
github.com/heimdalr/dag v1.0.1/go.mod h1:t+ZkR+sjKL4xhlE1B9rwpvwfo+x+2R0363efS+Oghns=
|
||||
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
|
||||
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
|
||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew=
|
||||
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
@@ -93,7 +93,9 @@ func NewIncomingContext(ctx context.Context, md Metadata) context.Context {
|
||||
ctx = context.Background()
|
||||
}
|
||||
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{md})
|
||||
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
|
||||
if v, ok := ctx.Value(mdOutgoingKey{}).(*rawMetadata); !ok || v == nil {
|
||||
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
|
||||
}
|
||||
return ctx
|
||||
}
|
||||
|
||||
@@ -103,6 +105,40 @@ func NewOutgoingContext(ctx context.Context, md Metadata) context.Context {
|
||||
ctx = context.Background()
|
||||
}
|
||||
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{md})
|
||||
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
|
||||
if v, ok := ctx.Value(mdIncomingKey{}).(*rawMetadata); !ok || v == nil {
|
||||
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
|
||||
}
|
||||
return ctx
|
||||
}
|
||||
|
||||
// AppendOutgoingContext apends new md to context
|
||||
func AppendOutgoingContext(ctx context.Context, kv ...string) context.Context {
|
||||
md, ok := Pairs(kv...)
|
||||
if !ok {
|
||||
return ctx
|
||||
}
|
||||
omd, ok := FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
return NewOutgoingContext(ctx, md)
|
||||
}
|
||||
for k, v := range md {
|
||||
omd.Set(k, v)
|
||||
}
|
||||
return NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
|
||||
// AppendIncomingContext apends new md to context
|
||||
func AppendIncomingContext(ctx context.Context, kv ...string) context.Context {
|
||||
md, ok := Pairs(kv...)
|
||||
if !ok {
|
||||
return ctx
|
||||
}
|
||||
omd, ok := FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return NewIncomingContext(ctx, md)
|
||||
}
|
||||
for k, v := range md {
|
||||
omd.Set(k, v)
|
||||
}
|
||||
return NewIncomingContext(ctx, omd)
|
||||
}
|
||||
|
@@ -111,3 +111,19 @@ func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata {
|
||||
}
|
||||
return nmd
|
||||
}
|
||||
|
||||
func Pairs(kv ...string) (Metadata, bool) {
|
||||
if len(kv)%2 == 1 {
|
||||
return nil, false
|
||||
}
|
||||
md := New(len(kv) / 2)
|
||||
var k string
|
||||
for i, v := range kv {
|
||||
if i%2 == 0 {
|
||||
k = v
|
||||
continue
|
||||
}
|
||||
md.Set(k, v)
|
||||
}
|
||||
return md, true
|
||||
}
|
||||
|
@@ -5,6 +5,28 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAppend(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctx = AppendIncomingContext(ctx, "key1", "val1", "key2", "val2")
|
||||
md, ok := FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
t.Fatal("metadata empty")
|
||||
}
|
||||
if _, ok := md.Get("key1"); !ok {
|
||||
t.Fatal("key1 not found")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPairs(t *testing.T) {
|
||||
md, ok := Pairs("key1", "val1", "key2", "val2")
|
||||
if !ok {
|
||||
t.Fatal("odd number of kv")
|
||||
}
|
||||
if _, ok = md.Get("key1"); !ok {
|
||||
t.Fatal("key1 not found")
|
||||
}
|
||||
}
|
||||
|
||||
func testCtx(ctx context.Context) {
|
||||
md := New(2)
|
||||
md.Set("Key1", "Val1_new")
|
||||
|
@@ -82,25 +82,6 @@ type Labels struct {
|
||||
vals []string
|
||||
}
|
||||
|
||||
type labels Labels
|
||||
|
||||
func (ls labels) sort() {
|
||||
sort.Sort(ls)
|
||||
}
|
||||
|
||||
func (ls labels) Len() int {
|
||||
return len(ls.keys)
|
||||
}
|
||||
|
||||
func (ls labels) Swap(i, j int) {
|
||||
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
|
||||
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
|
||||
}
|
||||
|
||||
func (ls labels) Less(i, j int) bool {
|
||||
return ls.vals[i] < ls.vals[j]
|
||||
}
|
||||
|
||||
// Append adds labels to label set
|
||||
func (ls Labels) Append(nls Labels) Labels {
|
||||
for n := range nls.keys {
|
||||
@@ -110,10 +91,30 @@ func (ls Labels) Append(nls Labels) Labels {
|
||||
return ls
|
||||
}
|
||||
|
||||
// Len returns number of labels
|
||||
func (ls Labels) Len() int {
|
||||
return len(ls.keys)
|
||||
}
|
||||
|
||||
type labels Labels
|
||||
|
||||
func (ls labels) Len() int {
|
||||
return len(ls.keys)
|
||||
}
|
||||
|
||||
func (ls labels) Sort() {
|
||||
sort.Sort(ls)
|
||||
}
|
||||
|
||||
func (ls labels) Swap(i, j int) {
|
||||
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
|
||||
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
|
||||
}
|
||||
|
||||
func (ls labels) Less(i, j int) bool {
|
||||
return ls.keys[i] < ls.keys[j]
|
||||
}
|
||||
|
||||
// LabelIter holds the
|
||||
type LabelIter struct {
|
||||
labels Labels
|
||||
@@ -123,7 +124,7 @@ type LabelIter struct {
|
||||
|
||||
// Iter returns labels iterator
|
||||
func (ls Labels) Iter() *LabelIter {
|
||||
labels(ls).sort()
|
||||
labels(ls).Sort()
|
||||
return &LabelIter{labels: ls, cnt: len(ls.keys)}
|
||||
}
|
||||
|
||||
|
@@ -5,12 +5,12 @@ import (
|
||||
)
|
||||
|
||||
func TestNoopMeter(t *testing.T) {
|
||||
meter := NewMeter(Path("/noop"))
|
||||
if "/noop" != meter.Options().Path {
|
||||
t.Fatalf("invalid options parsing: %v", meter.Options())
|
||||
m := NewMeter(Path("/noop"))
|
||||
if "/noop" != m.Options().Path {
|
||||
t.Fatalf("invalid options parsing: %v", m.Options())
|
||||
}
|
||||
|
||||
cnt := meter.Counter("counter", Label("server", "noop"))
|
||||
cnt := m.Counter("counter", Label("server", "noop"))
|
||||
cnt.Inc()
|
||||
}
|
||||
|
||||
@@ -32,15 +32,22 @@ func TestLabelsAppend(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestIterator(t *testing.T) {
|
||||
var ls Labels
|
||||
ls.keys = []string{"type", "server", "register"}
|
||||
ls.vals = []string{"noop", "http", "gossip"}
|
||||
options := NewOptions(
|
||||
Label("name", "svc1"),
|
||||
Label("version", "0.0.1"),
|
||||
Label("id", "12345"),
|
||||
Label("type", "noop"),
|
||||
Label("server", "http"),
|
||||
Label("register", "gossip"),
|
||||
Label("aa", "kk"),
|
||||
Label("zz", "kk"),
|
||||
)
|
||||
|
||||
iter := ls.Iter()
|
||||
iter := options.Labels.Iter()
|
||||
var k, v string
|
||||
cnt := 0
|
||||
for iter.Next(&k, &v) {
|
||||
if cnt == 1 && (k != "server" || v != "http") {
|
||||
if cnt == 4 && (k != "server" || v != "http") {
|
||||
t.Fatalf("iter error: %s != %s || %s != %s", k, "server", v, "http")
|
||||
}
|
||||
cnt++
|
||||
|
@@ -31,16 +31,14 @@ var (
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Meter meter.Meter
|
||||
Name string
|
||||
Version string
|
||||
ID string
|
||||
Meter meter.Meter
|
||||
lopts []meter.Option
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{}
|
||||
options := Options{lopts: make([]meter.Option, 0, 5)}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
@@ -49,19 +47,19 @@ func NewOptions(opts ...Option) Options {
|
||||
|
||||
func ServiceName(name string) Option {
|
||||
return func(o *Options) {
|
||||
o.Name = name
|
||||
o.lopts = append(o.lopts, meter.Label("name", name))
|
||||
}
|
||||
}
|
||||
|
||||
func ServiceVersion(version string) Option {
|
||||
return func(o *Options) {
|
||||
o.Version = version
|
||||
o.lopts = append(o.lopts, meter.Label("version", version))
|
||||
}
|
||||
}
|
||||
|
||||
func ServiceID(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.ID = id
|
||||
o.lopts = append(o.lopts, meter.Label("id", id))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,7 +102,7 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
|
||||
err := w.callFunc(ctx, addr, req, rsp, opts)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := make([]meter.Option, 0, 2)
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
@@ -127,7 +125,7 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
|
||||
err := w.Client.Call(ctx, req, rsp, opts...)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := make([]meter.Option, 0, 2)
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
@@ -150,7 +148,7 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
|
||||
stream, err := w.Client.Stream(ctx, req, opts...)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := make([]meter.Option, 0, 2)
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
@@ -173,7 +171,7 @@ func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.
|
||||
err := w.Client.Publish(ctx, p, opts...)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := make([]meter.Option, 0, 2)
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
@@ -204,7 +202,7 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
||||
err := fn(ctx, req, rsp)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := make([]meter.Option, 0, 2)
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
@@ -236,7 +234,7 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
|
||||
err := fn(ctx, msg)
|
||||
te := time.Since(ts)
|
||||
|
||||
lopts := make([]meter.Option, 0, 2)
|
||||
lopts := w.opts.lopts
|
||||
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
|
||||
|
||||
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
|
||||
|
@@ -369,9 +369,9 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi
|
||||
// serialize the result, each version counts as an individual service
|
||||
var result []*Service
|
||||
|
||||
for domain, service := range services {
|
||||
for _, service := range services {
|
||||
for _, version := range service {
|
||||
result = append(result, recordToService(version, domain))
|
||||
result = append(result, recordToService(version, options.Domain))
|
||||
}
|
||||
}
|
||||
|
||||
|
59
server/errors.go
Normal file
59
server/errors.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package server
|
||||
|
||||
import "github.com/unistack-org/micro/v3/errors"
|
||||
|
||||
type Error struct {
|
||||
id string
|
||||
}
|
||||
|
||||
func NewError(id string) *Error {
|
||||
return &Error{id}
|
||||
}
|
||||
|
||||
func (e *Error) BadRequest(format string, a ...interface{}) error {
|
||||
return errors.BadRequest(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) Unauthorized(format string, a ...interface{}) error {
|
||||
return errors.Unauthorized(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) Forbidden(format string, a ...interface{}) error {
|
||||
return errors.Forbidden(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) NotFound(format string, a ...interface{}) error {
|
||||
return errors.NotFound(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) MethodNotAllowed(format string, a ...interface{}) error {
|
||||
return errors.MethodNotAllowed(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) Timeout(format string, a ...interface{}) error {
|
||||
return errors.Timeout(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) Conflict(format string, a ...interface{}) error {
|
||||
return errors.Conflict(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) InternalServerError(format string, a ...interface{}) error {
|
||||
return errors.InternalServerError(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) NotImplemented(format string, a ...interface{}) error {
|
||||
return errors.NotImplemented(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) BadGateway(format string, a ...interface{}) error {
|
||||
return errors.BadGateway(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) ServiceUnavailable(format string, a ...interface{}) error {
|
||||
return errors.ServiceUnavailable(e.id, format, a...)
|
||||
}
|
||||
|
||||
func (e *Error) GatewayTimeout(format string, a ...interface{}) error {
|
||||
return errors.GatewayTimeout(e.id, format, a...)
|
||||
}
|
19
server/errors_test.go
Normal file
19
server/errors_test.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/unistack-org/micro/v3/errors"
|
||||
)
|
||||
|
||||
func TestError(t *testing.T) {
|
||||
e := NewError("svc1")
|
||||
err := e.BadRequest("%s", "test")
|
||||
merr, ok := err.(*errors.Error)
|
||||
if !ok {
|
||||
t.Fatal("error not *errors.Error")
|
||||
}
|
||||
if merr.Id != "svc1" {
|
||||
t.Fatal("id != svc1")
|
||||
}
|
||||
}
|
@@ -332,6 +332,7 @@ type SubscriberOptions struct {
|
||||
AutoAck bool
|
||||
Queue string
|
||||
Internal bool
|
||||
BodyOnly bool
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
@@ -389,6 +390,20 @@ func SubscriberQueue(n string) SubscriberOption {
|
||||
}
|
||||
}
|
||||
|
||||
// SubscriberGroup sets the shared group name distributed messages across subscribers
|
||||
func SubscriberGroup(n string) SubscriberOption {
|
||||
return func(o *SubscriberOptions) {
|
||||
o.Queue = n
|
||||
}
|
||||
}
|
||||
|
||||
// SubscriberBodyOnly says broker that message contains raw data with absence of micro broker.Message format
|
||||
func SubscriberBodyOnly(b bool) SubscriberOption {
|
||||
return func(o *SubscriberOptions) {
|
||||
o.BodyOnly = b
|
||||
}
|
||||
}
|
||||
|
||||
// SubscriberContext set context options to allow broker SubscriberOption passed
|
||||
func SubscriberContext(ctx context.Context) SubscriberOption {
|
||||
return func(o *SubscriberOptions) {
|
||||
|
@@ -3,41 +3,46 @@ package tracer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
const (
|
||||
traceIDKey = "Micro-Trace-Id"
|
||||
spanIDKey = "Micro-Span-Id"
|
||||
)
|
||||
type tracerKey struct{}
|
||||
|
||||
// FromContext returns a span from context
|
||||
func FromContext(ctx context.Context) (traceID string, parentSpanID string, isFound bool) {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return "", "", false
|
||||
// FromContext returns a tracer from context
|
||||
func FromContext(ctx context.Context) Tracer {
|
||||
if ctx == nil {
|
||||
return DefaultTracer
|
||||
}
|
||||
traceID, traceOk := md.Get(traceIDKey)
|
||||
microID, microOk := md.Get("Micro-Id")
|
||||
if !traceOk && !microOk {
|
||||
isFound = false
|
||||
return
|
||||
if tracer, ok := ctx.Value(tracerKey{}).(Tracer); ok {
|
||||
return tracer
|
||||
}
|
||||
if !traceOk {
|
||||
traceID = microID
|
||||
}
|
||||
parentSpanID, ok = md.Get(spanIDKey)
|
||||
return traceID, parentSpanID, ok
|
||||
return DefaultTracer
|
||||
}
|
||||
|
||||
// NewContext saves the trace and span ids in the context
|
||||
func NewContext(ctx context.Context, traceID, parentSpanID string) context.Context {
|
||||
md, ok := metadata.FromContext(ctx)
|
||||
if !ok {
|
||||
md = metadata.New(2)
|
||||
// NewContext saves the tracer in the context
|
||||
func NewContext(ctx context.Context, tracer Tracer) context.Context {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
md.Set(traceIDKey, traceID)
|
||||
md.Set(spanIDKey, parentSpanID)
|
||||
return metadata.NewContext(ctx, md)
|
||||
return context.WithValue(ctx, tracerKey{}, tracer)
|
||||
}
|
||||
|
||||
type spanKey struct{}
|
||||
|
||||
// SpanFromContext returns a span from context
|
||||
func SpanFromContext(ctx context.Context) Span {
|
||||
if ctx == nil {
|
||||
return &noopSpan{}
|
||||
}
|
||||
if span, ok := ctx.Value(spanKey{}).(Span); ok {
|
||||
return span
|
||||
}
|
||||
return &noopSpan{}
|
||||
}
|
||||
|
||||
// NewSpanContext saves the span in the context
|
||||
func NewSpanContext(ctx context.Context, span Span) context.Context {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
return context.WithValue(ctx, spanKey{}, span)
|
||||
}
|
||||
|
@@ -1,99 +0,0 @@
|
||||
package tracer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/unistack-org/micro/v3/util/ring"
|
||||
)
|
||||
|
||||
type tracer struct {
|
||||
opts Options
|
||||
// ring buffer of traces
|
||||
buffer *ring.Buffer
|
||||
}
|
||||
|
||||
func (t *tracer) Read(opts ...ReadOption) ([]*Span, error) {
|
||||
var options ReadOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
sp := t.buffer.Get(t.buffer.Size())
|
||||
|
||||
spans := make([]*Span, 0, len(sp))
|
||||
|
||||
for _, span := range sp {
|
||||
val := span.Value.(*Span)
|
||||
// skip if trace id is specified and doesn't match
|
||||
if len(options.Trace) > 0 && val.Trace != options.Trace {
|
||||
continue
|
||||
}
|
||||
spans = append(spans, val)
|
||||
}
|
||||
|
||||
return spans, nil
|
||||
}
|
||||
|
||||
func (t *tracer) Start(ctx context.Context, name string) (context.Context, *Span) {
|
||||
span := &Span{
|
||||
Name: name,
|
||||
Trace: uuid.New().String(),
|
||||
Id: uuid.New().String(),
|
||||
Started: time.Now(),
|
||||
Metadata: make(map[string]string),
|
||||
}
|
||||
|
||||
// return span if no context
|
||||
if ctx == nil {
|
||||
return NewContext(context.Background(), span.Trace, span.Id), span
|
||||
}
|
||||
traceID, parentSpanID, ok := FromContext(ctx)
|
||||
// If the trace can not be found in the header,
|
||||
// that means this is where the trace is created.
|
||||
if !ok {
|
||||
return NewContext(ctx, span.Trace, span.Id), span
|
||||
}
|
||||
|
||||
// set trace id
|
||||
span.Trace = traceID
|
||||
// set parent
|
||||
span.Parent = parentSpanID
|
||||
|
||||
// return the span
|
||||
return NewContext(ctx, span.Trace, span.Id), span
|
||||
}
|
||||
|
||||
func (t *tracer) Finish(s *Span) error {
|
||||
// set finished time
|
||||
s.Duration = time.Since(s.Started)
|
||||
// save the span
|
||||
t.buffer.Put(s)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tracer) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&t.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tracer) Lookup(ctx context.Context) (*Span, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (t *tracer) Name() string {
|
||||
return t.opts.Name
|
||||
}
|
||||
|
||||
// NewTracer returns new memory tracer
|
||||
func NewTracer(opts ...Option) Tracer {
|
||||
return &tracer{
|
||||
opts: NewOptions(opts...),
|
||||
// the last 256 requests
|
||||
buffer: ring.New(256),
|
||||
}
|
||||
}
|
69
tracer/noop.go
Normal file
69
tracer/noop.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package tracer
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type noopTracer struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) {
|
||||
span := &noopSpan{
|
||||
name: name,
|
||||
ctx: ctx,
|
||||
tracer: t,
|
||||
}
|
||||
if span.ctx == nil {
|
||||
span.ctx = context.Background()
|
||||
}
|
||||
return NewSpanContext(ctx, span), span
|
||||
}
|
||||
|
||||
func (t *noopTracer) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&t.opts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *noopTracer) Name() string {
|
||||
return t.opts.Name
|
||||
}
|
||||
|
||||
type noopSpan struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
tracer Tracer
|
||||
}
|
||||
|
||||
func (s *noopSpan) Finish(opts ...SpanOption) {
|
||||
|
||||
}
|
||||
|
||||
func (s *noopSpan) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
func (s *noopSpan) Tracer() Tracer {
|
||||
return s.tracer
|
||||
}
|
||||
|
||||
func (s *noopSpan) AddEvent(name string, opts ...EventOption) {
|
||||
|
||||
}
|
||||
|
||||
func (s *noopSpan) SetName(name string) {
|
||||
s.name = name
|
||||
}
|
||||
|
||||
func (s *noopSpan) SetLabels(labels ...Label) {
|
||||
|
||||
}
|
||||
|
||||
// NewTracer returns new memory tracer
|
||||
func NewTracer(opts ...Option) Tracer {
|
||||
return &noopTracer{
|
||||
opts: NewOptions(opts...),
|
||||
}
|
||||
}
|
@@ -2,39 +2,27 @@ package tracer
|
||||
|
||||
import "github.com/unistack-org/micro/v3/logger"
|
||||
|
||||
var (
|
||||
// DefaultSize of the buffer
|
||||
DefaultSize = 64
|
||||
)
|
||||
type SpanOptions struct {
|
||||
}
|
||||
|
||||
type SpanOption func(o *SpanOptions)
|
||||
|
||||
type EventOptions struct {
|
||||
}
|
||||
|
||||
type EventOption func(o *EventOptions)
|
||||
|
||||
// Options struct
|
||||
type Options struct {
|
||||
// Name of the tracer
|
||||
Name string
|
||||
// Logger is the logger for messages
|
||||
Logger logger.Logger
|
||||
// Size is the size of ring buffer
|
||||
Size int
|
||||
}
|
||||
|
||||
// Option func
|
||||
type Option func(o *Options)
|
||||
|
||||
// ReadOptions struct
|
||||
type ReadOptions struct {
|
||||
// Trace id
|
||||
Trace string
|
||||
}
|
||||
|
||||
// ReadOption func
|
||||
type ReadOption func(o *ReadOptions)
|
||||
|
||||
// ReadTrace read the given trace
|
||||
func ReadTrace(t string) ReadOption {
|
||||
return func(o *ReadOptions) {
|
||||
o.Trace = t
|
||||
}
|
||||
}
|
||||
|
||||
// Logger sets the logger
|
||||
func Logger(l logger.Logger) Option {
|
||||
return func(o *Options) {
|
||||
@@ -46,7 +34,6 @@ func Logger(l logger.Logger) Option {
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Logger: logger.DefaultLogger,
|
||||
Size: DefaultSize,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
|
@@ -3,9 +3,6 @@ package tracer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -15,44 +12,54 @@ var (
|
||||
|
||||
// Tracer is an interface for distributed tracing
|
||||
type Tracer interface {
|
||||
// Name return tracer name
|
||||
Name() string
|
||||
// Init tracer with options
|
||||
Init(...Option) error
|
||||
// Start a trace
|
||||
Start(ctx context.Context, name string) (context.Context, *Span)
|
||||
// Finish the trace
|
||||
Finish(*Span) error
|
||||
// Lookup get span from context
|
||||
Lookup(ctx context.Context) (*Span, error)
|
||||
// Read the traces
|
||||
Read(...ReadOption) ([]*Span, error)
|
||||
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
|
||||
}
|
||||
|
||||
// SpanType describe the nature of the trace span
|
||||
type SpanType int
|
||||
|
||||
const (
|
||||
// SpanTypeRequestInbound is a span created when serving a request
|
||||
SpanTypeRequestInbound SpanType = iota
|
||||
// SpanTypeRequestOutbound is a span created when making a service call
|
||||
SpanTypeRequestOutbound
|
||||
)
|
||||
|
||||
// Span is used to record an entry
|
||||
type Span struct {
|
||||
// Id of the trace
|
||||
Trace string
|
||||
// name of the span
|
||||
Name string
|
||||
// id of the span
|
||||
Id string
|
||||
// parent span id
|
||||
Parent string
|
||||
// Start time
|
||||
Started time.Time
|
||||
// Duration in nano seconds
|
||||
Duration time.Duration
|
||||
// associated data
|
||||
Metadata metadata.Metadata
|
||||
// Type
|
||||
Type SpanType
|
||||
type Span interface {
|
||||
// Tracer return underlining tracer
|
||||
Tracer() Tracer
|
||||
// Finish complete and send span
|
||||
Finish(opts ...SpanOption)
|
||||
// AddEvent add event to span
|
||||
AddEvent(name string, opts ...EventOption)
|
||||
// Context return context with span
|
||||
Context() context.Context
|
||||
// SetName set the span name
|
||||
SetName(name string)
|
||||
// SetLabels set the span labels
|
||||
SetLabels(labels ...Label)
|
||||
}
|
||||
|
||||
type Label struct {
|
||||
key string
|
||||
val interface{}
|
||||
}
|
||||
|
||||
func Any(k string, v interface{}) Label {
|
||||
return Label{k, v}
|
||||
}
|
||||
|
||||
func String(k string, v string) Label {
|
||||
return Label{k, v}
|
||||
}
|
||||
|
||||
func Int(k string, v int) Label {
|
||||
return Label{k, v}
|
||||
}
|
||||
|
||||
func Int64(k string, v int64) Label {
|
||||
return Label{k, v}
|
||||
}
|
||||
|
||||
func Float64(k string, v float64) Label {
|
||||
return Label{k, v}
|
||||
}
|
||||
|
||||
func Bool(k string, v bool) Label {
|
||||
return Label{k, v}
|
||||
}
|
||||
|
317
tracer/wrapper/wrapper.go
Normal file
317
tracer/wrapper/wrapper.go
Normal file
@@ -0,0 +1,317 @@
|
||||
// Package wrapper provides wrapper for Tracer
|
||||
package wrapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
type tWrapper struct {
|
||||
opts Options
|
||||
serverHandler server.HandlerFunc
|
||||
serverSubscriber server.SubscriberFunc
|
||||
clientCallFunc client.CallFunc
|
||||
client.Client
|
||||
}
|
||||
|
||||
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
|
||||
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
|
||||
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
|
||||
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
|
||||
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
|
||||
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
|
||||
|
||||
type Options struct {
|
||||
Tracer tracer.Tracer
|
||||
ClientCallObservers []ClientCallObserver
|
||||
ClientStreamObservers []ClientStreamObserver
|
||||
ClientPublishObservers []ClientPublishObserver
|
||||
ClientCallFuncObservers []ClientCallFuncObserver
|
||||
ServerHandlerObservers []ServerHandlerObserver
|
||||
ServerSubscriberObservers []ServerSubscriberObserver
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Tracer: tracer.DefaultTracer,
|
||||
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
|
||||
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
|
||||
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
|
||||
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
|
||||
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
|
||||
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
func WithTracer(t tracer.Tracer) Option {
|
||||
return func(o *Options) {
|
||||
o.Tracer = t
|
||||
}
|
||||
}
|
||||
|
||||
func WithClientCallObservers(ob ...ClientCallObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientCallObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientStreamObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientPublishObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientCallFuncObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerHandlerObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerSubscriberObservers = ob
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
|
||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||
var labels []tracer.Label
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
labels = make([]tracer.Label, 0, len(md))
|
||||
for k, v := range md {
|
||||
labels = append(labels, tracer.String(k, v))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
labels = append(labels, tracer.Bool("error", true))
|
||||
}
|
||||
sp.SetLabels(labels...)
|
||||
}
|
||||
|
||||
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
err := ot.Client.Call(ctx, req, rsp, opts...)
|
||||
|
||||
for _, o := range ot.opts.ClientCallObservers {
|
||||
o(ctx, req, rsp, opts, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
stream, err := ot.Client.Stream(ctx, req, opts...)
|
||||
|
||||
for _, o := range ot.opts.ClientStreamObservers {
|
||||
o(ctx, req, opts, stream, sp, err)
|
||||
}
|
||||
|
||||
return stream, err
|
||||
}
|
||||
|
||||
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
err := ot.Client.Publish(ctx, msg, opts...)
|
||||
|
||||
for _, o := range ot.opts.ClientPublishObservers {
|
||||
o(ctx, msg, opts, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
err := ot.serverHandler(ctx, req, rsp)
|
||||
|
||||
for _, o := range ot.opts.ServerHandlerObservers {
|
||||
o(ctx, req, rsp, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
err := ot.serverSubscriber(ctx, msg)
|
||||
|
||||
for _, o := range ot.opts.ServerSubscriberObservers {
|
||||
o(ctx, msg, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
|
||||
func NewClientWrapper(opts ...Option) client.Wrapper {
|
||||
return func(c client.Client) client.Client {
|
||||
options := NewOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &tWrapper{opts: options, Client: c}
|
||||
}
|
||||
}
|
||||
|
||||
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
|
||||
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
|
||||
return func(h client.CallFunc) client.CallFunc {
|
||||
options := NewOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
ot := &tWrapper{opts: options, clientCallFunc: h}
|
||||
return ot.ClientCallFunc
|
||||
}
|
||||
}
|
||||
|
||||
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
sp := tracer.SpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
|
||||
err := ot.clientCallFunc(ctx, addr, req, rsp, opts)
|
||||
|
||||
for _, o := range ot.opts.ClientCallFuncObservers {
|
||||
o(ctx, addr, req, rsp, opts, sp, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
|
||||
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
||||
return func(h server.HandlerFunc) server.HandlerFunc {
|
||||
options := NewOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
ot := &tWrapper{opts: options, serverHandler: h}
|
||||
return ot.ServerHandler
|
||||
}
|
||||
}
|
||||
|
||||
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
|
||||
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||
return func(h server.SubscriberFunc) server.SubscriberFunc {
|
||||
options := NewOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
ot := &tWrapper{opts: options, serverSubscriber: h}
|
||||
return ot.ServerSubscriber
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user