Merge branch 'master' into test_coverage
This commit is contained in:
		
							
								
								
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -1,6 +1,8 @@ | |||||||
| # Develop tools | # Develop tools | ||||||
| /.vscode/ | /.vscode/ | ||||||
| /.idea/ | /.idea/ | ||||||
|  | .idea | ||||||
|  | .vscode | ||||||
|  |  | ||||||
| # Binaries for programs and plugins | # Binaries for programs and plugins | ||||||
| *.exe | *.exe | ||||||
| @@ -13,6 +15,7 @@ | |||||||
| _obj | _obj | ||||||
| _test | _test | ||||||
| _build | _build | ||||||
|  | .DS_Store | ||||||
|  |  | ||||||
| # Architecture specific extensions/prefixes | # Architecture specific extensions/prefixes | ||||||
| *.[568vq] | *.[568vq] | ||||||
|   | |||||||
| @@ -4,6 +4,7 @@ package broker // import "go.unistack.org/micro/v4/broker" | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"errors" | 	"errors" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"go.unistack.org/micro/v4/metadata" | 	"go.unistack.org/micro/v4/metadata" | ||||||
| 	"go.unistack.org/micro/v4/options" | 	"go.unistack.org/micro/v4/options" | ||||||
| @@ -19,6 +20,8 @@ var ( | |||||||
| 	ErrDisconnected = errors.New("broker disconnected") | 	ErrDisconnected = errors.New("broker disconnected") | ||||||
| 	// ErrInvalidMessage returns when message has nvalid format | 	// ErrInvalidMessage returns when message has nvalid format | ||||||
| 	ErrInvalidMessage = errors.New("broker message has invalid format") | 	ErrInvalidMessage = errors.New("broker message has invalid format") | ||||||
|  | 	// DefaultGracefulTimeout | ||||||
|  | 	DefaultGracefulTimeout = 5 * time.Second | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Broker is an interface used for asynchronous messaging. | // Broker is an interface used for asynchronous messaging. | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
| 	"go.unistack.org/micro/v4/meter" | 	"go.unistack.org/micro/v4/meter" | ||||||
| 	"go.unistack.org/micro/v4/options" | 	"go.unistack.org/micro/v4/options" | ||||||
| 	"go.unistack.org/micro/v4/register" | 	"go.unistack.org/micro/v4/register" | ||||||
|  | 	"go.unistack.org/micro/v4/sync" | ||||||
| 	"go.unistack.org/micro/v4/tracer" | 	"go.unistack.org/micro/v4/tracer" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -36,22 +37,27 @@ type Options struct { | |||||||
| 	Name string | 	Name string | ||||||
| 	// Address holds the broker address | 	// Address holds the broker address | ||||||
| 	Address []string | 	Address []string | ||||||
|  |  | ||||||
|  | 	Wait *sync.WaitGroup | ||||||
|  |  | ||||||
|  | 	GracefulTimeout time.Duration | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewOptions create new Options | // NewOptions create new Options | ||||||
| func NewOptions(opts ...options.Option) Options { | func NewOptions(opts ...options.Option) Options { | ||||||
| 	options := Options{ | 	newOpts := Options{ | ||||||
| 		Register: register.DefaultRegister, | 		Register:        register.DefaultRegister, | ||||||
| 		Logger:   logger.DefaultLogger, | 		Logger:          logger.DefaultLogger, | ||||||
| 		Context:  context.Background(), | 		Context:         context.Background(), | ||||||
| 		Meter:    meter.DefaultMeter, | 		Meter:           meter.DefaultMeter, | ||||||
| 		Codecs:   make(map[string]codec.Codec), | 		Codecs:          make(map[string]codec.Codec), | ||||||
| 		Tracer:   tracer.DefaultTracer, | 		Tracer:          tracer.DefaultTracer, | ||||||
|  | 		GracefulTimeout: DefaultGracefulTimeout, | ||||||
| 	} | 	} | ||||||
| 	for _, o := range opts { | 	for _, o := range opts { | ||||||
| 		o(&options) | 		o(&newOpts) | ||||||
| 	} | 	} | ||||||
| 	return options | 	return newOpts | ||||||
| } | } | ||||||
|  |  | ||||||
| // PublishOptions struct | // PublishOptions struct | ||||||
|   | |||||||
| @@ -214,6 +214,20 @@ func WithMicroKeys() options.Option { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // WithAddCallerSkipCount add skip count for copy logger | ||||||
|  | func WithAddCallerSkipCount(n int) options.Option { | ||||||
|  | 	return func(src interface{}) error { | ||||||
|  | 		c, err := options.Get(src, ".CallerSkipCount") | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		if err = options.Set(src, c.(int)+n, ".CallerSkipCount"); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // WithAddStacktrace controls writing stacktrace on error | // WithAddStacktrace controls writing stacktrace on error | ||||||
| func WithAddStacktrace(v bool) options.Option { | func WithAddStacktrace(v bool) options.Option { | ||||||
| 	return func(src interface{}) error { | 	return func(src interface{}) error { | ||||||
|   | |||||||
| @@ -17,4 +17,6 @@ var ( | |||||||
| 	SubscribeMessageTotal = "subscribe_message_total" | 	SubscribeMessageTotal = "subscribe_message_total" | ||||||
| 	// SubscribeMessageInflight specifies meter metric name | 	// SubscribeMessageInflight specifies meter metric name | ||||||
| 	SubscribeMessageInflight = "subscribe_message_inflight" | 	SubscribeMessageInflight = "subscribe_message_inflight" | ||||||
|  | 	// BrokerGroupLag specifies broker lag | ||||||
|  | 	BrokerGroupLag = "broker_lag" | ||||||
| ) | ) | ||||||
|   | |||||||
							
								
								
									
										12
									
								
								semconv/cache.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								semconv/cache.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,12 @@ | |||||||
|  | package semconv | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	// CacheRequestDurationSeconds specifies meter metric name | ||||||
|  | 	CacheRequestDurationSeconds = "cache_request_duration_seconds" | ||||||
|  | 	// ClientRequestLatencyMicroseconds specifies meter metric name | ||||||
|  | 	CacheRequestLatencyMicroseconds = "cache_request_latency_microseconds" | ||||||
|  | 	// CacheRequestTotal specifies meter metric name | ||||||
|  | 	CacheRequestTotal = "cache_request_total" | ||||||
|  | 	// CacheRequestInflight specifies meter metric name | ||||||
|  | 	CacheRequestInflight = "cache_request_inflight" | ||||||
|  | ) | ||||||
							
								
								
									
										25
									
								
								util/xpool/pool.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								util/xpool/pool.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,25 @@ | |||||||
|  | package pool | ||||||
|  |  | ||||||
|  | import "sync" | ||||||
|  |  | ||||||
|  | type Pool[T any] struct { | ||||||
|  | 	p *sync.Pool | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewPool[T any](fn func() T) Pool[T] { | ||||||
|  | 	return Pool[T]{ | ||||||
|  | 		p: &sync.Pool{ | ||||||
|  | 			New: func() interface{} { | ||||||
|  | 				return fn() | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p Pool[T]) Get() T { | ||||||
|  | 	return p.p.Get().(T) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p Pool[T]) Put(t T) { | ||||||
|  | 	p.p.Put(t) | ||||||
|  | } | ||||||
							
								
								
									
										27
									
								
								util/xpool/pool_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								util/xpool/pool_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,27 @@ | |||||||
|  | package pool | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"strings" | ||||||
|  | 	"testing" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestBytes(t *testing.T) { | ||||||
|  | 	p := NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) }) | ||||||
|  | 	b := p.Get() | ||||||
|  | 	b.Write([]byte(`test`)) | ||||||
|  | 	if b.String() != "test" { | ||||||
|  | 		t.Fatal("pool not works") | ||||||
|  | 	} | ||||||
|  | 	p.Put(b) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestStrings(t *testing.T) { | ||||||
|  | 	p := NewPool(func() *strings.Builder { return &strings.Builder{} }) | ||||||
|  | 	b := p.Get() | ||||||
|  | 	b.Write([]byte(`test`)) | ||||||
|  | 	if b.String() != "test" { | ||||||
|  | 		t.Fatal("pool not works") | ||||||
|  | 	} | ||||||
|  | 	p.Put(b) | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user