#335 caller skip count. #337

Merged
vtolstov merged 3 commits from :#335-v3 into v3 2024-04-15 13:30:49 +03:00
52 changed files with 1683 additions and 457 deletions
Showing only changes of commit e82e793525 - Show all commits

View File

@ -10,15 +10,15 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: setup-go
uses: https://gitea.com/actions/setup-go@v3
uses: actions/setup-go@v3
with:
go-version: 1.18
go-version: 1.21
- name: checkout
uses: https://gitea.com/actions/checkout@v3
uses: actions/checkout@v3
- name: deps
run: go get -v -d ./...
- name: lint
uses: https://github.com/golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
version: v1.52
version: v1.52

View File

@ -10,14 +10,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: checkout
uses: https://gitea.com/actions/checkout@v3
uses: actions/checkout@v3
- name: setup-go
uses: https://gitea.com/actions/setup-go@v3
uses: actions/setup-go@v3
with:
go-version: 1.18
go-version: 1.21
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
run: go test -mod readonly -v ./...

3
.gitignore vendored
View File

@ -1,6 +1,8 @@
# Develop tools
/.vscode/
/.idea/
.idea
.vscode
# Binaries for programs and plugins
*.exe
@ -13,6 +15,7 @@
_obj
_test
_build
.DS_Store
# Architecture specific extensions/prefixes
*.[568vq]

View File

@ -4,19 +4,22 @@ package broker // import "go.unistack.org/micro/v3/broker"
import (
"context"
"errors"
"time"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
)
// DefaultBroker default memory broker
var DefaultBroker = NewBroker()
var DefaultBroker Broker = NewBroker()
var (
// ErrNotConnected returns when broker used but not connected yet
ErrNotConnected = errors.New("broker not connected")
// ErrDisconnected returns when broker disconnected
ErrDisconnected = errors.New("broker disconnected")
// DefaultGracefulTimeout
DefaultGracefulTimeout = 5 * time.Second
)
// Broker is an interface used for asynchronous messaging.

View File

