Compare commits
4 Commits
bcc06054f1
...
v3.10.20
Author | SHA1 | Date | |
---|---|---|---|
f58781d076 | |||
e1af4aa3a4 | |||
1d5e795443 | |||
a3a434d923 |
@@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v3/codec"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -85,33 +86,12 @@ type Event interface {
|
|||||||
SetError(err error)
|
SetError(err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RawMessage is a raw encoded JSON value.
|
|
||||||
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
|
||||||
type RawMessage []byte
|
|
||||||
|
|
||||||
// MarshalJSON returns m as the JSON encoding of m.
|
|
||||||
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
|
||||||
if m == nil {
|
|
||||||
return []byte("null"), nil
|
|
||||||
}
|
|
||||||
return *m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnmarshalJSON sets *m to a copy of data.
|
|
||||||
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
|
||||||
if m == nil {
|
|
||||||
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
|
||||||
}
|
|
||||||
*m = append((*m)[0:0], data...)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Message is used to transfer data
|
// Message is used to transfer data
|
||||||
type Message struct {
|
type Message struct {
|
||||||
// Header contains message metadata
|
// Header contains message metadata
|
||||||
Header metadata.Metadata
|
Header metadata.Metadata
|
||||||
// Body contains message body
|
// Body contains message body
|
||||||
Body RawMessage
|
Body codec.RawMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMessage create broker message with topic filled
|
// NewMessage create broker message with topic filled
|
||||||
|
@@ -84,3 +84,24 @@ func MarshalAppend(buf []byte, c Codec, v interface{}, opts ...Option) ([]byte,
|
|||||||
|
|
||||||
return append(buf, mbuf...), nil
|
return append(buf, mbuf...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RawMessage is a raw encoded JSON value.
|
||||||
|
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
||||||
|
type RawMessage []byte
|
||||||
|
|
||||||
|
// MarshalJSON returns m as the JSON encoding of m.
|
||||||
|
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
||||||
|
if m == nil {
|
||||||
|
return []byte("null"), nil
|
||||||
|
}
|
||||||
|
return *m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON sets *m to a copy of data.
|
||||||
|
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||||
|
if m == nil {
|
||||||
|
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
||||||
|
}
|
||||||
|
*m = append((*m)[0:0], data...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@@ -202,39 +202,6 @@ func (n *noopServer) Register() error {
|
|||||||
n.Lock()
|
n.Lock()
|
||||||
defer n.Unlock()
|
defer n.Unlock()
|
||||||
|
|
||||||
cx := config.Context
|
|
||||||
|
|
||||||
var sub broker.Subscriber
|
|
||||||
|
|
||||||
for sb := range n.subscribers {
|
|
||||||
if sb.Options().Context != nil {
|
|
||||||
cx = sb.Options().Context
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
|
||||||
}
|
|
||||||
|
|
||||||
if sb.Options().Batch {
|
|
||||||
// batch processing handler
|
|
||||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
|
|
||||||
} else {
|
|
||||||
// single processing handler
|
|
||||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
|
||||||
}
|
|
||||||
|
|
||||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
|
||||||
}
|
|
||||||
|
|
||||||
n.registered = true
|
n.registered = true
|
||||||
if cacheService {
|
if cacheService {
|
||||||
n.rsvc = service
|
n.rsvc = service
|
||||||
@@ -366,6 +333,10 @@ func (n *noopServer) Start() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := n.subscribe(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
t := new(time.Ticker)
|
t := new(time.Ticker)
|
||||||
|
|
||||||
@@ -449,6 +420,45 @@ func (n *noopServer) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *noopServer) subscribe() error {
|
||||||
|
config := n.Options()
|
||||||
|
|
||||||
|
cx := config.Context
|
||||||
|
var err error
|
||||||
|
var sub broker.Subscriber
|
||||||
|
|
||||||
|
for sb := range n.subscribers {
|
||||||
|
if sb.Options().Context != nil {
|
||||||
|
cx = sb.Options().Context
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||||
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
if sb.Options().Batch {
|
||||||
|
// batch processing handler
|
||||||
|
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
|
||||||
|
} else {
|
||||||
|
// single processing handler
|
||||||
|
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||||
|
}
|
||||||
|
|
||||||
|
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (n *noopServer) Stop() error {
|
func (n *noopServer) Stop() error {
|
||||||
n.RLock()
|
n.RLock()
|
||||||
if !n.started {
|
if !n.started {
|
||||||
|
@@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
|||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||||
return func(ps broker.Events) (err error) {
|
return func(ps broker.Events) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
@@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
|||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
|
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||||
return func(p broker.Event) (err error) {
|
return func(p broker.Event) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
|
Reference in New Issue
Block a user