From c9066e04553cd3acd0ce60dfc64657d1591c3be8 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 29 Jan 2025 01:47:58 +0300 Subject: [PATCH] intermediate Signed-off-by: Vasiliy Tolstov --- broker/broker.go | 95 ++----- broker/context.go | 10 - broker/context_test.go | 11 - broker/memory/memory.go | 284 ++++++-------------- broker/memory/memory_test.go | 60 +---- broker/noop.go | 108 +++++--- broker/noop_test.go | 4 +- broker/options.go | 96 ++----- {server => broker}/subscriber.go | 2 + changes | 154 +++++++++++ client/client.go | 30 +-- client/context.go | 10 - client/context_test.go | 11 - client/noop.go | 142 +--------- client/noop_test.go | 35 --- client/options.go | 98 ------- codec/frame.proto | 2 +- codec/noop.go | 2 +- event.go | 27 -- go.mod | 14 +- go.sum | 67 ++--- metadata/metadata.go | 11 +- metadata/metadata_grpc.go | 282 ------------------- network/network.go | 55 ---- network/options.go | 135 ---------- network/transport/context.go | 34 --- network/transport/memory.go | 258 ------------------ network/transport/memory_test.go | 100 ------- network/transport/options.go | 175 ------------ network/transport/transport.go | 63 ----- network/tunnel/broker/broker.go | 372 -------------------------- network/tunnel/options.go | 192 ------------- network/tunnel/transport/listener.go | 30 --- network/tunnel/transport/transport.go | 113 -------- network/tunnel/tunnel.go | 106 -------- proxy/options.go | 98 ------- proxy/proxy.go | 21 -- server/noop.go | 369 +------------------------ server/noop_test.go | 124 --------- server/server.go | 58 ++-- server/wrapper.go | 8 - service.go | 5 - service_test.go | 36 --- util/socket/pool.go | 65 ----- util/socket/socket.go | 118 -------- 45 files changed, 421 insertions(+), 3669 deletions(-) rename {server => broker}/subscriber.go (99%) create mode 100644 changes delete mode 100644 client/noop_test.go delete mode 100644 event.go delete mode 100644 metadata/metadata_grpc.go delete mode 100644 network/network.go delete mode 100644 network/options.go delete mode 100644 network/transport/context.go delete mode 100644 network/transport/memory.go delete mode 100644 network/transport/memory_test.go delete mode 100644 network/transport/options.go delete mode 100644 network/transport/transport.go delete mode 100644 network/tunnel/broker/broker.go delete mode 100644 network/tunnel/options.go delete mode 100644 network/tunnel/transport/listener.go delete mode 100644 network/tunnel/transport/transport.go delete mode 100644 network/tunnel/tunnel.go delete mode 100644 proxy/options.go delete mode 100644 proxy/proxy.go delete mode 100644 server/noop_test.go delete mode 100644 util/socket/pool.go delete mode 100644 util/socket/socket.go diff --git a/broker/broker.go b/broker/broker.go index 0dfea1fb..369a5352 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -18,6 +18,8 @@ var ( ErrNotConnected = errors.New("broker not connected") // ErrDisconnected returns when broker disconnected ErrDisconnected = errors.New("broker disconnected") + // ErrInvalidMessage returns when invalid Message passed + ErrInvalidMessage = errors.New("invalid message") // DefaultGracefulTimeout DefaultGracefulTimeout = 5 * time.Second ) @@ -36,14 +38,12 @@ type Broker interface { Connect(ctx context.Context) error // Disconnect disconnect from broker Disconnect(ctx context.Context) error + // NewMessage create new broker message to publish. + NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) // Publish message to broker topic - Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error + Publish(ctx context.Context, topic string, messages ...Message) error // Subscribe subscribes to topic message via handler - Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) - // BatchPublish messages to broker with multiple topics - BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error - // BatchSubscribe subscribes to topic messages via handler - BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) + Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) // String type of broker String() string // Live returns broker liveness @@ -55,72 +55,27 @@ type Broker interface { } type ( - FuncPublish func(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error - HookPublish func(next FuncPublish) FuncPublish - FuncBatchPublish func(ctx context.Context, msgs []*Message, opts ...PublishOption) error - HookBatchPublish func(next FuncBatchPublish) FuncBatchPublish - FuncSubscribe func(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) - HookSubscribe func(next FuncSubscribe) FuncSubscribe - FuncBatchSubscribe func(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) - HookBatchSubscribe func(next FuncBatchSubscribe) FuncBatchSubscribe + FuncPublish func(ctx context.Context, topic string, messages ...Message) error + HookPublish func(next FuncPublish) FuncPublish + FuncSubscribe func(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) + HookSubscribe func(next FuncSubscribe) FuncSubscribe ) -// Handler is used to process messages via a subscription of a topic. -type Handler func(Event) error - -// Events contains multiple events -type Events []Event - -// Ack try to ack all events and return -func (evs Events) Ack() error { - var err error - for _, ev := range evs { - if err = ev.Ack(); err != nil { - return err - } - } - return nil -} - -// SetError sets error on event -func (evs Events) SetError(err error) { - for _, ev := range evs { - ev.SetError(err) - } -} - -// BatchHandler is used to process messages in batches via a subscription of a topic. -type BatchHandler func(Events) error - -// Event is given to a subscription handler for processing -type Event interface { - // Context return context.Context for event +// Message is given to a subscription handler for processing +type Message interface { + // Context for the message. Context() context.Context - // Topic returns event topic + // Topic returns message destination topic. Topic() string - // Message returns broker message - Message() *Message - // Ack acknowledge message + // Header returns message headers. + Header() metadata.Metadata + // Body returns broker message []byte slice + Body() []byte + // Unmarshal try to decode message body to dst. + // This is helper method that uses codec.Unmarshal. + Unmarshal(dst interface{}, opts ...codec.Option) error + // Ack acknowledge message if supported. Ack() error - // Error returns message error (like decoding errors or some other) - Error() error - // SetError set event processing error - SetError(err error) -} - -// Message is used to transfer data -type Message struct { - // Header contains message metadata - Header metadata.Metadata - // Body contains message body - Body codec.RawMessage -} - -// NewMessage create broker message with topic filled -func NewMessage(topic string) *Message { - m := &Message{Header: metadata.New(2)} - m.Header.Set(metadata.HeaderTopic, topic) - return m } // Subscriber is a convenience return type for the Subscribe method @@ -132,3 +87,9 @@ type Subscriber interface { // Unsubscribe from topic Unsubscribe(ctx context.Context) error } + +// MessageHandler func signature for single message processing +type MessageHandler func(Message) error + +// MessagesHandler func signature for batch message processing +type MessagesHandler func([]Message) error diff --git a/broker/context.go b/broker/context.go index 71d3e90a..6115511c 100644 --- a/broker/context.go +++ b/broker/context.go @@ -51,13 +51,3 @@ func SetOption(k, v interface{}) Option { o.Context = context.WithValue(o.Context, k, v) } } - -// SetPublishOption returns a function to setup a context with given value -func SetPublishOption(k, v interface{}) PublishOption { - return func(o *PublishOptions) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} diff --git a/broker/context_test.go b/broker/context_test.go index cd4feb5e..3860e93b 100644 --- a/broker/context_test.go +++ b/broker/context_test.go @@ -49,17 +49,6 @@ func TestSetSubscribeOption(t *testing.T) { } } -func TestSetPublishOption(t *testing.T) { - type key struct{} - o := SetPublishOption(key{}, "test") - opts := &PublishOptions{} - o(opts) - - if v, ok := opts.Context.Value(key{}).(string); !ok || v == "" { - t.Fatal("SetPublishOption not works") - } -} - func TestSetOption(t *testing.T) { type key struct{} o := SetOption(key{}, "test") diff --git a/broker/memory/memory.go b/broker/memory/memory.go index 3030729d..1ca50e87 100644 --- a/broker/memory/memory.go +++ b/broker/memory/memory.go @@ -5,6 +5,7 @@ import ( "sync" "go.unistack.org/micro/v4/broker" + "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/options" @@ -14,44 +15,43 @@ import ( "go.unistack.org/micro/v4/util/rand" ) -type memoryBroker struct { - funcPublish broker.FuncPublish - funcBatchPublish broker.FuncBatchPublish - funcSubscribe broker.FuncSubscribe - funcBatchSubscribe broker.FuncBatchSubscribe - subscribers map[string][]*memorySubscriber - addr string - opts broker.Options +type Broker struct { + funcPublish broker.FuncPublish + funcSubscribe broker.FuncSubscribe + subscribers map[string][]*Subscriber + addr string + opts broker.Options sync.RWMutex connected bool } -type memoryEvent struct { - err error - message interface{} +type memoryMessage struct { + c codec.Codec + topic string + ctx context.Context + body []byte + hdr metadata.Metadata + opts broker.PublishOptions +} + +type Subscriber struct { + ctx context.Context + exit chan bool + handler interface{} + id string topic string - opts broker.Options + opts broker.SubscribeOptions } -type memorySubscriber struct { - ctx context.Context - exit chan bool - handler broker.Handler - batchhandler broker.BatchHandler - id string - topic string - opts broker.SubscribeOptions -} - -func (m *memoryBroker) Options() broker.Options { +func (m *Broker) Options() broker.Options { return m.opts } -func (m *memoryBroker) Address() string { +func (m *Broker) Address() string { return m.addr } -func (m *memoryBroker) Connect(ctx context.Context) error { +func (m *Broker) Connect(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() @@ -81,7 +81,7 @@ func (m *memoryBroker) Connect(ctx context.Context) error { return nil } -func (m *memoryBroker) Disconnect(ctx context.Context) error { +func (m *Broker) Disconnect(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() @@ -99,50 +99,35 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error { return nil } -func (m *memoryBroker) Init(opts ...broker.Option) error { +func (m *Broker) Init(opts ...broker.Option) error { for _, o := range opts { o(&m.opts) } m.funcPublish = m.fnPublish - m.funcBatchPublish = m.fnBatchPublish m.funcSubscribe = m.fnSubscribe - m.funcBatchSubscribe = m.fnBatchSubscribe m.opts.Hooks.EachPrev(func(hook options.Hook) { switch h := hook.(type) { case broker.HookPublish: m.funcPublish = h(m.funcPublish) - case broker.HookBatchPublish: - m.funcBatchPublish = h(m.funcBatchPublish) case broker.HookSubscribe: m.funcSubscribe = h(m.funcSubscribe) - case broker.HookBatchSubscribe: - m.funcBatchSubscribe = h(m.funcBatchSubscribe) } }) return nil } -func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { - return m.funcPublish(ctx, topic, msg, opts...) +func (m *Broker) Publish(ctx context.Context, topic string, messages ...broker.Message) error { + return m.funcPublish(ctx, topic, messages...) } -func (m *memoryBroker) fnPublish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { - msg.Header.Set(metadata.HeaderTopic, topic) - return m.publish(ctx, []*broker.Message{msg}, opts...) +func (m *Broker) fnPublish(ctx context.Context, topic string, messages ...broker.Message) error { + return m.publish(ctx, topic, messages...) } -func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { - return m.funcBatchPublish(ctx, msgs, opts...) -} - -func (m *memoryBroker) fnBatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { - return m.publish(ctx, msgs, opts...) -} - -func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { +func (m *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error { m.RLock() if !m.connected { m.RUnlock() @@ -150,79 +135,41 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts } m.RUnlock() - var err error - select { case <-ctx.Done(): return ctx.Err() default: - options := broker.NewPublishOptions(opts...) + } - msgTopicMap := make(map[string]broker.Events) - for _, v := range msgs { - p := &memoryEvent{opts: m.opts} + m.RLock() + subs, ok := m.subscribers[topic] + m.RUnlock() + if !ok { + return nil + } - if m.opts.Codec == nil || options.BodyOnly { - p.topic, _ = v.Header.Get(metadata.HeaderTopic) - p.message = v.Body - } else { - p.topic, _ = v.Header.Get(metadata.HeaderTopic) - p.message, err = m.opts.Codec.Marshal(v) + var err error + + for _, sub := range subs { + switch s := sub.handler.(type) { + case broker.MessageHandler: + for _, message := range messages { + if err = s(message); err == nil && sub.opts.AutoAck { + err = message.Ack() + } if err != nil { - return err - } - } - msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p) - } - - beh := m.opts.BatchErrorHandler - eh := m.opts.ErrorHandler - - for t, ms := range msgTopicMap { - m.RLock() - subs, ok := m.subscribers[t] - m.RUnlock() - if !ok { - continue - } - - for _, sub := range subs { - if sub.opts.BatchErrorHandler != nil { - beh = sub.opts.BatchErrorHandler - } - if sub.opts.ErrorHandler != nil { - eh = sub.opts.ErrorHandler - } - - switch { - // batch processing - case sub.batchhandler != nil: - if err = sub.batchhandler(ms); err != nil { - ms.SetError(err) - if beh != nil { - _ = beh(ms) - } else if m.opts.Logger.V(logger.ErrorLevel) { - m.opts.Logger.Error(m.opts.Context, err.Error()) - } - } else if sub.opts.AutoAck { - if err = ms.Ack(); err != nil { - m.opts.Logger.Error(m.opts.Context, "broker ack error", err) - } + if m.opts.Logger.V(logger.ErrorLevel) { + m.opts.Logger.Error(m.opts.Context, "broker handler error", err) } - // single processing - case sub.handler != nil: - for _, p := range ms { - if err = sub.handler(p); err != nil { - p.SetError(err) - if eh != nil { - _ = eh(p) - } else if m.opts.Logger.V(logger.ErrorLevel) { - m.opts.Logger.Error(m.opts.Context, "broker handler error", err) - } - } else if sub.opts.AutoAck { - if err = p.Ack(); err != nil { - m.opts.Logger.Error(m.opts.Context, "broker ack error", err) - } + } + } + case broker.MessagesHandler: + if err = s(messages); err == nil && sub.opts.AutoAck { + for _, message := range messages { + err = message.Ack() + if err != nil { + if m.opts.Logger.V(logger.ErrorLevel) { + m.opts.Logger.Error(m.opts.Context, "broker handler error", err) } } } @@ -233,60 +180,11 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*broker.Message, opts return nil } -func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - return m.funcBatchSubscribe(ctx, topic, handler, opts...) -} - -func (m *memoryBroker) fnBatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - m.RLock() - if !m.connected { - m.RUnlock() - return nil, broker.ErrNotConnected - } - m.RUnlock() - - sid, err := id.New() - if err != nil { - return nil, err - } - - options := broker.NewSubscribeOptions(opts...) - - sub := &memorySubscriber{ - exit: make(chan bool, 1), - id: sid, - topic: topic, - batchhandler: handler, - opts: options, - ctx: ctx, - } - - m.Lock() - m.subscribers[topic] = append(m.subscribers[topic], sub) - m.Unlock() - - go func() { - <-sub.exit - m.Lock() - newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1) - for _, sb := range m.subscribers[topic] { - if sb.id == sub.id { - continue - } - newSubscribers = append(newSubscribers, sb) - } - m.subscribers[topic] = newSubscribers - m.Unlock() - }() - - return sub, nil -} - -func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (m *Broker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) { return m.funcSubscribe(ctx, topic, handler, opts...) } -func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (m *Broker) fnSubscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() @@ -301,7 +199,7 @@ func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler br options := broker.NewSubscribeOptions(opts...) - sub := &memorySubscriber{ + sub := &Subscriber{ exit: make(chan bool, 1), id: sid, topic: topic, @@ -317,7 +215,7 @@ func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler br go func() { <-sub.exit m.Lock() - newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1) + newSubscribers := make([]*Subscriber, 0, len(m.subscribers)-1) for _, sb := range m.subscribers[topic] { if sb.id == sub.id { continue @@ -331,81 +229,59 @@ func (m *memoryBroker) fnSubscribe(ctx context.Context, topic string, handler br return sub, nil } -func (m *memoryBroker) String() string { +func (m *Broker) String() string { return "memory" } -func (m *memoryBroker) Name() string { +func (m *Broker) Name() string { return m.opts.Name } -func (m *memoryBroker) Live() bool { +func (m *Broker) Live() bool { return true } -func (m *memoryBroker) Ready() bool { +func (m *Broker) Ready() bool { return true } -func (m *memoryBroker) Health() bool { +func (m *Broker) Health() bool { return true } -func (m *memoryEvent) Topic() string { +func (m *memoryMessage) Topic() string { return m.topic } -func (m *memoryEvent) Message() *broker.Message { - switch v := m.message.(type) { - case *broker.Message: - return v - case []byte: - msg := &broker.Message{} - if err := m.opts.Codec.Unmarshal(v, msg); err != nil { - if m.opts.Logger.V(logger.ErrorLevel) { - m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err) - } - return nil - } - return msg - } +func (m *memoryMessage) Body() []byte { + return m.body +} +func (m *memoryMessage) Ack() error { return nil } -func (m *memoryEvent) Ack() error { - return nil +func (m *memoryMessage) Context() context.Context { + return m.ctx } -func (m *memoryEvent) Error() error { - return m.err -} - -func (m *memoryEvent) SetError(err error) { - m.err = err -} - -func (m *memoryEvent) Context() context.Context { - return m.opts.Context -} - -func (m *memorySubscriber) Options() broker.SubscribeOptions { +func (m *Subscriber) Options() broker.SubscribeOptions { return m.opts } -func (m *memorySubscriber) Topic() string { +func (m *Subscriber) Topic() string { return m.topic } -func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { +func (m *Subscriber) Unsubscribe(ctx context.Context) error { m.exit <- true return nil } // NewBroker return new memory broker func NewBroker(opts ...broker.Option) broker.Broker { - return &memoryBroker{ + return &Broker{ opts: broker.NewOptions(opts...), - subscribers: make(map[string][]*memorySubscriber), + subscribers: make(map[string][]*Subscriber), } } diff --git a/broker/memory/memory_test.go b/broker/memory/memory_test.go index 737d1b98..936c3d7d 100644 --- a/broker/memory/memory_test.go +++ b/broker/memory/memory_test.go @@ -9,56 +9,6 @@ import ( "go.unistack.org/micro/v4/metadata" ) -func TestMemoryBatchBroker(t *testing.T) { - b := NewBroker() - ctx := context.Background() - - if err := b.Init(); err != nil { - t.Fatalf("Unexpected init error %v", err) - } - - if err := b.Connect(ctx); err != nil { - t.Fatalf("Unexpected connect error %v", err) - } - - topic := "test" - count := 10 - - fn := func(evts broker.Events) error { - return evts.Ack() - } - - sub, err := b.BatchSubscribe(ctx, topic, fn) - if err != nil { - t.Fatalf("Unexpected error subscribing %v", err) - } - - msgs := make([]*broker.Message, 0, count) - for i := 0; i < count; i++ { - message := &broker.Message{ - Header: map[string]string{ - metadata.HeaderTopic: topic, - "foo": "bar", - "id": fmt.Sprintf("%d", i), - }, - Body: []byte(`"hello world"`), - } - msgs = append(msgs, message) - } - - if err := b.BatchPublish(ctx, msgs); err != nil { - t.Fatalf("Unexpected error publishing %v", err) - } - - if err := sub.Unsubscribe(ctx); err != nil { - t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err) - } - - if err := b.Disconnect(ctx); err != nil { - t.Fatalf("Unexpected connect error %v", err) - } -} - func TestMemoryBroker(t *testing.T) { b := NewBroker() ctx := context.Background() @@ -74,7 +24,7 @@ func TestMemoryBroker(t *testing.T) { topic := "test" count := 10 - fn := func(_ broker.Event) error { + fn := func(_ broker.Message) error { return nil } @@ -85,13 +35,13 @@ func TestMemoryBroker(t *testing.T) { msgs := make([]*broker.Message, 0, count) for i := 0; i < count; i++ { - message := &broker.Message{ + message, err := b.NewMessage(ctx, metadata.Pairs() Header: map[string]string{ metadata.HeaderTopic: topic, "foo": "bar", "id": fmt.Sprintf("%d", i), }, - Body: []byte(`"hello world"`), + []byte(`"hello world"`), } msgs = append(msgs, message) @@ -100,10 +50,6 @@ func TestMemoryBroker(t *testing.T) { } } - if err := b.BatchPublish(ctx, msgs); err != nil { - t.Fatalf("Unexpected error publishing %v", err) - } - if err := sub.Unsubscribe(ctx); err != nil { t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err) } diff --git a/broker/noop.go b/broker/noop.go index 625ababc..c3e84e18 100644 --- a/broker/noop.go +++ b/broker/noop.go @@ -3,24 +3,37 @@ package broker import ( "context" "strings" + "sync" + "go.unistack.org/micro/v4/codec" + "go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/options" ) type NoopBroker struct { - funcPublish FuncPublish - funcBatchPublish FuncBatchPublish - funcSubscribe FuncSubscribe - funcBatchSubscribe FuncBatchSubscribe - opts Options + funcPublish FuncPublish + funcSubscribe FuncSubscribe + opts Options + sync.RWMutex +} + +func (b *NoopBroker) newCodec(ct string) (codec.Codec, error) { + if idx := strings.IndexRune(ct, ';'); idx >= 0 { + ct = ct[:idx] + } + b.RLock() + c, ok := b.opts.Codecs[ct] + b.RUnlock() + if ok { + return c, nil + } + return nil, codec.ErrUnknownContentType } func NewBroker(opts ...Option) *NoopBroker { b := &NoopBroker{opts: NewOptions(opts...)} b.funcPublish = b.fnPublish - b.funcBatchPublish = b.fnBatchPublish b.funcSubscribe = b.fnSubscribe - b.funcBatchSubscribe = b.fnBatchSubscribe return b } @@ -55,20 +68,14 @@ func (b *NoopBroker) Init(opts ...Option) error { } b.funcPublish = b.fnPublish - b.funcBatchPublish = b.fnBatchPublish b.funcSubscribe = b.fnSubscribe - b.funcBatchSubscribe = b.fnBatchSubscribe b.opts.Hooks.EachPrev(func(hook options.Hook) { switch h := hook.(type) { case HookPublish: b.funcPublish = h(b.funcPublish) - case HookBatchPublish: - b.funcBatchPublish = h(b.funcBatchPublish) case HookSubscribe: b.funcSubscribe = h(b.funcSubscribe) - case HookBatchSubscribe: - b.funcBatchSubscribe = h(b.funcBatchSubscribe) } }) @@ -87,43 +94,76 @@ func (b *NoopBroker) Address() string { return strings.Join(b.opts.Addrs, ",") } -func (b *NoopBroker) fnBatchPublish(_ context.Context, _ []*Message, _ ...PublishOption) error { +type noopMessage struct { + c codec.Codec + ctx context.Context + body []byte + hdr metadata.Metadata + opts PublishOptions +} + +func (m *noopMessage) Ack() error { return nil } -func (b *NoopBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { - return b.funcBatchPublish(ctx, msgs, opts...) +func (m *noopMessage) Body() []byte { + return m.body } -func (b *NoopBroker) fnPublish(_ context.Context, _ string, _ *Message, _ ...PublishOption) error { +func (m *noopMessage) Header() metadata.Metadata { + return m.hdr +} + +func (m *noopMessage) Context() context.Context { + return m.ctx +} + +func (m *noopMessage) Error() error { return nil } -func (b *NoopBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { - return b.funcPublish(ctx, topic, msg, opts...) +func (m *noopMessage) Topic() string { + return "" +} + +func (m *noopMessage) Unmarshal(dst interface{}, opts ...codec.Option) error { + return m.c.Unmarshal(m.body, dst) +} + +func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) { + options := NewPublishOptions(opts...) + m := &noopMessage{ctx: ctx, hdr: hdr, opts: options} + c, err := b.newCodec(m.opts.ContentType) + if err == nil { + m.body, err = c.Marshal(body) + } + if err != nil { + return nil, err + } + + return m, nil +} + +func (b *NoopBroker) fnPublish(_ context.Context, _ string, _ ...Message) error { + return nil +} + +func (b *NoopBroker) Publish(ctx context.Context, topic string, msg ...Message) error { + return b.funcPublish(ctx, topic, msg...) } type NoopSubscriber struct { - ctx context.Context - topic string - handler Handler - batchHandler BatchHandler - opts SubscribeOptions + ctx context.Context + topic string + handler interface{} + opts SubscribeOptions } -func (b *NoopBroker) fnBatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { - return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), batchHandler: handler}, nil -} - -func (b *NoopBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { - return b.funcBatchSubscribe(ctx, topic, handler, opts...) -} - -func (b *NoopBroker) fnSubscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { +func (b *NoopBroker) fnSubscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) { return &NoopSubscriber{ctx: ctx, topic: topic, opts: NewSubscribeOptions(opts...), handler: handler}, nil } -func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { +func (b *NoopBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) { return b.funcSubscribe(ctx, topic, handler, opts...) } diff --git a/broker/noop_test.go b/broker/noop_test.go index 824b4b00..8350b118 100644 --- a/broker/noop_test.go +++ b/broker/noop_test.go @@ -10,9 +10,9 @@ type testHook struct { } func (t *testHook) Publish1(fn FuncPublish) FuncPublish { - return func(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { + return func(ctx context.Context, topic string, messages ...Message) error { t.f = true - return fn(ctx, topic, msg, opts...) + return fn(ctx, topic, messages...) } } diff --git a/broker/options.go b/broker/options.go index 717fb10a..a5894794 100644 --- a/broker/options.go +++ b/broker/options.go @@ -23,8 +23,8 @@ type Options struct { Tracer tracer.Tracer // Register can be used for clustering Register register.Register - // Codec holds the codec for marshal/unmarshal - Codec codec.Codec + // Codecs holds the codecs for marshal/unmarshal based on content-type + Codecs map[string]codec.Codec // Logger used for logging Logger logger.Logger // Meter used for metrics @@ -37,11 +37,6 @@ type Options struct { // TLSConfig holds tls.TLSConfig options TLSConfig *tls.Config - // ErrorHandler used when broker can't unmarshal incoming message - ErrorHandler Handler - // BatchErrorHandler used when broker can't unmashal incoming messages - BatchErrorHandler BatchHandler - // Addrs holds the broker address Addrs []string // Hooks can be run before broker Publish/BatchPublish and @@ -59,10 +54,11 @@ func NewOptions(opts ...Option) Options { Logger: logger.DefaultLogger, Context: context.Background(), Meter: meter.DefaultMeter, - Codec: codec.DefaultCodec, + Codecs: make(map[string]codec.Codec), Tracer: tracer.DefaultTracer, GracefulTimeout: DefaultGracefulTimeout, } + for _, o := range opts { o(&options) } @@ -78,17 +74,16 @@ func Context(ctx context.Context) Option { // PublishOptions struct type PublishOptions struct { - // Context holds external options - Context context.Context - // BodyOnly flag says the message contains raw body bytes + // ContentType for message body + ContentType string + // BodyOnly flag says the message contains raw body bytes and don't need + // codec Marshal method BodyOnly bool } // NewPublishOptions creates PublishOptions struct func NewPublishOptions(opts ...PublishOption) PublishOptions { - options := PublishOptions{ - Context: context.Background(), - } + options := PublishOptions{} for _, o := range opts { o(&options) } @@ -99,10 +94,6 @@ func NewPublishOptions(opts ...PublishOption) PublishOptions { type SubscribeOptions struct { // Context holds external options Context context.Context - // ErrorHandler used when broker can't unmarshal incoming message - ErrorHandler Handler - // BatchErrorHandler used when broker can't unmashal incoming messages - BatchErrorHandler BatchHandler // Group holds consumer group Group string // AutoAck flag specifies auto ack of incoming message when no error happens @@ -121,6 +112,13 @@ type Option func(*Options) // PublishOption func type PublishOption func(*PublishOptions) +// PublishContentType sets message content-type that used to Marshal +func PublishContentType(ct string) PublishOption { + return func(o *PublishOptions) { + o.ContentType = ct + } +} + // PublishBodyOnly publish only body of the message func PublishBodyOnly(b bool) PublishOption { return func(o *PublishOptions) { @@ -128,13 +126,6 @@ func PublishBodyOnly(b bool) PublishOption { } } -// PublishContext sets the context -func PublishContext(ctx context.Context) PublishOption { - return func(o *PublishOptions) { - o.Context = ctx - } -} - // Addrs sets the host addresses to be used by the broker func Addrs(addrs ...string) Option { return func(o *Options) { @@ -142,51 +133,10 @@ func Addrs(addrs ...string) Option { } } -// Codec sets the codec used for encoding/decoding used where -// a broker does not support headers -func Codec(c codec.Codec) Option { +// Codecs sets the codec used for encoding/decoding messages +func Codecs(ct string, c codec.Codec) Option { return func(o *Options) { - o.Codec = c - } -} - -// ErrorHandler will catch all broker errors that cant be handled -// in normal way, for example Codec errors -func ErrorHandler(h Handler) Option { - return func(o *Options) { - o.ErrorHandler = h - } -} - -// BatchErrorHandler will catch all broker errors that cant be handled -// in normal way, for example Codec errors -func BatchErrorHandler(h BatchHandler) Option { - return func(o *Options) { - o.BatchErrorHandler = h - } -} - -// SubscribeErrorHandler will catch all broker errors that cant be handled -// in normal way, for example Codec errors -func SubscribeErrorHandler(h Handler) SubscribeOption { - return func(o *SubscribeOptions) { - o.ErrorHandler = h - } -} - -// SubscribeBatchErrorHandler will catch all broker errors that cant be handled -// in normal way, for example Codec errors -func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption { - return func(o *SubscribeOptions) { - o.BatchErrorHandler = h - } -} - -// Queue sets the subscribers queue -// Deprecated -func Queue(name string) SubscribeOption { - return func(o *SubscribeOptions) { - o.Group = name + o.Codecs[ct] = c } } @@ -253,14 +203,6 @@ func SubscribeContext(ctx context.Context) SubscribeOption { } } -// DisableAutoAck disables auto ack -// Deprecated -func DisableAutoAck() SubscribeOption { - return func(o *SubscribeOptions) { - o.AutoAck = false - } -} - // SubscribeAutoAck contol auto acking of messages // after they have been handled. func SubscribeAutoAck(b bool) SubscribeOption { diff --git a/server/subscriber.go b/broker/subscriber.go similarity index 99% rename from server/subscriber.go rename to broker/subscriber.go index 81ee6030..3470a5cd 100644 --- a/server/subscriber.go +++ b/broker/subscriber.go @@ -1,3 +1,5 @@ +//go:build ignore + package server import ( diff --git a/changes b/changes new file mode 100644 index 00000000..4d952603 --- /dev/null +++ b/changes @@ -0,0 +1,154 @@ +broker/broker.go +broker/context.go +broker/context_test.go +broker/memory.go +broker/memory/memory.go +broker/memory_test.go +broker/noop.go +broker/noop_test.go +broker/options.go +broker/subscriber.go +client/backoff.go +client/backoff_test.go +client/client.go +client/client_call_options.go +client/client_call_options_test.go +client/context.go +client/context_test.go +client/noop.go +client/noop_test.go +client/options.go +codec/codec.go +codec/context.go +codec/frame.go +codec/frame.proto +codec/options.go +config/config.go +config/context.go +config/context_test.go +config/default.go +config/default_test.go +config/options.go +database/dsn.go +errors/errors.go +errors/errors.proto +errors/errors_test.go +flow/context.go +flow/context_test.go +flow/default.go +flow/flow.go +flow/flow_test.go +flow/options.go +meter/context.go +meter/context_test.go +meter/meter.go +meter/noop.go +meter/options.go +meter/wrapper/wrapper.go +micro_test.go +mtls/mtls.go +mtls/options.go +options.go +options/hooks.go +options/options.go +options/options_test.go +profiler/http/http.go +profiler/noop.go +profiler/pprof/pprof.go +profiler/profile.go +proxy/options.go +proxy/proxy.go +register/context.go +register/extractor.go +register/extractor_test.go +register/memory/memory.go +register/memory/memory_test.go +register/options.go +register/register.go +register/watcher.go +resolver/dns/dns.go +resolver/dnssrv/dnssrv.go +resolver/http/http.go +resolver/noop/noop.go +resolver/registry/registry.go +resolver/resolver.go +resolver/static/static.go +router/context.go +router/options.go +router/router.go +selector/random/random.go +selector/roundrobin/roundrobin.go +selector/selector.go +semconv/broker.go +semconv/cache.go +semconv/client.go +semconv/logger.go +semconv/metadata.go +semconv/pool.go +semconv/server.go +semconv/store.go +server/context.go +server/context_test.go +server/noop.go +server/noop_test.go +server/options.go +server/registry.go +server/server.go +server/wrapper.go +service.go +service_test.go +store/context.go +store/context_test.go +store/memory.go +store/memory/memory.go +store/memory_test.go +store/options.go +store/store.go +store/wrapper.go +sync/memory.go +sync/sync.go +tools.go +tracer/context.go +tracer/memory/memory.go +tracer/memory/memory_test.go +tracer/noop.go +tracer/options.go +tracer/tracer.go +tracer/tracer_test.go +tracer/wrapper/wrapper.go +util/addr/addr.go +util/backoff/backoff.go +util/buf/buf.go +util/buffer/buffer.go +util/buffer/buffer_test.go +util/ctx/ctx.go +util/dns/cache.go +util/dns/cache_test.go +util/dns/conn.go +util/grpc/tracer.go +util/http/clienttracer.go +util/http/http.go +util/http/trie.go +util/http/trie_test.go +util/io/redirect.go +util/io/redirect_test.go +util/io/redirect_unix.go +util/io/redirect_windows.go +util/net/net.go +util/pki/pki.go +util/rand/rand.go +util/reflect/path.go +util/reflect/reflect.go +util/reflect/reflect_test.go +util/reflect/struct.go +util/reflect/struct_test.go +util/register/util.go +util/ring/buffer.go +util/sort/sort.go +util/sort/sort_test.go +util/stream/stream.go +util/structfs/metadata_digitalocean.go +util/structfs/metadata_ec2.go +util/structfs/structfs.go +util/structfs/structfs_test.go +util/test/test.go diff --git a/client/client.go b/client/client.go index 4925fe93..30cb04b3 100644 --- a/client/client.go +++ b/client/client.go @@ -29,40 +29,24 @@ var ( ) // Client is the interface used to make requests to services. -// It supports Request/Response via Transport and Publishing via the Broker. // It also supports bidirectional streaming of requests. type Client interface { Name() string Init(opts ...Option) error Options() Options - NewMessage(topic string, msg interface{}, opts ...MessageOption) Message NewRequest(service string, endpoint string, req interface{}, opts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) - Publish(ctx context.Context, msg Message, opts ...PublishOption) error - BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error String() string } type ( - FuncCall func(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error - HookCall func(next FuncCall) FuncCall - FuncStream func(ctx context.Context, req Request, opts ...CallOption) (Stream, error) - HookStream func(next FuncStream) FuncStream - FuncPublish func(ctx context.Context, msg Message, opts ...PublishOption) error - HookPublish func(next FuncPublish) FuncPublish - FuncBatchPublish func(ctx context.Context, msg []Message, opts ...PublishOption) error - HookBatchPublish func(next FuncBatchPublish) FuncBatchPublish + FuncCall func(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error + HookCall func(next FuncCall) FuncCall + FuncStream func(ctx context.Context, req Request, opts ...CallOption) (Stream, error) + HookStream func(next FuncStream) FuncStream ) -// Message is the interface for publishing asynchronously -type Message interface { - Topic() string - Payload() interface{} - ContentType() string - Metadata() metadata.Metadata -} - // Request is the interface for a synchronous request used by Call or Stream type Request interface { // The service to call @@ -121,11 +105,5 @@ type Option func(*Options) // CallOption used by Call or Stream type CallOption func(*CallOptions) -// PublishOption used by Publish -type PublishOption func(*PublishOptions) - -// MessageOption used by NewMessage -type MessageOption func(*MessageOptions) - // RequestOption used by NewRequest type RequestOption func(*RequestOptions) diff --git a/client/context.go b/client/context.go index 539e61a1..b5e6d95d 100644 --- a/client/context.go +++ b/client/context.go @@ -32,16 +32,6 @@ func NewContext(ctx context.Context, c Client) context.Context { return context.WithValue(ctx, clientKey{}, c) } -// SetPublishOption returns a function to setup a context with given value -func SetPublishOption(k, v interface{}) PublishOption { - return func(o *PublishOptions) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} - // SetCallOption returns a function to setup a context with given value func SetCallOption(k, v interface{}) CallOption { return func(o *CallOptions) { diff --git a/client/context_test.go b/client/context_test.go index e562529e..ced09a78 100644 --- a/client/context_test.go +++ b/client/context_test.go @@ -39,17 +39,6 @@ func TestNewNilContext(t *testing.T) { } } -func TestSetPublishOption(t *testing.T) { - type key struct{} - o := SetPublishOption(key{}, "test") - opts := &PublishOptions{} - o(opts) - - if v, ok := opts.Context.Value(key{}).(string); !ok || v == "" { - t.Fatal("SetPublishOption not works") - } -} - func TestSetCallOption(t *testing.T) { type key struct{} o := SetCallOption(key{}, "test") diff --git a/client/noop.go b/client/noop.go index 4d1d446a..6623c70f 100644 --- a/client/noop.go +++ b/client/noop.go @@ -3,11 +3,9 @@ package client import ( "context" "fmt" - "os" "strconv" "time" - "go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v4/metadata" @@ -23,17 +21,9 @@ var DefaultCodecs = map[string]codec.Codec{ } type noopClient struct { - funcPublish FuncPublish - funcBatchPublish FuncBatchPublish - funcCall FuncCall - funcStream FuncStream - opts Options -} - -type noopMessage struct { - topic string - payload interface{} - opts MessageOptions + funcCall FuncCall + funcStream FuncStream + opts Options } type noopRequest struct { @@ -52,8 +42,6 @@ func NewClient(opts ...Option) Client { n.funcCall = n.fnCall n.funcStream = n.fnStream - n.funcPublish = n.fnPublish - n.funcBatchPublish = n.fnBatchPublish return n } @@ -158,32 +146,6 @@ func (n *noopStream) CloseSend() error { return n.err } -func (n *noopMessage) Topic() string { - return n.topic -} - -func (n *noopMessage) Payload() interface{} { - return n.payload -} - -func (n *noopMessage) ContentType() string { - return n.opts.ContentType -} - -func (n *noopMessage) Metadata() metadata.Metadata { - return n.opts.Metadata -} - -func (n *noopClient) newCodec(contentType string) (codec.Codec, error) { - if cf, ok := n.opts.Codecs[contentType]; ok { - return cf, nil - } - if cf, ok := DefaultCodecs[contentType]; ok { - return cf, nil - } - return nil, codec.ErrUnknownContentType -} - func (n *noopClient) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) @@ -191,8 +153,6 @@ func (n *noopClient) Init(opts ...Option) error { n.funcCall = n.fnCall n.funcStream = n.fnStream - n.funcPublish = n.fnPublish - n.funcBatchPublish = n.fnBatchPublish n.opts.Hooks.EachPrev(func(hook options.Hook) { switch h := hook.(type) { @@ -200,10 +160,6 @@ func (n *noopClient) Init(opts ...Option) error { n.funcCall = h(n.funcCall) case HookStream: n.funcStream = h(n.funcStream) - case HookPublish: - n.funcPublish = h(n.funcPublish) - case HookBatchPublish: - n.funcBatchPublish = h(n.funcBatchPublish) } }) @@ -376,11 +332,6 @@ func (n *noopClient) NewRequest(service, endpoint string, _ interface{}, _ ...Re return &noopRequest{service: service, endpoint: endpoint} } -func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message { - options := NewMessageOptions(append([]MessageOption{MessageContentType(n.opts.ContentType)}, opts...)...) - return &noopMessage{topic: topic, payload: msg, opts: options} -} - func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { ts := time.Now() n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() @@ -549,90 +500,3 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti func (n *noopClient) stream(ctx context.Context, _ string, _ Request, _ CallOptions) (Stream, error) { return &noopStream{ctx: ctx}, nil } - -func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error { - return n.funcBatchPublish(ctx, ps, opts...) -} - -func (n *noopClient) fnBatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error { - return n.publish(ctx, ps, opts...) -} - -func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error { - return n.funcPublish(ctx, p, opts...) -} - -func (n *noopClient) fnPublish(ctx context.Context, p Message, opts ...PublishOption) error { - return n.publish(ctx, []Message{p}, opts...) -} - -func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error { - options := NewPublishOptions(opts...) - - msgs := make([]*broker.Message, 0, len(ps)) - - // get proxy - exchange := "" - if v, ok := os.LookupEnv("MICRO_PROXY"); ok { - exchange = v - } - // get the exchange - if len(options.Exchange) > 0 { - exchange = options.Exchange - } - - omd, ok := metadata.FromOutgoingContext(ctx) - if !ok { - omd = metadata.New(0) - } - - for _, p := range ps { - md := metadata.Copy(omd) - topic := p.Topic() - if len(exchange) > 0 { - topic = exchange - } - md[metadata.HeaderTopic] = topic - iter := p.Metadata().Iterator() - var k, v string - for iter.Next(&k, &v) { - md.Set(k, v) - } - - md[metadata.HeaderContentType] = p.ContentType() - - var body []byte - - // passed in raw data - if d, ok := p.Payload().(*codec.Frame); ok { - body = d.Data - } else { - // use codec for payload - cf, err := n.newCodec(p.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", "%s", err) - } - - // set the body - b, err := cf.Marshal(p.Payload()) - if err != nil { - return errors.InternalServerError("go.micro.client", "%s", err) - } - body = b - } - - msgs = append(msgs, &broker.Message{Header: md, Body: body}) - } - - if len(msgs) == 1 { - return n.opts.Broker.Publish(ctx, msgs[0].Header[metadata.HeaderTopic], msgs[0], - broker.PublishContext(options.Context), - broker.PublishBodyOnly(options.BodyOnly), - ) - } - - return n.opts.Broker.BatchPublish(ctx, msgs, - broker.PublishContext(options.Context), - broker.PublishBodyOnly(options.BodyOnly), - ) -} diff --git a/client/noop_test.go b/client/noop_test.go deleted file mode 100644 index cc05204c..00000000 --- a/client/noop_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package client - -import ( - "context" - "testing" -) - -type testHook struct { - f bool -} - -func (t *testHook) Publish(fn FuncPublish) FuncPublish { - return func(ctx context.Context, msg Message, opts ...PublishOption) error { - t.f = true - return fn(ctx, msg, opts...) - } -} - -func TestNoopHook(t *testing.T) { - h := &testHook{} - - c := NewClient(Hooks(HookPublish(h.Publish))) - - if err := c.Init(); err != nil { - t.Fatal(err) - } - - if err := c.Publish(context.TODO(), c.NewMessage("", nil, MessageContentType("application/octet-stream"))); err != nil { - t.Fatal(err) - } - - if !h.f { - t.Fatal("hook not works") - } -} diff --git a/client/options.go b/client/options.go index fba8bd83..da6f1fc3 100644 --- a/client/options.go +++ b/client/options.go @@ -137,43 +137,6 @@ func Context(ctx context.Context) Option { } } -// NewPublishOptions create new PublishOptions struct from option -func NewPublishOptions(opts ...PublishOption) PublishOptions { - options := PublishOptions{} - for _, o := range opts { - o(&options) - } - return options -} - -// PublishOptions holds publish options -type PublishOptions struct { - // Context used for external options - Context context.Context - // Exchange topic exchange name - Exchange string - // BodyOnly will publish only message body - BodyOnly bool -} - -// NewMessageOptions creates message options struct -func NewMessageOptions(opts ...MessageOption) MessageOptions { - options := MessageOptions{Metadata: metadata.New(1)} - for _, o := range opts { - o(&options) - } - return options -} - -// MessageOptions holds client message options -type MessageOptions struct { - // Metadata additional metadata - Metadata metadata.Metadata - // ContentType specify content-type of message - // deprecated - ContentType string -} - // NewRequestOptions creates new RequestOptions struct func NewRequestOptions(opts ...RequestOption) RequestOptions { options := RequestOptions{} @@ -374,43 +337,6 @@ func DialTimeout(d time.Duration) Option { } } -// WithExchange sets the exchange to route a message through -// Deprecated -func WithExchange(e string) PublishOption { - return func(o *PublishOptions) { - o.Exchange = e - } -} - -// PublishExchange sets the exchange to route a message through -func PublishExchange(e string) PublishOption { - return func(o *PublishOptions) { - o.Exchange = e - } -} - -// WithBodyOnly publish only message body -// DERECATED -func WithBodyOnly(b bool) PublishOption { - return func(o *PublishOptions) { - o.BodyOnly = b - } -} - -// PublishBodyOnly publish only message body -func PublishBodyOnly(b bool) PublishOption { - return func(o *PublishOptions) { - o.BodyOnly = b - } -} - -// PublishContext sets the context in publish options -func PublishContext(ctx context.Context) PublishOption { - return func(o *PublishOptions) { - o.Context = ctx - } -} - // WithContextDialer pass ContextDialer to client call func WithContextDialer(fn func(context.Context, string) (net.Conn, error)) CallOption { return func(o *CallOptions) { @@ -522,30 +448,6 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption { } } -// WithMessageContentType sets the message content type -// Deprecated -func WithMessageContentType(ct string) MessageOption { - return func(o *MessageOptions) { - o.Metadata.Set(metadata.HeaderContentType, ct) - o.ContentType = ct - } -} - -// MessageContentType sets the message content type -func MessageContentType(ct string) MessageOption { - return func(o *MessageOptions) { - o.Metadata.Set(metadata.HeaderContentType, ct) - o.ContentType = ct - } -} - -// MessageMetadata sets the message metadata -func MessageMetadata(k, v string) MessageOption { - return func(o *MessageOptions) { - o.Metadata.Set(k, v) - } -} - // StreamingRequest specifies that request is streaming func StreamingRequest(b bool) RequestOption { return func(o *RequestOptions) { diff --git a/codec/frame.proto b/codec/frame.proto index 69dc556a..8a2c2349 100644 --- a/codec/frame.proto +++ b/codec/frame.proto @@ -17,7 +17,7 @@ syntax = "proto3"; package micro.codec; option cc_enable_arenas = true; -option go_package = "go.unistack.org/micro/v3/codec;codec"; +option go_package = "go.unistack.org/micro/v4/codec;codec"; option java_multiple_files = true; option java_outer_classname = "MicroCodec"; option java_package = "micro.codec"; diff --git a/codec/noop.go b/codec/noop.go index 76d21d93..3565bc8f 100644 --- a/codec/noop.go +++ b/codec/noop.go @@ -3,7 +3,7 @@ package codec import ( "encoding/json" - codecpb "go.unistack.org/micro-proto/v3/codec" + codecpb "go.unistack.org/micro-proto/v4/codec" ) type noopCodec struct { diff --git a/event.go b/event.go deleted file mode 100644 index 1e4116d6..00000000 --- a/event.go +++ /dev/null @@ -1,27 +0,0 @@ -package micro - -import ( - "context" - - "go.unistack.org/micro/v4/client" -) - -// Event is used to publish messages to a topic -type Event interface { - // Publish publishes a message to the event topic - Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error -} - -type event struct { - c client.Client - topic string -} - -// NewEvent creates a new event publisher -func NewEvent(topic string, c client.Client) Event { - return &event{c, topic} -} - -func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error { - return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...) -} diff --git a/go.mod b/go.mod index 358b50f5..6349d0ee 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 go.uber.org/automaxprocs v1.6.0 - go.unistack.org/micro-proto/v3 v3.4.1 + go.unistack.org/micro-proto/v4 v4.1.0 golang.org/x/sync v0.10.0 google.golang.org/grpc v1.69.4 google.golang.org/protobuf v1.36.3 @@ -20,22 +20,12 @@ require ( ) require ( - github.com/cilium/ebpf v0.16.0 // indirect - github.com/containerd/cgroups/v3 v3.0.4 // indirect - github.com/containerd/log v0.1.0 // indirect - github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/docker/go-units v0.5.0 // indirect - github.com/godbus/dbus/v5 v5.1.0 // indirect - github.com/moby/sys/userns v0.1.0 // indirect - github.com/opencontainers/runtime-spec v1.2.0 // indirect + github.com/kr/pretty v0.3.1 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect - github.com/sirupsen/logrus v1.9.3 // indirect github.com/stretchr/testify v1.10.0 // indirect - go.uber.org/goleak v1.3.0 // indirect - golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/sys v0.29.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect diff --git a/go.sum b/go.sum index 72be286c..70758602 100644 --- a/go.sum +++ b/go.sum @@ -2,39 +2,19 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= -github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8= -github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY= +github.com/KimMachineGun/automemlimit v0.7.0 h1:7G06p/dMSf7G8E6oq+f2uOPuVncFyIlDI/pBWK49u88= +github.com/KimMachineGun/automemlimit v0.7.0/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM= github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= -github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok= -github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE= -github.com/containerd/cgroups/v3 v3.0.4 h1:2fs7l3P0Qxb1nKWuJNFiwhp2CqiKzho71DQkDrHJIo4= -github.com/containerd/cgroups/v3 v3.0.4/go.mod h1:SA5DLYnXO8pTGYiAHXz94qvLQTKfVM5GEVisn4jpins= -github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= -github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= -github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= -github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= -github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= -github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= -github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= -github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= -github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM= -github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -45,59 +25,42 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4= github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= -github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g= -github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw= -github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U= -github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= -github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= -github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= -github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk= -github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= -github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= -github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= -go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -golang.org/x/exp v0.0.0-20241210194714-1829a127f884 h1:Y/Mj/94zIQQGHVSv1tTtQBDaQaJe62U9bkDZKKyhPCU= -golang.org/x/exp v0.0.0-20241210194714-1829a127f884/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk= +go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= -google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= -google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= +google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= +google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/metadata/metadata.go b/metadata/metadata.go index f2729e73..4f67efbf 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -77,6 +77,11 @@ func (md Metadata) MustGet(key string) string { return val } +// Len returns the number of items. +func (md Metadata) Len() int { + return len(md) +} + // Get returns value from metadata by key func (md Metadata) Get(key string) (string, bool) { // fast path @@ -157,13 +162,13 @@ func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata { } // Pairs from which metadata created -func Pairs(kv ...string) (Metadata, bool) { +func Pairs(kv ...string) Metadata { if len(kv)%2 == 1 { - return nil, false + return nil } md := New(len(kv) / 2) for idx := 0; idx < len(kv); idx += 2 { md[kv[idx]] = kv[idx+1] } - return md, true + return md } diff --git a/metadata/metadata_grpc.go b/metadata/metadata_grpc.go deleted file mode 100644 index f5f2bbf1..00000000 --- a/metadata/metadata_grpc.go +++ /dev/null @@ -1,282 +0,0 @@ -//go:build exclude - -// Package metadata TODO need compare with micro metadata -package metadata // import "google.golang.org/grpc/metadata" - -import ( - "context" - "fmt" - "strings" -) - -// MD is a mapping from metadata keys to values. Users should use the following -// two convenience functions New and Pairs to generate MD. -type MD map[string][]string - -type Metadata map[string]string - -// New creates an MD from a given key-value map. -// -// Only the following ASCII characters are allowed in keys: -// - digits: 0-9 -// - uppercase letters: A-Z (normalized to lower) -// - lowercase letters: a-z -// - special characters: -_. -// -// Uppercase letters are automatically converted to lowercase. -// -// Keys beginning with "grpc-" are reserved for grpc-internal use only and may -// result in errors if set in metadata. -func New(m map[string]string) MD { - md := make(MD, len(m)) - for k, val := range m { - key := strings.ToLower(k) - md[key] = append(md[key], val) - } - return md -} - -// Pairs returns an MD formed by the mapping of key, value ... -// Pairs panics if len(kv) is odd. -// -// Only the following ASCII characters are allowed in keys: -// - digits: 0-9 -// - uppercase letters: A-Z (normalized to lower) -// - lowercase letters: a-z -// - special characters: -_. -// -// Uppercase letters are automatically converted to lowercase. -// -// Keys beginning with "grpc-" are reserved for grpc-internal use only and may -// result in errors if set in metadata. -func Pairs(kv ...string) MD { - if len(kv)%2 == 1 { - panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv))) - } - md := make(MD, len(kv)/2) - for i := 0; i < len(kv); i += 2 { - key := strings.ToLower(kv[i]) - md[key] = append(md[key], kv[i+1]) - } - return md -} - -// Len returns the number of items in md. -func (md MD) Len() int { - return len(md) -} - -// Copy returns a copy of md. -func (md MD) Copy() MD { - out := make(MD, len(md)) - for k, v := range md { - out[k] = copyOf(v) - } - return out -} - -// Get obtains the values for a given key. -// -// k is converted to lowercase before searching in md. -func (md MD) Get(k string) []string { - k = strings.ToLower(k) - return md[k] -} - -// Set sets the value of a given key with a slice of values. -// -// k is converted to lowercase before storing in md. -func (md MD) Set(k string, vals ...string) { - if len(vals) == 0 { - return - } - k = strings.ToLower(k) - md[k] = vals -} - -// Append adds the values to key k, not overwriting what was already stored at -// that key. -// -// k is converted to lowercase before storing in md. -func (md MD) Append(k string, vals ...string) { - if len(vals) == 0 { - return - } - k = strings.ToLower(k) - md[k] = append(md[k], vals...) -} - -// Delete removes the values for a given key k which is converted to lowercase -// before removing it from md. -func (md MD) Delete(k string) { - k = strings.ToLower(k) - delete(md, k) -} - -// Join joins any number of mds into a single MD. -// -// The order of values for each key is determined by the order in which the mds -// containing those values are presented to Join. -func Join(mds ...MD) MD { - out := MD{} - for _, md := range mds { - for k, v := range md { - out[k] = append(out[k], v...) - } - } - return out -} - -type mdIncomingKey struct{} -type mdOutgoingKey struct{} - -// NewIncomingContext creates a new context with incoming md attached. md must -// not be modified after calling this function. -func NewIncomingContext(ctx context.Context, md Metadata) context.Context { - in := make(MD, len(md)) - for k, v := range md { - in[k] = []string{v} - } - - return context.WithValue(ctx, mdIncomingKey{}, in) -} - -// NewOutgoingContext creates a new context with outgoing md attached. If used -// in conjunction with AppendToOutgoingContext, NewOutgoingContext will -// overwrite any previously-appended metadata. md must not be modified after -// calling this function. -func NewOutgoingContext(ctx context.Context, md Metadata) context.Context { - out := make(MD, len(md)) - for k, v := range md { - out[k] = []string{v} - } - - return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: out}) -} - -// AppendToOutgoingContext returns a new context with the provided kv merged -// with any existing metadata in the context. Please refer to the documentation -// of Pairs for a description of kv. -func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context { - if len(kv)%2 == 1 { - panic(fmt.Sprintf("metadata: AppendToOutgoingContext got an odd number of input pairs for metadata: %d", len(kv))) - } - md, _ := ctx.Value(mdOutgoingKey{}).(rawMD) - added := make([][]string, len(md.added)+1) - copy(added, md.added) - kvCopy := make([]string, 0, len(kv)) - for i := 0; i < len(kv); i += 2 { - kvCopy = append(kvCopy, strings.ToLower(kv[i]), kv[i+1]) - } - added[len(added)-1] = kvCopy - return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: added}) -} - -// FromIncomingContext returns the incoming metadata in ctx if it exists. -// -// All keys in the returned MD are lowercase. -func FromIncomingContext(ctx context.Context) (Metadata, bool) { - md, ok := ctx.Value(mdIncomingKey{}).(MD) - if !ok { - return nil, false - } - out := make(Metadata, len(md)) - for k, v := range md { - // We need to manually convert all keys to lower case, because MD is a - // map, and there's no guarantee that the MD attached to the context is - // created using our helper functions. - - if len(v) > 0 { - key := strings.ToLower(k) - out[key] = v[0] - } - - } - return out, true -} - -// ValueFromIncomingContext returns the metadata value corresponding to the metadata -// key from the incoming metadata if it exists. Keys are matched in a case insensitive -// manner. -func ValueFromIncomingContext(ctx context.Context, key string) []string { - md, ok := ctx.Value(mdIncomingKey{}).(MD) - if !ok { - return nil - } - - if v, ok := md[key]; ok { - return copyOf(v) - } - for k, v := range md { - // Case insensitive comparison: MD is a map, and there's no guarantee - // that the MD attached to the context is created using our helper - // functions. - if strings.EqualFold(k, key) { - return copyOf(v) - } - } - return nil -} - -func copyOf(v []string) []string { - vals := make([]string, len(v)) - copy(vals, v) - return vals -} - -// fromOutgoingContextRaw returns the un-merged, intermediary contents of rawMD. -// -// Remember to perform strings.ToLower on the keys, for both the returned MD (MD -// is a map, there's no guarantee it's created using our helper functions) and -// the extra kv pairs (AppendToOutgoingContext doesn't turn them into -// lowercase). -func fromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) { - raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD) - if !ok { - return nil, nil, false - } - - return raw.md, raw.added, true -} - -// FromOutgoingContext returns the outgoing metadata in ctx if it exists. -// -// All keys in the returned MD are lowercase. -func FromOutgoingContext(ctx context.Context) (Metadata, bool) { - raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD) - if !ok { - return nil, false - } - - mdSize := len(raw.md) - for i := range raw.added { - mdSize += len(raw.added[i]) / 2 - } - - out := make(Metadata, mdSize) - for k, v := range raw.md { - // We need to manually convert all keys to lower case, because MD is a - // map, and there's no guarantee that the MD attached to the context is - // created using our helper functions. - if len(v) > 0 { - key := strings.ToLower(k) - out[key] = v[0] - } - } - for _, added := range raw.added { - if len(added)%2 == 1 { - panic(fmt.Sprintf("metadata: FromOutgoingContext got an odd number of input pairs for metadata: %d", len(added))) - } - - for i := 0; i < len(added); i += 2 { - key := strings.ToLower(added[i]) - out[key] = added[i+1] - } - } - return out, ok -} - -type rawMD struct { - md MD - added [][]string -} diff --git a/network/network.go b/network/network.go deleted file mode 100644 index 125f3a92..00000000 --- a/network/network.go +++ /dev/null @@ -1,55 +0,0 @@ -// Package network is for creating internetworks -package network - -import ( - "go.unistack.org/micro/v4/client" - "go.unistack.org/micro/v4/server" -) - -// Error is network node errors -type Error interface { - // Count is current count of errors - Count() int - // Msg is last error message - Msg() string -} - -// Status is node status -type Status interface { - // Error reports error status - Error() Error -} - -// Node is network node -type Node interface { - // Id is node id - Id() string - // Address is node bind address - Address() string - // Peers returns node peers - Peers() []Node - // Network is the network node is in - Network() Network - // Status returns node status - Status() Status -} - -// Network is micro network -type Network interface { - // Node is network node - Node - // Initialise options - Init(...Option) error - // Options returns the network options - Options() Options - // Name of the network - Name() string - // Connect starts the resolver and tunnel server - Connect() error - // Close stops the tunnel and resolving - Close() error - // Client is micro client - Client() client.Client - // Server is micro server - Server() server.Server -} diff --git a/network/options.go b/network/options.go deleted file mode 100644 index 1bf2fdaa..00000000 --- a/network/options.go +++ /dev/null @@ -1,135 +0,0 @@ -package network - -import ( - "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/meter" - "go.unistack.org/micro/v4/network/tunnel" - "go.unistack.org/micro/v4/proxy" - "go.unistack.org/micro/v4/router" - "go.unistack.org/micro/v4/tracer" - "go.unistack.org/micro/v4/util/id" -) - -// Option func -type Option func(*Options) - -// Options configure network -type Options struct { - // Router used for routing - Router router.Router - // Proxy holds the proxy - Proxy proxy.Proxy - // Logger used for logging - Logger logger.Logger - // Meter used for metrics - Meter meter.Meter - // Tracer used for tracing - Tracer tracer.Tracer - // Tunnel used for transfer data - Tunnel tunnel.Tunnel - // ID of the node - ID string - // Name of the network - Name string - // Address to bind to - Address string - // Advertise sets the address to advertise - Advertise string - // Nodes is a list of nodes to connect to - Nodes []string -} - -// ID sets the id of the network node -func ID(id string) Option { - return func(o *Options) { - o.ID = id - } -} - -// Name sets the network name -func Name(n string) Option { - return func(o *Options) { - o.Name = n - } -} - -// Address sets the network address -func Address(a string) Option { - return func(o *Options) { - o.Address = a - } -} - -// Advertise sets the address to advertise -func Advertise(a string) Option { - return func(o *Options) { - o.Advertise = a - } -} - -// Nodes is a list of nodes to connect to -func Nodes(n ...string) Option { - return func(o *Options) { - o.Nodes = n - } -} - -// Tunnel sets the network tunnel -func Tunnel(t tunnel.Tunnel) Option { - return func(o *Options) { - o.Tunnel = t - } -} - -// Router sets the network router -func Router(r router.Router) Option { - return func(o *Options) { - o.Router = r - } -} - -// Proxy sets the network proxy -func Proxy(p proxy.Proxy) Option { - return func(o *Options) { - o.Proxy = p - } -} - -// Logger sets the network logger -func Logger(l logger.Logger) Option { - return func(o *Options) { - o.Logger = l - } -} - -// Meter sets the meter -func Meter(m meter.Meter) Option { - return func(o *Options) { - o.Meter = m - } -} - -// Tracer to be used for tracing -func Tracer(t tracer.Tracer) Option { - return func(o *Options) { - o.Tracer = t - } -} - -// NewOptions returns network default options -func NewOptions(opts ...Option) Options { - options := Options{ - ID: id.MustNew(), - Name: "go.micro", - Address: ":0", - Logger: logger.DefaultLogger, - Meter: meter.DefaultMeter, - Tracer: tracer.DefaultTracer, - } - - for _, o := range opts { - o(&options) - } - - return options -} diff --git a/network/transport/context.go b/network/transport/context.go deleted file mode 100644 index 73e4681d..00000000 --- a/network/transport/context.go +++ /dev/null @@ -1,34 +0,0 @@ -package transport - -import ( - "context" -) - -type transportKey struct{} - -// FromContext get transport from context -func FromContext(ctx context.Context) (Transport, bool) { - if ctx == nil { - return nil, false - } - c, ok := ctx.Value(transportKey{}).(Transport) - return c, ok -} - -// NewContext put transport in context -func NewContext(ctx context.Context, c Transport) context.Context { - if ctx == nil { - ctx = context.Background() - } - return context.WithValue(ctx, transportKey{}, c) -} - -// SetOption returns a function to setup a context with given value -func SetOption(k, v interface{}) Option { - return func(o *Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} diff --git a/network/transport/memory.go b/network/transport/memory.go deleted file mode 100644 index 6447978f..00000000 --- a/network/transport/memory.go +++ /dev/null @@ -1,258 +0,0 @@ -package transport - -import ( - "context" - "errors" - "fmt" - "net" - "sync" - "time" - - maddr "go.unistack.org/micro/v4/util/addr" - mnet "go.unistack.org/micro/v4/util/net" - "go.unistack.org/micro/v4/util/rand" -) - -type memorySocket struct { - ctx context.Context - recv chan *Message - exit chan bool - lexit chan bool - send chan *Message - local string - remote string - timeout time.Duration - sync.RWMutex -} - -type memoryClient struct { - *memorySocket - opts DialOptions -} - -type memoryListener struct { - lopts ListenOptions - ctx context.Context - exit chan bool - conn chan *memorySocket - addr string - topts Options - sync.RWMutex -} - -type memoryTransport struct { - listeners map[string]*memoryListener - opts Options - sync.RWMutex -} - -func (ms *memorySocket) Recv(m *Message) error { - ms.RLock() - defer ms.RUnlock() - - ctx := ms.ctx - if ms.timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ms.ctx, ms.timeout) - defer cancel() - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-ms.exit: - return errors.New("connection closed") - case <-ms.lexit: - return errors.New("server connection closed") - case cm := <-ms.recv: - *m = *cm - } - return nil -} - -func (ms *memorySocket) Local() string { - return ms.local -} - -func (ms *memorySocket) Remote() string { - return ms.remote -} - -func (ms *memorySocket) Send(m *Message) error { - ms.RLock() - defer ms.RUnlock() - - ctx := ms.ctx - if ms.timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ms.ctx, ms.timeout) - defer cancel() - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-ms.exit: - return errors.New("connection closed") - case <-ms.lexit: - return errors.New("server connection closed") - case ms.send <- m: - } - return nil -} - -func (ms *memorySocket) Close() error { - ms.Lock() - defer ms.Unlock() - select { - case <-ms.exit: - return nil - default: - close(ms.exit) - } - return nil -} - -func (m *memoryListener) Addr() string { - return m.addr -} - -func (m *memoryListener) Close() error { - m.Lock() - defer m.Unlock() - select { - case <-m.exit: - return nil - default: - close(m.exit) - } - return nil -} - -func (m *memoryListener) Accept(fn func(Socket)) error { - for { - select { - case <-m.exit: - return nil - case c := <-m.conn: - go fn(&memorySocket{ - lexit: c.lexit, - exit: c.exit, - send: c.recv, - recv: c.send, - local: c.Remote(), - remote: c.Local(), - timeout: m.topts.Timeout, - ctx: m.topts.Context, - }) - } - } -} - -func (m *memoryTransport) Dial(ctx context.Context, addr string, opts ...DialOption) (Client, error) { - m.RLock() - defer m.RUnlock() - - listener, ok := m.listeners[addr] - if !ok { - return nil, errors.New("could not dial " + addr) - } - - options := NewDialOptions(opts...) - - client := &memoryClient{ - &memorySocket{ - send: make(chan *Message), - recv: make(chan *Message), - exit: make(chan bool), - lexit: listener.exit, - local: addr, - remote: addr, - timeout: m.opts.Timeout, - ctx: m.opts.Context, - }, - options, - } - - // pseudo connect - select { - case <-listener.exit: - return nil, errors.New("connection error") - case listener.conn <- client.memorySocket: - } - - return client, nil -} - -func (m *memoryTransport) Listen(ctx context.Context, addr string, opts ...ListenOption) (Listener, error) { - m.Lock() - defer m.Unlock() - - options := NewListenOptions(opts...) - - host, port, err := net.SplitHostPort(addr) - if err != nil { - return nil, err - } - - addr, err = maddr.Extract(host) - if err != nil { - return nil, err - } - - // if zero port then randomly assign one - if len(port) > 0 && port == "0" { - var rng rand.Rand - i := rng.Intn(20000) - port = fmt.Sprintf("%d", 10000+i) - } - - // set addr with port - addr = mnet.HostPort(addr, port) - - if _, ok := m.listeners[addr]; ok { - return nil, errors.New("already listening on " + addr) - } - - listener := &memoryListener{ - lopts: options, - topts: m.opts, - addr: addr, - conn: make(chan *memorySocket), - exit: make(chan bool), - ctx: m.opts.Context, - } - - m.listeners[addr] = listener - - return listener, nil -} - -func (m *memoryTransport) Init(opts ...Option) error { - for _, o := range opts { - o(&m.opts) - } - return nil -} - -func (m *memoryTransport) Options() Options { - return m.opts -} - -func (m *memoryTransport) String() string { - return "memory" -} - -func (m *memoryTransport) Name() string { - return m.opts.Name -} - -// NewTransport returns new memory transport with options -func NewTransport(opts ...Option) Transport { - options := NewOptions(opts...) - - return &memoryTransport{ - opts: options, - listeners: make(map[string]*memoryListener), - } -} diff --git a/network/transport/memory_test.go b/network/transport/memory_test.go deleted file mode 100644 index cf712b86..00000000 --- a/network/transport/memory_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package transport - -import ( - "context" - "os" - "testing" -) - -func TestMemoryTransport(t *testing.T) { - tr := NewTransport() - ctx := context.Background() - // bind / listen - l, err := tr.Listen(ctx, "127.0.0.1:8080") - if err != nil { - t.Fatalf("Unexpected error listening %v", err) - } - defer l.Close() - - cherr := make(chan error, 1) - // accept - go func() { - if nerr := l.Accept(func(sock Socket) { - for { - var m Message - if rerr := sock.Recv(&m); rerr != nil { - cherr <- rerr - return - } - if len(os.Getenv("INTEGRATION_TESTS")) == 0 { - t.Logf("Server Received %s", string(m.Body)) - } - if cerr := sock.Send(&Message{ - Body: []byte(`pong`), - }); cerr != nil { - cherr <- cerr - return - } - } - }); nerr != nil { - cherr <- err - } - }() - - // dial - c, err := tr.Dial(ctx, "127.0.0.1:8080") - if err != nil { - t.Fatalf("Unexpected error dialing %v", err) - } - defer c.Close() - - select { - case err := <-cherr: - t.Fatal(err) - default: - // send <=> receive - for i := 0; i < 3; i++ { - if err := c.Send(&Message{ - Body: []byte(`ping`), - }); err != nil { - return - } - var m Message - if err := c.Recv(&m); err != nil { - return - } - if len(os.Getenv("INTEGRATION_TESTS")) == 0 { - t.Logf("Client Received %s", string(m.Body)) - } - } - } -} - -func TestListener(t *testing.T) { - tr := NewTransport() - ctx := context.Background() - // bind / listen on random port - l, err := tr.Listen(ctx, ":0") - if err != nil { - t.Fatalf("Unexpected error listening %v", err) - } - defer l.Close() - - // try again - l2, err := tr.Listen(ctx, ":0") - if err != nil { - t.Fatalf("Unexpected error listening %v", err) - } - defer l2.Close() - - // now make sure it still fails - l3, err := tr.Listen(ctx, ":8080") - if err != nil { - t.Fatalf("Unexpected error listening %v", err) - } - defer l3.Close() - - if _, err := tr.Listen(ctx, ":8080"); err == nil { - t.Fatal("Expected error binding to :8080 got nil") - } -} diff --git a/network/transport/options.go b/network/transport/options.go deleted file mode 100644 index 6ab32d4b..00000000 --- a/network/transport/options.go +++ /dev/null @@ -1,175 +0,0 @@ -package transport - -import ( - "context" - "crypto/tls" - "time" - - "go.unistack.org/micro/v4/codec" - "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/meter" - "go.unistack.org/micro/v4/tracer" -) - -// Options struct holds the transport options -type Options struct { - // Meter used for metrics - Meter meter.Meter - // Tracer used for tracing - Tracer tracer.Tracer - // Codec used for marshal/unmarshal messages - Codec codec.Codec - // Logger used for logging - Logger logger.Logger - // Context holds external options - Context context.Context - // TLSConfig holds tls.TLSConfig options - TLSConfig *tls.Config - // Name holds the transport name - Name string - // Addrs holds the transport addrs - Addrs []string - // Timeout holds the timeout - Timeout time.Duration -} - -// NewOptions returns new options -func NewOptions(opts ...Option) Options { - options := Options{ - Logger: logger.DefaultLogger, - Meter: meter.DefaultMeter, - Tracer: tracer.DefaultTracer, - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - return options -} - -// DialOptions struct -type DialOptions struct { - // Context holds the external options - Context context.Context - // Timeout holds the timeout - Timeout time.Duration - // Stream flag - Stream bool -} - -// NewDialOptions returns new DialOptions -func NewDialOptions(opts ...DialOption) DialOptions { - options := DialOptions{ - Timeout: DefaultDialTimeout, - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - return options -} - -// ListenOptions struct -type ListenOptions struct { - // TODO: add tls options when listening - // Currently set in global options - // Context holds the external options - Context context.Context - // TLSConfig holds the *tls.Config options - TLSConfig *tls.Config -} - -// NewListenOptions returns new ListenOptions -func NewListenOptions(opts ...ListenOption) ListenOptions { - options := ListenOptions{ - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - return options -} - -// Addrs to use for transport -func Addrs(addrs ...string) Option { - return func(o *Options) { - o.Addrs = addrs - } -} - -// Logger sets the logger -func Logger(l logger.Logger) Option { - return func(o *Options) { - o.Logger = l - } -} - -// Meter sets the meter -func Meter(m meter.Meter) Option { - return func(o *Options) { - o.Meter = m - } -} - -// Context sets the context -func Context(ctx context.Context) Option { - return func(o *Options) { - o.Context = ctx - } -} - -// Codec sets the codec used for encoding where the transport -// does not support message headers -func Codec(c codec.Codec) Option { - return func(o *Options) { - o.Codec = c - } -} - -// Timeout sets the timeout for Send/Recv execution -func Timeout(t time.Duration) Option { - return func(o *Options) { - o.Timeout = t - } -} - -// TLSConfig to be used for the transport. -func TLSConfig(t *tls.Config) Option { - return func(o *Options) { - o.TLSConfig = t - } -} - -// WithStream indicates whether this is a streaming connection -func WithStream() DialOption { - return func(o *DialOptions) { - o.Stream = true - } -} - -// WithTimeout used when dialling the remote side -func WithTimeout(d time.Duration) DialOption { - return func(o *DialOptions) { - o.Timeout = d - } -} - -// Tracer to be used for tracing -func Tracer(t tracer.Tracer) Option { - return func(o *Options) { - o.Tracer = t - } -} - -// Name sets the name -func Name(n string) Option { - return func(o *Options) { - o.Name = n - } -} diff --git a/network/transport/transport.go b/network/transport/transport.go deleted file mode 100644 index 78333595..00000000 --- a/network/transport/transport.go +++ /dev/null @@ -1,63 +0,0 @@ -// Package transport is an interface for synchronous connection based communication -package transport - -import ( - "context" - "time" - - "go.unistack.org/micro/v4/metadata" -) - -var ( - // DefaultTransport is the global default transport - DefaultTransport = NewTransport() - // DefaultDialTimeout the default dial timeout - DefaultDialTimeout = time.Second * 5 -) - -// Transport is an interface which is used for communication between -// services. It uses connection based socket send/recv semantics and -// has various implementations; http, grpc, quic. -type Transport interface { - Init(...Option) error - Options() Options - Dial(ctx context.Context, addr string, opts ...DialOption) (Client, error) - Listen(ctx context.Context, addr string, opts ...ListenOption) (Listener, error) - String() string -} - -// Message is used to transfer data -type Message struct { - Header metadata.Metadata - Body []byte -} - -// Socket bastraction interface -type Socket interface { - Recv(*Message) error - Send(*Message) error - Close() error - Local() string - Remote() string -} - -// Client is the socket owner -type Client interface { - Socket -} - -// Listener is the interface for stream oriented messaging -type Listener interface { - Addr() string - Close() error - Accept(func(Socket)) error -} - -// Option is the option signature -type Option func(*Options) - -// DialOption is the option signature -type DialOption func(*DialOptions) - -// ListenOption is the option signature -type ListenOption func(*ListenOptions) diff --git a/network/tunnel/broker/broker.go b/network/tunnel/broker/broker.go deleted file mode 100644 index 77ea946d..00000000 --- a/network/tunnel/broker/broker.go +++ /dev/null @@ -1,372 +0,0 @@ -// Package broker is a tunnel broker -package broker - -import ( - "context" - "fmt" - - "go.unistack.org/micro/v4/broker" - "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/metadata" - "go.unistack.org/micro/v4/network/transport" - "go.unistack.org/micro/v4/network/tunnel" -) - -type tunBroker struct { - tunnel tunnel.Tunnel - opts broker.Options -} - -type tunSubscriber struct { - listener tunnel.Listener - handler broker.Handler - closed chan bool - topic string - opts broker.SubscribeOptions -} - -type tunBatchSubscriber struct { - listener tunnel.Listener - handler broker.BatchHandler - closed chan bool - topic string - opts broker.SubscribeOptions -} - -type tunEvent struct { - err error - message *broker.Message - topic string -} - -// used to access tunnel from options context -type ( - tunnelKey struct{} - tunnelAddr struct{} -) - -func (t *tunBroker) Live() bool { - return true -} - -func (t *tunBroker) Ready() bool { - return true -} - -func (t *tunBroker) Health() bool { - return true -} - -func (t *tunBroker) Init(opts ...broker.Option) error { - for _, o := range opts { - o(&t.opts) - } - return nil -} - -func (t *tunBroker) Name() string { - return t.opts.Name -} - -func (t *tunBroker) Options() broker.Options { - return t.opts -} - -func (t *tunBroker) Address() string { - return t.tunnel.Address() -} - -func (t *tunBroker) Connect(ctx context.Context) error { - return t.tunnel.Connect(ctx) -} - -func (t *tunBroker) Disconnect(ctx context.Context) error { - return t.tunnel.Close(ctx) -} - -func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, _ ...broker.PublishOption) error { - // TODO: this is probably inefficient, we might want to just maintain an open connection - // it may be easier to add broadcast to the tunnel - topicMap := make(map[string]tunnel.Session) - - var err error - for _, msg := range msgs { - topic, _ := msg.Header.Get(metadata.HeaderTopic) - c, ok := topicMap[topic] - if !ok { - c, err = t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast)) - if err != nil { - return err - } - defer c.Close() - topicMap[topic] = c - } - - if err = c.Send(&transport.Message{ - Header: msg.Header, - Body: msg.Body, - }); err != nil { - // msg.SetError(err) - return err - } - } - - return nil -} - -func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, _ ...broker.PublishOption) error { - // TODO: this is probably inefficient, we might want to just maintain an open connection - // it may be easier to add broadcast to the tunnel - c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast)) - if err != nil { - return err - } - defer c.Close() - - return c.Send(&transport.Message{ - Header: m.Header, - Body: m.Body, - }) -} - -func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast)) - if err != nil { - return nil, err - } - - tunSub := &tunBatchSubscriber{ - topic: topic, - handler: h, - opts: broker.NewSubscribeOptions(opts...), - closed: make(chan bool), - listener: l, - } - - // start processing - go tunSub.run() - - return tunSub, nil -} - -func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast)) - if err != nil { - return nil, err - } - - tunSub := &tunSubscriber{ - topic: topic, - handler: h, - opts: broker.NewSubscribeOptions(opts...), - closed: make(chan bool), - listener: l, - } - - // start processing - go tunSub.run() - - return tunSub, nil -} - -func (t *tunBroker) String() string { - return "tunnel" -} - -func (t *tunBatchSubscriber) run() { - for { - // accept a new connection - c, err := t.listener.Accept() - if err != nil { - select { - case <-t.closed: - return - default: - continue - } - } - - // receive message - m := new(transport.Message) - if err := c.Recv(m); err != nil { - if logger.DefaultLogger.V(logger.ErrorLevel) { - logger.DefaultLogger.Error(t.opts.Context, err.Error(), err) - } - if err = c.Close(); err != nil { - if logger.DefaultLogger.V(logger.ErrorLevel) { - logger.DefaultLogger.Error(t.opts.Context, err.Error(), err) - } - } - continue - } - - // close the connection - c.Close() - - evts := broker.Events{&tunEvent{ - topic: t.topic, - message: &broker.Message{ - Header: m.Header, - Body: m.Body, - }, - }} - // handle the message - go func() { - _ = t.handler(evts) - }() - - } -} - -func (t *tunSubscriber) run() { - for { - // accept a new connection - c, err := t.listener.Accept() - if err != nil { - select { - case <-t.closed: - return - default: - continue - } - } - - // receive message - m := new(transport.Message) - if err := c.Recv(m); err != nil { - if logger.DefaultLogger.V(logger.ErrorLevel) { - logger.DefaultLogger.Error(t.opts.Context, err.Error(), err) - } - if err = c.Close(); err != nil { - if logger.DefaultLogger.V(logger.ErrorLevel) { - logger.DefaultLogger.Error(t.opts.Context, err.Error(), err) - } - } - continue - } - - // close the connection - c.Close() - - // handle the message - go func() { - _ = t.handler(&tunEvent{ - topic: t.topic, - message: &broker.Message{ - Header: m.Header, - Body: m.Body, - }, - }) - }() - } -} - -func (t *tunBatchSubscriber) Options() broker.SubscribeOptions { - return t.opts -} - -func (t *tunBatchSubscriber) Topic() string { - return t.topic -} - -func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error { - select { - case <-t.closed: - return nil - default: - close(t.closed) - return t.listener.Close() - } -} - -func (t *tunSubscriber) Options() broker.SubscribeOptions { - return t.opts -} - -func (t *tunSubscriber) Topic() string { - return t.topic -} - -func (t *tunSubscriber) Unsubscribe(ctx context.Context) error { - select { - case <-t.closed: - return nil - default: - close(t.closed) - return t.listener.Close() - } -} - -func (t *tunEvent) Topic() string { - return t.topic -} - -func (t *tunEvent) Message() *broker.Message { - return t.message -} - -func (t *tunEvent) Ack() error { - return nil -} - -func (t *tunEvent) Error() error { - return t.err -} - -func (t *tunEvent) SetError(err error) { - t.err = err -} - -func (t *tunEvent) Context() context.Context { - return context.TODO() -} - -// NewBroker returns new tunnel broker -func NewBroker(opts ...broker.Option) (broker.Broker, error) { - options := broker.NewOptions(opts...) - - t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel) - if !ok { - return nil, fmt.Errorf("tunnel not set") - } - - a, ok := options.Context.Value(tunnelAddr{}).(string) - if ok { - // initialise address - if err := t.Init(tunnel.Address(a)); err != nil { - return nil, err - } - } - - if len(options.Addrs) > 0 { - // initialise nodes - if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil { - return nil, err - } - } - - return &tunBroker{ - opts: options, - tunnel: t, - }, nil -} - -// WithAddress sets the tunnel address -func WithAddress(a string) broker.Option { - return func(o *broker.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, tunnelAddr{}, a) - } -} - -// WithTunnel sets the internal tunnel -func WithTunnel(t tunnel.Tunnel) broker.Option { - return func(o *broker.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, tunnelKey{}, t) - } -} diff --git a/network/tunnel/options.go b/network/tunnel/options.go deleted file mode 100644 index cf8832ab..00000000 --- a/network/tunnel/options.go +++ /dev/null @@ -1,192 +0,0 @@ -package tunnel - -import ( - "time" - - "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/meter" - "go.unistack.org/micro/v4/network/transport" - "go.unistack.org/micro/v4/tracer" - "go.unistack.org/micro/v4/util/id" -) - -var ( - // DefaultAddress is default tunnel bind address - DefaultAddress = ":0" - // DefaultToken the shared default token - DefaultToken = "go.micro.tunnel" -) - -// Option func signature -type Option func(*Options) - -// Options provides network configuration options -type Options struct { - // Logger used for logging - Logger logger.Logger - // Meter used for metrics - Meter meter.Meter - // Tracer used for tracing - Tracer tracer.Tracer - // Transport used for communication - Transport transport.Transport - // Token the shared auth token - Token string - // Name holds the tunnel name - Name string - // ID holds the tunnel id - ID string - // Address holds the tunnel address - Address string - // Nodes holds the tunnel nodes - Nodes []string -} - -// DialOption func -type DialOption func(*DialOptions) - -// DialOptions provides dial options -type DialOptions struct { - // Link specifies the link to use - Link string - // specify mode of the session - Mode Mode - // Wait for connection to be accepted - Wait bool - // the dial timeout - Timeout time.Duration -} - -// ListenOption func -type ListenOption func(*ListenOptions) - -// ListenOptions provides listen options -type ListenOptions struct { - // Mode specify mode of the session - Mode Mode - // Timeout the read timeout - Timeout time.Duration -} - -// ID sets the tunnel id -func ID(id string) Option { - return func(o *Options) { - o.ID = id - } -} - -// Logger sets the logger -func Logger(l logger.Logger) Option { - return func(o *Options) { - o.Logger = l - } -} - -// Meter sets the meter -func Meter(m meter.Meter) Option { - return func(o *Options) { - o.Meter = m - } -} - -// Address sets the tunnel address -func Address(a string) Option { - return func(o *Options) { - o.Address = a - } -} - -// Nodes specify remote network nodes -func Nodes(n ...string) Option { - return func(o *Options) { - o.Nodes = n - } -} - -// Token sets the shared token for auth -func Token(t string) Option { - return func(o *Options) { - o.Token = t - } -} - -// Transport listens for incoming connections -func Transport(t transport.Transport) Option { - return func(o *Options) { - o.Transport = t - } -} - -// ListenMode option -func ListenMode(m Mode) ListenOption { - return func(o *ListenOptions) { - o.Mode = m - } -} - -// ListenTimeout for reads and writes on the listener session -func ListenTimeout(t time.Duration) ListenOption { - return func(o *ListenOptions) { - o.Timeout = t - } -} - -// DialMode multicast sets the multicast option to send only to those mapped -func DialMode(m Mode) DialOption { - return func(o *DialOptions) { - o.Mode = m - } -} - -// DialTimeout sets the dial timeout of the connection -func DialTimeout(t time.Duration) DialOption { - return func(o *DialOptions) { - o.Timeout = t - } -} - -// DialLink specifies the link to pin this connection to. -// This is not applicable if the multicast option is set. -func DialLink(id string) DialOption { - return func(o *DialOptions) { - o.Link = id - } -} - -// DialWait specifies whether to wait for the connection -// to be accepted before returning the session -func DialWait(b bool) DialOption { - return func(o *DialOptions) { - o.Wait = b - } -} - -// NewOptions returns router default options with filled values -func NewOptions(opts ...Option) Options { - options := Options{ - ID: id.MustNew(), - Address: DefaultAddress, - Token: DefaultToken, - Logger: logger.DefaultLogger, - Meter: meter.DefaultMeter, - Tracer: tracer.DefaultTracer, - } - for _, o := range opts { - o(&options) - } - return options -} - -// Tracer to be used for tracing -func Tracer(t tracer.Tracer) Option { - return func(o *Options) { - o.Tracer = t - } -} - -// Name sets the name -func Name(n string) Option { - return func(o *Options) { - o.Name = n - } -} diff --git a/network/tunnel/transport/listener.go b/network/tunnel/transport/listener.go deleted file mode 100644 index eb506e48..00000000 --- a/network/tunnel/transport/listener.go +++ /dev/null @@ -1,30 +0,0 @@ -package transport - -import ( - "go.unistack.org/micro/v4/network/transport" - "go.unistack.org/micro/v4/network/tunnel" -) - -type tunListener struct { - l tunnel.Listener -} - -func (t *tunListener) Addr() string { - return t.l.Channel() -} - -func (t *tunListener) Close() error { - return t.l.Close() -} - -func (t *tunListener) Accept(fn func(socket transport.Socket)) error { - for { - // accept connection - c, err := t.l.Accept() - if err != nil { - return err - } - // execute the function - go fn(c) - } -} diff --git a/network/tunnel/transport/transport.go b/network/tunnel/transport/transport.go deleted file mode 100644 index 4b4c0270..00000000 --- a/network/tunnel/transport/transport.go +++ /dev/null @@ -1,113 +0,0 @@ -// Package transport provides a tunnel transport -package transport - -import ( - "context" - "fmt" - - "go.unistack.org/micro/v4/network/transport" - "go.unistack.org/micro/v4/network/tunnel" -) - -type tunTransport struct { - tunnel tunnel.Tunnel - options transport.Options -} - -type tunnelKey struct{} - -type transportKey struct{} - -func (t *tunTransport) Init(opts ...transport.Option) error { - for _, o := range opts { - o(&t.options) - } - - // close the existing tunnel - if t.tunnel != nil { - t.tunnel.Close(context.TODO()) - } - - // get the tunnel - tun, ok := t.options.Context.Value(tunnelKey{}).(tunnel.Tunnel) - if !ok { - return fmt.Errorf("tunnel not set") - } - - // get the transport - tr, ok := t.options.Context.Value(transportKey{}).(transport.Transport) - if ok { - _ = tun.Init(tunnel.Transport(tr)) - } - - // set the tunnel - t.tunnel = tun - - return nil -} - -func (t *tunTransport) Dial(ctx context.Context, addr string, opts ...transport.DialOption) (transport.Client, error) { - if err := t.tunnel.Connect(ctx); err != nil { - return nil, err - } - - c, err := t.tunnel.Dial(ctx, addr) - if err != nil { - return nil, err - } - - return c, nil -} - -func (t *tunTransport) Listen(ctx context.Context, addr string, opts ...transport.ListenOption) (transport.Listener, error) { - if err := t.tunnel.Connect(ctx); err != nil { - return nil, err - } - - l, err := t.tunnel.Listen(ctx, addr) - if err != nil { - return nil, err - } - - return &tunListener{l}, nil -} - -func (t *tunTransport) Options() transport.Options { - return t.options -} - -func (t *tunTransport) String() string { - return "tunnel" -} - -// NewTransport honours the initialiser used in -func NewTransport(opts ...transport.Option) transport.Transport { - t := &tunTransport{ - options: transport.Options{}, - } - - // initialise - // t.Init(opts...) - - return t -} - -// WithTunnel sets the internal tunnel -func WithTunnel(t tunnel.Tunnel) transport.Option { - return func(o *transport.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, tunnelKey{}, t) - } -} - -// WithTransport sets the internal transport -func WithTransport(t transport.Transport) transport.Option { - return func(o *transport.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, transportKey{}, t) - } -} diff --git a/network/tunnel/tunnel.go b/network/tunnel/tunnel.go deleted file mode 100644 index f3a9ae96..00000000 --- a/network/tunnel/tunnel.go +++ /dev/null @@ -1,106 +0,0 @@ -// Package tunnel provides gre network tunnelling -package tunnel - -import ( - "context" - "errors" - "time" - - "go.unistack.org/micro/v4/network/transport" -) - -// DefaultTunnel contains default tunnel implementation -var DefaultTunnel Tunnel - -const ( - // Unicast send over one link - Unicast Mode = iota - // Multicast send to all channel listeners - Multicast - // Broadcast send to all links - Broadcast -) - -var ( - // DefaultDialTimeout is the dial timeout if none is specified - DefaultDialTimeout = time.Second * 5 - // ErrDialTimeout is returned by a call to Dial where the timeout occurs - ErrDialTimeout = errors.New("dial timeout") - // ErrDiscoverChan is returned when we failed to receive the "announce" back from a discovery - ErrDiscoverChan = errors.New("failed to discover channel") - // ErrLinkNotFound is returned when a link is specified at dial time and does not exist - ErrLinkNotFound = errors.New("link not found") - // ErrLinkDisconnected is returned when a link we attempt to send to is disconnected - ErrLinkDisconnected = errors.New("link not connected") - // ErrLinkLoopback is returned when attempting to send an outbound message over loopback link - ErrLinkLoopback = errors.New("link is loopback") - // ErrLinkRemote is returned when attempting to send a loopback message over remote link - ErrLinkRemote = errors.New("link is remote") - // ErrReadTimeout is a timeout on session.Recv - ErrReadTimeout = errors.New("read timeout") - // ErrDecryptingData is for when theres a nonce error - ErrDecryptingData = errors.New("error decrypting data") -) - -// Mode of the session -type Mode uint8 - -// Tunnel creates a gre tunnel on top of the micro/transport. -// It establishes multiple streams using the Micro-Tunnel-Channel header -// and Micro-Tunnel-Session header. The tunnel id is a hash of -// the address being requested. -type Tunnel interface { - // Init initializes tunnel with options - Init(opts ...Option) error - // Address returns the address the tunnel is listening on - Address() string - // Connect connects the tunnel - Connect(ctx context.Context) error - // Close closes the tunnel - Close(ctx context.Context) error - // Links returns all the links the tunnel is connected to - Links() []Link - // Dial allows a client to connect to a channel - Dial(ctx context.Context, channel string, opts ...DialOption) (Session, error) - // Listen allows to accept connections on a channel - Listen(ctx context.Context, channel string, opts ...ListenOption) (Listener, error) - // String returns the name of the tunnel implementation - String() string -} - -// Link represents internal links to the tunnel -type Link interface { - // Id returns the link unique Id - Id() string - // Delay is the current load on the link (lower is better) - Delay() int64 - // Length returns the roundtrip time as nanoseconds (lower is better) - Length() int64 - // Current transfer rate as bits per second (lower is better) - Rate() float64 - // Is this a loopback link - Loopback() bool - // State of the link: connected/closed/error - State() string - // honours transport socket - transport.Socket -} - -// Listener provides similar constructs to the transport.Listener -type Listener interface { - Accept() (Session, error) - Channel() string - Close() error -} - -// Session is a unique session created when dialling or accepting connections on the tunnel -type Session interface { - // The unique session id - Id() string - // The channel name - Channel() string - // The link the session is on - Link() string - // a transport socket - transport.Socket -} diff --git a/proxy/options.go b/proxy/options.go deleted file mode 100644 index f1d01271..00000000 --- a/proxy/options.go +++ /dev/null @@ -1,98 +0,0 @@ -// Package proxy is a transparent proxy built on the micro/server -package proxy - -import ( - "go.unistack.org/micro/v4/client" - "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/meter" - "go.unistack.org/micro/v4/router" - "go.unistack.org/micro/v4/tracer" -) - -// Options for proxy -type Options struct { - // Tracer used for tracing - Tracer tracer.Tracer - // Client for communication - Client client.Client - // Router for routing - Router router.Router - // Logger used for logging - Logger logger.Logger - // Meter used for metrics - Meter meter.Meter - // Links holds the communication links - Links map[string]client.Client - // Endpoint holds the destination address - Endpoint string -} - -// Option func signature -type Option func(o *Options) - -// NewOptions returns new options struct that filled by opts -func NewOptions(opts ...Option) Options { - options := Options{ - Logger: logger.DefaultLogger, - Meter: meter.DefaultMeter, - Tracer: tracer.DefaultTracer, - } - - for _, o := range opts { - o(&options) - } - - return options -} - -// WithEndpoint sets a proxy endpoint -func WithEndpoint(e string) Option { - return func(o *Options) { - o.Endpoint = e - } -} - -// WithClient sets the client -func WithClient(c client.Client) Option { - return func(o *Options) { - o.Client = c - } -} - -// WithRouter specifies the router to use -func WithRouter(r router.Router) Option { - return func(o *Options) { - o.Router = r - } -} - -// WithLogger specifies the logger to use -func WithLogger(l logger.Logger) Option { - return func(o *Options) { - o.Logger = l - } -} - -// WithMeter specifies the meter to use -func WithMeter(m meter.Meter) Option { - return func(o *Options) { - o.Meter = m - } -} - -// WithLink sets a link for outbound requests -func WithLink(name string, c client.Client) Option { - return func(o *Options) { - if o.Links == nil { - o.Links = make(map[string]client.Client) - } - o.Links[name] = c - } -} - -// Tracer to be used for tracing -func Tracer(t tracer.Tracer) Option { - return func(o *Options) { - o.Tracer = t - } -} diff --git a/proxy/proxy.go b/proxy/proxy.go deleted file mode 100644 index bca6c47b..00000000 --- a/proxy/proxy.go +++ /dev/null @@ -1,21 +0,0 @@ -// Package proxy is a transparent proxy built on the micro/server -package proxy - -import ( - "context" - - "go.unistack.org/micro/v4/server" -) - -// DefaultEndpoint holds default proxy address -var DefaultEndpoint = "localhost:9090" - -// Proxy can be used as a proxy server for micro services -type Proxy interface { - // ProcessMessage handles inbound messages - ProcessMessage(context.Context, server.Message) error - // ServeRequest handles inbound requests - ServeRequest(context.Context, server.Request, server.Response) error - // Name of the proxy protocol - String() string -} diff --git a/server/noop.go b/server/noop.go index 74d57409..fab5fdc5 100644 --- a/server/noop.go +++ b/server/noop.go @@ -1,20 +1,13 @@ package server import ( - "context" "fmt" "reflect" - "runtime/debug" - "strings" "sync" "time" - "go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/codec" - "go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/metadata" - "go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/register" maddr "go.unistack.org/micro/v4/util/addr" mnet "go.unistack.org/micro/v4/util/net" @@ -26,10 +19,6 @@ var DefaultCodecs = map[string]codec.Codec{ "application/octet-stream": codec.NewCodec(), } -const ( - defaultContentType = "application/json" -) - type rpcHandler struct { opts HandlerOptions handler interface{} @@ -62,13 +51,12 @@ func (r *rpcHandler) Options() HandlerOptions { } type noopServer struct { - h Handler - wg *sync.WaitGroup - rsvc *register.Service - handlers map[string]Handler - subscribers map[*subscriber][]broker.Subscriber - exit chan chan error - opts Options + h Handler + wg *sync.WaitGroup + rsvc *register.Service + handlers map[string]Handler + exit chan chan error + opts Options sync.RWMutex registered bool started bool @@ -80,25 +68,12 @@ func NewServer(opts ...Option) Server { if n.handlers == nil { n.handlers = make(map[string]Handler) } - if n.subscribers == nil { - n.subscribers = make(map[*subscriber][]broker.Subscriber) - } if n.exit == nil { n.exit = make(chan chan error) } return n } -func (n *noopServer) newCodec(contentType string) (codec.Codec, error) { - if cf, ok := n.opts.Codecs[contentType]; ok { - return cf, nil - } - if cf, ok := DefaultCodecs[contentType]; ok { - return cf, nil - } - return nil, codec.ErrUnknownContentType -} - func (n *noopServer) Live() bool { return true } @@ -120,65 +95,10 @@ func (n *noopServer) Name() string { return n.opts.Name } -func (n *noopServer) Subscribe(sb Subscriber) error { - sub, ok := sb.(*subscriber) - if !ok { - return fmt.Errorf("invalid subscriber: expected *subscriber") - } else if len(sub.handlers) == 0 { - return fmt.Errorf("invalid subscriber: no handler functions") - } - - if err := ValidateSubscriber(sb); err != nil { - return err - } - - n.Lock() - if _, ok = n.subscribers[sub]; ok { - n.Unlock() - return fmt.Errorf("subscriber %v already exists", sub) - } - - n.subscribers[sub] = nil - n.Unlock() - return nil -} - -type rpcMessage struct { - payload interface{} - codec codec.Codec - header metadata.Metadata - topic string - contentType string -} - -func (r *rpcMessage) ContentType() string { - return r.contentType -} - -func (r *rpcMessage) Topic() string { - return r.topic -} - -func (r *rpcMessage) Body() interface{} { - return r.payload -} - -func (r *rpcMessage) Header() metadata.Metadata { - return r.header -} - -func (r *rpcMessage) Codec() codec.Codec { - return r.codec -} - func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { return newRPCHandler(h, opts...) } -func (n *noopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { - return newSubscriber(topic, sb, opts...) -} - func (n *noopServer) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) @@ -187,9 +107,6 @@ func (n *noopServer) Init(opts ...Option) error { if n.handlers == nil { n.handlers = make(map[string]Handler, 1) } - if n.subscribers == nil { - n.subscribers = make(map[*subscriber][]broker.Subscriber, 1) - } if n.exit == nil { n.exit = make(chan chan error) @@ -288,33 +205,6 @@ func (n *noopServer) Deregister() error { n.registered = false - cx := config.Context - - wg := sync.WaitGroup{} - for sb, subs := range n.subscribers { - for idx := range subs { - if sb.Options().Context != nil { - cx = sb.Options().Context - } - - ncx := cx - wg.Add(1) - go func(s broker.Subscriber) { - defer wg.Done() - if config.Logger.V(logger.InfoLevel) { - config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic()) - } - if err := s.Unsubscribe(ncx); err != nil { - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err) - } - } - }(subs[idx]) - } - n.subscribers[sb] = nil - } - wg.Wait() - n.Unlock() return nil } @@ -351,21 +241,6 @@ func (n *noopServer) Start() error { } n.Unlock() - // only connect if we're subscribed - if len(n.subscribers) > 0 { - // connect to the broker - if err := config.Broker.Connect(config.Context); err != nil { - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err) - } - return err - } - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())) - } - } - // use RegisterCheck func before register // nolint: nestif if err := config.RegisterCheck(config.Context); err != nil { @@ -381,10 +256,6 @@ func (n *noopServer) Start() error { } } - if err := n.subscribe(); err != nil { - return err - } - go func() { t := new(time.Ticker) @@ -468,42 +339,6 @@ func (n *noopServer) Start() error { return nil } -func (n *noopServer) subscribe() error { - config := n.Options() - - subCtx := config.Context - - for sb := range n.subscribers { - - if cx := sb.Options().Context; cx != nil { - subCtx = cx - } - - opts := []broker.SubscribeOption{ - broker.SubscribeContext(subCtx), - broker.SubscribeAutoAck(sb.Options().AutoAck), - broker.SubscribeBodyOnly(sb.Options().BodyOnly), - } - - if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.SubscribeGroup(queue)) - } - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic()) - } - - sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), n.createSubHandler(sb, config), opts...) - if err != nil { - return err - } - - n.subscribers[sb] = []broker.Subscriber{sub} - } - - return nil -} - func (n *noopServer) Stop() error { n.RLock() if !n.started { @@ -523,195 +358,3 @@ func (n *noopServer) Stop() error { return err } - -func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { - var handlers []*handler - - options := NewSubscriberOptions(opts...) - - if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { - h := &handler{ - method: reflect.ValueOf(sub), - } - - switch typ.NumIn() { - case 1: - h.reqType = typ.In(0) - case 2: - h.ctxType = typ.In(0) - h.reqType = typ.In(1) - } - - handlers = append(handlers, h) - } else { - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - h := &handler{ - method: method.Func, - } - - switch method.Type.NumIn() { - case 2: - h.reqType = method.Type.In(1) - case 3: - h.ctxType = method.Type.In(1) - h.reqType = method.Type.In(2) - } - - handlers = append(handlers, h) - } - } - - return &subscriber{ - rcvr: reflect.ValueOf(sub), - typ: reflect.TypeOf(sub), - topic: topic, - subscriber: sub, - handlers: handlers, - opts: options, - } -} - -//nolint:gocyclo -func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { - return func(p broker.Event) (err error) { - defer func() { - if r := recover(); r != nil { - n.RLock() - config := n.opts - n.RUnlock() - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(n.opts.Context, "panic recovered: ", r) - config.Logger.Error(n.opts.Context, string(debug.Stack())) - } - err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) - } - }() - - msg := p.Message() - // if we don't have headers, create empty map - if msg.Header == nil { - msg.Header = metadata.New(2) - } - - ct := msg.Header["Content-Type"] - if len(ct) == 0 { - msg.Header.Set(metadata.HeaderContentType, defaultContentType) - ct = defaultContentType - } - cf, err := n.newCodec(ct) - if err != nil { - return err - } - - hdr := metadata.New(len(msg.Header)) - for k, v := range msg.Header { - hdr.Set(k, v) - } - - ctx := metadata.NewIncomingContext(sb.opts.Context, hdr) - - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var isVal bool - var req reflect.Value - - if handler.reqType.Kind() == reflect.Ptr { - req = reflect.New(handler.reqType.Elem()) - } else { - req = reflect.New(handler.reqType) - isVal = true - } - if isVal { - req = req.Elem() - } - - if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil { - return err - } - - fn := func(ctx context.Context, msg Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctx)) - } - - vals = append(vals, reflect.ValueOf(msg.Body())) - - returnValues := handler.method.Call(vals) - if rerr := returnValues[0].Interface(); rerr != nil { - return rerr.(error) - } - return nil - } - - opts.Hooks.EachPrev(func(hook options.Hook) { - if h, ok := hook.(HookSubHandler); ok { - fn = h(fn) - } - }) - - if n.wg != nil { - n.wg.Add(1) - } - go func() { - if n.wg != nil { - defer n.wg.Done() - } - cerr := fn(ctx, &rpcMessage{ - topic: sb.topic, - contentType: ct, - payload: req.Interface(), - header: msg.Header, - }) - results <- cerr - }() - } - var errors []string - for i := 0; i < len(sb.handlers); i++ { - if rerr := <-results; rerr != nil { - errors = append(errors, rerr.Error()) - } - } - if len(errors) > 0 { - err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - return err - } -} - -func (s *subscriber) Topic() string { - return s.topic -} - -func (s *subscriber) Subscriber() interface{} { - return s.subscriber -} - -func (s *subscriber) Options() SubscriberOptions { - return s.opts -} - -type subscriber struct { - topic string - - typ reflect.Type - subscriber interface{} - - handlers []*handler - - rcvr reflect.Value - opts SubscriberOptions -} - -type handler struct { - reqType reflect.Type - ctxType reflect.Type - method reflect.Value -} diff --git a/server/noop_test.go b/server/noop_test.go deleted file mode 100644 index 2752e6ff..00000000 --- a/server/noop_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package server_test - -import ( - "context" - "fmt" - "testing" - - "go.unistack.org/micro/v4/broker" - "go.unistack.org/micro/v4/client" - "go.unistack.org/micro/v4/codec" - "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/options" - "go.unistack.org/micro/v4/server" -) - -type TestHandler struct { - t *testing.T -} - -type TestMessage struct { - Name string -} - -func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error { - // fmt.Printf("msg %s\n", msg.Data) - return nil -} - -func TestNoopSub(t *testing.T) { - ctx := context.Background() - - b := broker.NewBroker() - - if err := b.Init(); err != nil { - t.Fatal(err) - } - - if err := b.Connect(ctx); err != nil { - t.Fatal(err) - } - - if err := logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)); err != nil { - t.Fatal(err) - } - s := server.NewServer( - server.Broker(b), - server.Codec("application/octet-stream", codec.NewCodec()), - ) - if err := s.Init(); err != nil { - t.Fatal(err) - } - - c := client.NewClient( - client.Broker(b), - client.Codec("application/octet-stream", codec.NewCodec()), - client.ContentType("application/octet-stream"), - ) - if err := c.Init(); err != nil { - t.Fatal(err) - } - h := &TestHandler{t: t} - - if err := s.Subscribe(s.NewSubscriber("single_topic", h.SingleSubHandler, - server.SubscriberQueue("queue"), - )); err != nil { - t.Fatal(err) - } - - if err := s.Start(); err != nil { - t.Fatal(err) - } - - msgs := make([]client.Message, 0, 8) - for i := 0; i < 8; i++ { - msgs = append(msgs, c.NewMessage("batch_topic", &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))})) - } - - if err := c.BatchPublish(ctx, msgs); err != nil { - t.Fatal(err) - } - - defer func() { - if err := s.Stop(); err != nil { - t.Fatal(err) - } - }() -} - -func TestHooks_Wrap(t *testing.T) { - n := 5 - fn1 := func(next server.FuncSubHandler) server.FuncSubHandler { - return func(ctx context.Context, msg server.Message) (err error) { - n *= 2 - return next(ctx, msg) - } - } - fn2 := func(next server.FuncSubHandler) server.FuncSubHandler { - return func(ctx context.Context, msg server.Message) (err error) { - n -= 10 - return next(ctx, msg) - } - } - - hs := &options.Hooks{} - hs.Append(server.HookSubHandler(fn1), server.HookSubHandler(fn2)) - - var fn = func(ctx context.Context, msg server.Message) error { - return nil - } - - hs.EachPrev(func(hook options.Hook) { - if h, ok := hook.(server.HookSubHandler); ok { - fn = h(fn) - } - }) - - if err := fn(nil, nil); err != nil { - t.Fatal(err) - } - - if n != 0 { - t.Fatalf("uncorrected hooks call") - } -} diff --git a/server/server.go b/server/server.go index 4648a0f8..14f8acb4 100644 --- a/server/server.go +++ b/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" + "errors" "time" "go.unistack.org/micro/v4/codec" @@ -51,10 +52,6 @@ type Server interface { Handle(h Handler) error // Create a new handler NewHandler(h interface{}, opts ...HandlerOption) Handler - // Create a new subscriber - NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber - // Register a subscriber - Subscribe(s Subscriber) error // Start the server Start() error // Stop the server @@ -70,36 +67,10 @@ type Server interface { } type ( - FuncSubHandler func(ctx context.Context, ms Message) error - HookSubHandler func(next FuncSubHandler) FuncSubHandler - FuncHandler func(ctx context.Context, req Request, rsp interface{}) error - HookHandler func(next FuncHandler) FuncHandler + FuncHandler func(ctx context.Context, req Request, rsp interface{}) error + HookHandler func(next FuncHandler) FuncHandler ) -/* -// Router handle serving messages -type Router interface { - // ProcessMessage processes a message - ProcessMessage(ctx context.Context, msg Message) error - // ServeRequest processes a request to completion - ServeRequest(ctx context.Context, req Request, rsp Response) error -} -*/ - -// Message is an async message interface -type Message interface { - // Topic of the message - Topic() string - // The decoded payload value - Body() interface{} - // The content type of the payload - ContentType() string - // The raw headers of the message - Header() metadata.Metadata - // Codec used to decode the message - Codec() codec.Codec -} - // Request is a synchronous request interface type Request interface { // Service name requested @@ -172,11 +143,20 @@ type Handler interface { Options() HandlerOptions } -// Subscriber interface represents a subscription to a given topic using -// a specific subscriber function or object with endpoints. It mirrors -// the handler in its behaviour. -type Subscriber interface { - Topic() string - Subscriber() interface{} - Options() SubscriberOptions +type serverHeaderKey struct{} + +func ResponseMetadata(ctx context.Context, md *metadata.Metadata) context.Context { + return context.WithValue(ctx, serverHeaderKey{}, md) +} + +func SetResponseMetadata(ctx context.Context, md metadata.Metadata) error { + if md.Len() == 0 { + return nil + } + h, ok := ctx.Value(serverHeaderKey{}).(*metadata.Metadata) + if !ok || h == nil { + return errors.New("missing metadata") + } + md.CopyTo(*h) + return nil } diff --git a/server/wrapper.go b/server/wrapper.go index 3e4d3ecd..3500dc78 100644 --- a/server/wrapper.go +++ b/server/wrapper.go @@ -9,17 +9,9 @@ import ( // request and response types. type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error -// SubscriberFunc represents a single method of a subscriber. It's used primarily -// for the wrappers. What's handed to the actual method is the concrete -// publication message. -type SubscriberFunc func(ctx context.Context, msg Message) error - // HandlerWrapper wraps the HandlerFunc and returns the equivalent type HandlerWrapper func(HandlerFunc) HandlerFunc -// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent -type SubscriberWrapper func(SubscriberFunc) SubscriberFunc - // StreamWrapper wraps a Stream interface and returns the equivalent. // Because streams exist for the lifetime of a method invocation this // is a convenient way to wrap a Stream as its in use for trace, monitoring, diff --git a/service.go b/service.go index 7ba61843..1ea4c29d 100644 --- a/service.go +++ b/service.go @@ -95,11 +95,6 @@ func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOptio return s.Handle(s.NewHandler(h, opts...)) } -// RegisterSubscriber is syntactic sugar for registering a subscriber -func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error { - return s.Subscribe(s.NewSubscriber(topic, h, opts...)) -} - type service struct { done chan struct{} opts Options diff --git a/service_test.go b/service_test.go index 32370619..9dff6262 100644 --- a/service_test.go +++ b/service_test.go @@ -1,7 +1,6 @@ package micro import ( - "context" "reflect" "testing" @@ -80,41 +79,6 @@ func TestRegisterHandler(t *testing.T) { } } -func TestRegisterSubscriber(t *testing.T) { - type args struct { - topic string - s server.Server - h interface{} - opts []server.SubscriberOption - } - h := func(_ context.Context, _ interface{}) error { - return nil - } - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "RegisterSubscriber", - args: args{ - topic: "test", - s: server.DefaultServer, - h: h, - opts: nil, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if err := RegisterSubscriber(tt.args.topic, tt.args.s, tt.args.h, tt.args.opts...); (err != nil) != tt.wantErr { - t.Errorf("RegisterSubscriber() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - func TestNewService(t *testing.T) { type args struct { opts []Option diff --git a/util/socket/pool.go b/util/socket/pool.go deleted file mode 100644 index 462ce883..00000000 --- a/util/socket/pool.go +++ /dev/null @@ -1,65 +0,0 @@ -package socket - -import ( - "sync" -) - -// Pool holds the socket pool -type Pool struct { - pool map[string]*Socket - sync.RWMutex -} - -// Get socket from pool -func (p *Pool) Get(id string) (*Socket, bool) { - // attempt to get existing socket - p.RLock() - socket, ok := p.pool[id] - if ok { - p.RUnlock() - return socket, ok - } - p.RUnlock() - - // save socket - p.Lock() - defer p.Unlock() - // double checked locking - socket, ok = p.pool[id] - if ok { - return socket, ok - } - // create new socket - socket = New(id) - p.pool[id] = socket - - // return socket - return socket, false -} - -// Release close the socket and delete from pool -func (p *Pool) Release(s *Socket) { - p.Lock() - defer p.Unlock() - - // close the socket - s.Close() - delete(p.pool, s.id) -} - -// Close the pool and delete all the sockets -func (p *Pool) Close() { - p.Lock() - defer p.Unlock() - for id, sock := range p.pool { - sock.Close() - delete(p.pool, id) - } -} - -// NewPool returns a new socket pool -func NewPool() *Pool { - return &Pool{ - pool: make(map[string]*Socket), - } -} diff --git a/util/socket/socket.go b/util/socket/socket.go deleted file mode 100644 index b36036e6..00000000 --- a/util/socket/socket.go +++ /dev/null @@ -1,118 +0,0 @@ -// Package socket provides a pseudo socket -package socket - -import ( - "io" - - "go.unistack.org/micro/v4/network/transport" -) - -// Socket is our pseudo socket for transport.Socket -type Socket struct { - closed chan bool - send chan *transport.Message - recv chan *transport.Message - id string - remote string - local string -} - -// SetLocal sets the local addr -func (s *Socket) SetLocal(l string) { - s.local = l -} - -// SetRemote sets the remote addr -func (s *Socket) SetRemote(r string) { - s.remote = r -} - -// Accept passes a message to the socket which will be processed by the call to Recv -func (s *Socket) Accept(m *transport.Message) error { - select { - case s.recv <- m: - return nil - case <-s.closed: - return io.EOF - } -} - -// Process takes the next message off the send queue created by a call to Send -func (s *Socket) Process(m *transport.Message) error { - select { - case msg := <-s.send: - *m = *msg - case <-s.closed: - // see if we need to drain - select { - case msg := <-s.send: - *m = *msg - return nil - default: - return io.EOF - } - } - return nil -} - -// Remote returns remote addr -func (s *Socket) Remote() string { - return s.remote -} - -// Local returns local addr -func (s *Socket) Local() string { - return s.local -} - -// Send message by via transport -func (s *Socket) Send(m *transport.Message) error { - // send a message - select { - case s.send <- m: - case <-s.closed: - return io.EOF - } - - return nil -} - -// Recv message from transport -func (s *Socket) Recv(m *transport.Message) error { - // receive a message - select { - case msg := <-s.recv: - // set message - *m = *msg - case <-s.closed: - return io.EOF - } - - // return nil - return nil -} - -// Close closes the socket -func (s *Socket) Close() error { - select { - case <-s.closed: - // no op - default: - close(s.closed) - } - return nil -} - -// New returns a new pseudo socket which can be used in the place of a transport socket. -// Messages are sent to the socket via Accept and receives from the socket via Process. -// SetLocal/SetRemote should be called before using the socket. -func New(id string) *Socket { - return &Socket{ - id: id, - closed: make(chan bool), - local: "local", - remote: "remote", - send: make(chan *transport.Message, 128), - recv: make(chan *transport.Message, 128), - } -}