Compare commits

..

24 Commits

Author SHA1 Message Date
b5f8316b57 semconv: fix broker group lag metric name
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-13 02:38:58 +03:00
d7ddd912a8 Merge pull request 'semconv: add broker group lag' (#336) from brokerlag into v3
Reviewed-on: #336
2024-04-13 02:07:53 +03:00
c020d90cb4 semconv: add broker group lag
Some checks failed
pr / test (pull_request) Failing after 1m39s
lint / lint (pull_request) Successful in 10m49s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-13 02:06:51 +03:00
db47b62159 Merge pull request 'add options in broker' (#334) from devstigneev/micro:v3 into v3
Reviewed-on: #334
2024-04-08 23:12:59 +03:00
8254456c8b rename path to sync
Some checks failed
lint / lint (pull_request) Has been cancelled
pr / test (pull_request) Has been cancelled
2024-04-07 21:16:50 +03:00
c2808679c3 add options in broker
Some checks failed
lint / lint (pull_request) Has been cancelled
pr / test (pull_request) Has been cancelled
2024-04-07 20:53:01 +03:00
f418235c16 Merge pull request 'cluster: initial import' (#332) from cluster into v3
Reviewed-on: #332
2024-04-06 23:29:04 +03:00
67ba7b3753 cluster: initial import
Some checks failed
pr / test (pull_request) Failing after 1m36s
lint / lint (pull_request) Successful in 10m48s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-06 23:28:01 +03:00
e48d7cadf9 Merge pull request 'add semconv package' (#331) from semconv into v3
Reviewed-on: #331
2024-04-06 22:04:47 +03:00
c906186011 add semconv package
Some checks failed
pr / test (pull_request) Failing after 1m39s
lint / lint (pull_request) Successful in 10m24s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-06 22:03:56 +03:00
dc0ff91b83 Merge pull request 'util/reflect: detect json.Unmarshaler' (#328) from utilsort into v3
Reviewed-on: #328
2024-04-02 08:52:11 +03:00
e739c2d438 util/reflect: detect json.Unmarshaler
Some checks failed
lint / lint (pull_request) Has been cancelled
pr / test (pull_request) Failing after 2m3s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-02 08:51:06 +03:00
bf4a036652 Merge pull request 'move sort.Uniq to dedicated package' (#327) from utilsort into v3
Reviewed-on: #327
2024-03-27 11:25:50 +03:00
f83a29eb67 move sort.Uniq to dedicated package
Some checks failed
lint / lint (pull_request) Has been cancelled
pr / test (pull_request) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-27 11:24:28 +03:00
aef7f53d88 Merge pull request 'tracer: append labels' (#326) from tracerfix into v3
Reviewed-on: #326
2024-03-17 00:18:23 +03:00
02c8e4fb7f tracer: append labels
All checks were successful
pr / test (pull_request) Successful in 1m35s
lint / lint (pull_request) Successful in 10m38s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-17 00:17:10 +03:00
f5693bd940 Merge pull request 'v3 update WaitGroup Options' (#325) from devstigneev/micro:v3 into v3
Reviewed-on: #325
2024-03-13 11:03:29 +03:00
701afb7bea sort imports
Some checks failed
lint / lint (pull_request) Has been cancelled
pr / test (pull_request) Has been cancelled
2024-03-13 10:51:03 +03:00
019b407e74 update WaitOptions 2024-03-13 10:49:58 +03:00
f29a346434 Merge pull request 'tracer: add Context init to NewOptions' (#323) from tracerctx into v3
Reviewed-on: #323
2024-03-11 01:13:01 +03:00
27db1876c0 tracer: add Context init to NewOptions
All checks were successful
pr / test (pull_request) Successful in 1m30s
lint / lint (pull_request) Successful in 10m33s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-11 01:12:20 +03:00
f66ac9736b metadata: allow to exclude some keys in Copy func (#321)
Reviewed-on: #321
Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Co-committed-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-09 23:50:40 +03:00
ed7972a1fa Merge pull request 'sync/waitgroup: backport from master' (#320) from waitgroup into v3
Reviewed-on: #320
2024-03-09 23:37:39 +03:00
2cc004b01c sync/waitgroup: backport from master
All checks were successful
pr / test (pull_request) Successful in 1m40s
lint / lint (pull_request) Successful in 10m42s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-09 23:36:39 +03:00
16 changed files with 285 additions and 54 deletions

3
.gitignore vendored
View File

@@ -1,6 +1,8 @@
# Develop tools
/.vscode/
/.idea/
.idea
.vscode
# Binaries for programs and plugins
*.exe
@@ -13,6 +15,7 @@
_obj
_test
_build
.DS_Store
# Architecture specific extensions/prefixes
*.[568vq]

View File

@@ -4,6 +4,7 @@ package broker // import "go.unistack.org/micro/v3/broker"
import (
"context"
"errors"
"time"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
@@ -17,6 +18,8 @@ var (
ErrNotConnected = errors.New("broker not connected")
// ErrDisconnected returns when broker disconnected
ErrDisconnected = errors.New("broker disconnected")
// DefaultGracefulTimeout
DefaultGracefulTimeout = 5 * time.Second
)
// Broker is an interface used for asynchronous messaging.

View File

@@ -9,6 +9,7 @@ import (
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
)
@@ -36,17 +37,22 @@ type Options struct {
Name string
// Addrs holds the broker address
Addrs []string
Wait *sync.WaitGroup
GracefulTimeout time.Duration
}
// NewOptions create new Options
func NewOptions(opts ...Option) Options {
options := Options{
Register: register.DefaultRegister,
Logger: logger.DefaultLogger,
Context: context.Background(),
Meter: meter.DefaultMeter,
Codec: codec.DefaultCodec,
Tracer: tracer.DefaultTracer,
Register: register.DefaultRegister,
Logger: logger.DefaultLogger,
Context: context.Background(),
Meter: meter.DefaultMeter,
Codec: codec.DefaultCodec,
Tracer: tracer.DefaultTracer,
GracefulTimeout: DefaultGracefulTimeout,
}
for _, o := range opts {
o(&options)

41
cluster/cluster.go Normal file
View File

@@ -0,0 +1,41 @@
package cluster
import (
"context"
"go.unistack.org/micro/v3/metadata"
)
// Message sent to member in cluster
type Message interface {
// Header returns message headers
Header() metadata.Metadata
// Body returns broker message may be []byte slice or some go struct or interface
Body() interface{}
}
type Node interface {
// Name returns node name
Name() string
// Address returns node address
Address() string
// Metadata returns node metadata
Metadata() metadata.Metadata
}
// Cluster interface used for cluster communication across nodes
type Cluster interface {
// Join is used to take an existing members and performing state sync
Join(ctx context.Context, addr ...string) error
// Leave broadcast a leave message and stop listeners
Leave(ctx context.Context) error
// Ping is used to probe live status of the node
Ping(ctx context.Context, node Node, payload []byte) error
// Members returns the cluster members
Members() ([]Node, error)
// Broadcast send message for all members in cluster, if filter is not nil, nodes may be filtered
// by key/value pairs
Broadcast(ctx context.Context, msg Message, filter ...string) error
// Unicast send message to single member in cluster
Unicast(ctx context.Context, node Node, msg Message) error
}

View File

@@ -98,11 +98,12 @@ func (md Metadata) Del(keys ...string) {
}
// Copy makes a copy of the metadata
func Copy(md Metadata) Metadata {
func Copy(md Metadata, exclude ...string) Metadata {
nmd := New(len(md))
for key, val := range md {
nmd.Set(key, val)
}
nmd.Del(exclude...)
return nmd
}

View File

@@ -190,3 +190,14 @@ func TestMetadataContext(t *testing.T) {
t.Errorf("Expected metadata length 1 got %d", i)
}
}
func TestCopy(t *testing.T) {
md := New(2)
md.Set("key1", "val1", "key2", "val2")
nmd := Copy(md, "key2")
if len(nmd) != 1 {
t.Fatal("Copy exclude not works")
} else if nmd["Key1"] != "val1" {
t.Fatal("Copy exclude not works")
}
}

22
semconv/broker.go Normal file
View File

@@ -0,0 +1,22 @@
package semconv
var (
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
// BrokerGroupLag specifies broker lag
BrokerGroupLag = "broker_group_lag"
)

12
semconv/client.go Normal file
View File

@@ -0,0 +1,12 @@
package semconv
var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
)

12
semconv/server.go Normal file
View File

@@ -0,0 +1,12 @@
package semconv
var (
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
)

View File

@@ -15,6 +15,7 @@ import (
"go.unistack.org/micro/v3/network/transport"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
msync "go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
"go.unistack.org/micro/v3/util/id"
)
@@ -47,7 +48,7 @@ type Options struct {
// Listener may be passed if already created
Listener net.Listener
// Wait group
Wait *sync.WaitGroup
Wait *msync.WaitGroup
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
// Metadata holds the server metadata
@@ -282,7 +283,7 @@ func Wait(wg *sync.WaitGroup) Option {
if wg == nil {
wg = new(sync.WaitGroup)
}
o.Wait = wg
o.Wait = msync.WrapWaitGroup(wg)
}
}
@@ -331,7 +332,6 @@ func GracefulTimeout(td time.Duration) Option {
}
}
// HandlerOptions struct
type HandlerOptions struct {
// Context holds external options

69
sync/waitgroup.go Normal file
View File

@@ -0,0 +1,69 @@
package sync
import (
"context"
"sync"
)
type WaitGroup struct {
wg *sync.WaitGroup
c int
mu sync.Mutex
}
func WrapWaitGroup(wg *sync.WaitGroup) *WaitGroup {
g := &WaitGroup{
wg: wg,
}
return g
}
func NewWaitGroup() *WaitGroup {
var wg sync.WaitGroup
return WrapWaitGroup(&wg)
}
func (g *WaitGroup) Add(n int) {
g.mu.Lock()
g.c += n
g.wg.Add(n)
g.mu.Unlock()
}
func (g *WaitGroup) Done() {
g.mu.Lock()
g.c += -1
g.wg.Add(-1)
g.mu.Unlock()
}
func (g *WaitGroup) Wait() {
g.wg.Wait()
}
func (g *WaitGroup) WaitContext(ctx context.Context) {
done := make(chan struct{})
go func() {
g.wg.Wait()
close(done)
}()
select {
case <-ctx.Done():
g.mu.Lock()
g.wg.Add(-g.c)
<-done
g.wg.Add(g.c)
g.mu.Unlock()
return
case <-done:
return
}
}
func (g *WaitGroup) Waiters() int {
g.mu.Lock()
c := g.c
g.mu.Unlock()
return c
}

37
sync/waitgroup_test.go Normal file
View File

@@ -0,0 +1,37 @@
package sync
import (
"context"
"testing"
"time"
)
func TestWaitGroupContext(t *testing.T) {
wg := NewWaitGroup()
_ = t
wg.Add(1)
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
wg.WaitContext(ctx)
}
func TestWaitGroupReuse(t *testing.T) {
wg := NewWaitGroup()
defer func() {
if wg.Waiters() != 0 {
t.Fatal("lost goroutines")
}
}()
wg.Add(1)
defer wg.Done()
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
wg.WaitContext(ctx)
wg.Add(1)
defer wg.Done()
ctx, cancel = context.WithTimeout(context.TODO(), 1*time.Second)
defer cancel()
wg.WaitContext(ctx)
}

View File

@@ -100,13 +100,13 @@ type EventOption func(o *EventOptions)
func WithEventLabels(kv ...interface{}) EventOption {
return func(o *EventOptions) {
o.Labels = kv
o.Labels = append(o.Labels, kv...)
}
}
func WithSpanLabels(kv ...interface{}) SpanOption {
return func(o *SpanOptions) {
o.Labels = kv
o.Labels = append(o.Labels, kv...)
}
}
@@ -159,7 +159,8 @@ func NewSpanOptions(opts ...SpanOption) SpanOptions {
// NewOptions returns default options
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Logger: logger.DefaultLogger,
Context: context.Background(),
}
for _, o := range opts {
o(&options)

View File

@@ -3,8 +3,6 @@ package tracer // import "go.unistack.org/micro/v3/tracer"
import (
"context"
"fmt"
"sort"
"go.unistack.org/micro/v3/logger"
)
@@ -70,37 +68,3 @@ type Span interface {
// SpanID returns span id
SpanID() string
}
// sort labels alphabeticaly by label name
type byKey []interface{}
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) }
func (k byKey) Swap(i, j int) {
k[i*2], k[j*2] = k[j*2], k[i*2]
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
}
func UniqLabels(labels []interface{}) []interface{} {
if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
}
if len(labels) > 2 {
sort.Sort(byKey(labels))
idx := 0
for {
if labels[idx] == labels[idx+2] {
copy(labels[idx:], labels[idx+2:])
labels = labels[:len(labels)-2]
} else {
idx += 2
}
if idx+2 >= len(labels) {
break
}
}
}
return labels
}

View File

@@ -1,6 +1,7 @@
package reflect // import "go.unistack.org/micro/v3/util/reflect"
package reflect
import (
"encoding/json"
"errors"
"fmt"
"reflect"
@@ -45,15 +46,23 @@ func SliceAppend(b bool) Option {
// Merge merges map[string]interface{} to destination struct
func Merge(dst interface{}, mp map[string]interface{}, opts ...Option) error {
var err error
var sval reflect.Value
var fname string
options := Options{}
for _, o := range opts {
o(&options)
}
if unmarshaler, ok := dst.(json.Unmarshaler); ok {
buf, err := json.Marshal(mp)
if err == nil {
err = unmarshaler.UnmarshalJSON(buf)
}
return err
}
var err error
var sval reflect.Value
var fname string
dviface := reflect.ValueOf(dst)
if dviface.Kind() == reflect.Ptr {
dviface = dviface.Elem()

40
util/sort/sort.go Normal file
View File

@@ -0,0 +1,40 @@
package sort
import (
"fmt"
"sort"
)
// sort labels alphabeticaly by label name
type byKey []interface{}
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) }
func (k byKey) Swap(i, j int) {
k[i*2], k[j*2] = k[j*2], k[i*2]
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
}
func Uniq(labels []interface{}) []interface{} {
if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
}
if len(labels) > 2 {
sort.Sort(byKey(labels))
idx := 0
for {
if labels[idx] == labels[idx+2] {
copy(labels[idx:], labels[idx+2:])
labels = labels[:len(labels)-2]
} else {
idx += 2
}
if idx+2 >= len(labels) {
break
}
}
}
return labels
}