Compare commits

...

10 Commits

Author SHA1 Message Date
e64269b2a8 broker: add BatchBroker interface to avoid breaking older brokers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-23 12:55:36 +03:00
d18429e024 metadata: add HeaderAuthorization
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-23 12:17:00 +03:00
675e121049 metadata: add default headers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-23 12:03:18 +03:00
d357fb1e0d WIP: broker batch processing
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-22 22:53:44 +03:00
e4673bcc50 remove old cruft
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-22 15:45:44 +03:00
a839f75a2f util/reflect: add new funcs
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-22 15:45:31 +03:00
a7e6d61b95 meter: fast path for only one label
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-21 14:29:13 +03:00
650d167313 meter: add BuildLabels func that sorts and deletes duplicates
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-21 14:10:20 +03:00
c6ba2a91e6 meter: BuildName func to combine metric name with labels into string
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-21 12:39:59 +03:00
7ece08896f server: use 127.0.0.1:0 if no address provided
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-17 01:57:39 +06:00
13 changed files with 705 additions and 89 deletions

View File

@@ -8,31 +8,83 @@ import (
"github.com/unistack-org/micro/v3/metadata"
)
// DefaultBroker default broker
// DefaultBroker default memory broker
var DefaultBroker Broker = NewBroker()
var (
// ErrNotConnected returns when broker used but not connected yet
ErrNotConnected = errors.New("broker not connected")
// ErrDisconnected returns when broker disconnected
ErrDisconnected = errors.New("broker disconnected")
)
type BatchBroker interface {
Broker
// BatchPublish messages to broker with multiple topics
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
// BatchSubscribe subscribes to topic messages via handler
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
}
// Broker is an interface used for asynchronous messaging.
type Broker interface {
// Name returns broker instance name
Name() string
Init(...Option) error
// Init initilize broker
Init(opts ...Option) error
// Options returns broker options
Options() Options
// Address return configured address
Address() string
Connect(context.Context) error
Disconnect(context.Context) error
Publish(context.Context, string, *Message, ...PublishOption) error
Subscribe(context.Context, string, Handler, ...SubscribeOption) (Subscriber, error)
// Connect connects to broker
Connect(ctx context.Context) error
// Disconnect disconnect from broker
Disconnect(ctx context.Context) error
// Publish message to broker topic
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
// Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
// String type of broker
String() string
}
// Handler is used to process messages via a subscription of a topic.
type Handler func(Event) error
// Events contains multiple events
type Events []Event
func (evs Events) Ack() error {
var err error
for _, ev := range evs {
if err = ev.Ack(); err != nil {
return err
}
}
return nil
}
func (evs Events) SetError(err error) {
for _, ev := range evs {
ev.SetError(err)
}
}
// BatchHandler is used to process messages in batches via a subscription of a topic.
type BatchHandler func(Events) error
// Event is given to a subscription handler for processing
type Event interface {
// Topic returns event topic
Topic() string
// Message returns broker message
Message() *Message
// Ack acknowledge message
Ack() error
// Error returns message error (like decoding errors or some other)
Error() error
// SetError set event processing error
SetError(err error)
}
// RawMessage is a raw encoded JSON value.
@@ -58,13 +110,25 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error {
// Message is used to transfer data
type Message struct {
Header metadata.Metadata // contains message metadata
Body RawMessage // contains message body
// Header contains message metadata
Header metadata.Metadata
// Body contains message body
Body RawMessage
}
// NewMessage create broker message with topic filled
func NewMessage(topic string) *Message {
m := &Message{Header: metadata.New(2)}
m.Header.Set(metadata.HeaderTopic, topic)
return m
}
// Subscriber is a convenience return type for the Subscribe method
type Subscriber interface {
// Options returns subscriber options
Options() SubscribeOptions
// Topic returns topic for subscription
Topic() string
Unsubscribe(context.Context) error
// Unsubscribe from topic
Unsubscribe(ctx context.Context) error
}

View File

@@ -2,20 +2,21 @@ package broker
import (
"context"
"errors"
"sync"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
maddr "github.com/unistack-org/micro/v3/util/addr"
mnet "github.com/unistack-org/micro/v3/util/net"
"github.com/unistack-org/micro/v3/util/rand"
)
type memoryBroker struct {
Subscribers map[string][]*memorySubscriber
addr string
opts Options
subscribers map[string][]*memorySubscriber
batchsubscribers map[string][]*memoryBatchSubscriber
addr string
opts Options
sync.RWMutex
connected bool
}
@@ -36,6 +37,15 @@ type memorySubscriber struct {
opts SubscribeOptions
}
type memoryBatchSubscriber struct {
ctx context.Context
exit chan bool
handler BatchHandler
id string
topic string
opts SubscribeOptions
}
func (m *memoryBroker) Options() Options {
return m.opts
}
@@ -77,7 +87,6 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error {
}
m.connected = false
return nil
}
@@ -88,14 +97,127 @@ func (m *memoryBroker) Init(opts ...Option) error {
return nil
}
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return ErrNotConnected
}
m.RUnlock()
type msgWrapper struct {
topic string
body interface{}
}
vs := make([]msgWrapper, 0, len(msgs))
if m.opts.Codec == nil {
m.RLock()
for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
vs = append(vs, msgWrapper{topic: topic, body: m})
}
m.RUnlock()
} else {
m.RLock()
for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
m.RUnlock()
return err
}
vs = append(vs, msgWrapper{topic: topic, body: buf})
}
m.RUnlock()
}
if len(m.batchsubscribers) > 0 {
eh := m.opts.BatchErrorHandler
msgTopicMap := make(map[string]Events)
for _, v := range vs {
p := &memoryEvent{
topic: v.topic,
message: v.body,
opts: m.opts,
}
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
}
for t, ms := range msgTopicMap {
m.RLock()
subs, ok := m.batchsubscribers[t]
m.RUnlock()
if !ok {
continue
}
for _, sub := range subs {
if err := sub.handler(ms); err != nil {
ms.SetError(err)
if sub.opts.BatchErrorHandler != nil {
eh = sub.opts.BatchErrorHandler
}
if eh != nil {
eh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err := ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
}
}
}
eh := m.opts.ErrorHandler
for _, v := range vs {
p := &memoryEvent{
topic: v.topic,
message: v.body,
opts: m.opts,
}
m.RLock()
subs, ok := m.subscribers[p.topic]
m.RUnlock()
if !ok {
continue
}
for _, sub := range subs {
if err := sub.handler(p); err != nil {
p.SetError(err)
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
if eh != nil {
eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err := p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
}
}
return nil
}
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return errors.New("not connected")
return ErrNotConnected
}
subs, ok := m.Subscribers[topic]
subs, ok := m.subscribers[topic]
m.RUnlock()
if !ok {
return nil
@@ -138,11 +260,56 @@ func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message,
return nil
}
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
}
m.RUnlock()
options := NewSubscribeOptions(opts...)
id, err := uuid.NewRandom()
if err != nil {
return nil, err
}
sub := &memoryBatchSubscriber{
exit: make(chan bool, 1),
id: id.String(),
topic: topic,
handler: handler,
opts: options,
ctx: ctx,
}
m.Lock()
m.batchsubscribers[topic] = append(m.batchsubscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
var newSubscribers []*memoryBatchSubscriber
for _, sb := range m.batchsubscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.batchsubscribers[topic] = newSubscribers
m.Unlock()
}()
return sub, nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, errors.New("not connected")
return nil, ErrNotConnected
}
m.RUnlock()
@@ -163,20 +330,20 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
}
m.Lock()
m.Subscribers[topic] = append(m.Subscribers[topic], sub)
m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
var newSubscribers []*memorySubscriber
for _, sb := range m.Subscribers[topic] {
for _, sb := range m.subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.Subscribers[topic] = newSubscribers
m.subscribers[topic] = newSubscribers
m.Unlock()
}()
@@ -221,6 +388,23 @@ func (m *memoryEvent) Error() error {
return m.err
}
func (m *memoryEvent) SetError(err error) {
m.err = err
}
func (m *memoryBatchSubscriber) Options() SubscribeOptions {
return m.opts
}
func (m *memoryBatchSubscriber) Topic() string {
return m.topic
}
func (m *memoryBatchSubscriber) Unsubscribe(ctx context.Context) error {
m.exit <- true
return nil
}
func (m *memorySubscriber) Options() SubscribeOptions {
return m.opts
}
@@ -235,9 +419,10 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
}
// NewBroker return new memory broker
func NewBroker(opts ...Option) Broker {
func NewBroker(opts ...Option) BatchBroker {
return &memoryBroker{
opts: NewOptions(opts...),
Subscribers: make(map[string][]*memorySubscriber),
opts: NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber),
batchsubscribers: make(map[string][]*memoryBatchSubscriber),
}
}

