Compare commits
12 Commits
Author | SHA1 | Date | |
---|---|---|---|
bf4a036652 | |||
f83a29eb67 | |||
aef7f53d88 | |||
02c8e4fb7f | |||
f5693bd940 | |||
701afb7bea | |||
019b407e74 | |||
f29a346434 | |||
27db1876c0 | |||
f66ac9736b | |||
ed7972a1fa | |||
2cc004b01c |
@@ -98,11 +98,12 @@ func (md Metadata) Del(keys ...string) {
|
||||
}
|
||||
|
||||
// Copy makes a copy of the metadata
|
||||
func Copy(md Metadata) Metadata {
|
||||
func Copy(md Metadata, exclude ...string) Metadata {
|
||||
nmd := New(len(md))
|
||||
for key, val := range md {
|
||||
nmd.Set(key, val)
|
||||
}
|
||||
nmd.Del(exclude...)
|
||||
return nmd
|
||||
}
|
||||
|
||||
|
@@ -190,3 +190,14 @@ func TestMetadataContext(t *testing.T) {
|
||||
t.Errorf("Expected metadata length 1 got %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCopy(t *testing.T) {
|
||||
md := New(2)
|
||||
md.Set("key1", "val1", "key2", "val2")
|
||||
nmd := Copy(md, "key2")
|
||||
if len(nmd) != 1 {
|
||||
t.Fatal("Copy exclude not works")
|
||||
} else if nmd["Key1"] != "val1" {
|
||||
t.Fatal("Copy exclude not works")
|
||||
}
|
||||
}
|
||||
|
@@ -15,6 +15,7 @@ import (
|
||||
"go.unistack.org/micro/v3/network/transport"
|
||||
"go.unistack.org/micro/v3/options"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
msync "go.unistack.org/micro/v3/sync"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
"go.unistack.org/micro/v3/util/id"
|
||||
)
|
||||
@@ -47,7 +48,7 @@ type Options struct {
|
||||
// Listener may be passed if already created
|
||||
Listener net.Listener
|
||||
// Wait group
|
||||
Wait *sync.WaitGroup
|
||||
Wait *msync.WaitGroup
|
||||
// TLSConfig specifies tls.Config for secure serving
|
||||
TLSConfig *tls.Config
|
||||
// Metadata holds the server metadata
|
||||
@@ -282,7 +283,7 @@ func Wait(wg *sync.WaitGroup) Option {
|
||||
if wg == nil {
|
||||
wg = new(sync.WaitGroup)
|
||||
}
|
||||
o.Wait = wg
|
||||
o.Wait = msync.WrapWaitGroup(wg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,7 +332,6 @@ func GracefulTimeout(td time.Duration) Option {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// HandlerOptions struct
|
||||
type HandlerOptions struct {
|
||||
// Context holds external options
|
||||
|
69
sync/waitgroup.go
Normal file
69
sync/waitgroup.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type WaitGroup struct {
|
||||
wg *sync.WaitGroup
|
||||
c int
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func WrapWaitGroup(wg *sync.WaitGroup) *WaitGroup {
|
||||
g := &WaitGroup{
|
||||
wg: wg,
|
||||
}
|
||||
return g
|
||||
}
|
||||
|
||||
func NewWaitGroup() *WaitGroup {
|
||||
var wg sync.WaitGroup
|
||||
return WrapWaitGroup(&wg)
|
||||
}
|
||||
|
||||
func (g *WaitGroup) Add(n int) {
|
||||
g.mu.Lock()
|
||||
g.c += n
|
||||
g.wg.Add(n)
|
||||
g.mu.Unlock()
|
||||
}
|
||||
|
||||
func (g *WaitGroup) Done() {
|
||||
g.mu.Lock()
|
||||
g.c += -1
|
||||
g.wg.Add(-1)
|
||||
g.mu.Unlock()
|
||||
}
|
||||
|
||||
func (g *WaitGroup) Wait() {
|
||||
g.wg.Wait()
|
||||
}
|
||||
|
||||
func (g *WaitGroup) WaitContext(ctx context.Context) {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
g.wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
g.mu.Lock()
|
||||
g.wg.Add(-g.c)
|
||||
<-done
|
||||
g.wg.Add(g.c)
|
||||
g.mu.Unlock()
|
||||
return
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (g *WaitGroup) Waiters() int {
|
||||
g.mu.Lock()
|
||||
c := g.c
|
||||
g.mu.Unlock()
|
||||
return c
|
||||
}
|
37
sync/waitgroup_test.go
Normal file
37
sync/waitgroup_test.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestWaitGroupContext(t *testing.T) {
|
||||
wg := NewWaitGroup()
|
||||
_ = t
|
||||
wg.Add(1)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
|
||||
defer cancel()
|
||||
wg.WaitContext(ctx)
|
||||
}
|
||||
|
||||
func TestWaitGroupReuse(t *testing.T) {
|
||||
wg := NewWaitGroup()
|
||||
defer func() {
|
||||
if wg.Waiters() != 0 {
|
||||
t.Fatal("lost goroutines")
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second)
|
||||
defer cancel()
|
||||
wg.WaitContext(ctx)
|
||||
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
ctx, cancel = context.WithTimeout(context.TODO(), 1*time.Second)
|
||||
defer cancel()
|
||||
wg.WaitContext(ctx)
|
||||
}
|
@@ -100,13 +100,13 @@ type EventOption func(o *EventOptions)
|
||||
|
||||
func WithEventLabels(kv ...interface{}) EventOption {
|
||||
return func(o *EventOptions) {
|
||||
o.Labels = kv
|
||||
o.Labels = append(o.Labels, kv...)
|
||||
}
|
||||
}
|
||||
|
||||
func WithSpanLabels(kv ...interface{}) SpanOption {
|
||||
return func(o *SpanOptions) {
|
||||
o.Labels = kv
|
||||
o.Labels = append(o.Labels, kv...)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,7 +159,8 @@ func NewSpanOptions(opts ...SpanOption) SpanOptions {
|
||||
// NewOptions returns default options
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Logger: logger.DefaultLogger,
|
||||
Logger: logger.DefaultLogger,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
|
@@ -3,8 +3,6 @@ package tracer // import "go.unistack.org/micro/v3/tracer"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
)
|
||||
@@ -70,37 +68,3 @@ type Span interface {
|
||||
// SpanID returns span id
|
||||
SpanID() string
|
||||
}
|
||||
|
||||
// sort labels alphabeticaly by label name
|
||||
type byKey []interface{}
|
||||
|
||||
func (k byKey) Len() int { return len(k) / 2 }
|
||||
func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) }
|
||||
func (k byKey) Swap(i, j int) {
|
||||
k[i*2], k[j*2] = k[j*2], k[i*2]
|
||||
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
|
||||
}
|
||||
|
||||
func UniqLabels(labels []interface{}) []interface{} {
|
||||
if len(labels)%2 == 1 {
|
||||
labels = labels[:len(labels)-1]
|
||||
}
|
||||
|
||||
if len(labels) > 2 {
|
||||
sort.Sort(byKey(labels))
|
||||
|
||||
idx := 0
|
||||
for {
|
||||
if labels[idx] == labels[idx+2] {
|
||||
copy(labels[idx:], labels[idx+2:])
|
||||
labels = labels[:len(labels)-2]
|
||||
} else {
|
||||
idx += 2
|
||||
}
|
||||
if idx+2 >= len(labels) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
40
util/sort/sort.go
Normal file
40
util/sort/sort.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package sort
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// sort labels alphabeticaly by label name
|
||||
type byKey []interface{}
|
||||
|
||||
func (k byKey) Len() int { return len(k) / 2 }
|
||||
func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) }
|
||||
func (k byKey) Swap(i, j int) {
|
||||
k[i*2], k[j*2] = k[j*2], k[i*2]
|
||||
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
|
||||
}
|
||||
|
||||
func Uniq(labels []interface{}) []interface{} {
|
||||
if len(labels)%2 == 1 {
|
||||
labels = labels[:len(labels)-1]
|
||||
}
|
||||
|
||||
if len(labels) > 2 {
|
||||
sort.Sort(byKey(labels))
|
||||
|
||||
idx := 0
|
||||
for {
|
||||
if labels[idx] == labels[idx+2] {
|
||||
copy(labels[idx:], labels[idx+2:])
|
||||
labels = labels[:len(labels)-2]
|
||||
} else {
|
||||
idx += 2
|
||||
}
|
||||
if idx+2 >= len(labels) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return labels
|
||||
}
|
Reference in New Issue
Block a user