@ -4,6 +4,7 @@ import (
"context"
"sync"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
maddr "go.unistack.org/micro/v3/util/addr"
@ -15,7 +16,7 @@ import (
type memoryBroker struct {
subscribers map[string][]*memorySubscriber
addr string
opts Options
opts broker.Options
sync.RWMutex
connected bool
}
@ -24,20 +25,20 @@ type memoryEvent struct {
err error
message interface{}
topic string
opts Options
opts broker.Options
}
type memorySubscriber struct {
ctx context.Context
exit chan bool
handler Handler
batchhandler BatchHandler
handler broker.Handler
batchhandler broker.BatchHandler
id string
topic string
opts SubscribeOptions
opts broker.SubscribeOptions
}
func (m *memoryBroker) Options() Options {
func (m *memoryBroker) Options() broker.Options {
return m.opts
}
@ -46,6 +47,12 @@ func (m *memoryBroker) Address() string {
}
func (m *memoryBroker) Connect(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
m.Lock()
defer m.Unlock()
@ -70,6 +77,12 @@ func (m *memoryBroker) Connect(ctx context.Context) error {
}
func (m *memoryBroker) Disconnect(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
m.Lock()
defer m.Unlock()
@ -81,27 +94,27 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error {
return nil
}
func (m *memoryBroker) Init(opts ...Option) error {
func (m *memoryBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
msg.Header.Set(metadata.HeaderTopic, topic)
return m.publish(ctx, []*Message{msg}, opts...)
return m.publish(ctx, []*broker.Message{msg}, opts...)
}
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
return m.publish(ctx, msgs, opts...)
}
func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return ErrNotConnected
return broker.ErrNotConnected
}
m.RUnlock()
@ -111,9 +124,9 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
case <-ctx.Done():
return ctx.Err()
default:
options := NewPublishOptions(opts...)
options := broker.NewPublishOptions(opts...)
msgTopicMap := make(map[string]Events)
msgTopicMap := make(map[string]broker.Events)
for _, v := range msgs {
p := &memoryEvent{opts: m.opts}
@ -188,11 +201,11 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
return nil
}
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
return nil, broker.ErrNotConnected
}
m.RUnlock()
@ -201,7 +214,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
return nil, err
}
options := NewSubscribeOptions(opts...)
options := broker.NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
@ -233,11 +246,11 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
return sub, nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
return nil, broker.ErrNotConnected
}
m.RUnlock()
@ -246,7 +259,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
return nil, err
}
options := NewSubscribeOptions(opts...)
options := broker.NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
@ -290,12 +303,12 @@ func (m *memoryEvent) Topic() string {
return m.topic
}
func (m *memoryEvent) Message() *Message {
func (m *memoryEvent) Message() *broker.Message {
switch v := m.message.(type) {
case *Message:
case *broker.Message:
return v
case []byte:
msg := &Message{}
msg := &broker.Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
@ -320,7 +333,7 @@ func (m *memoryEvent) SetError(err error) {
m.err = err
}
func (m *memorySubscriber) Options() SubscribeOptions {
func (m *memorySubscriber) Options() broker.SubscribeOptions {
return m.opts
}
@ -334,9 +347,9 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
}
// NewBroker return new memory broker
func NewBroker(opts ...Option) Broker {
func NewBroker(opts ...broker.Option) broker.Broker {
return &memoryBroker{
opts: NewOptions(opts...),
opts: broker.NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber),
}
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"testing"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/metadata"
)
@ -19,7 +20,7 @@ func TestMemoryBatchBroker(t *testing.T) {
topic := "test"
count := 10
fn := func(evts Events) error {
fn := func(evts broker.Events) error {
return evts.Ack()
}
@ -28,9 +29,9 @@ func TestMemoryBatchBroker(t *testing.T) {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]*Message, 0, count)
msgs := make([]*broker.Message, 0, count)
for i := 0; i < count; i++ {
message := &Message{
message := &broker.Message{
Header: map[string]string{
metadata.HeaderTopic: topic,
"foo": "bar",
@ -65,7 +66,7 @@ func TestMemoryBroker(t *testing.T) {
topic := "test"
count := 10
fn := func(p Event) error {
fn := func(p broker.Event) error {
return nil
}
@ -74,9 +75,9 @@ func TestMemoryBroker(t *testing.T) {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]*Message, 0, count)
msgs := make([]*broker.Message, 0, count)
for i := 0; i < count; i++ {
message := &Message{
message := &broker.Message{
Header: map[string]string{
metadata.HeaderTopic: topic,
"foo": "bar",

82
broker/noop.go Normal file
View File

@ -0,0 +1,82 @@
package broker
import (
"context"
"strings"
)
type NoopBroker struct {
opts Options
}
func NewBroker(opts ...Option) *NoopBroker {
b := &NoopBroker{opts: NewOptions(opts...)}
return b
}
func (b *NoopBroker) Name() string {
return b.opts.Name
}
func (b *NoopBroker) String() string {
return "noop"
}
func (b *NoopBroker) Options() Options {
return b.opts
}
func (b *NoopBroker) Init(opts ...Option) error {
for _, opt := range opts {
opt(&b.opts)
}
return nil
}
func (b *NoopBroker) Connect(_ context.Context) error {
return nil
}
func (b *NoopBroker) Disconnect(_ context.Context) error {
return nil
}
func (b *NoopBroker) Address() string {
return strings.Join(b.opts.Addrs, ",")
}
func (b *NoopBroker) BatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error {
return nil
}
func (b *NoopBroker) Publish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error {
return nil
}
type NoopSubscriber struct {
ctx context.Context
topic string
handler Handler
batchHandler BatchHandler
opts SubscribeOptions
}
func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil
}
func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil
}
func (s *NoopSubscriber) Options() SubscribeOptions {
return s.opts
}
func (s *NoopSubscriber) Topic() string {
return s.topic
}
func (s *NoopSubscriber) Unsubscribe(ctx context.Context) error {
return nil
}

View File

@ -9,6 +9,7 @@ import (
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
)
@ -36,17 +37,22 @@ type Options struct {
Name string
// Addrs holds the broker address
Addrs []string
Wait *sync.WaitGroup
GracefulTimeout time.Duration
}
// NewOptions create new Options
func NewOptions(opts ...Option) Options {
options := Options{
Register: register.DefaultRegister,
Logger: logger.DefaultLogger,
Context: context.Background(),
Meter: meter.DefaultMeter,
Codec: codec.DefaultCodec,
Tracer: tracer.DefaultTracer,
Register: register.DefaultRegister,
Logger: logger.DefaultLogger,
Context: context.Background(),
Meter: meter.DefaultMeter,
Codec: codec.DefaultCodec,
Tracer: tracer.DefaultTracer,
GracefulTimeout: DefaultGracefulTimeout,
}
for _, o := range opts {
o(&options)

41
cluster/cluster.go Normal file
View File

@ -0,0 +1,41 @@
package cluster
import (
"context"
"go.unistack.org/micro/v3/metadata"
)
// Message sent to member in cluster
type Message interface {
// Header returns message headers
Header() metadata.Metadata
// Body returns broker message may be []byte slice or some go struct or interface
Body() interface{}
}
type Node interface {
// Name returns node name
Name() string
// Address returns node address
Address() string
// Metadata returns node metadata
Metadata() metadata.Metadata
}
// Cluster interface used for cluster communication across nodes
type Cluster interface {
// Join is used to take an existing members and performing state sync
Join(ctx context.Context, addr ...string) error
// Leave broadcast a leave message and stop listeners
Leave(ctx context.Context) error
// Ping is used to probe live status of the node
Ping(ctx context.Context, node Node, payload []byte) error
// Members returns the cluster members
Members() ([]Node, error)
// Broadcast send message for all members in cluster, if filter is not nil, nodes may be filtered
// by key/value pairs
Broadcast(ctx context.Context, msg Message, filter ...string) error
// Unicast send message to single member in cluster
Unicast(ctx context.Context, node Node, msg Message) error
}

View File

@ -13,7 +13,7 @@ type Validator interface {
}
// DefaultConfig default config
var DefaultConfig = NewConfig()
var DefaultConfig Config = NewConfig()
// DefaultWatcherMinInterval default min interval for poll changes
var DefaultWatcherMinInterval = 5 * time.Second

View File

@ -7,8 +7,8 @@ import (
"strings"
"time"
"dario.cat/mergo"
"github.com/google/uuid"
"github.com/imdario/mergo"
mid "go.unistack.org/micro/v3/util/id"
rutil "go.unistack.org/micro/v3/util/reflect"
mtime "go.unistack.org/micro/v3/util/time"

4
go.mod
View File

@ -1,11 +1,11 @@
module go.unistack.org/micro/v3
go 1.19
go 1.20
require (
dario.cat/mergo v1.0.0
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/google/uuid v1.3.0
github.com/imdario/mergo v0.3.15
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
golang.org/x/sync v0.3.0

4
go.sum
View File

@ -1,3 +1,5 @@
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
@ -7,8 +9,6 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=

View File

@ -3,12 +3,15 @@ package logger // import "go.unistack.org/micro/v3/logger"
import (
"context"
"os"
)
type ContextAttrFunc func(ctx context.Context) []interface{}
var DefaultContextAttrFuncs []ContextAttrFunc
var (
// DefaultLogger variable
DefaultLogger = NewLogger(WithLevel(ParseLevel(os.Getenv("MICRO_LOG_LEVEL"))))
DefaultLogger Logger = NewLogger()
// DefaultLevel used by logger
DefaultLevel = InfoLevel
// DefaultCallerSkipCount used by logger
@ -57,7 +60,9 @@ type Logger interface {
Log(ctx context.Context, level Level, args ...interface{})
// Logf logs message with needed level
Logf(ctx context.Context, level Level, msg string, args ...interface{})
// String returns the name of logger
// Name returns broker instance name
Name() string
// String returns the type of logger
String() string
}
@ -65,76 +70,106 @@ type Logger interface {
type Field interface{}
// Info writes msg to default logger on info level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Info(ctx context.Context, args ...interface{}) {
DefaultLogger.Info(ctx, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Info(ctx, args...)
}
// Error writes msg to default logger on error level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Error(ctx context.Context, args ...interface{}) {
DefaultLogger.Error(ctx, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Error(ctx, args...)
}
// Debug writes msg to default logger on debug level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Debug(ctx context.Context, args ...interface{}) {
DefaultLogger.Debug(ctx, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Debug(ctx, args...)
}
// Warn writes msg to default logger on warn level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Warn(ctx context.Context, args ...interface{}) {
DefaultLogger.Warn(ctx, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Warn(ctx, args...)
}
// Trace writes msg to default logger on trace level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Trace(ctx context.Context, args ...interface{}) {
DefaultLogger.Trace(ctx, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Trace(ctx, args...)
}
// Fatal writes msg to default logger on fatal level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Fatal(ctx context.Context, args ...interface{}) {
DefaultLogger.Fatal(ctx, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Fatal(ctx, args...)
}
// Infof writes formatted msg to default logger on info level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Infof(ctx context.Context, msg string, args ...interface{}) {
DefaultLogger.Infof(ctx, msg, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Infof(ctx, msg, args...)
}
// Errorf writes formatted msg to default logger on error level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Errorf(ctx context.Context, msg string, args ...interface{}) {
DefaultLogger.Errorf(ctx, msg, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Errorf(ctx, msg, args...)
}
// Debugf writes formatted msg to default logger on debug level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Debugf(ctx context.Context, msg string, args ...interface{}) {
DefaultLogger.Debugf(ctx, msg, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Debugf(ctx, msg, args...)
}
// Warnf writes formatted msg to default logger on warn level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Warnf(ctx context.Context, msg string, args ...interface{}) {
DefaultLogger.Warnf(ctx, msg, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Warnf(ctx, msg, args...)
}
// Tracef writes formatted msg to default logger on trace level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Tracef(ctx context.Context, msg string, args ...interface{}) {
DefaultLogger.Tracef(ctx, msg, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Tracef(ctx, msg, args...)
}
// Fatalf writes formatted msg to default logger on fatal level
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Fatalf(ctx context.Context, msg string, args ...interface{}) {
DefaultLogger.Fatalf(ctx, msg, args...)
DefaultLogger.Clone(WithCallerSkipCount(DefaultCallerSkipCount+1)).Fatalf(ctx, msg, args...)
}
// V returns true if passed level enabled in default logger
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func V(level Level) bool {
return DefaultLogger.V(level)
}
// Init initialize logger
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Init(opts ...Option) error {
return DefaultLogger.Init(opts...)
}
// Fields create logger with specific fields
//
// Deprecated: Dont use logger methods directly, use instance of logger to avoid additional allocations
func Fields(fields ...interface{}) Logger {
return DefaultLogger.Fields(fields...)
}

View File

@ -13,11 +13,15 @@ func NewLogger(opts ...Option) Logger {
return &noopLogger{opts: options}
}
func (l *noopLogger) V(lvl Level) bool {
func (l *noopLogger) V(_ Level) bool {
return false
}
func (l *noopLogger) Level(lvl Level) {
func (l *noopLogger) Level(_ Level) {
}
func (l *noopLogger) Name() string {
return l.opts.Name
}
func (l *noopLogger) Init(opts ...Option) error {
@ -35,7 +39,7 @@ func (l *noopLogger) Clone(opts ...Option) Logger {
return nl
}
func (l *noopLogger) Fields(attrs ...interface{}) Logger {
func (l *noopLogger) Fields(_ ...interface{}) Logger {
return l
}

View File

@ -3,10 +3,12 @@ package logger
import (
"context"
"io"
"log/slog"
"os"
"time"
)
// Option func
// Option func signature
type Option func(*Options)
// Options holds logger options
@ -15,31 +17,65 @@ type Options struct {
Out io.Writer
// Context holds exernal options
Context context.Context
// Fields holds additional metadata
Fields []interface{}
// Name holds the logger name
Name string
// The logging level the logger should log
Level Level
// Fields holds additional metadata
Fields []interface{}
// CallerSkipCount number of frmaes to skip
CallerSkipCount int
// ContextAttrFuncs contains funcs that executed before log func on context
ContextAttrFuncs []ContextAttrFunc
// TimeKey is the key used for the time of the log call
TimeKey string
// LevelKey is the key used for the level of the log call
LevelKey string
// ErroreKey is the key used for the error of the log call
ErrorKey string
// MessageKey is the key used for the message of the log call
MessageKey string
// SourceKey is the key used for the source file and line of the log call
SourceKey string
// StacktraceKey is the key used for the stacktrace
StacktraceKey string
// AddStacktrace controls writing of stacktaces on error
AddStacktrace bool
// AddSource enabled writing source file and position in log
AddSource bool
// The logging level the logger should log
Level Level
// TimeFunc used to obtain current time
TimeFunc func() time.Time
}
// NewOptions creates new options struct
func NewOptions(opts ...Option) Options {
options := Options{
Level: DefaultLevel,
Fields: make([]interface{}, 0, 6),
Out: os.Stderr,
CallerSkipCount: DefaultCallerSkipCount,
Context: context.Background(),
Level: DefaultLevel,
Fields: make([]interface{}, 0, 6),
Out: os.Stderr,
CallerSkipCount: DefaultCallerSkipCount,
Context: context.Background(),
ContextAttrFuncs: DefaultContextAttrFuncs,
AddSource: true,
TimeFunc: time.Now,
}
WithMicroKeys()(&options)
for _, o := range opts {
o(&options)
}
return options
}
// WithContextAttrFuncs appends default funcs for the context arrts filler
func WithContextAttrFuncs(fncs ...ContextAttrFunc) Option {
return func(o *Options) {
o.ContextAttrFuncs = append(o.ContextAttrFuncs, fncs...)
}
}
// WithFields set default fields for the logger
func WithFields(fields ...interface{}) Option {
return func(o *Options) {
@ -61,6 +97,20 @@ func WithOutput(out io.Writer) Option {
}
}
// WitAddStacktrace controls writing stacktrace on error
func WithAddStacktrace(v bool) Option {
return func(o *Options) {
o.AddStacktrace = v
}
}
// WitAddSource controls writing source file and pos in log
func WithAddSource(v bool) Option {
return func(o *Options) {
o.AddSource = v
}
}
// WithCallerSkipCount set frame count to skip
func WithCallerSkipCount(c int) Option {
return func(o *Options) {
@ -82,6 +132,57 @@ func WithName(n string) Option {
}
}
// WithTimeFunc sets the func to obtain current time
func WithTimeFunc(fn func() time.Time) Option {
return func(o *Options) {
o.TimeFunc = fn
}
}
func WithZapKeys() Option {
return func(o *Options) {
o.TimeKey = "@timestamp"
o.LevelKey = "level"
o.MessageKey = "msg"
o.SourceKey = "caller"
o.StacktraceKey = "stacktrace"
o.ErrorKey = "error"
}
}
func WithZerologKeys() Option {
return func(o *Options) {
o.TimeKey = "time"
o.LevelKey = "level"
o.MessageKey = "message"
o.SourceKey = "caller"
o.StacktraceKey = "stacktrace"
o.ErrorKey = "error"
}
}
func WithSlogKeys() Option {
return func(o *Options) {
o.TimeKey = slog.TimeKey
o.LevelKey = slog.LevelKey
o.MessageKey = slog.MessageKey
o.SourceKey = slog.SourceKey
o.StacktraceKey = "stacktrace"
o.ErrorKey = "error"
}
}
func WithMicroKeys() Option {
return func(o *Options) {
o.TimeKey = "timestamp"
o.LevelKey = "level"
o.MessageKey = "msg"
o.SourceKey = "caller"
o.StacktraceKey = "stacktrace"
o.ErrorKey = "error"
}
}
func WithIncCallerSkipCount(n int) Option {
return func(o *Options) {
o.CallerSkipCount += n

View File

@ -1,27 +0,0 @@
package slog
import "go.unistack.org/micro/v3/logger"
type sourceKey struct{}
func WithSourceKey(v string) logger.Option {
return logger.SetOption(sourceKey{}, v)
}
type timeKey struct{}
func WithTimeKey(v string) logger.Option {
return logger.SetOption(timeKey{}, v)
}
type messageKey struct{}
func WithMessageKey(v string) logger.Option {
return logger.SetOption(messageKey{}, v)
}
type levelKey struct{}
func WithLevelKey(v string) logger.Option {
return logger.SetOption(levelKey{}, v)
}

View File

@ -5,21 +5,16 @@ import (
"fmt"
"log/slog"
"os"
"regexp"
"runtime"
"strconv"
"sync"
"time"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/tracer"
)
var (
DefaultSourceKey string = slog.SourceKey
DefaultTimeKey string = slog.TimeKey
DefaultMessageKey string = slog.MessageKey
DefaultLevelKey string = slog.LevelKey
)
var reTrace = regexp.MustCompile(`.*/slog/logger\.go.*\n`)
var (
traceValue = slog.StringValue("trace")
@ -35,15 +30,15 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
case slog.SourceKey:
source := a.Value.Any().(*slog.Source)
a.Value = slog.StringValue(source.File + ":" + strconv.Itoa(source.Line))
a.Key = s.sourceKey
a.Key = s.opts.SourceKey
case slog.TimeKey:
a.Key = s.timeKey
a.Key = s.opts.TimeKey
case slog.MessageKey:
a.Key = s.messageKey
a.Key = s.opts.MessageKey
case slog.LevelKey:
level := a.Value.Any().(slog.Level)
lvl := slogToLoggerLevel(level)
a.Key = s.levelKey
a.Key = s.opts.LevelKey
switch {
case lvl < logger.DebugLevel:
a.Value = traceValue
@ -66,56 +61,33 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
}
type slogLogger struct {
slog *slog.Logger
leveler *slog.LevelVar
levelKey string
messageKey string
sourceKey string
timeKey string
opts logger.Options
mu sync.RWMutex
leveler *slog.LevelVar
handler slog.Handler
opts logger.Options
mu sync.RWMutex
}
func (s *slogLogger) Clone(opts ...logger.Option) logger.Logger {
s.mu.RLock()
options := s.opts
s.mu.RUnlock()
for _, o := range opts {
o(&options)
}
l := &slogLogger{
opts: options,
levelKey: s.levelKey,
messageKey: s.messageKey,
sourceKey: s.sourceKey,
timeKey: s.timeKey,
}
if v, ok := l.opts.Context.Value(levelKey{}).(string); ok && v != "" {
l.levelKey = v
}
if v, ok := l.opts.Context.Value(messageKey{}).(string); ok && v != "" {
l.messageKey = v
}
if v, ok := l.opts.Context.Value(sourceKey{}).(string); ok && v != "" {
l.sourceKey = v
}
if v, ok := l.opts.Context.Value(timeKey{}).(string); ok && v != "" {
l.timeKey = v
opts: options,
}
l.leveler = new(slog.LevelVar)
handleOpt := &slog.HandlerOptions{
ReplaceAttr: s.renameAttr,
ReplaceAttr: l.renameAttr,
Level: l.leveler,
AddSource: true,
AddSource: l.opts.AddSource,
}
l.leveler.Set(loggerToSlogLevel(l.opts.Level))
handler := slog.NewJSONHandler(options.Out, handleOpt)
l.slog = slog.New(handler).With(options.Fields...)
s.mu.RUnlock()
l.handler = slog.New(slog.NewJSONHandler(options.Out, handleOpt)).With(options.Fields...).Handler()
return l
}
@ -134,61 +106,44 @@ func (s *slogLogger) Options() logger.Options {
func (s *slogLogger) Fields(attrs ...interface{}) logger.Logger {
s.mu.RLock()
nl := &slogLogger{
opts: s.opts,
levelKey: s.levelKey,
messageKey: s.messageKey,
sourceKey: s.sourceKey,
timeKey: s.timeKey,
}
nl.leveler = new(slog.LevelVar)
nl.leveler.Set(s.leveler.Level())
handleOpt := &slog.HandlerOptions{
ReplaceAttr: nl.renameAttr,
Level: nl.leveler,
AddSource: true,
}
handler := slog.NewJSONHandler(s.opts.Out, handleOpt)
nl.slog = slog.New(handler).With(attrs...)
level := s.leveler.Level()
options := s.opts
s.mu.RUnlock()
return nl
l := &slogLogger{opts: options}
l.leveler = new(slog.LevelVar)
l.leveler.Set(level)
handleOpt := &slog.HandlerOptions{
ReplaceAttr: l.renameAttr,
Level: l.leveler,
AddSource: l.opts.AddSource,
}
l.handler = slog.New(slog.NewJSONHandler(l.opts.Out, handleOpt)).With(attrs...).Handler()
return l
}
func (s *slogLogger) Init(opts ...logger.Option) error {
s.mu.Lock()
for _, o := range opts {
o(&s.opts)
if len(s.opts.ContextAttrFuncs) == 0 {
s.opts.ContextAttrFuncs = logger.DefaultContextAttrFuncs
}
if v, ok := s.opts.Context.Value(levelKey{}).(string); ok && v != "" {
s.levelKey = v
}
if v, ok := s.opts.Context.Value(messageKey{}).(string); ok && v != "" {
s.messageKey = v
}
if v, ok := s.opts.Context.Value(sourceKey{}).(string); ok && v != "" {
s.sourceKey = v
}
if v, ok := s.opts.Context.Value(timeKey{}).(string); ok && v != "" {
s.timeKey = v
for _, o := range opts {
o(&s.opts)
}
s.leveler = new(slog.LevelVar)
handleOpt := &slog.HandlerOptions{
ReplaceAttr: s.renameAttr,
Level: s.leveler,
AddSource: true,
AddSource: s.opts.AddSource,
}
s.leveler.Set(loggerToSlogLevel(s.opts.Level))
handler := slog.NewJSONHandler(s.opts.Out, handleOpt)
s.slog = slog.New(handler).With(s.opts.Fields...)
slog.SetDefault(s.slog)
s.handler = slog.New(slog.NewJSONHandler(s.opts.Out, handleOpt)).With(s.opts.Fields...).Handler()
s.mu.Unlock()
return nil
@ -200,9 +155,37 @@ func (s *slogLogger) Log(ctx context.Context, lvl logger.Level, attrs ...interfa
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), loggerToSlogLevel(lvl), fmt.Sprintf("%s", attrs[0]), pcs[0])
// r.Add(attrs[1:]...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), fmt.Sprintf("%s", attrs[0]), pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
if s.opts.AddStacktrace && lvl == logger.ErrorLevel {
stackInfo := make([]byte, 1024*1024)
if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 {
traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1)
if len(traceLines) != 0 {
attrs = append(attrs, slog.String(s.opts.StacktraceKey, traceLines[len(traceLines)-1]))
}
}
}
r.Add(attrs[1:]...)
r.Attrs(func(a slog.Attr) bool {
if a.Key == s.opts.ErrorKey {
if span, ok := tracer.SpanFromContext(ctx); ok {
span.SetStatus(tracer.SpanStatusError, a.Value.String())
return false
}
}
return true
})
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, attrs ...interface{}) {
@ -211,9 +194,37 @@ func (s *slogLogger) Logf(ctx context.Context, lvl logger.Level, msg string, att
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), loggerToSlogLevel(lvl), fmt.Sprintf(msg, attrs...), pcs[0])
// r.Add(attrs...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), loggerToSlogLevel(lvl), msg, pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
if s.opts.AddStacktrace && lvl == logger.ErrorLevel {
stackInfo := make([]byte, 1024*1024)
if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 {
traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1)
if len(traceLines) != 0 {
attrs = append(attrs, (slog.String(s.opts.StacktraceKey, traceLines[len(traceLines)-1])))
}
}
}
r.Add(attrs[1:]...)
r.Attrs(func(a slog.Attr) bool {
if a.Key == s.opts.ErrorKey {
if span, ok := tracer.SpanFromContext(ctx); ok {
span.SetStatus(tracer.SpanStatusError, a.Value.String())
return false
}
}
return true
})
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
@ -222,9 +233,19 @@ func (s *slogLogger) Info(ctx context.Context, attrs ...interface{}) {
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelInfo, fmt.Sprintf("%s", attrs[0]), pcs[0])
// r.Add(attrs[1:]...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelInfo, fmt.Sprintf("%s", attrs[0]), pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs[1:]...)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}) {
@ -233,9 +254,19 @@ func (s *slogLogger) Infof(ctx context.Context, msg string, attrs ...interface{}
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelInfo, fmt.Sprintf(msg, attrs...), pcs[0])
// r.Add(attrs...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelInfo, msg, pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs...)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
@ -244,9 +275,19 @@ func (s *slogLogger) Debug(ctx context.Context, attrs ...interface{}) {
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelDebug, fmt.Sprintf("%s", attrs[0]), pcs[0])
// r.Add(attrs[1:]...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug, fmt.Sprintf("%s", attrs[0]), pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs[1:]...)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{}) {
@ -255,9 +296,19 @@ func (s *slogLogger) Debugf(ctx context.Context, msg string, attrs ...interface{
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelDebug, fmt.Sprintf(msg, attrs...), pcs[0])
// r.Add(attrs...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug, msg, pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs...)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
@ -266,9 +317,19 @@ func (s *slogLogger) Trace(ctx context.Context, attrs ...interface{}) {
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelDebug-1, fmt.Sprintf("%s", attrs[0]), pcs[0])
// r.Add(attrs[1:]...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug-1, fmt.Sprintf("%s", attrs[0]), pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs[1:]...)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{}) {
@ -277,9 +338,19 @@ func (s *slogLogger) Tracef(ctx context.Context, msg string, attrs ...interface{
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelDebug-1, fmt.Sprintf(msg, attrs...), pcs[0])
// r.Add(attrs...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelDebug-1, msg, pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs[1:]...)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
@ -288,10 +359,29 @@ func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelError, fmt.Sprintf("%s", attrs[0]), pcs[0])
// r.Add(attrs[1:]...)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError, fmt.Sprintf("%s", attrs[0]), pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
if s.opts.AddStacktrace {
stackInfo := make([]byte, 1024*1024)
if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 {
traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1)
if len(traceLines) != 0 {
attrs = append(attrs, slog.String("stacktrace", traceLines[len(traceLines)-1]))
}
}
}
r.Add(attrs[1:]...)
r.Attrs(func(a slog.Attr) bool {
if a.Key == "error" {
if a.Key == s.opts.ErrorKey {
if span, ok := tracer.SpanFromContext(ctx); ok {
span.SetStatus(tracer.SpanStatusError, a.Value.String())
return false
@ -299,7 +389,7 @@ func (s *slogLogger) Error(ctx context.Context, attrs ...interface{}) {
}
return true
})
_ = s.slog.Handler().Handle(ctx, r)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{}) {
@ -308,10 +398,29 @@ func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelError, fmt.Sprintf(msg, attrs...), pcs[0])
// r.Add(attrs...)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError, msg, pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
if s.opts.AddStacktrace {
stackInfo := make([]byte, 1024*1024)
if stackSize := runtime.Stack(stackInfo, false); stackSize > 0 {
traceLines := reTrace.Split(string(stackInfo[:stackSize]), -1)
if len(traceLines) != 0 {
attrs = append(attrs, slog.String("stacktrace", traceLines[len(traceLines)-1]))
}
}
}
r.Add(attrs...)
r.Attrs(func(a slog.Attr) bool {
if a.Key == "error" {
if a.Key == s.opts.ErrorKey {
if span, ok := tracer.SpanFromContext(ctx); ok {
span.SetStatus(tracer.SpanStatusError, a.Value.String())
return false
@ -319,7 +428,7 @@ func (s *slogLogger) Errorf(ctx context.Context, msg string, attrs ...interface{
}
return true
})
_ = s.slog.Handler().Handle(ctx, r)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
@ -328,9 +437,19 @@ func (s *slogLogger) Fatal(ctx context.Context, attrs ...interface{}) {
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelError+1, fmt.Sprintf("%s", attrs[0]), pcs[0])
// r.Add(attrs[1:]...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError+1, fmt.Sprintf("%s", attrs[0]), pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs[1:]...)
_ = s.handler.Handle(ctx, r)
os.Exit(1)
}
@ -340,9 +459,19 @@ func (s *slogLogger) Fatalf(ctx context.Context, msg string, attrs ...interface{
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelError+1, fmt.Sprintf(msg, attrs...), pcs[0])
// r.Add(attrs...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelError+1, msg, pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs...)
_ = s.handler.Handle(ctx, r)
os.Exit(1)
}
@ -352,9 +481,19 @@ func (s *slogLogger) Warn(ctx context.Context, attrs ...interface{}) {
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelWarn, fmt.Sprintf("%s", attrs[0]), pcs[0])
// r.Add(attrs[1:]...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelWarn, fmt.Sprintf("%s", attrs[0]), pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs[1:]...)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}) {
@ -363,9 +502,23 @@ func (s *slogLogger) Warnf(ctx context.Context, msg string, attrs ...interface{}
}
var pcs [1]uintptr
runtime.Callers(s.opts.CallerSkipCount, pcs[:]) // skip [Callers, Infof]
r := slog.NewRecord(time.Now(), slog.LevelWarn, fmt.Sprintf(msg, attrs...), pcs[0])
// r.Add(attrs...)
_ = s.slog.Handler().Handle(ctx, r)
r := slog.NewRecord(s.opts.TimeFunc(), slog.LevelWarn, msg, pcs[0])
for _, fn := range s.opts.ContextAttrFuncs {
attrs = append(attrs, fn(ctx)...)
}
for idx, attr := range attrs {
if ve, ok := attr.(error); ok && ve != nil {
attrs[idx] = slog.String(s.opts.ErrorKey, ve.Error())
break
}
}
r.Add(attrs[1:]...)
_ = s.handler.Handle(ctx, r)
}
func (s *slogLogger) Name() string {
return s.opts.Name
}
func (s *slogLogger) String() string {
@ -374,24 +527,9 @@ func (s *slogLogger) String() string {
func NewLogger(opts ...logger.Option) logger.Logger {
s := &slogLogger{
opts: logger.NewOptions(opts...),
sourceKey: DefaultSourceKey,
timeKey: DefaultTimeKey,
messageKey: DefaultMessageKey,
levelKey: DefaultLevelKey,
}
if v, ok := s.opts.Context.Value(levelKey{}).(string); ok && v != "" {
s.levelKey = v
}
if v, ok := s.opts.Context.Value(messageKey{}).(string); ok && v != "" {
s.messageKey = v
}
if v, ok := s.opts.Context.Value(sourceKey{}).(string); ok && v != "" {
s.sourceKey = v
}
if v, ok := s.opts.Context.Value(timeKey{}).(string); ok && v != "" {
s.timeKey = v
opts: logger.NewOptions(opts...),
}
return s
}

View File

@ -3,12 +3,47 @@ package slog
import (
"bytes"
"context"
"fmt"
"log"
"testing"
"go.unistack.org/micro/v3/logger"
)
func TestError(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf), logger.WithAddStacktrace(true))
if err := l.Init(); err != nil {
t.Fatal(err)
}
l.Error(ctx, "message", fmt.Errorf("error message"))
if !bytes.Contains(buf.Bytes(), []byte(`"stacktrace":"`)) {
t.Fatalf("logger stacktrace not works, buf contains: %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte(`"error":"`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
}
}
func TestErrorf(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf), logger.WithAddStacktrace(true))
if err := l.Init(); err != nil {
t.Fatal(err)
}
l.Errorf(ctx, "message", fmt.Errorf("error message"))
if !bytes.Contains(buf.Bytes(), []byte(`"stacktrace":"`)) {
t.Fatalf("logger stacktrace not works, buf contains: %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte(`"error":"`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
}
}
func TestContext(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)

View File

@ -98,11 +98,12 @@ func (md Metadata) Del(keys ...string) {
}
// Copy makes a copy of the metadata
func Copy(md Metadata) Metadata {
func Copy(md Metadata, exclude ...string) Metadata {
nmd := New(len(md))
for key, val := range md {
nmd.Set(key, val)
}
nmd.Del(exclude...)
return nmd
}

View File

@ -190,3 +190,14 @@ func TestMetadataContext(t *testing.T) {
t.Errorf("Expected metadata length 1 got %d", i)
}
}
func TestCopy(t *testing.T) {
md := New(2)
md.Set("key1", "val1", "key2", "val2")
nmd := Copy(md, "key2")
if len(nmd) != 1 {
t.Fatal("Copy exclude not works")
} else if nmd["Key1"] != "val1" {
t.Fatal("Copy exclude not works")
}
}

View File

@ -1,5 +1,5 @@
// Package meter is for instrumentation
package meter // import "go.unistack.org/micro/v3/meter"
package meter
import (
"io"
@ -11,7 +11,7 @@ import (
var (
// DefaultMeter is the default meter
DefaultMeter = NewMeter()
DefaultMeter Meter = NewMeter()
// DefaultAddress data will be made available on this host:port
DefaultAddress = ":9090"
// DefaultPath the meter endpoint where the Meter data will be made available

94
micro.go Normal file
View File

@ -0,0 +1,94 @@
package micro
import (
"reflect"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/flow"
"go.unistack.org/micro/v3/fsm"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/resolver"
"go.unistack.org/micro/v3/router"
"go.unistack.org/micro/v3/selector"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/store"
"go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
)
func As(b any, target any) bool {
if b == nil {
return false
}
if target == nil {
return false
}
val := reflect.ValueOf(target)
typ := val.Type()
if typ.Kind() != reflect.Ptr || val.IsNil() {
return false
}
targetType := typ.Elem()
if targetType.Kind() != reflect.Interface {
switch {
case targetType.Implements(brokerType):
break
case targetType.Implements(loggerType):
break
case targetType.Implements(clientType):
break
case targetType.Implements(serverType):
break
case targetType.Implements(codecType):
break
case targetType.Implements(flowType):
break
case targetType.Implements(fsmType):
break
case targetType.Implements(meterType):
break
case targetType.Implements(registerType):
break
case targetType.Implements(resolverType):
break
case targetType.Implements(selectorType):
break
case targetType.Implements(storeType):
break
case targetType.Implements(syncType):
break
case targetType.Implements(serviceType):
break
case targetType.Implements(routerType):
break
default:
return false
}
}
if reflect.TypeOf(b).AssignableTo(targetType) {
val.Elem().Set(reflect.ValueOf(b))
return true
}
return false
}
var brokerType = reflect.TypeOf((*broker.Broker)(nil)).Elem()
var loggerType = reflect.TypeOf((*logger.Logger)(nil)).Elem()
var clientType = reflect.TypeOf((*client.Client)(nil)).Elem()
var serverType = reflect.TypeOf((*server.Server)(nil)).Elem()
var codecType = reflect.TypeOf((*codec.Codec)(nil)).Elem()
var flowType = reflect.TypeOf((*flow.Flow)(nil)).Elem()
var fsmType = reflect.TypeOf((*fsm.FSM)(nil)).Elem()
var meterType = reflect.TypeOf((*meter.Meter)(nil)).Elem()
var registerType = reflect.TypeOf((*register.Register)(nil)).Elem()
var resolverType = reflect.TypeOf((*resolver.Resolver)(nil)).Elem()
var routerType = reflect.TypeOf((*router.Router)(nil)).Elem()
var selectorType = reflect.TypeOf((*selector.Selector)(nil)).Elem()
var storeType = reflect.TypeOf((*store.Store)(nil)).Elem()
var syncType = reflect.TypeOf((*sync.Sync)(nil)).Elem()
var tracerType = reflect.TypeOf((*tracer.Tracer)(nil)).Elem()
var serviceType = reflect.TypeOf((*Service)(nil)).Elem()

115
micro_test.go Normal file
View File

@ -0,0 +1,115 @@
package micro
import (
"context"
"fmt"
"reflect"
"testing"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/fsm"
)
func TestAs(t *testing.T) {
var b *bro
broTarget := &bro{name: "kafka"}
fsmTarget := &fsmT{name: "fsm"}
testCases := []struct {
b any
target any
match bool
want any
}{
{
broTarget,
&b,
true,
broTarget,
},
{
nil,
&b,
false,
nil,
},
{
fsmTarget,
&b,
false,
nil,
},
}
for i, tc := range testCases {
name := fmt.Sprintf("%d:As(Errorf(..., %v), %v)", i, tc.b, tc.target)
// Clear the target pointer, in case it was set in a previous test.
rtarget := reflect.ValueOf(tc.target)
rtarget.Elem().Set(reflect.Zero(reflect.TypeOf(tc.target).Elem()))
t.Run(name, func(t *testing.T) {
match := As(tc.b, tc.target)
if match != tc.match {
t.Fatalf("match: got %v; want %v", match, tc.match)
}
if !match {
return
}
if got := rtarget.Elem().Interface(); got != tc.want {
t.Fatalf("got %#v, want %#v", got, tc.want)
}
})
}
}
type bro struct {
name string
}
func (p *bro) Name() string { return p.name }
func (p *bro) Init(opts ...broker.Option) error { return nil }
// Options returns broker options
func (p *bro) Options() broker.Options { return broker.Options{} }
// Address return configured address
func (p *bro) Address() string { return "" }
// Connect connects to broker
func (p *bro) Connect(ctx context.Context) error { return nil }
// Disconnect disconnect from broker
func (p *bro) Disconnect(ctx context.Context) error { return nil }
// Publish message, msg can be single broker.Message or []broker.Message
func (p *bro) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
return nil
}
// BatchPublish messages to broker with multiple topics
func (p *bro) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
return nil
}
// BatchSubscribe subscribes to topic messages via handler
func (p *bro) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil
}
// Subscribe subscribes to topic message via handler
func (p *bro) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil
}
// String type of broker
func (p *bro) String() string { return p.name }
type fsmT struct {
name string
}
func (f *fsmT) Start(ctx context.Context, a interface{}, o ...Option) (interface{}, error) {
return nil, nil
}
func (f *fsmT) Current() string { return f.name }
func (f *fsmT) Reset() {}
func (f *fsmT) State(s string, sf fsm.StateFunc) {}

View File

@ -6,6 +6,7 @@ import (
"time"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/util/id"
)
@ -16,7 +17,7 @@ var (
type node struct {
LastSeen time.Time
*Node
*register.Node
TTL time.Duration
}
@ -25,23 +26,23 @@ type record struct {
Version string
Metadata map[string]string
Nodes map[string]*node
Endpoints []*Endpoint
Endpoints []*register.Endpoint
}
type memory struct {
sync.RWMutex
records map[string]services
watchers map[string]*watcher
opts Options
opts register.Options
}
// services is a KV map with service name as the key and a map of records as the value
type services map[string]map[string]*record
// NewRegister returns an initialized in-memory register
func NewRegister(opts ...Option) Register {
func NewRegister(opts ...register.Option) register.Register {
r := &memory{
opts: NewOptions(opts...),
opts: register.NewOptions(opts...),
records: make(map[string]services),
watchers: make(map[string]*watcher),
}
@ -75,7 +76,7 @@ func (m *memory) ttlPrune() {
}
}
func (m *memory) sendEvent(r *Result) {
func (m *memory) sendEvent(r *register.Result) {
m.RLock()
watchers := make([]*watcher, 0, len(m.watchers))
for _, w := range m.watchers {
@ -106,7 +107,7 @@ func (m *memory) Disconnect(ctx context.Context) error {
return nil
}
func (m *memory) Init(opts ...Option) error {
func (m *memory) Init(opts ...register.Option) error {
for _, o := range opts {
o(&m.opts)
}
@ -118,15 +119,15 @@ func (m *memory) Init(opts ...Option) error {
return nil
}
func (m *memory) Options() Options {
func (m *memory) Options() register.Options {
return m.opts
}
func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOption) error {
func (m *memory) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error {
m.Lock()
defer m.Unlock()
options := NewRegisterOptions(opts...)
options := register.NewRegisterOptions(opts...)
// get the services for this domain from the register
srvs, ok := m.records[options.Domain]
@ -153,7 +154,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
m.opts.Logger.Debugf(m.opts.Context, "Register added new service: %s, version: %s", s.Name, s.Version)
}
m.records[options.Domain] = srvs
go m.sendEvent(&Result{Action: "create", Service: s})
go m.sendEvent(&register.Result{Action: "create", Service: s})
}
var addedNodes bool
@ -176,7 +177,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
// add the node
srvs[s.Name][s.Version].Nodes[n.ID] = &node{
Node: &Node{
Node: &register.Node{
ID: n.ID,
Address: n.Address,
Metadata: metadata,
@ -192,7 +193,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debugf(m.opts.Context, "Register added new node to service: %s, version: %s", s.Name, s.Version)
}
go m.sendEvent(&Result{Action: "update", Service: s})
go m.sendEvent(&register.Result{Action: "update", Service: s})
} else {
// refresh TTL and timestamp
for _, n := range s.Nodes {
@ -208,11 +209,11 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
return nil
}
func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterOption) error {
func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...register.DeregisterOption) error {
m.Lock()
defer m.Unlock()
options := NewDeregisterOptions(opts...)
options := register.NewDeregisterOptions(opts...)
// domain is set in metadata so it can be passed to watchers
if s.Metadata == nil {
@ -252,7 +253,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO
// is cleanup
if len(version.Nodes) > 0 {
m.records[options.Domain][s.Name][s.Version] = version
go m.sendEvent(&Result{Action: "update", Service: s})
go m.sendEvent(&register.Result{Action: "update", Service: s})
return nil
}
@ -260,7 +261,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO
// register and exit
if len(versions) == 1 {
delete(m.records[options.Domain], s.Name)
go m.sendEvent(&Result{Action: "delete", Service: s})
go m.sendEvent(&register.Result{Action: "delete", Service: s})
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debugf(m.opts.Context, "Register removed service: %s", s.Name)
@ -270,7 +271,7 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO
// there are other versions of the service running, so only remove this version of it
delete(m.records[options.Domain][s.Name], s.Version)
go m.sendEvent(&Result{Action: "delete", Service: s})
go m.sendEvent(&register.Result{Action: "delete", Service: s})
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debugf(m.opts.Context, "Register removed service: %s, version: %s", s.Name, s.Version)
}
@ -278,20 +279,20 @@ func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterO
return nil
}
func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupOption) ([]*Service, error) {
options := NewLookupOptions(opts...)
func (m *memory) LookupService(ctx context.Context, name string, opts ...register.LookupOption) ([]*register.Service, error) {
options := register.NewLookupOptions(opts...)
// if it's a wildcard domain, return from all domains
if options.Domain == WildcardDomain {
if options.Domain == register.WildcardDomain {
m.RLock()
recs := m.records
m.RUnlock()
var services []*Service
var services []*register.Service
for domain := range recs {
srvs, err := m.LookupService(ctx, name, append(opts, LookupDomain(domain))...)
if err == ErrNotFound {
srvs, err := m.LookupService(ctx, name, append(opts, register.LookupDomain(domain))...)
if err == register.ErrNotFound {
continue
} else if err != nil {
return nil, err
@ -300,7 +301,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO
}
if len(services) == 0 {
return nil, ErrNotFound
return nil, register.ErrNotFound
}
return services, nil
}
@ -311,17 +312,17 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO
// check the domain exists
services, ok := m.records[options.Domain]
if !ok {
return nil, ErrNotFound
return nil, register.ErrNotFound
}
// check the service exists
versions, ok := services[name]
if !ok || len(versions) == 0 {
return nil, ErrNotFound
return nil, register.ErrNotFound
}
// serialize the response
result := make([]*Service, len(versions))
result := make([]*register.Service, len(versions))
var i int
@ -333,19 +334,19 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupO
return result, nil
}
func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) {
options := NewListOptions(opts...)
func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) {
options := register.NewListOptions(opts...)
// if it's a wildcard domain, list from all domains
if options.Domain == WildcardDomain {
if options.Domain == register.WildcardDomain {
m.RLock()
recs := m.records
m.RUnlock()
var services []*Service
var services []*register.Service
for domain := range recs {
srvs, err := m.ListServices(ctx, append(opts, ListDomain(domain))...)
srvs, err := m.ListServices(ctx, append(opts, register.ListDomain(domain))...)
if err != nil {
return nil, err
}
@ -361,11 +362,11 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi
// ensure the domain exists
services, ok := m.records[options.Domain]
if !ok {
return make([]*Service, 0), nil
return make([]*register.Service, 0), nil
}
// serialize the result, each version counts as an individual service
var result []*Service
var result []*register.Service
for _, service := range services {
for _, version := range service {
@ -376,16 +377,16 @@ func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Servi
return result, nil
}
func (m *memory) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
func (m *memory) Watch(ctx context.Context, opts ...register.WatchOption) (register.Watcher, error) {
id, err := id.New()
if err != nil {
return nil, err
}
wo := NewWatchOptions(opts...)
wo := register.NewWatchOptions(opts...)
// construct the watcher
w := &watcher{
exit: make(chan bool),
res: make(chan *Result),
res: make(chan *register.Result),
id: id,
wo: wo,
}
@ -406,13 +407,13 @@ func (m *memory) String() string {
}
type watcher struct {
res chan *Result
res chan *register.Result
exit chan bool
wo WatchOptions
wo register.WatchOptions
id string
}
func (m *watcher) Next() (*Result, error) {
func (m *watcher) Next() (*register.Result, error) {
for {
select {
case r := <-m.res:
@ -429,15 +430,15 @@ func (m *watcher) Next() (*Result, error) {
if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 {
domain = r.Service.Metadata["domain"]
} else {
domain = DefaultDomain
domain = register.DefaultDomain
}
// only send the event if watching the wildcard or this specific domain
if m.wo.Domain == WildcardDomain || m.wo.Domain == domain {
if m.wo.Domain == register.WildcardDomain || m.wo.Domain == domain {
return r, nil
}
case <-m.exit:
return nil, ErrWatcherStopped
return nil, register.ErrWatcherStopped
}
}
}
@ -451,7 +452,7 @@ func (m *watcher) Stop() {
}
}
func serviceToRecord(s *Service, ttl time.Duration) *record {
func serviceToRecord(s *register.Service, ttl time.Duration) *record {
metadata := make(map[string]string, len(s.Metadata))
for k, v := range s.Metadata {
metadata[k] = v
@ -466,7 +467,7 @@ func serviceToRecord(s *Service, ttl time.Duration) *record {
}
}
endpoints := make([]*Endpoint, len(s.Endpoints))
endpoints := make([]*register.Endpoint, len(s.Endpoints))
for i, e := range s.Endpoints {
endpoints[i] = e
}
@ -480,7 +481,7 @@ func serviceToRecord(s *Service, ttl time.Duration) *record {
}
}
func recordToService(r *record, domain string) *Service {
func recordToService(r *record, domain string) *register.Service {
metadata := make(map[string]string, len(r.Metadata))
for k, v := range r.Metadata {
metadata[k] = v
@ -489,14 +490,14 @@ func recordToService(r *record, domain string) *Service {
// set the domain in metadata so it can be determined when a wildcard query is performed
metadata["domain"] = domain
endpoints := make([]*Endpoint, len(r.Endpoints))
endpoints := make([]*register.Endpoint, len(r.Endpoints))
for i, e := range r.Endpoints {
md := make(map[string]string, len(e.Metadata))
for k, v := range e.Metadata {
md[k] = v
}
endpoints[i] = &Endpoint{
endpoints[i] = &register.Endpoint{
Name: e.Name,
Request: e.Request,
Response: e.Response,
@ -504,7 +505,7 @@ func recordToService(r *record, domain string) *Service {
}
}
nodes := make([]*Node, len(r.Nodes))
nodes := make([]*register.Node, len(r.Nodes))
i := 0
for _, n := range r.Nodes {
md := make(map[string]string, len(n.Metadata))
@ -512,7 +513,7 @@ func recordToService(r *record, domain string) *Service {
md[k] = v
}
nodes[i] = &Node{
nodes[i] = &register.Node{
ID: n.ID,
Address: n.Address,
Metadata: md,
@ -520,7 +521,7 @@ func recordToService(r *record, domain string) *Service {
i++
}
return &Service{
return &register.Service{
Name: r.Name,
Version: r.Version,
Metadata: metadata,

View File

@ -6,14 +6,16 @@ import (
"sync"
"testing"
"time"
"go.unistack.org/micro/v3/register"
)
var testData = map[string][]*Service{
var testData = map[string][]*register.Service{
"foo": {
{
Name: "foo",
Version: "1.0.0",
Nodes: []*Node{
Nodes: []*register.Node{
{
ID: "foo-1.0.0-123",
Address: "localhost:9999",
@ -27,7 +29,7 @@ var testData = map[string][]*Service{
{
Name: "foo",
Version: "1.0.1",
Nodes: []*Node{
Nodes: []*register.Node{
{
ID: "foo-1.0.1-321",
Address: "localhost:6666",
@ -37,7 +39,7 @@ var testData = map[string][]*Service{
{
Name: "foo",
Version: "1.0.3",
Nodes: []*Node{
Nodes: []*register.Node{
{
ID: "foo-1.0.3-345",
Address: "localhost:8888",
@ -49,7 +51,7 @@ var testData = map[string][]*Service{
{
Name: "bar",
Version: "default",
Nodes: []*Node{
Nodes: []*register.Node{
{
ID: "bar-1.0.0-123",
Address: "localhost:9999",
@ -63,7 +65,7 @@ var testData = map[string][]*Service{
{
Name: "bar",
Version: "latest",
Nodes: []*Node{
Nodes: []*register.Node{
{
ID: "bar-1.0.1-321",
Address: "localhost:6666",
@ -78,7 +80,7 @@ func TestMemoryRegistry(t *testing.T) {
ctx := context.TODO()
m := NewRegister()
fn := func(k string, v []*Service) {
fn := func(k string, v []*register.Service) {
services, err := m.LookupService(ctx, k)
if err != nil {
t.Errorf("Unexpected error getting service %s: %v", k, err)
@ -155,8 +157,8 @@ func TestMemoryRegistry(t *testing.T) {
for _, v := range testData {
for _, service := range v {
services, err := m.LookupService(ctx, service.Name)
if err != ErrNotFound {
t.Errorf("Expected error: %v, got: %v", ErrNotFound, err)
if err != register.ErrNotFound {
t.Errorf("Expected error: %v, got: %v", register.ErrNotFound, err)
}
if len(services) != 0 {
t.Errorf("Expected %d services for %s, got %d", 0, service.Name, len(services))
@ -171,7 +173,7 @@ func TestMemoryRegistryTTL(t *testing.T) {
for _, v := range testData {
for _, service := range v {
if err := m.Register(ctx, service, RegisterTTL(time.Millisecond)); err != nil {
if err := m.Register(ctx, service, register.RegisterTTL(time.Millisecond)); err != nil {
t.Fatal(err)
}
}
@ -200,7 +202,7 @@ func TestMemoryRegistryTTLConcurrent(t *testing.T) {
ctx := context.TODO()
for _, v := range testData {
for _, service := range v {
if err := m.Register(ctx, service, RegisterTTL(waitTime/2)); err != nil {
if err := m.Register(ctx, service, register.RegisterTTL(waitTime/2)); err != nil {
t.Fatal(err)
}
}
@ -249,34 +251,34 @@ func TestMemoryWildcard(t *testing.T) {
m := NewRegister()
ctx := context.TODO()
testSrv := &Service{Name: "foo", Version: "1.0.0"}
testSrv := &register.Service{Name: "foo", Version: "1.0.0"}
if err := m.Register(ctx, testSrv, RegisterDomain("one")); err != nil {
if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil {
t.Fatalf("Register err: %v", err)
}
if err := m.Register(ctx, testSrv, RegisterDomain("two")); err != nil {
if err := m.Register(ctx, testSrv, register.RegisterDomain("two")); err != nil {
t.Fatalf("Register err: %v", err)
}
if recs, err := m.ListServices(ctx, ListDomain("one")); err != nil {
if recs, err := m.ListServices(ctx, register.ListDomain("one")); err != nil {
t.Errorf("List err: %v", err)
} else if len(recs) != 1 {
t.Errorf("Expected 1 record, got %v", len(recs))
}
if recs, err := m.ListServices(ctx, ListDomain("*")); err != nil {
if recs, err := m.ListServices(ctx, register.ListDomain("*")); err != nil {
t.Errorf("List err: %v", err)
} else if len(recs) != 2 {
t.Errorf("Expected 2 records, got %v", len(recs))
}
if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("one")); err != nil {
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("one")); err != nil {
t.Errorf("Lookup err: %v", err)
} else if len(recs) != 1 {
t.Errorf("Expected 1 record, got %v", len(recs))
}
if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("*")); err != nil {
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("*")); err != nil {
t.Errorf("Lookup err: %v", err)
} else if len(recs) != 2 {
t.Errorf("Expected 2 records, got %v", len(recs))
@ -284,7 +286,7 @@ func TestMemoryWildcard(t *testing.T) {
}
func TestWatcher(t *testing.T) {
testSrv := &Service{Name: "foo", Version: "1.0.0"}
testSrv := &register.Service{Name: "foo", Version: "1.0.0"}
ctx := context.TODO()
m := NewRegister()

72
register/noop.go Normal file
View File

@ -0,0 +1,72 @@
package register
import "context"
type noop struct {
opts Options
}
func NewRegister(opts ...Option) Register {
return &noop{
opts: NewOptions(opts...),
}
}
func (n *noop) Name() string {
return n.opts.Name
}
func (n *noop) Init(opts ...Option) error {
for _, o := range opts {
o(&n.opts)
}
return nil
}
func (n *noop) Options() Options {
return n.opts
}
func (n *noop) Connect(ctx context.Context) error {
return nil
}
func (n *noop) Disconnect(ctx context.Context) error {
return nil
}
func (n *noop) Register(ctx context.Context, service *Service, option ...RegisterOption) error {
return nil
}
func (n *noop) Deregister(ctx context.Context, service *Service, option ...DeregisterOption) error {
return nil
}
func (n *noop) LookupService(ctx context.Context, s string, option ...LookupOption) ([]*Service, error) {
return nil, nil
}
func (n *noop) ListServices(ctx context.Context, option ...ListOption) ([]*Service, error) {
return nil, nil
}
func (n *noop) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
wOpts := NewWatchOptions(opts...)
return &watcher{wo: wOpts}, nil
}
func (n *noop) String() string {
return "noop"
}
type watcher struct {
wo WatchOptions
}
func (m *watcher) Next() (*Result, error) {
return nil, nil
}
func (m *watcher) Stop() {}

View File

@ -18,7 +18,7 @@ var DefaultDomain = "micro"
var (
// DefaultRegister is the global default register
DefaultRegister = NewRegister()
DefaultRegister Register = NewRegister()
// ErrNotFound returned when LookupService is called and no services found
ErrNotFound = errors.New("service not found")
// ErrWatcherStopped returned when when watcher is stopped

View File

@ -1,5 +1,5 @@
// Package resolver resolves network names to addresses
package resolver // import "go.unistack.org/micro/v3/resolver"
package resolver
// Resolver is network resolver. It's used to find network nodes
// via the name to connect to. This is done based on Network.Name().

View File

@ -7,7 +7,7 @@ import (
var (
// DefaultRouter is the global default router
DefaultRouter = NewRouter()
DefaultRouter Router = NewRouter()
// DefaultNetwork is default micro network
DefaultNetwork = "micro"
// ErrRouteNotFound is returned when no route was found in the routing table

22
semconv/broker.go Normal file
View File

@ -0,0 +1,22 @@
package semconv
var (
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
// BrokerGroupLag specifies broker lag
BrokerGroupLag = "broker_group_lag"
)

12
semconv/cache.go Normal file
View File

@ -0,0 +1,12 @@
package semconv
var (
// CacheRequestDurationSeconds specifies meter metric name
CacheRequestDurationSeconds = "cache_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
CacheRequestLatencyMicroseconds = "cache_request_latency_microseconds"
// CacheRequestTotal specifies meter metric name
CacheRequestTotal = "cache_request_total"
// CacheRequestInflight specifies meter metric name
CacheRequestInflight = "cache_request_inflight"
)

12
semconv/client.go Normal file
View File

@ -0,0 +1,12 @@
package semconv
var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
)

12
semconv/server.go Normal file
View File

@ -0,0 +1,12 @@
package semconv
var (
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
)

View File

@ -15,6 +15,7 @@ import (
"go.unistack.org/micro/v3/network/transport"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
msync "go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
"go.unistack.org/micro/v3/util/id"
)
@ -47,7 +48,7 @@ type Options struct {
// Listener may be passed if already created
Listener net.Listener
// Wait group
Wait *sync.WaitGroup
Wait *msync.WaitGroup
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
// Metadata holds the server metadata
@ -86,6 +87,8 @@ type Options struct {
DeregisterAttempts int
// Hooks may contains SubscriberWrapper, HandlerWrapper or Server func wrapper
Hooks options.Hooks
// GracefulTimeout timeout for graceful stop server
GracefulTimeout time.Duration
}
// NewOptions returns new options struct with default or passed values
@ -108,6 +111,7 @@ func NewOptions(opts ...Option) Options {
Version: DefaultVersion,
ID: id.Must(),
Namespace: DefaultNamespace,
GracefulTimeout: DefaultGracefulTimeout,
}
for _, o := range opts {
@ -279,7 +283,7 @@ func Wait(wg *sync.WaitGroup) Option {
if wg == nil {
wg = new(sync.WaitGroup)
}
o.Wait = wg
o.Wait = msync.WrapWaitGroup(wg)
}
}
@ -321,6 +325,13 @@ func Listener(l net.Listener) Option {
// HandlerOption func
type HandlerOption func(*HandlerOptions)
// GracefulTimeout duration
func GracefulTimeout(td time.Duration) Option {
return func(o *Options) {
o.GracefulTimeout = td
}
}
// HandlerOptions struct
type HandlerOptions struct {
// Context holds external options

View File

@ -11,7 +11,7 @@ import (
)
// DefaultServer default server
var DefaultServer = NewServer()
var DefaultServer Server = NewServer()
var (
// DefaultAddress will be used if no address passed, use secure localhost
@ -34,6 +34,8 @@ var (
DefaultMaxMsgRecvSize = 1024 * 1024 * 4 // 4Mb
// DefaultMaxMsgSendSize holds default max send size
DefaultMaxMsgSendSize = 1024 * 1024 * 4 // 4Mb
// DefaultGracefulTimeout default time for graceful stop
DefaultGracefulTimeout = 5 * time.Second
)
// Server is a simple micro server abstraction

View File

@ -72,8 +72,8 @@ func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...se
}
type service struct {
sync.RWMutex
opts Options
sync.RWMutex
}
// NewService creates and returns a new Service based on the packages within.
@ -376,71 +376,15 @@ func (s *service) Run() error {
return s.Stop()
}
func getNameIndex(n string, ifaces interface{}) int {
switch values := ifaces.(type) {
case []router.Router:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
case []register.Register:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
case []store.Store:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
case []tracer.Tracer:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
case []server.Server:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
case []config.Config:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
case []meter.Meter:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
case []broker.Broker:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
case []client.Client:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
/*
case []logger.Logger:
for idx, iface := range values {
if iface.Name() == n {
return idx
}
}
*/
}
type Namer interface {
Name() string
}
func getNameIndex[T Namer](n string, ifaces []T) int {
for idx, iface := range ifaces {
if iface.Name() == n {
return idx
}
}
return 0
}

View File

@ -22,13 +22,14 @@ func TestClient(t *testing.T) {
c2 := client.NewClient(client.Name("test2"))
svc := NewService(Client(c1, c2))
if err := svc.Init(); err != nil {
t.Fatal(err)
}
x1 := svc.Client("test2")
if x1.Name() != "test2" {
t.Fatal("invalid client")
t.Fatalf("invalid client %#+v", svc.Options().Clients)
}
}
@ -40,12 +41,11 @@ func (ti *testItem) Name() string {
return ti.name
}
func TestGetNameIndex(t *testing.T) {
item1 := &testItem{name: "first"}
item2 := &testItem{name: "second"}
items := []interface{}{item1, item2}
if idx := getNameIndex("second", items); idx != 1 {
t.Fatalf("getNameIndex func error, item not found")
func Test_getNameIndex(t *testing.T) {
items := []*testItem{{name: "test1"}, {name: "test2"}}
idx := getNameIndex("test2", items)
if items[idx].Name() != "test2" {
t.Fatal("getNameIndex wrong")
}
}

View File

@ -144,6 +144,10 @@ type ReadOptions struct {
Context context.Context
// Namespace holds namespace
Namespace string
// Name holds mnemonic name
Name string
// Timeout specifies max timeout for operation
Timeout time.Duration
}
// NewReadOptions fills ReadOptions struct with opts slice
@ -158,6 +162,20 @@ func NewReadOptions(opts ...ReadOption) ReadOptions {
// ReadOption sets values in ReadOptions
type ReadOption func(r *ReadOptions)
// ReadTimeout pass timeout to ReadOptions
func ReadTimeout(td time.Duration) ReadOption {
return func(o *ReadOptions) {
o.Timeout = td
}
}
// ReadName pass name to ReadOptions
func ReadName(name string) ReadOption {
return func(o *ReadOptions) {
o.Name = name
}
}
// ReadContext pass context.Context to ReadOptions
func ReadContext(ctx context.Context) ReadOption {
return func(o *ReadOptions) {
@ -180,6 +198,10 @@ type WriteOptions struct {
Metadata metadata.Metadata
// Namespace holds namespace
Namespace string
// Name holds mnemonic name
Name string
// Timeout specifies max timeout for operation
Timeout time.Duration
// TTL specifies key TTL
TTL time.Duration
}
@ -224,12 +246,30 @@ func WriteNamespace(ns string) WriteOption {
}
}
// WriteName pass name to WriteOptions
func WriteName(name string) WriteOption {
return func(o *WriteOptions) {
o.Name = name
}
}
// WriteTimeout pass timeout to WriteOptions
func WriteTimeout(td time.Duration) WriteOption {
return func(o *WriteOptions) {
o.Timeout = td
}
}
// DeleteOptions configures an individual Delete operation
type DeleteOptions struct {
// Context holds external options
Context context.Context
// Namespace holds namespace
Namespace string
// Name holds mnemonic name
Name string
// Timeout specifies max timeout for operation
Timeout time.Duration
}
// NewDeleteOptions fills DeleteOptions struct with opts slice
@ -258,14 +298,32 @@ func DeleteNamespace(ns string) DeleteOption {
}
}
// DeleteName pass name to DeleteOptions
func DeleteName(name string) DeleteOption {
return func(o *DeleteOptions) {
o.Name = name
}
}
// DeleteTimeout pass timeout to DeleteOptions
func DeleteTimeout(td time.Duration) DeleteOption {
return func(o *DeleteOptions) {
o.Timeout = td
}
}
// ListOptions configures an individual List operation
type ListOptions struct {
Context context.Context
Prefix string
Suffix string
Namespace string
Limit uint
Offset uint
// Name holds mnemonic name
Name string
Limit uint
Offset uint
// Timeout specifies max timeout for operation
Timeout time.Duration
}
// NewListOptions fills ListOptions struct with opts slice
@ -322,12 +380,23 @@ func ListNamespace(ns string) ListOption {
}
}
// ListTimeout pass timeout to ListOptions
func ListTimeout(td time.Duration) ListOption {
return func(o *ListOptions) {
o.Timeout = td
}
}
// ExistsOptions holds options for Exists method
type ExistsOptions struct {
// Context holds external options
Context context.Context
// Namespace contains namespace
Namespace string
// Name holds mnemonic name
Name string
// Timeout specifies max timeout for operation
Timeout time.Duration
}
// ExistsOption specifies Exists call options
@ -358,6 +427,20 @@ func ExistsNamespace(ns string) ExistsOption {
}
}
// ExistsName pass name to exist options
func ExistsName(name string) ExistsOption {
return func(o *ExistsOptions) {
o.Name = name
}
}
// ExistsTimeout timeout to ListOptions
func ExistsTimeout(td time.Duration) ExistsOption {
return func(o *ExistsOptions) {
o.Timeout = td
}
}
/*
// WrapStore adds a store Wrapper to a list of options passed into the store
func WrapStore(w Wrapper) Option {

View File

@ -12,7 +12,7 @@ var (
// ErrInvalidKey is returned when a key has empty or have invalid format
ErrInvalidKey = errors.New("invalid key")
// DefaultStore is the global default store
DefaultStore = NewStore()
DefaultStore Store = NewStore()
// DefaultSeparator is the gloabal default key parts separator
DefaultSeparator = "/"
)

69
sync/waitgroup.go Normal file
View File

@ -0,0 +1,69 @@
package sync
import (
"context"
"sync"
)
type WaitGroup struct {
wg *sync.WaitGroup
c int
mu sync.Mutex
}
func WrapWaitGroup(wg *sync.WaitGroup) *WaitGroup {
g := &WaitGroup{
wg: wg,
}
return g
}
func NewWaitGroup() *WaitGroup {
var wg sync.WaitGroup
return WrapWaitGroup(&wg)
}
func (g *WaitGroup) Add(n int) {
g.mu.Lock()
g.c += n
g.wg.Add(n)
g.mu.Unlock()
}
func (g *WaitGroup) Done() {
g.mu.Lock()
g.c += -1
g.wg.Add(-1)
g.mu.Unlock()
}
func (g *WaitGroup) Wait() {
g.wg.Wait()
}
func (g *WaitGroup) WaitContext(ctx context.Context) {
done := make(chan struct{})
go func() {
g.wg.Wait()
close(done)
}()
select {
case <-ctx.Done():
g.mu.Lock()
g.wg.Add(-g.c)
<-done
g.wg.Add(g.c)
g.mu.Unlock()
return
case <-done:
return
}
}
func (g *WaitGroup) Waiters() int {
g.mu.Lock()
c := g.c
g.mu.Unlock()
return c
}

37
sync/waitgroup_test.go Normal file
View File

@ -0,0 +1,37 @@
package sync
import (
"context"
"testing"
"time"
)
func TestWaitGroupContext(t *testing.T) {
wg := NewWaitGroup()
_ = t
wg.Add(1)
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
wg.WaitContext(ctx)
}
func TestWaitGroupReuse(t *testing.T) {
wg := NewWaitGroup()
defer func() {
if wg.Waiters() != 0 {
t.Fatal("lost goroutines")
}
}()
wg.Add(1)
defer wg.Done()
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
wg.WaitContext(ctx)
wg.Add(1)
defer wg.Done()
ctx, cancel = context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
wg.WaitContext(ctx)
}

139
tracer/memory/memory.go Normal file
View File

@ -0,0 +1,139 @@
package memory
import (
"context"
"time"
"go.unistack.org/micro/v3/tracer"
"go.unistack.org/micro/v3/util/id"
)
var _ tracer.Tracer = (*Tracer)(nil)
type Tracer struct {
opts tracer.Options
spans []tracer.Span
}
func (t *Tracer) Spans() []tracer.Span {
return t.spans
}
func (t *Tracer) Start(ctx context.Context, name string, opts ...tracer.SpanOption) (context.Context, tracer.Span) {
options := tracer.NewSpanOptions(opts...)
span := &Span{
name: name,
ctx: ctx,
tracer: t,
kind: options.Kind,
startTime: time.Now(),
}
span.spanID.s, _ = id.New()
span.traceID.s, _ = id.New()
if span.ctx == nil {
span.ctx = context.Background()
}
t.spans = append(t.spans, span)
return tracer.NewSpanContext(ctx, span), span
}
func (t *Tracer) Flush(_ context.Context) error {
return nil
}
func (t *Tracer) Init(opts ...tracer.Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *Tracer) Name() string {
return t.opts.Name
}
type noopStringer struct {
s string
}
func (s noopStringer) String() string {
return s.s
}
type Span struct {
ctx context.Context
tracer tracer.Tracer
name string
statusMsg string
startTime time.Time
finishTime time.Time
traceID noopStringer
spanID noopStringer
events []*Event
labels []interface{}
logs []interface{}
kind tracer.SpanKind
status tracer.SpanStatus
}
func (s *Span) Finish(_ ...tracer.SpanOption) {
s.finishTime = time.Now()
}
func (s *Span) Context() context.Context {
return s.ctx
}
func (s *Span) Tracer() tracer.Tracer {
return s.tracer
}
type Event struct {
name string
labels []interface{}
}
func (s *Span) AddEvent(name string, opts ...tracer.EventOption) {
options := tracer.NewEventOptions(opts...)
s.events = append(s.events, &Event{name: name, labels: options.Labels})
}
func (s *Span) SetName(name string) {
s.name = name
}
func (s *Span) AddLogs(kv ...interface{}) {
s.logs = append(s.logs, kv...)
}
func (s *Span) AddLabels(kv ...interface{}) {
s.labels = append(s.labels, kv...)
}
func (s *Span) Kind() tracer.SpanKind {
return s.kind
}
func (s *Span) TraceID() string {
return s.traceID.String()
}
func (s *Span) SpanID() string {
return s.spanID.String()
}
func (s *Span) Status() (tracer.SpanStatus, string) {
return s.status, s.statusMsg
}
func (s *Span) SetStatus(st tracer.SpanStatus, msg string) {
s.status = st
s.statusMsg = msg
}
// NewTracer returns new memory tracer
func NewTracer(opts ...tracer.Option) *Tracer {
return &Tracer{
opts: tracer.NewOptions(opts...),
}
}

View File

@ -0,0 +1,38 @@
package memory
import (
"bytes"
"context"
"fmt"
"strings"
"testing"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/logger/slog"
"go.unistack.org/micro/v3/tracer"
)
func TestLoggerWithTracer(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
logger.DefaultLogger = slog.NewLogger(logger.WithOutput(buf))
if err := logger.Init(); err != nil {
t.Fatal(err)
}
var span tracer.Span
tr := NewTracer()
ctx, span = tr.Start(ctx, "test1")
logger.Error(ctx, "my test error", fmt.Errorf("error"))
if !strings.Contains(buf.String(), span.TraceID()) {
t.Fatalf("log does not contains trace id: %s", buf.Bytes())
}
_, _ = tr.Start(ctx, "test2")
for _, s := range tr.Spans() {
_ = s
}
}

View File

@ -2,6 +2,8 @@ package tracer
import (
"context"
"go.unistack.org/micro/v3/util/id"
)
var _ Tracer = (*noopTracer)(nil)
@ -24,6 +26,8 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption)
labels: options.Labels,
kind: options.Kind,
}
span.spanID.s, _ = id.New()
span.traceID.s, _ = id.New()
if span.ctx == nil {
span.ctx = context.Background()
}
@ -31,6 +35,14 @@ func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption)
return NewSpanContext(ctx, span), span
}
type noopStringer struct {
s string
}
func (s noopStringer) String() string {
return s.s
}
func (t *noopTracer) Flush(ctx context.Context) error {
return nil
}
@ -56,6 +68,8 @@ type noopSpan struct {
tracer Tracer
name string
statusMsg string
traceID noopStringer
spanID noopStringer
events []*noopEvent
labels []interface{}
logs []interface{}
@ -63,7 +77,15 @@ type noopSpan struct {
status SpanStatus
}
func (s *noopSpan) Finish(opts ...SpanOption) {
func (s *noopSpan) TraceID() string {
return s.traceID.String()
}
func (s *noopSpan) SpanID() string {
return s.spanID.String()
}
func (s *noopSpan) Finish(_ ...SpanOption) {
}
func (s *noopSpan) Context() context.Context {

View File

@ -100,13 +100,13 @@ type EventOption func(o *EventOptions)
func WithEventLabels(kv ...interface{}) EventOption {
return func(o *EventOptions) {
o.Labels = kv
o.Labels = append(o.Labels, kv...)
}
}
func WithSpanLabels(kv ...interface{}) SpanOption {
return func(o *SpanOptions) {
o.Labels = kv
o.Labels = append(o.Labels, kv...)
}
}
@ -159,7 +159,8 @@ func NewSpanOptions(opts ...SpanOption) SpanOptions {
// NewOptions returns default options
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Logger: logger.DefaultLogger,
Context: context.Background(),
}
for _, o := range opts {
o(&options)

View File

@ -3,12 +3,32 @@ package tracer // import "go.unistack.org/micro/v3/tracer"
import (
"context"
"fmt"
"sort"
"go.unistack.org/micro/v3/logger"
)
// DefaultTracer is the global default tracer
var DefaultTracer = NewTracer()
var DefaultTracer Tracer = NewTracer()
var (
// TraceIDKey is the key used for the trace id in the log call
TraceIDKey = "trace-id"
// SpanIDKey is the key used for the span id in the log call
SpanIDKey = "span-id"
)
func init() {
logger.DefaultContextAttrFuncs = append(logger.DefaultContextAttrFuncs,
func(ctx context.Context) []interface{} {
if span, ok := SpanFromContext(ctx); ok {
return []interface{}{
TraceIDKey, span.TraceID(),
SpanIDKey, span.SpanID(),
}
}
return nil
})
}
// Tracer is an interface for distributed tracing
type Tracer interface {
@ -43,38 +63,8 @@ type Span interface {
AddLogs(kv ...interface{})
// Kind returns span kind
Kind() SpanKind
}
// sort labels alphabeticaly by label name
type byKey []interface{}
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) }
func (k byKey) Swap(i, j int) {
k[i*2], k[j*2] = k[j*2], k[i*2]
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
}
func UniqLabels(labels []interface{}) []interface{} {
if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
}
if len(labels) > 2 {
sort.Sort(byKey(labels))
idx := 0
for {
if labels[idx] == labels[idx+2] {
copy(labels[idx:], labels[idx+2:])
labels = labels[:len(labels)-2]
} else {
idx += 2
}
if idx+2 >= len(labels) {
break
}
}
}
return labels
// TraceID returns trace id
TraceID() string
// SpanID returns span id
SpanID() string
}

View File

@ -1,6 +1,7 @@
package reflect // import "go.unistack.org/micro/v3/util/reflect"
package reflect
import (
"encoding/json"
"errors"
"fmt"
"reflect"
@ -45,15 +46,23 @@ func SliceAppend(b bool) Option {
// Merge merges map[string]interface{} to destination struct
func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
var err error
var sval reflect.Value
var fname string
options := Options{}
for _, o := range opts {
o(&options)
}
if unmarshaler, ok := dst.(json.Unmarshaler); ok {
buf, err := json.Marshal(mp)
if err == nil {
err = unmarshaler.UnmarshalJSON(buf)
}
return err
}
var err error
var sval reflect.Value
var fname string
dviface := reflect.ValueOf(dst)
if dviface.Kind() == reflect.Ptr {
dviface = dviface.Elem()

40
util/sort/sort.go Normal file
View File

@ -0,0 +1,40 @@
package sort
import (
"fmt"
"sort"
)
// sort labels alphabeticaly by label name
type byKey []interface{}
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) }
func (k byKey) Swap(i, j int) {
k[i*2], k[j*2] = k[j*2], k[i*2]
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
}
func Uniq(labels []interface{}) []interface{} {
if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
}
if len(labels) > 2 {
sort.Sort(byKey(labels))
idx := 0
for {
if labels[idx] == labels[idx+2] {
copy(labels[idx:], labels[idx+2:])
labels = labels[:len(labels)-2]
} else {
idx += 2
}
if idx+2 >= len(labels) {
break
}
}
}
return labels
}

View File

@ -35,8 +35,8 @@ func TestUnmarshalJSON(t *testing.T) {
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
if err != nil {
t.Fatal(err)
} else if v.TTL != 31536000000000000 {
t.Fatalf("invalid duration %v != 31536000000000000", v.TTL)
} else if v.TTL != 31622400000000000 {
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
}
}
@ -55,7 +55,7 @@ func TestParseDuration(t *testing.T) {
if err != nil {
t.Fatalf("ParseDuration error: %v", err)
}
if td.String() != "8760h0m0s" {
t.Fatalf("ParseDuration 1y != 8760h0m0s : %s", td.String())
if td.String() != "8784h0m0s" {
t.Fatalf("ParseDuration 1y != 8784h0m0s : %s", td.String())
}
}

25
util/xpool/pool.go Normal file
View File

@ -0,0 +1,25 @@
package pool
import "sync"
type Pool[T any] struct {
p *sync.Pool
}
func NewPool[T any](fn func() T) Pool[T] {
return Pool[T]{
p: &sync.Pool{
New: func() interface{} {
return fn()
},
},
}
}
func (p Pool[T]) Get() T {
return p.p.Get().(T)
}
func (p Pool[T]) Put(t T) {
p.p.Put(t)
}

27
util/xpool/pool_test.go Normal file
View File

@ -0,0 +1,27 @@
package pool
import (
"bytes"
"strings"
"testing"
)
func TestBytes(t *testing.T) {
p := NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) })
b := p.Get()
b.Write([]byte(`test`))
if b.String() != "test" {
t.Fatal("pool not works")
}
p.Put(b)
}
func TestStrings(t *testing.T) {
p := NewPool(func() *strings.Builder { return &strings.Builder{} })
b := p.Get()
b.Write([]byte(`test`))
if b.String() != "test" {
t.Fatal("pool not works")
}
p.Put(b)
}