View File

@@ -4,8 +4,55 @@ import (
"context"
"fmt"
"testing"
"github.com/unistack-org/micro/v3/metadata"
)
func TestMemoryBatchBroker(t *testing.T) {
b := NewBroker()
ctx := context.Background()
if err := b.Connect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
topic := "test"
count := 10
fn := func(evts Events) error {
return evts.Ack()
}
sub, err := b.BatchSubscribe(ctx, topic, fn)
if err != nil {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]*Message, 0, 0)
for i := 0; i < count; i++ {
message := &Message{
Header: map[string]string{
metadata.HeaderTopic: topic,
"foo": "bar",
"id": fmt.Sprintf("%d", i),
},
Body: []byte(`"hello world"`),
}
msgs = append(msgs, message)
}
if err := b.BatchPublish(ctx, msgs); err != nil {
t.Fatalf("Unexpected error publishing %v", err)
}
if err := sub.Unsubscribe(ctx); err != nil {
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
}
if err := b.Disconnect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
}
func TestMemoryBroker(t *testing.T) {
b := NewBroker()
ctx := context.Background()
@@ -26,20 +73,27 @@ func TestMemoryBroker(t *testing.T) {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]*Message, 0, 0)
for i := 0; i < count; i++ {
message := &Message{
Header: map[string]string{
"foo": "bar",
"id": fmt.Sprintf("%d", i),
metadata.HeaderTopic: topic,
"foo": "bar",
"id": fmt.Sprintf("%d", i),
},
Body: []byte(`"hello world"`),
}
msgs = append(msgs, message)
if err := b.Publish(ctx, topic, message); err != nil {
t.Fatalf("Unexpected error publishing %d", i)
t.Fatalf("Unexpected error publishing %d err: %v", i, err)
}
}
if err := b.BatchPublish(ctx, msgs); err != nil {
t.Fatalf("Unexpected error publishing %v", err)
}
if err := sub.Unsubscribe(ctx); err != nil {
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
}

