Compare commits

...

13 Commits

Author SHA1 Message Date
1f0482fbd5 tracer: finalize tracer implementation
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-04 01:12:16 +03:00
a862562284 fixup domain in ListServices
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-03 18:16:54 +03:00
c320c23913 metadata: minor fixup for NewXXXContext functions
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-03-01 13:00:53 +03:00
Renovate Bot
ae848ba8bb fix(deps): update golang.org/x/net commit hash to e18ecbb 2021-02-26 19:46:14 +00:00
Renovate Bot
8e264cbb3e fix(deps): update golang.org/x/net commit hash to 39120d0 2021-02-26 12:05:10 +00:00
Renovate Bot
54e523ab3f fix(deps): update golang.org/x/net commit hash to 3d97a24 2021-02-26 08:40:37 +00:00
09973af099 server: add error helper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-22 00:52:18 +03:00
3247da3dd0 metadata: add Pairs helper func
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-22 00:08:05 +03:00
b505455f7c run go mod tidy in renovate update
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-21 23:55:19 +03:00
293949f081 metadata: add Append func to Incoming/Outgoing context
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-21 23:54:59 +03:00
8d7e442b3a server: add SubscriberBodyOnly option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-20 18:12:13 +03:00
renovate[bot]
f7b5211af3 fix(deps): update golang.org/x/net commit hash to 5f55cee (#20)
Co-authored-by: Renovate Bot <bot@renovateapp.com>
2021-02-20 11:40:35 +03:00
7eb6d030dc meter: fix internal labels sorting
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-02-18 15:57:42 +03:00
20 changed files with 701 additions and 259 deletions

View File

@@ -2,6 +2,7 @@
"extends": [
"config:base"
],
"postUpdateOptions": ["gomodTidy"],
"packageRules": [
{
"matchUpdateTypes": ["minor", "patch", "pin", "digest"],

View File

@@ -175,7 +175,7 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
}
func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts ...RequestOption) Request {
return &noopRequest{}
return &noopRequest{service: service, endpoint: endpoint}
}
func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message {

8
go.mod
View File

@@ -6,12 +6,8 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4
github.com/google/uuid v1.2.0
github.com/heimdalr/dag v1.0.1 // indirect
github.com/imdario/mergo v0.3.11
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 // indirect
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
)

18
go.sum
View File

@@ -1,35 +1,21 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M=
github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/heimdalr/dag v1.0.1 h1:iR2K3DSUFDYx0GeV7iXBnZkedWS1xePSGrylQ197uxg=
github.com/heimdalr/dag v1.0.1/go.mod h1:t+ZkR+sjKL4xhlE1B9rwpvwfo+x+2R0363efS+Oghns=
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 h1:vBfVmA5mZhsQa2jr1FOL9nfA37N/jnbBmi5XUfviVTI=
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -93,7 +93,9 @@ func NewIncomingContext(ctx context.Context, md Metadata) context.Context {
ctx = context.Background()
}
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{md})
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
if v, ok := ctx.Value(mdOutgoingKey{}).(*rawMetadata); !ok || v == nil {
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{})
}
return ctx
}
@@ -103,6 +105,40 @@ func NewOutgoingContext(ctx context.Context, md Metadata) context.Context {
ctx = context.Background()
}
ctx = context.WithValue(ctx, mdOutgoingKey{}, &rawMetadata{md})
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
if v, ok := ctx.Value(mdIncomingKey{}).(*rawMetadata); !ok || v == nil {
ctx = context.WithValue(ctx, mdIncomingKey{}, &rawMetadata{})
}
return ctx
}
// AppendOutgoingContext apends new md to context
func AppendOutgoingContext(ctx context.Context, kv ...string) context.Context {
md, ok := Pairs(kv...)
if !ok {
return ctx
}
omd, ok := FromOutgoingContext(ctx)
if !ok {
return NewOutgoingContext(ctx, md)
}
for k, v := range md {
omd.Set(k, v)
}
return NewOutgoingContext(ctx, omd)
}
// AppendIncomingContext apends new md to context
func AppendIncomingContext(ctx context.Context, kv ...string) context.Context {
md, ok := Pairs(kv...)
if !ok {
return ctx
}
omd, ok := FromIncomingContext(ctx)
if !ok {
return NewIncomingContext(ctx, md)
}
for k, v := range md {
omd.Set(k, v)
}
return NewIncomingContext(ctx, omd)
}

View File

@@ -111,3 +111,19 @@ func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata {
}
return nmd
}
func Pairs(kv ...string) (Metadata, bool) {
if len(kv)%2 == 1 {
return nil, false
}
md := New(len(kv) / 2)
var k string
for i, v := range kv {
if i%2 == 0 {
k = v
continue
}
md.Set(k, v)
}
return md, true
}