View File

@@ -29,6 +29,8 @@ type Options struct {
TLSConfig *tls.Config
// ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler
// BatchErrorHandler used when broker can't unmashal incoming messages
BatchErrorHandler BatchHandler
// Name holds the broker name
Name string
// Addrs holds the broker address
@@ -85,6 +87,8 @@ type SubscribeOptions struct {
Context context.Context
// ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler
// BatchErrorHandler used when broker can't unmashal incoming messages
BatchErrorHandler BatchHandler
// Group holds consumer group
Group string
// AutoAck flag specifies auto ack of incoming message when no error happens
@@ -175,6 +179,14 @@ func ErrorHandler(h Handler) Option {
}
}
// BatchErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func BatchErrorHandler(h BatchHandler) Option {
return func(o *Options) {
o.BatchErrorHandler = h
}
}
// SubscribeErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func SubscribeErrorHandler(h Handler) SubscribeOption {
@@ -183,6 +195,14 @@ func SubscribeErrorHandler(h Handler) SubscribeOption {
}
}
// SubscribeBatchErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption {
return func(o *SubscribeOptions) {
o.BatchErrorHandler = h
}
}
// Queue sets the subscribers queue
// Deprecated
func Queue(name string) SubscribeOption {

View File

@@ -190,8 +190,8 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti
if !ok {
md = metadata.New(0)
}
md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic()
md[metadata.HeaderContentType] = p.ContentType()
md[metadata.HeaderTopic] = p.Topic()
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {

View File

@@ -6,8 +6,20 @@ import (
"sort"
)
// HeaderPrefix for all headers passed
var HeaderPrefix = "Micro-"
var (
// HeaderTopic is the header name that contains topic name
HeaderTopic = "Micro-Topic"
// HeaderContentType specifies content type of message
HeaderContentType = "Content-Type"
// HeaderEndpoint specifies endpoint in service
HeaderEndpoint = "Micro-Endpoint"
// HeaderService specifies service
HeaderService = "Micro-Service"
// HeaderTimeout specifies timeout of operation
HeaderTimeout = "Micro-Timeout"
// HeaderAuthorization specifies Authorization header
HeaderAuthorization = "Authorization"
)
// Metadata is our way of representing request headers internally.
// They're used at the RPC level and translate back and forth

View File

@@ -3,8 +3,9 @@ package meter
import (
"io"
"reflect"
"sort"
"strconv"
"strings"
"time"
)
@@ -77,36 +78,62 @@ type Summary interface {
UpdateDuration(time.Time)
}
// sort labels alphabeticaly by label name
type byKey []string
func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
func (k byKey) Swap(i, j int) {
k[i*2], k[i*2+1], k[j*2], k[j*2+1] = k[j*2], k[j*2+1], k[i*2], k[i*2+1]
k[i*2], k[j*2] = k[j*2], k[i*2]
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
}
func Sort(slice *[]string) {
bk := byKey(*slice)
if bk.Len() <= 1 {
return
// BuildLables used to sort labels and delete duplicates.
// Last value wins in case of duplicate label keys.
func BuildLabels(labels ...string) []string {
if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
}
sort.Sort(bk)
v := reflect.ValueOf(slice).Elem()
cnt := 0
key := 0
val := 1
for key < v.Len() {
if len(bk) > key+2 && bk[key] == bk[key+2] {
key += 2
val += 2
continue
}
v.Index(cnt).Set(v.Index(key))
cnt++
v.Index(cnt).Set(v.Index(val))
cnt++
key += 2
val += 2
}
v.SetLen(cnt)
sort.Sort(byKey(labels))
return labels
}
// BuildName used to combine metric with labels.
// If labels count is odd, drop last element
func BuildName(name string, labels ...string) string {
if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
}
if len(labels) > 2 {
sort.Sort(byKey(labels))
idx := 0
for {
if labels[idx] == labels[idx+2] {
copy(labels[idx:], labels[idx+2:])
labels = labels[:len(labels)-2]
} else {
idx += 2
}
if idx+2 >= len(labels) {
break
}
}
}
var b strings.Builder
_, _ = b.WriteString(name)
_, _ = b.WriteRune('{')
for idx := 0; idx < len(labels); idx += 2 {
if idx > 0 {
_, _ = b.WriteRune(',')
}
_, _ = b.WriteString(labels[idx])
_, _ = b.WriteString(`=`)
_, _ = b.WriteString(strconv.Quote(labels[idx+1]))
}
_, _ = b.WriteRune('}')
return b.String()
}

View File

@@ -14,11 +14,57 @@ func TestNoopMeter(t *testing.T) {
cnt.Inc()
}
func TestLabelsSort(t *testing.T) {
ls := []string{"server", "http", "register", "mdns", "broker", "broker1", "broker", "broker2", "server", "tcp"}
Sort(&ls)
func testEq(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
if ls[0] != "broker" || ls[1] != "broker2" {
t.Fatalf("sort error: %v", ls)
func TestBuildLabels(t *testing.T) {
type testData struct {
src []string
dst []string
}
data := []testData{
testData{
src: []string{"zerolabel", "value3", "firstlabel", "value2"},
dst: []string{"firstlabel", "value2", "zerolabel", "value3"},
},
}
for _, d := range data {
if !testEq(d.dst, BuildLabels(d.src...)) {
t.Fatalf("slices not properly sorted: %v %v", d.dst, d.src)
}
}
}
func TestBuildName(t *testing.T) {
data := map[string][]string{
`my_metric{firstlabel="value2",zerolabel="value3"}`: []string{
"my_metric",
"zerolabel", "value3", "firstlabel", "value2",
},
`my_metric{broker="broker2",register="mdns",server="tcp"}`: []string{
"my_metric",
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns",
},
`my_metric{aaa="aaa"}`: []string{
"my_metric",
"aaa", "aaa",
},
}
for e, d := range data {
if x := BuildName(d[0], d[1:]...); x != e {
t.Fatalf("expect: %s, result: %s", e, x)
}
}
}

View File

@@ -24,9 +24,18 @@ type tunSubscriber struct {
opts broker.SubscribeOptions
}
type tunBatchSubscriber struct {
listener tunnel.Listener
handler broker.BatchHandler
closed chan bool
topic string
opts broker.SubscribeOptions
}
type tunEvent struct {
message *broker.Message
topic string
err error
}
// used to access tunnel from options context
@@ -62,6 +71,36 @@ func (t *tunBroker) Disconnect(ctx context.Context) error {
return t.tunnel.Close(ctx)
}
func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
topicMap := make(map[string]tunnel.Session)
var err error
for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
c, ok := topicMap[topic]
if !ok {
c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}
defer c.Close()
topicMap[topic] = c
}
if err = c.Send(&transport.Message{
Header: msg.Header,
Body: msg.Body,
}); err != nil {
// msg.SetError(err)
return err
}
}
return nil
}
func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
@@ -77,6 +116,26 @@ func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message
})
}
func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
if err != nil {
return nil, err
}
tunSub := &tunBatchSubscriber{
topic: topic,
handler: h,
opts: broker.NewSubscribeOptions(opts...),
closed: make(chan bool),
listener: l,
}
// start processing
go tunSub.run()
return tunSub, nil
}
func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
if err != nil {
@@ -101,6 +160,49 @@ func (t *tunBroker) String() string {
return "tunnel"
}
func (t *tunBatchSubscriber) run() {
for {
// accept a new connection
c, err := t.listener.Accept()
if err != nil {
select {
case <-t.closed:
return
default:
continue
}
}
// receive message
m := new(transport.Message)
if err := c.Recv(m); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(t.opts.Context, err.Error())
}
if err = c.Close(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error(t.opts.Context, err.Error())
}
}
continue
}
// close the connection
c.Close()
evts := broker.Events{&tunEvent{
topic: t.topic,
message: &broker.Message{
Header: m.Header,
Body: m.Body,
},
}}
// handle the message
go t.handler(evts)
}
}
func (t *tunSubscriber) run() {
for {
// accept a new connection
@@ -142,6 +244,24 @@ func (t *tunSubscriber) run() {
}
}
func (t *tunBatchSubscriber) Options() broker.SubscribeOptions {
return t.opts
}
func (t *tunBatchSubscriber) Topic() string {
return t.topic
}
func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error {
select {
case <-t.closed:
return nil
default:
close(t.closed)
return t.listener.Close()
}
}
func (t *tunSubscriber) Options() broker.SubscribeOptions {
return t.opts
}
@@ -173,7 +293,11 @@ func (t *tunEvent) Ack() error {
}
func (t *tunEvent) Error() error {
return nil
return t.err
}
func (t *tunEvent) SetError(err error) {
t.err = err
}
// NewBroker returns new tunnel broker

View File

@@ -15,8 +15,8 @@ import (
var DefaultServer Server = NewServer()
var (
// DefaultAddress will be used if no address passed
DefaultAddress = ":0"
// DefaultAddress will be used if no address passed, use secure localhost
DefaultAddress = "127.0.0.1:0"
// DefaultName will be used if no name passed
DefaultName = "server"
// DefaultVersion will be used if no version passed

View File

@@ -1,15 +0,0 @@
package fn
type Initer interface {
Init(opts ...interface{}) error
}
func Init(ifaces ...Initer) error {
var err error
for _, iface := range ifaces {
if err = iface.Init(); err != nil {
return err
}
}
return nil
}

View File

@@ -7,7 +7,6 @@ import (
"reflect"
"regexp"
"strings"
"time"
)
// ErrInvalidParam specifies invalid url query params
@@ -15,11 +14,12 @@ var ErrInvalidParam = errors.New("invalid url query param provided")
var bracketSplitter = regexp.MustCompile(`\[|\]`)
var timeKind = reflect.TypeOf(time.Time{}).Kind()
//var timeKind = reflect.ValueOf(time.Time{}).Kind()
type StructField struct {
Field reflect.StructField
Value reflect.Value
Path string
}
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
@@ -35,7 +35,7 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
if len(fld.PkgPath) != 0 {
continue
}
@@ -66,6 +66,17 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
return nil, ErrNotFound
}
func StructFieldByPath(src interface{}, path string) (interface{}, error) {
var err error
for _, p := range strings.Split(path, ".") {
src, err = StructFieldByName(src, p)
if err != nil {
return nil, err
}
}
return src, err
}
func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
@@ -79,7 +90,7 @@ func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
if len(fld.PkgPath) != 0 {
continue
}
if fld.Name == tkey {
@@ -105,6 +116,19 @@ func StructFieldByName(src interface{}, tkey string) (interface{}, error) {
return nil, ErrNotFound
}
// StructFieldsMap returns map[string]interface{} or error
func StructFieldsMap(src interface{}) (map[string]interface{}, error) {
fields, err := StructFields(src)
if err != nil {
return nil, err
}
mp := make(map[string]interface{}, len(fields))
for _, field := range fields {
mp[field.Path] = field.Value.Interface()
}
return mp, nil
}
// StructFields returns slice of struct fields
func StructFields(src interface{}) ([]StructField, error) {
var fields []StructField
@@ -116,25 +140,29 @@ func StructFields(src interface{}) ([]StructField, error) {
if sv.Kind() != reflect.Struct {
return nil, ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 {
if !val.IsValid() || len(fld.PkgPath) != 0 {
continue
}
switch val.Kind() {
case timeKind:
fields = append(fields, StructField{Field: fld, Value: val})
//case timeKind:
//fmt.Printf("GGG\n")
//fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
case reflect.Struct:
infields, err := StructFields(val.Interface())
if err != nil {
return nil, err
}
fields = append(fields, infields...)
for _, infield := range infields {
infield.Path = fmt.Sprintf("%s.%s", fld.Name, infield.Path)
fields = append(fields, infield)
}
default:
fields = append(fields, StructField{Field: fld, Value: val})
fields = append(fields, StructField{Field: fld, Value: val, Path: fld.Name})
}
}
@@ -194,7 +222,7 @@ func StructURLValues(src interface{}, pref string, tags []string) (url.Values, e
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if !val.CanSet() || len(fld.PkgPath) != 0 || !val.IsValid() {
if len(fld.PkgPath) != 0 || !val.IsValid() {
continue
}

View File

@@ -2,9 +2,80 @@ package reflect
import (
"net/url"
"reflect"
rfl "reflect"
"testing"
)
func TestStructFieldsMap(t *testing.T) {
type NestedStr struct {
BBB string
CCC int
}
type Str struct {
Name []string `json:"name" codec:"flatten"`
XXX string `json:"xxx"`
Nested NestedStr
}
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
fields, err := StructFieldsMap(val)
if err != nil {
t.Fatal(err)
}
if v, ok := fields["Nested.BBB"]; !ok || v != "ddd" {
t.Fatalf("invalid field from %v", fields)
}
}
func TestStructFields(t *testing.T) {
type NestedStr struct {
BBB string
CCC int
}
type Str struct {
Name []string `json:"name" codec:"flatten"`
XXX string `json:"xxx"`
Nested NestedStr
}
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
fields, err := StructFields(val)
if err != nil {
t.Fatal(err)
}
var ok bool
for _, field := range fields {
if field.Path == "Nested.CCC" {
ok = true
}
}
if !ok {
t.Fatalf("struct fields returns invalid path: %v", fields)
}
}
func TestStructByPath(t *testing.T) {
type NestedStr struct {
BBB string
CCC int
}
type Str struct {
Name []string `json:"name" codec:"flatten"`
XXX string `json:"xxx"`
Nested NestedStr
}
val := &Str{Name: []string{"first", "second"}, XXX: "ttt", Nested: NestedStr{BBB: "ddd", CCC: 9}}
field, err := StructFieldByPath(val, "Nested.CCC")
if err != nil {
t.Fatal(err)
}
if rfl.Indirect(reflect.ValueOf(field)).Int() != 9 {
t.Fatalf("invalid elem returned: %v", field)
}
}
func TestStructByTag(t *testing.T) {
type Str struct {
Name []string `json:"name" codec:"flatten"`