View File

@@ -5,6 +5,28 @@ import (
"testing"
)
func TestAppend(t *testing.T) {
ctx := context.Background()
ctx = AppendIncomingContext(ctx, "key1", "val1", "key2", "val2")
md, ok := FromIncomingContext(ctx)
if !ok {
t.Fatal("metadata empty")
}
if _, ok := md.Get("key1"); !ok {
t.Fatal("key1 not found")
}
}
func TestPairs(t *testing.T) {
md, ok := Pairs("key1", "val1", "key2", "val2")
if !ok {
t.Fatal("odd number of kv")
}
if _, ok = md.Get("key1"); !ok {
t.Fatal("key1 not found")
}
}
func testCtx(ctx context.Context) {
md := New(2)
md.Set("Key1", "Val1_new")

View File

@@ -82,25 +82,6 @@ type Labels struct {
vals []string
}
type labels Labels
func (ls labels) sort() {
sort.Sort(ls)
}
func (ls labels) Len() int {
return len(ls.keys)
}
func (ls labels) Swap(i, j int) {
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
}
func (ls labels) Less(i, j int) bool {
return ls.vals[i] < ls.vals[j]
}
// Append adds labels to label set
func (ls Labels) Append(nls Labels) Labels {
for n := range nls.keys {
@@ -110,10 +91,30 @@ func (ls Labels) Append(nls Labels) Labels {
return ls
}
// Len returns number of labels
func (ls Labels) Len() int {
return len(ls.keys)
}
type labels Labels
func (ls labels) Len() int {
return len(ls.keys)
}
func (ls labels) Sort() {
sort.Sort(ls)
}
func (ls labels) Swap(i, j int) {
ls.keys[i], ls.keys[j] = ls.keys[j], ls.keys[i]
ls.vals[i], ls.vals[j] = ls.vals[j], ls.vals[i]
}
func (ls labels) Less(i, j int) bool {
return ls.keys[i] < ls.keys[j]
}
// LabelIter holds the
type LabelIter struct {
labels Labels
@@ -123,7 +124,7 @@ type LabelIter struct {
// Iter returns labels iterator
func (ls Labels) Iter() *LabelIter {
labels(ls).sort()
labels(ls).Sort()
return &LabelIter{labels: ls, cnt: len(ls.keys)}
}

View File

@@ -5,12 +5,12 @@ import (
)
func TestNoopMeter(t *testing.T) {
meter := NewMeter(Path("/noop"))
if "/noop" != meter.Options().Path {
t.Fatalf("invalid options parsing: %v", meter.Options())
m := NewMeter(Path("/noop"))
if "/noop" != m.Options().Path {
t.Fatalf("invalid options parsing: %v", m.Options())
}
cnt := meter.Counter("counter", Label("server", "noop"))
cnt := m.Counter("counter", Label("server", "noop"))
cnt.Inc()
}
@@ -32,15 +32,22 @@ func TestLabelsAppend(t *testing.T) {
}
func TestIterator(t *testing.T) {
var ls Labels
ls.keys = []string{"type", "server", "register"}
ls.vals = []string{"noop", "http", "gossip"}
options := NewOptions(
Label("name", "svc1"),
Label("version", "0.0.1"),
Label("id", "12345"),
Label("type", "noop"),
Label("server", "http"),
Label("register", "gossip"),
Label("aa", "kk"),
Label("zz", "kk"),
)
iter := ls.Iter()
iter := options.Labels.Iter()
var k, v string
cnt := 0
for iter.Next(&k, &v) {
if cnt == 1 && (k != "server" || v != "http") {
if cnt == 4 && (k != "server" || v != "http") {
t.Fatalf("iter error: %s != %s || %s != %s", k, "server", v, "http")
}
cnt++

View File

@@ -31,16 +31,14 @@ var (
)
type Options struct {
Meter meter.Meter
Name string
Version string
ID string
Meter meter.Meter
lopts []meter.Option
}
type Option func(*Options)
func NewOptions(opts ...Option) Options {
options := Options{}
options := Options{lopts: make([]meter.Option, 0, 5)}
for _, o := range opts {
o(&options)
}
@@ -49,19 +47,19 @@ func NewOptions(opts ...Option) Options {
func ServiceName(name string) Option {
return func(o *Options) {
o.Name = name
o.lopts = append(o.lopts, meter.Label("name", name))
}
}
func ServiceVersion(version string) Option {
return func(o *Options) {
o.Version = version
o.lopts = append(o.lopts, meter.Label("version", version))
}
}
func ServiceID(id string) Option {
return func(o *Options) {
o.ID = id
o.lopts = append(o.lopts, meter.Label("id", id))
}
}
@@ -104,7 +102,7 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -127,7 +125,7 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -150,7 +148,7 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -173,7 +171,7 @@ func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -204,7 +202,7 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
err := fn(ctx, req, rsp)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
@@ -236,7 +234,7 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
err := fn(ctx, msg)
te := time.Since(ts)
lopts := make([]meter.Option, 0, 2)
lopts := w.opts.lopts
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))

View File

@@ -369,9 +369,9 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi
// serialize the result, each version counts as an individual service
var result []*Service
for domain, service := range services {
for _, service := range services {
for _, version := range service {
result = append(result, recordToService(version, domain))
result = append(result, recordToService(version, options.Domain))
}
}

59
server/errors.go Normal file
View File

@@ -0,0 +1,59 @@
package server
import "github.com/unistack-org/micro/v3/errors"
type Error struct {
id string
}
func NewError(id string) *Error {
return &Error{id}
}
func (e *Error) BadRequest(format string, a ...interface{}) error {
return errors.BadRequest(e.id, format, a...)
}
func (e *Error) Unauthorized(format string, a ...interface{}) error {
return errors.Unauthorized(e.id, format, a...)
}
func (e *Error) Forbidden(format string, a ...interface{}) error {
return errors.Forbidden(e.id, format, a...)
}
func (e *Error) NotFound(format string, a ...interface{}) error {
return errors.NotFound(e.id, format, a...)
}
func (e *Error) MethodNotAllowed(format string, a ...interface{}) error {
return errors.MethodNotAllowed(e.id, format, a...)
}
func (e *Error) Timeout(format string, a ...interface{}) error {
return errors.Timeout(e.id, format, a...)
}
func (e *Error) Conflict(format string, a ...interface{}) error {
return errors.Conflict(e.id, format, a...)
}
func (e *Error) InternalServerError(format string, a ...interface{}) error {
return errors.InternalServerError(e.id, format, a...)
}
func (e *Error) NotImplemented(format string, a ...interface{}) error {
return errors.NotImplemented(e.id, format, a...)
}
func (e *Error) BadGateway(format string, a ...interface{}) error {
return errors.BadGateway(e.id, format, a...)
}
func (e *Error) ServiceUnavailable(format string, a ...interface{}) error {
return errors.ServiceUnavailable(e.id, format, a...)
}
func (e *Error) GatewayTimeout(format string, a ...interface{}) error {
return errors.GatewayTimeout(e.id, format, a...)
}

19
server/errors_test.go Normal file
View File

@@ -0,0 +1,19 @@
package server
import (
"testing"
"github.com/unistack-org/micro/v3/errors"
)
func TestError(t *testing.T) {
e := NewError("svc1")
err := e.BadRequest("%s", "test")
merr, ok := err.(*errors.Error)
if !ok {
t.Fatal("error not *errors.Error")
}
if merr.Id != "svc1" {
t.Fatal("id != svc1")
}
}

View File

@@ -332,6 +332,7 @@ type SubscriberOptions struct {
AutoAck bool
Queue string
Internal bool
BodyOnly bool
Context context.Context
}
@@ -389,6 +390,20 @@ func SubscriberQueue(n string) SubscriberOption {
}
}
// SubscriberGroup sets the shared group name distributed messages across subscribers
func SubscriberGroup(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberBodyOnly says broker that message contains raw data with absence of micro broker.Message format
func SubscriberBodyOnly(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.BodyOnly = b
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {

View File

@@ -3,41 +3,46 @@ package tracer
import (
"context"
"github.com/unistack-org/micro/v3/metadata"
)
const (
traceIDKey = "Micro-Trace-Id"
spanIDKey = "Micro-Span-Id"
)
type tracerKey struct{}
// FromContext returns a span from context
func FromContext(ctx context.Context) (traceID string, parentSpanID string, isFound bool) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", "", false
// FromContext returns a tracer from context
func FromContext(ctx context.Context) Tracer {
if ctx == nil {
return DefaultTracer
}
traceID, traceOk := md.Get(traceIDKey)
microID, microOk := md.Get("Micro-Id")
if !traceOk && !microOk {
isFound = false
return
if tracer, ok := ctx.Value(tracerKey{}).(Tracer); ok {
return tracer
}
if !traceOk {
traceID = microID
}
parentSpanID, ok = md.Get(spanIDKey)
return traceID, parentSpanID, ok
return DefaultTracer
}
// NewContext saves the trace and span ids in the context
func NewContext(ctx context.Context, traceID, parentSpanID string) context.Context {
md, ok := metadata.FromContext(ctx)
if !ok {
md = metadata.New(2)
// NewContext saves the tracer in the context
func NewContext(ctx context.Context, tracer Tracer) context.Context {
if ctx == nil {
ctx = context.Background()
}
md.Set(traceIDKey, traceID)
md.Set(spanIDKey, parentSpanID)
return metadata.NewContext(ctx, md)
return context.WithValue(ctx, tracerKey{}, tracer)
}
type spanKey struct{}
// SpanFromContext returns a span from context
func SpanFromContext(ctx context.Context) Span {
if ctx == nil {
return &noopSpan{}
}
if span, ok := ctx.Value(spanKey{}).(Span); ok {
return span
}
return &noopSpan{}
}
// NewSpanContext saves the span in the context
func NewSpanContext(ctx context.Context, span Span) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, spanKey{}, span)
}

View File

@@ -1,99 +0,0 @@
package tracer
import (
"context"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/util/ring"
)
type tracer struct {
opts Options
// ring buffer of traces
buffer *ring.Buffer
}
func (t *tracer) Read(opts ...ReadOption) ([]*Span, error) {
var options ReadOptions
for _, o := range opts {
o(&options)
}
sp := t.buffer.Get(t.buffer.Size())
spans := make([]*Span, 0, len(sp))
for _, span := range sp {
val := span.Value.(*Span)
// skip if trace id is specified and doesn't match
if len(options.Trace) > 0 && val.Trace != options.Trace {
continue
}
spans = append(spans, val)
}
return spans, nil
}
func (t *tracer) Start(ctx context.Context, name string) (context.Context, *Span) {
span := &Span{
Name: name,
Trace: uuid.New().String(),
Id: uuid.New().String(),
Started: time.Now(),
Metadata: make(map[string]string),
}
// return span if no context
if ctx == nil {
return NewContext(context.Background(), span.Trace, span.Id), span
}
traceID, parentSpanID, ok := FromContext(ctx)
// If the trace can not be found in the header,
// that means this is where the trace is created.
if !ok {
return NewContext(ctx, span.Trace, span.Id), span
}
// set trace id
span.Trace = traceID
// set parent
span.Parent = parentSpanID
// return the span
return NewContext(ctx, span.Trace, span.Id), span
}
func (t *tracer) Finish(s *Span) error {
// set finished time
s.Duration = time.Since(s.Started)
// save the span
t.buffer.Put(s)
return nil
}
func (t *tracer) Init(opts ...Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *tracer) Lookup(ctx context.Context) (*Span, error) {
return nil, nil
}
func (t *tracer) Name() string {
return t.opts.Name
}
// NewTracer returns new memory tracer
func NewTracer(opts ...Option) Tracer {
return &tracer{
opts: NewOptions(opts...),
// the last 256 requests
buffer: ring.New(256),
}
}

69
tracer/noop.go Normal file
View File

@@ -0,0 +1,69 @@
package tracer
import (
"context"
)
type noopTracer struct {
opts Options
}
func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) {
span := &noopSpan{
name: name,
ctx: ctx,
tracer: t,
}
if span.ctx == nil {
span.ctx = context.Background()
}
return NewSpanContext(ctx, span), span
}
func (t *noopTracer) Init(opts ...Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *noopTracer) Name() string {
return t.opts.Name
}
type noopSpan struct {
name string
ctx context.Context
tracer Tracer
}
func (s *noopSpan) Finish(opts ...SpanOption) {
}
func (s *noopSpan) Context() context.Context {
return s.ctx
}
func (s *noopSpan) Tracer() Tracer {
return s.tracer
}
func (s *noopSpan) AddEvent(name string, opts ...EventOption) {
}
func (s *noopSpan) SetName(name string) {
s.name = name
}
func (s *noopSpan) SetLabels(labels ...Label) {
}
// NewTracer returns new memory tracer
func NewTracer(opts ...Option) Tracer {
return &noopTracer{
opts: NewOptions(opts...),
}
}

View File

@@ -2,39 +2,27 @@ package tracer
import "github.com/unistack-org/micro/v3/logger"
var (
// DefaultSize of the buffer
DefaultSize = 64
)
type SpanOptions struct {
}
type SpanOption func(o *SpanOptions)
type EventOptions struct {
}
type EventOption func(o *EventOptions)
// Options struct
type Options struct {
// Name of the tracer
Name string
// Logger is the logger for messages
Logger logger.Logger
// Size is the size of ring buffer
Size int
}
// Option func
type Option func(o *Options)
// ReadOptions struct
type ReadOptions struct {
// Trace id
Trace string
}
// ReadOption func
type ReadOption func(o *ReadOptions)
// ReadTrace read the given trace
func ReadTrace(t string) ReadOption {
return func(o *ReadOptions) {
o.Trace = t
}
}
// Logger sets the logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
@@ -46,7 +34,6 @@ func Logger(l logger.Logger) Option {
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Size: DefaultSize,
}
for _, o := range opts {
o(&options)

View File

@@ -3,9 +3,6 @@ package tracer
import (
"context"
"time"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@@ -15,44 +12,54 @@ var (
// Tracer is an interface for distributed tracing
type Tracer interface {
// Name return tracer name
Name() string
// Init tracer with options
Init(...Option) error
// Start a trace
Start(ctx context.Context, name string) (context.Context, *Span)
// Finish the trace
Finish(*Span) error
// Lookup get span from context
Lookup(ctx context.Context) (*Span, error)
// Read the traces
Read(...ReadOption) ([]*Span, error)
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
}
// SpanType describe the nature of the trace span
type SpanType int
const (
// SpanTypeRequestInbound is a span created when serving a request
SpanTypeRequestInbound SpanType = iota
// SpanTypeRequestOutbound is a span created when making a service call
SpanTypeRequestOutbound
)
// Span is used to record an entry
type Span struct {
// Id of the trace
Trace string
// name of the span
Name string
// id of the span
Id string
// parent span id
Parent string
// Start time
Started time.Time
// Duration in nano seconds
Duration time.Duration
// associated data
Metadata metadata.Metadata
// Type
Type SpanType
type Span interface {
// Tracer return underlining tracer
Tracer() Tracer
// Finish complete and send span
Finish(opts ...SpanOption)
// AddEvent add event to span
AddEvent(name string, opts ...EventOption)
// Context return context with span
Context() context.Context
// SetName set the span name
SetName(name string)
// SetLabels set the span labels
SetLabels(labels ...Label)
}
type Label struct {
key string
val interface{}
}
func Any(k string, v interface{}) Label {
return Label{k, v}
}
func String(k string, v string) Label {
return Label{k, v}
}
func Int(k string, v int) Label {
return Label{k, v}
}
func Int64(k string, v int64) Label {
return Label{k, v}
}
func Float64(k string, v float64) Label {
return Label{k, v}
}
func Bool(k string, v bool) Label {
return Label{k, v}
}

317
tracer/wrapper/wrapper.go Normal file
View File

@@ -0,0 +1,317 @@
// Package wrapper provides wrapper for Tracer
package wrapper
import (
"context"
"fmt"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/server"
"github.com/unistack-org/micro/v3/tracer"
)
type tWrapper struct {
opts Options
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
client.Client
}
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
type Options struct {
Tracer tracer.Tracer
ClientCallObservers []ClientCallObserver
ClientStreamObservers []ClientStreamObserver
ClientPublishObservers []ClientPublishObserver
ClientCallFuncObservers []ClientCallFuncObserver
ServerHandlerObservers []ServerHandlerObserver
ServerSubscriberObservers []ServerSubscriberObserver
}
type Option func(*Options)
func NewOptions(opts ...Option) Options {
options := Options{
Tracer: tracer.DefaultTracer,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
}
for _, o := range opts {
o(&options)
}
return options
}
func WithTracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.Client.Call(ctx, req, rsp, opts...)
for _, o := range ot.opts.ClientCallObservers {
o(ctx, req, rsp, opts, sp, err)
}
return err
}
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
stream, err := ot.Client.Stream(ctx, req, opts...)
for _, o := range ot.opts.ClientStreamObservers {
o(ctx, req, opts, stream, sp, err)
}
return stream, err
}
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.Client.Publish(ctx, msg, opts...)
for _, o := range ot.opts.ClientPublishObservers {
o(ctx, msg, opts, sp, err)
}
return err
}
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.serverHandler(ctx, req, rsp)
for _, o := range ot.opts.ServerHandlerObservers {
o(ctx, req, rsp, sp, err)
}
return err
}
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.serverSubscriber(ctx, msg)
for _, o := range ot.opts.ServerSubscriberObservers {
o(ctx, msg, sp, err)
}
return err
}
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &tWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, clientCallFunc: h}
return ot.ClientCallFunc
}
}
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.clientCallFunc(ctx, addr, req, rsp, opts)
for _, o := range ot.opts.ClientCallFuncObservers {
o(ctx, addr, req, rsp, opts, sp, err)
}
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverHandler: h}
return ot.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverSubscriber: h}
return ot.ServerSubscriber
}
}