Compare commits

...

17 Commits
v3 ... tmp

Author SHA1 Message Date
01e7d81883 tmp
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-25 15:14:03 +03:00
03de1ec38f Merge pull request 'cleanup interfaces for v4' (#217) from v4 into master
Reviewed-on: #217
2023-05-09 20:04:40 +03:00
819ad1117a cleanup interfaces for v4
Some checks failed
lint / lint (pull_request) Successful in 1m9s
pr / test (pull_request) Failing after 1m4s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 20:04:15 +03:00
a2a383606d Merge pull request 'util/test: update test cases code' (#216) from testcase into master
Reviewed-on: #216
2023-04-28 07:10:40 +03:00
55ce58617b util/test: update test cases code
Some checks failed
lint / lint (pull_request) Successful in 51s
pr / test (pull_request) Failing after 52s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-28 07:10:19 +03:00
0e587d923e Merge pull request 'meter: move metrics handling in broker implementations' (#215) from metrics into master
Reviewed-on: #215
2023-04-27 15:32:56 +03:00
fa0248c80c cleanup
All checks were successful
pr / test (pull_request) Successful in 50s
lint / lint (pull_request) Successful in 49s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-27 15:31:59 +03:00
054bd02b59 meter: move metrics handling in broker implementations
All checks were successful
lint / lint (pull_request) Successful in 1m4s
pr / test (pull_request) Successful in 50s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-27 15:30:55 +03:00
0cf246d2d6 Merge pull request 'util/io: add RedirectStderr' (#214) from io-redirect into master
Reviewed-on: #214
2023-04-24 12:59:31 +03:00
af278bd7d3 util/io: add RedirectStderr
All checks were successful
lint / lint (pull_request) Successful in 46s
pr / test (pull_request) Successful in 50s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-24 12:58:05 +03:00
814b90efe5 Merge pull request 'util/test: export GetCases func' (#213) from GetCases into master
Reviewed-on: #213
2023-04-19 01:23:53 +03:00
e403ae3d8e util/test: export GetCases func
All checks were successful
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-19 01:23:34 +03:00
c9816a3957 Merge pull request 'util/test: add helper funcs' (#212) from test into master
Reviewed-on: #212
2023-04-19 00:33:28 +03:00
5691238a6a util/test: add helper funcs
All checks were successful
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-18 23:47:12 +03:00
963a0fa7b7 Merge pull request 'gofmt -s code' (#209) from gofmt into master
Reviewed-on: #209
2023-04-11 23:34:41 +03:00
485257035c gofmt -s code
Some checks failed
lint
test
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-11 23:32:58 +03:00
ebd8ddf05b move to v4
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-04-11 22:21:25 +03:00
136 changed files with 1504 additions and 1868 deletions

View File

@ -1,4 +1,4 @@
# Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/unistack-org/micro/v3?tab=overview) [![Status](https://github.com/unistack-org/micro/workflows/build/badge.svg?branch=master)](https://github.com/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Amaster+event%3Apush) [![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3) [![Coverage](https://codecov.io/gh/unistack-org/micro/branch/v3/graph/badge.svg?token=OZPO2LP7VS)](https://codecov.io/gh/unistack-org/micro) # Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview) [![Status](https://github.com/unistack-org/micro/workflows/build/badge.svg?branch=master)](https://github.com/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Amaster+event%3Apush) [![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v4)](https://goreportcard.com/report/go.unistack.org/micro/v4) [![Coverage](https://codecov.io/gh/unistack-org/micro/branch/v4/graph/badge.svg?token=OZPO2LP7VS)](https://codecov.io/gh/unistack-org/micro)
Micro is a standard library for microservices. Micro is a standard library for microservices.

View File

@ -1,22 +1,24 @@
// Package broker is an interface used for asynchronous messaging // Package broker is an interface used for asynchronous messaging
package broker // import "go.unistack.org/micro/v3/broker" package broker // import "go.unistack.org/micro/v4/broker"
import ( import (
"context" "context"
"errors" "errors"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
) )
// DefaultBroker default memory broker // DefaultBroker default memory broker
var DefaultBroker = NewBroker() var DefaultBroker Broker // = NewBroker()
var ( var (
// ErrNotConnected returns when broker used but not connected yet // ErrNotConnected returns when broker used but not connected yet
ErrNotConnected = errors.New("broker not connected") ErrNotConnected = errors.New("broker not connected")
// ErrDisconnected returns when broker disconnected // ErrDisconnected returns when broker disconnected
ErrDisconnected = errors.New("broker disconnected") ErrDisconnected = errors.New("broker disconnected")
// ErrInvalidMessage returns when message has nvalid format
ErrInvalidMessage = errors.New("broker message has invalid format")
) )
// Broker is an interface used for asynchronous messaging. // Broker is an interface used for asynchronous messaging.
@ -33,61 +35,33 @@ type Broker interface {
Connect(ctx context.Context) error Connect(ctx context.Context) error
// Disconnect disconnect from broker // Disconnect disconnect from broker
Disconnect(ctx context.Context) error Disconnect(ctx context.Context) error
// NewMessage creates new broker message
NewMessage(endpoint string, req interface{}, opts ...MessageOption) Message
// Publish message to broker topic // Publish message to broker topic
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error Publish(ctx context.Context, msg interface{}, opts ...PublishOption) error
// Subscribe subscribes to topic message via handler // Subscribe subscribes to topic message via handler
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
// BatchPublish messages to broker with multiple topics
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
// BatchSubscribe subscribes to topic messages via handler
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
// String type of broker // String type of broker
String() string String() string
} }
// Handler is used to process messages via a subscription of a topic. // Message is given to a subscription handler for processing
type Handler func(Event) error type Message interface {
// Context for the message
// Events contains multiple events Context() context.Context
type Events []Event
// Ack try to ack all events and return
func (evs Events) Ack() error {
var err error
for _, ev := range evs {
if err = ev.Ack(); err != nil {
return err
}
}
return nil
}
// SetError sets error on event
func (evs Events) SetError(err error) {
for _, ev := range evs {
ev.SetError(err)
}
}
// BatchHandler is used to process messages in batches via a subscription of a topic.
type BatchHandler func(Events) error
// Event is given to a subscription handler for processing
type Event interface {
// Topic returns event topic // Topic returns event topic
Topic() string Topic() string
// Message returns broker message // Body returns broker message
Message() *Message Body() interface{}
// Ack acknowledge message // Ack acknowledge message
Ack() error Ack() error
// Error returns message error (like decoding errors or some other) // Error returns message error (like decoding errors or some other)
// In this case Body contains raw []byte from broker
Error() error Error() error
// SetError set event processing error
SetError(err error)
} }
// Message is used to transfer data // RawMessage is used to transfer data
type Message struct { type RawMessage struct {
// Header contains message metadata // Header contains message metadata
Header metadata.Metadata Header metadata.Metadata
// Body contains message body // Body contains message body
@ -95,8 +69,8 @@ type Message struct {
} }
// NewMessage create broker message with topic filled // NewMessage create broker message with topic filled
func NewMessage(topic string) *Message { func NewRawMessage(topic string) *RawMessage {
m := &Message{Header: metadata.New(2)} m := &RawMessage{Header: metadata.New(2)}
m.Header.Set(metadata.HeaderTopic, topic) m.Header.Set(metadata.HeaderTopic, topic)
return m return m
} }

View File

@ -1,15 +1,17 @@
//go:build ignore
package broker package broker
import ( import (
"context" "context"
"sync" "sync"
"time"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/metadata" maddr "go.unistack.org/micro/v4/util/addr"
maddr "go.unistack.org/micro/v3/util/addr" "go.unistack.org/micro/v4/util/id"
"go.unistack.org/micro/v3/util/id" mnet "go.unistack.org/micro/v4/util/net"
mnet "go.unistack.org/micro/v3/util/net" "go.unistack.org/micro/v4/util/rand"
"go.unistack.org/micro/v3/util/rand"
) )
type memoryBroker struct { type memoryBroker struct {
@ -20,23 +22,6 @@ type memoryBroker struct {
connected bool connected bool
} }
type memoryEvent struct {
err error
message interface{}
topic string
opts Options
}
type memorySubscriber struct {
ctx context.Context
exit chan bool
handler Handler
batchhandler BatchHandler
id string
topic string
opts SubscribeOptions
}
func (m *memoryBroker) Options() Options { func (m *memoryBroker) Options() Options {
return m.opts return m.opts
} }
@ -88,16 +73,11 @@ func (m *memoryBroker) Init(opts ...Option) error {
return nil return nil
} }
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { func (m *memoryBroker) NewMessage(endpoint string, req interface{}, opts ...MessageOption) Message {
msg.Header.Set(metadata.HeaderTopic, topic) return &memoryMessage{}
return m.publish(ctx, []*Message{msg}, opts...)
} }
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { func (m *memoryBroker) Publish(ctx context.Context, message interface{}, opts ...PublishOption) error {
return m.publish(ctx, msgs, opts...)
}
func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
m.RLock() m.RLock()
if !m.connected { if !m.connected {
m.RUnlock() m.RUnlock()
@ -112,128 +92,94 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
return ctx.Err() return ctx.Err()
default: default:
options := NewPublishOptions(opts...) options := NewPublishOptions(opts...)
var msgs []*memoryMessage
msgTopicMap := make(map[string]Events) switch v := message.(type) {
for _, v := range msgs { case *memoryMessage:
p := &memoryEvent{opts: m.opts} msgs = []*memoryMessage{v}
case []*memoryMessage:
if m.opts.Codec == nil || options.BodyOnly { msgs = v
p.topic, _ = v.Header.Get(metadata.HeaderTopic) default:
p.message = v.Body return ErrInvalidMessage
} else { }
p.topic, _ = v.Header.Get(metadata.HeaderTopic) msgTopicMap := make(map[string][]*memoryMessage)
p.message, err = m.opts.Codec.Marshal(v) for _, msg := range msgs {
if err != nil { p := &memoryMessage{opts: options}
return err /*
if mb, ok := msg.Body().(*codec.Frame); ok {
p.message = v.Body
} else {
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
p.message, err = m.opts.Codec.Marshal(v)
if err != nil {
return err
}
} }
} */
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p) msgTopicMap[msg.Topic()] = append(msgTopicMap[p.topic], p)
} }
beh := m.opts.BatchErrorHandler
eh := m.opts.ErrorHandler eh := m.opts.ErrorHandler
for t, ms := range msgTopicMap { for t, ms := range msgTopicMap {
ts := time.Now()
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(len(ms))
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(len(ms))
m.RLock() m.RLock()
subs, ok := m.subscribers[t] subs, ok := m.subscribers[t]
m.RUnlock() m.RUnlock()
if !ok { if !ok {
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "failure").Add(len(ms))
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
continue continue
} }
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms))
for _, sub := range subs { for _, sub := range subs {
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
}
if sub.opts.ErrorHandler != nil { if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler eh = sub.opts.ErrorHandler
} }
switch { for _, p := range ms {
// batch processing if err = sub.handler(p); err != nil {
case sub.batchhandler != nil: m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
if err = sub.batchhandler(ms); err != nil { if eh != nil {
ms.SetError(err) _ = eh(p)
if beh != nil {
_ = beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) { } else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error()) m.opts.Logger.Error(m.opts.Context, err.Error())
} }
} else if sub.opts.AutoAck { } else {
if err = ms.Ack(); err != nil { if sub.opts.AutoAck {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
// single processing
case sub.handler != nil:
for _, p := range ms {
if err = sub.handler(p); err != nil {
p.SetError(err)
if eh != nil {
_ = eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = p.Ack(); err != nil { if err = p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
} }
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
} }
} }
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1)
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1)
} }
} }
te := time.Since(ts)
m.opts.Meter.Summary(PublishMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
m.opts.Meter.Histogram(PublishMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
m.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
m.opts.Meter.Histogram(SubscribeMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
} }
} }
return nil return nil
} }
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
}
m.RUnlock()
sid, err := id.New()
if err != nil {
return nil, err
}
options := NewSubscribeOptions(opts...)
sub := &memorySubscriber{
exit: make(chan bool, 1),
id: sid,
topic: topic,
batchhandler: handler,
opts: options,
ctx: ctx,
}
m.Lock()
m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
for _, sb := range m.subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.subscribers[topic] = newSubscribers
m.Unlock()
}()
return sub, nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock() m.RLock()
if !m.connected { if !m.connected {
m.RUnlock() m.RUnlock()
@ -286,38 +232,41 @@ func (m *memoryBroker) Name() string {
return m.opts.Name return m.opts.Name
} }
func (m *memoryEvent) Topic() string { type memoryMessage struct {
err error
body interface{}
topic string
opts PublishOptions
ctx context.Context
}
func (m *memoryMessage) Topic() string {
return m.topic return m.topic
} }
func (m *memoryEvent) Message() *Message { func (m *memoryMessage) Body() interface{} {
switch v := m.message.(type) { return m.body
case *Message: }
return v
case []byte:
msg := &Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
}
return nil
}
return msg
}
func (m *memoryMessage) Ack() error {
return nil return nil
} }
func (m *memoryEvent) Ack() error { func (m *memoryMessage) Error() error {
return nil
}
func (m *memoryEvent) Error() error {
return m.err return m.err
} }
func (m *memoryEvent) SetError(err error) { func (m *memoryMessage) Context() context.Context {
m.err = err return m.ctx
}
type memorySubscriber struct {
ctx context.Context
exit chan bool
handler interface{}
id string
topic string
opts SubscribeOptions
} }
func (m *memorySubscriber) Options() SubscribeOptions { func (m *memorySubscriber) Options() SubscribeOptions {
@ -334,7 +283,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
} }
// NewBroker return new memory broker // NewBroker return new memory broker
func NewBroker(opts ...Option) Broker { func NewBroker(opts ...Option) *memoryBroker {
return &memoryBroker{ return &memoryBroker{
opts: NewOptions(opts...), opts: NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber), subscribers: make(map[string][]*memorySubscriber),

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"testing" "testing"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
) )
func TestMemoryBatchBroker(t *testing.T) { func TestMemoryBatchBroker(t *testing.T) {

View File

@ -5,11 +5,31 @@ import (
"crypto/tls" "crypto/tls"
"time" "time"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/tracer"
)
var (
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
) )
// Options struct // Options struct
@ -18,8 +38,8 @@ type Options struct {
Tracer tracer.Tracer Tracer tracer.Tracer
// Register can be used for clustering // Register can be used for clustering
Register register.Register Register register.Register
// Codec holds the codec for marshal/unmarshal // Codecs holds the codec for marshal/unmarshal
Codec codec.Codec Codecs map[string]codec.Codec
// Logger used for logging // Logger used for logging
Logger logger.Logger Logger logger.Logger
// Meter used for metrics // Meter used for metrics
@ -29,15 +49,16 @@ type Options struct {
// TLSConfig holds tls.TLSConfig options // TLSConfig holds tls.TLSConfig options
TLSConfig *tls.Config TLSConfig *tls.Config
// ErrorHandler used when broker can't unmarshal incoming message // ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler ErrorHandler func(Message)
// BatchErrorHandler used when broker can't unmashal incoming messages
BatchErrorHandler BatchHandler
// Name holds the broker name // Name holds the broker name
Name string Name string
// Addrs holds the broker address // Addrs holds the broker address
Addrs []string Addrs []string
} }
// Option func
type Option func(*Options)
// NewOptions create new Options // NewOptions create new Options
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
@ -45,7 +66,7 @@ func NewOptions(opts ...Option) Options {
Logger: logger.DefaultLogger, Logger: logger.DefaultLogger,
Context: context.Background(), Context: context.Background(),
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Codec: codec.DefaultCodec, Codecs: make(map[string]codec.Codec),
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
} }
for _, o := range opts { for _, o := range opts {
@ -61,6 +82,32 @@ func Context(ctx context.Context) Option {
} }
} }
// MessageOption func
type MessageOption func(*MessageOptions)
// MessageOptions struct
type MessageOptions struct {
Metadata metadata.Metadata
ContentType string
}
// MessageMetadata pass additional message metadata
func MessageMetadata(md metadata.Metadata) MessageOption {
return func(o *MessageOptions) {
o.Metadata = md
}
}
// MessageContentType pass ContentType for message data
func MessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.ContentType = ct
}
}
// PublishOption func
type PublishOption func(*PublishOptions)
// PublishOptions struct // PublishOptions struct
type PublishOptions struct { type PublishOptions struct {
// Context holds external options // Context holds external options
@ -85,11 +132,9 @@ type SubscribeOptions struct {
// Context holds external options // Context holds external options
Context context.Context Context context.Context
// ErrorHandler used when broker can't unmarshal incoming message // ErrorHandler used when broker can't unmarshal incoming message
ErrorHandler Handler ErrorHandler func(Message)
// BatchErrorHandler used when broker can't unmashal incoming messages // QueueGroup holds consumer group
BatchErrorHandler BatchHandler QueueGroup string
// Group holds consumer group
Group string
// AutoAck flag specifies auto ack of incoming message when no error happens // AutoAck flag specifies auto ack of incoming message when no error happens
AutoAck bool AutoAck bool
// BodyOnly flag specifies that message contains only body bytes without header // BodyOnly flag specifies that message contains only body bytes without header
@ -100,12 +145,6 @@ type SubscribeOptions struct {
BatchWait time.Duration BatchWait time.Duration
} }
// Option func
type Option func(*Options)
// PublishOption func
type PublishOption func(*PublishOptions)
// PublishBodyOnly publish only body of the message // PublishBodyOnly publish only body of the message
func PublishBodyOnly(b bool) PublishOption { func PublishBodyOnly(b bool) PublishOption {
return func(o *PublishOptions) { return func(o *PublishOptions) {
@ -129,59 +168,29 @@ func Addrs(addrs ...string) Option {
// Codec sets the codec used for encoding/decoding used where // Codec sets the codec used for encoding/decoding used where
// a broker does not support headers // a broker does not support headers
func Codec(c codec.Codec) Option { // Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c codec.Codec) Option {
return func(o *Options) { return func(o *Options) {
o.Codec = c o.Codecs[contentType] = c
} }
} }
// ErrorHandler will catch all broker errors that cant be handled // ErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors // in normal way, for example Codec errors
func ErrorHandler(h Handler) Option { func ErrorHandler(h func(Message)) Option {
return func(o *Options) { return func(o *Options) {
o.ErrorHandler = h o.ErrorHandler = h
} }
} }
// BatchErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func BatchErrorHandler(h BatchHandler) Option {
return func(o *Options) {
o.BatchErrorHandler = h
}
}
// SubscribeErrorHandler will catch all broker errors that cant be handled // SubscribeErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors // in normal way, for example Codec errors
func SubscribeErrorHandler(h Handler) SubscribeOption { func SubscribeErrorHandler(h func(Message)) SubscribeOption {
return func(o *SubscribeOptions) { return func(o *SubscribeOptions) {
o.ErrorHandler = h o.ErrorHandler = h
} }
} }
// SubscribeBatchErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors
func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption {
return func(o *SubscribeOptions) {
o.BatchErrorHandler = h
}
}
// Queue sets the subscribers queue
// Deprecated
func Queue(name string) SubscribeOption {
return func(o *SubscribeOptions) {
o.Group = name
}
}
// SubscribeGroup sets the name of the queue to share messages on
func SubscribeGroup(name string) SubscribeOption {
return func(o *SubscribeOptions) {
o.Group = name
}
}
// Register sets register option // Register sets register option
func Register(r register.Register) Option { func Register(r register.Register) Option {
return func(o *Options) { return func(o *Options) {
@ -224,6 +233,21 @@ func Name(n string) Option {
} }
} }
// SubscribeOption func signature
type SubscribeOption func(*SubscribeOptions)
// NewSubscribeOptions creates new SubscribeOptions
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
options := SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// SubscribeContext set context // SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption { func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) { return func(o *SubscribeOptions) {
@ -231,14 +255,6 @@ func SubscribeContext(ctx context.Context) SubscribeOption {
} }
} }
// DisableAutoAck disables auto ack
// Deprecated
func DisableAutoAck() SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = false
}
}
// SubscribeAutoAck contol auto acking of messages // SubscribeAutoAck contol auto acking of messages
// after they have been handled. // after they have been handled.
func SubscribeAutoAck(b bool) SubscribeOption { func SubscribeAutoAck(b bool) SubscribeOption {
@ -268,17 +284,16 @@ func SubscribeBatchWait(td time.Duration) SubscribeOption {
} }
} }
// SubscribeOption func // SubscribeQueueGroup sets the shared queue name distributed messages across subscribers
type SubscribeOption func(*SubscribeOptions) func SubscribeQueueGroup(n string) SubscribeOption {
return func(o *SubscribeOptions) {
// NewSubscribeOptions creates new SubscribeOptions o.QueueGroup = n
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { }
options := SubscribeOptions{ }
AutoAck: true,
Context: context.Background(), // SubscribeAutoAck control auto ack processing for handler
func SubscribeAuthAck(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = b
} }
for _, o := range opts {
o(&options)
}
return options
} }

98
broker/subscriber.go Normal file
View File

@ -0,0 +1,98 @@
package broker
import (
"fmt"
"reflect"
"strings"
"unicode"
"unicode/utf8"
)
const (
subSig = "func(context.Context, interface{}) error"
batchSubSig = "func([]context.Context, []interface{}) error"
)
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// ValidateSubscriber func signature
func ValidateSubscriber(sub interface{}) error {
typ := reflect.TypeOf(sub)
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 1: // func(Message) error
case 2: // func(context.Context, Message) error or func(context.Context, []Message) error
argType = typ.In(2)
// if sub.Options().Batch {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
}
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
}
// }
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
name, typ.NumOut(), subSig, batchSubSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
default:
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}

View File

@ -5,7 +5,7 @@ import (
"math" "math"
"time" "time"
"go.unistack.org/micro/v3/util/backoff" "go.unistack.org/micro/v4/util/backoff"
) )
// BackoffFunc is the backoff call func // BackoffFunc is the backoff call func

View File

@ -1,12 +1,11 @@
// Package client is an interface for an RPC client // Package client is an interface for an RPC client
package client // import "go.unistack.org/micro/v3/client" package client // import "go.unistack.org/micro/v4/client"
import ( import (
"context" "context"
"time" "time"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/metadata"
) )
var ( var (
@ -35,23 +34,12 @@ type Client interface {
Name() string Name() string
Init(opts ...Option) error Init(opts ...Option) error
Options() Options Options() Options
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service string, endpoint string, req interface{}, opts ...RequestOption) Request NewRequest(service string, endpoint string, req interface{}, opts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error
String() string String() string
} }
// Message is the interface for publishing asynchronously
type Message interface {
Topic() string
Payload() interface{}
ContentType() string
Metadata() metadata.Metadata
}
// Request is the interface for a synchronous request used by Call or Stream // Request is the interface for a synchronous request used by Call or Stream
type Request interface { type Request interface {
// The service to call // The service to call
@ -68,16 +56,22 @@ type Request interface {
Codec() codec.Codec Codec() codec.Codec
// indicates whether the request will be a streaming one rather than unary // indicates whether the request will be a streaming one rather than unary
Stream() bool Stream() bool
// Header data
// Header() metadata.Metadata
} }
// Response is the response received from a service // Response is the response received from a service
type Response interface { type Response interface {
// Read the response // Read the response
Codec() codec.Codec Codec() codec.Codec
// The content type
// ContentType() string
// Header data // Header data
Header() metadata.Metadata // Header() metadata.Metadata
// Read the undecoded response // Read the undecoded response
Read() ([]byte, error) Read() ([]byte, error)
// The unencoded request body
// Body() interface{}
} }
// Stream is the interface for a bidirectional synchronous stream // Stream is the interface for a bidirectional synchronous stream

View File

@ -4,8 +4,8 @@ import (
"context" "context"
"sort" "sort"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v3/router" "go.unistack.org/micro/v4/router"
) )
// LookupFunc is used to lookup routes for a service // LookupFunc is used to lookup routes for a service

View File

@ -5,11 +5,10 @@ import (
"fmt" "fmt"
"time" "time"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/selector"
"go.unistack.org/micro/v3/selector"
) )
// DefaultCodecs will be used to encode/decode data // DefaultCodecs will be used to encode/decode data
@ -285,6 +284,9 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
ch := make(chan error, callOpts.Retries) ch := make(chan error, callOpts.Retries)
var gerr error var gerr error
ts := time.Now()
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Inc()
for i := 0; i <= callOpts.Retries; i++ { for i := 0; i <= callOpts.Retries; i++ {
go func() { go func() {
ch <- call(i) ch <- call(i)
@ -312,6 +314,16 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
} }
} }
if gerr != nil {
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "failure").Inc()
} else {
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "success").Inc()
}
n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Dec()
te := time.Since(ts)
n.opts.Meter.Summary(ClientRequestLatencyMicroseconds, "endpoint", endpoint).Update(te.Seconds())
n.opts.Meter.Histogram(ClientRequestDurationSeconds, "endpoint", endpoint).Update(te.Seconds())
return gerr return gerr
} }
@ -323,11 +335,6 @@ func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts
return &noopRequest{service: service, endpoint: endpoint} return &noopRequest{service: service, endpoint: endpoint}
} }
func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message {
options := NewMessageOptions(append([]MessageOption{MessageContentType(n.opts.ContentType)}, opts...)...)
return &noopMessage{topic: topic, payload: msg, opts: options}
}
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
var err error var err error
@ -414,7 +421,15 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
node := next() node := next()
// ts := time.Now()
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Inc()
stream, cerr := n.stream(ctx, node, req, callOpts) stream, cerr := n.stream(ctx, node, req, callOpts)
if cerr != nil {
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "failure").Inc()
} else {
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "success").Inc()
}
// record the result of the call to inform future routing decisions // record the result of the call to inform future routing decisions
if verr := n.opts.Selector.Record(node, cerr); verr != nil { if verr := n.opts.Selector.Record(node, cerr); verr != nil {
@ -468,64 +483,6 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
return nil, grr return nil, grr
} }
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) { func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (*noopStream, error) {
return &noopStream{}, nil return &noopStream{}, nil
} }
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
return n.publish(ctx, ps, opts...)
}
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
return n.publish(ctx, []Message{p}, opts...)
}
func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error {
options := NewPublishOptions(opts...)
msgs := make([]*broker.Message, 0, len(ps))
for _, p := range ps {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
md[metadata.HeaderTopic] = topic
var body []byte
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data
} else {
// use codec for payload
cf, err := n.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
}
msgs = append(msgs, &broker.Message{Header: md, Body: body})
}
return n.opts.Broker.BatchPublish(ctx, msgs,
broker.PublishContext(options.Context),
broker.PublishBodyOnly(options.BodyOnly),
)
}

View File

@ -6,17 +6,27 @@ import (
"net" "net"
"time" "time"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/network/transport"
"go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/router"
"go.unistack.org/micro/v3/router" "go.unistack.org/micro/v4/selector"
"go.unistack.org/micro/v3/selector" "go.unistack.org/micro/v4/selector/random"
"go.unistack.org/micro/v3/selector/random" "go.unistack.org/micro/v4/tracer"
"go.unistack.org/micro/v3/tracer" )
var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
) )
// Options holds client options // Options holds client options
@ -29,8 +39,6 @@ type Options struct {
Logger logger.Logger Logger logger.Logger
// Tracer used for tracing // Tracer used for tracing
Tracer tracer.Tracer Tracer tracer.Tracer
// Broker used to publish messages
Broker broker.Broker
// Meter used for metrics // Meter used for metrics
Meter meter.Meter Meter meter.Meter
// Context is used for external options // Context is used for external options
@ -199,7 +207,6 @@ func NewOptions(opts ...Option) Options {
PoolTTL: DefaultPoolTTL, PoolTTL: DefaultPoolTTL,
Selector: random.NewSelector(), Selector: random.NewSelector(),
Logger: logger.DefaultLogger, Logger: logger.DefaultLogger,
Broker: broker.DefaultBroker,
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
Router: router.DefaultRouter, Router: router.DefaultRouter,
@ -213,13 +220,6 @@ func NewOptions(opts ...Option) Options {
return options return options
} }
// Broker to be used for pub/sub
func Broker(b broker.Broker) Option {
return func(o *Options) {
o.Broker = b
}
}
// Tracer to be used for tracing // Tracer to be used for tracing
func Tracer(t tracer.Tracer) Option { func Tracer(t tracer.Tracer) Option {
return func(o *Options) { return func(o *Options) {

View File

@ -3,7 +3,7 @@ package client
import ( import (
"context" "context"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v4/errors"
) )
// RetryFunc that returning either false or a non-nil error will result in the call not being retried // RetryFunc that returning either false or a non-nil error will result in the call not being retried

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"testing" "testing"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v4/errors"
) )
func TestRetryAlways(t *testing.T) { func TestRetryAlways(t *testing.T) {

View File

@ -1,7 +1,7 @@
package client package client
import ( import (
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
) )
type testRequest struct { type testRequest struct {

View File

@ -1,11 +1,11 @@
// Package codec is an interface for encoding messages // Package codec is an interface for encoding messages
package codec // import "go.unistack.org/micro/v3/codec" package codec // import "go.unistack.org/micro/v4/codec"
import ( import (
"errors" "errors"
"io" "io"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
) )
// Message types // Message types

View File

@ -1,4 +1,4 @@
// Copyright 2021 Unistack LLC // Copyright 2021-2023 Unistack LLC
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -17,7 +17,7 @@ syntax = "proto3";
package micro.codec; package micro.codec;
option cc_enable_arenas = true; option cc_enable_arenas = true;
option go_package = "go.unistack.org/micro/v3/codec;codec"; option go_package = "go.unistack.org/micro/v4/codec;codec";
option java_multiple_files = true; option java_multiple_files = true;
option java_outer_classname = "MicroCodec"; option java_outer_classname = "MicroCodec";
option java_package = "micro.codec"; option java_package = "micro.codec";

View File

@ -3,9 +3,9 @@ package codec
import ( import (
"context" "context"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Option func // Option func

View File

@ -1,5 +1,5 @@
// Package config is an interface for dynamic configuration. // Package config is an interface for dynamic configuration.
package config // import "go.unistack.org/micro/v3/config" package config // import "go.unistack.org/micro/v4/config"
import ( import (
"context" "context"

View File

@ -8,8 +8,8 @@ import (
"time" "time"
"github.com/imdario/mergo" "github.com/imdario/mergo"
rutil "go.unistack.org/micro/v3/util/reflect" rutil "go.unistack.org/micro/v4/util/reflect"
mtime "go.unistack.org/micro/v3/util/time" mtime "go.unistack.org/micro/v4/util/time"
) )
type defaultConfig struct { type defaultConfig struct {
@ -169,7 +169,7 @@ func fillValue(value reflect.Value, val string) error {
return err return err
} }
value.Set(reflect.ValueOf(v)) value.Set(reflect.ValueOf(v))
case value.Type().String() == "time.Duration" && value.Type().PkgPath() == "go.unistack.org/micro/v3/util/time": case value.Type().String() == "time.Duration" && value.Type().PkgPath() == "go.unistack.org/micro/v4/util/time":
v, err := mtime.ParseDuration(val) v, err := mtime.ParseDuration(val)
if err != nil { if err != nil {
return err return err

View File

@ -6,17 +6,18 @@ import (
"testing" "testing"
"time" "time"
"go.unistack.org/micro/v3/config" "go.unistack.org/micro/v4/config"
mtime "go.unistack.org/micro/v3/util/time" mtime "go.unistack.org/micro/v4/util/time"
) )
type cfg struct { type cfg struct {
StringValue string `default:"string_value"` StringValue string `default:"string_value"`
IgnoreValue string `json:"-"` IgnoreValue string `json:"-"`
StructValue *cfgStructValue StructValue *cfgStructValue
IntValue int `default:"99"` IntValue int `default:"99"`
DurationValue time.Duration `default:"10s"` DurationValue time.Duration `default:"10s"`
MDurationValue mtime.Duration `default:"10s"` MDurationValue mtime.Duration `default:"10s"`
MapValue map[string]bool `default:"key1=true,key2=false"`
} }
type cfgStructValue struct { type cfgStructValue struct {
@ -67,6 +68,9 @@ func TestDefault(t *testing.T) {
if conf.StringValue != "after_load" { if conf.StringValue != "after_load" {
t.Fatal("AfterLoad option not working") t.Fatal("AfterLoad option not working")
} }
if len(conf.MapValue) != 2 {
t.Fatalf("map value invalid: %#+v\n", conf.MapValue)
}
_ = conf _ = conf
// t.Logf("%#+v\n", conf) // t.Logf("%#+v\n", conf)
} }

View File

@ -4,10 +4,10 @@ import (
"context" "context"
"time" "time"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Options hold the config options // Options hold the config options

View File

@ -1,6 +1,6 @@
// Package errors provides a way to return detailed information // Package errors provides a way to return detailed information
// for an RPC request error. The error is normally JSON encoded. // for an RPC request error. The error is normally JSON encoded.
package errors // import "go.unistack.org/micro/v3/errors" package errors // import "go.unistack.org/micro/v4/errors"
import ( import (
"bytes" "bytes"

View File

@ -1,4 +1,4 @@
// Copyright 2021 Unistack LLC // Copyright 2021-2023 Unistack LLC
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -17,7 +17,7 @@ syntax = "proto3";
package micro.errors; package micro.errors;
option cc_enable_arenas = true; option cc_enable_arenas = true;
option go_package = "go.unistack.org/micro/v3/errors;errors"; option go_package = "go.unistack.org/micro/v4/errors;errors";
option java_multiple_files = true; option java_multiple_files = true;
option java_outer_classname = "MicroErrors"; option java_outer_classname = "MicroErrors";
option java_package = "micro.errors"; option java_package = "micro.errors";

View File

@ -1,27 +0,0 @@
package micro
import (
"context"
"go.unistack.org/micro/v3/client"
)
// Event is used to publish messages to a topic
type Event interface {
// Publish publishes a message to the event topic
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}
type event struct {
c client.Client
topic string
}
// NewEvent creates a new event publisher
func NewEvent(topic string, c client.Client) Event {
return &event{c, topic}
}
func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...)
}

View File

@ -6,12 +6,12 @@ import (
"sync" "sync"
"github.com/silas/dag" "github.com/silas/dag"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/store" "go.unistack.org/micro/v4/store"
"go.unistack.org/micro/v3/util/id" "go.unistack.org/micro/v4/util/id"
) )
type microFlow struct { type microFlow struct {

View File

@ -1,5 +1,5 @@
// Package flow is an interface used for saga pattern microservice workflow // Package flow is an interface used for saga pattern microservice workflow
package flow // import "go.unistack.org/micro/v3/flow" package flow // import "go.unistack.org/micro/v4/flow"
import ( import (
"context" "context"
@ -7,7 +7,7 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
) )
var ( var (

View File

@ -4,11 +4,11 @@ import (
"context" "context"
"time" "time"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/store" "go.unistack.org/micro/v4/store"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Option func // Option func

View File

@ -1,4 +1,4 @@
package fsm // import "go.unistack.org/micro/v3/fsm" package fsm // import "go.unistack.org/micro/v4/fsm"
import ( import (
"context" "context"

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"testing" "testing"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
) )
func TestFSMStart(t *testing.T) { func TestFSMStart(t *testing.T) {

18
go.mod
View File

@ -1,9 +1,19 @@
module go.unistack.org/micro/v3 module go.unistack.org/micro/v4
go 1.19 go 1.20
require ( require (
github.com/imdario/mergo v0.3.14 github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/imdario/mergo v0.3.15
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
golang.org/x/sync v0.1.0
golang.org/x/sys v0.7.0
google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.30.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
) )

30
go.sum
View File

@ -1,9 +1,31 @@
github.com/imdario/mergo v0.3.14 h1:fOqeC1+nCuuk6PKQdg9YmosXX7Y7mHX6R/0ZldI9iHo= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/imdario/mergo v0.3.14/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,5 +1,5 @@
// Package logger provides a log interface // Package logger provides a log interface
package logger // import "go.unistack.org/micro/v3/logger" package logger // import "go.unistack.org/micro/v4/logger"
import ( import (
"context" "context"

View File

@ -8,7 +8,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
) )
const sf = "0-+# " const sf = "0-+# "

View File

@ -5,7 +5,7 @@ import (
"strings" "strings"
"testing" "testing"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
) )
func TestUnwrap(t *testing.T) { func TestUnwrap(t *testing.T) {

View File

@ -5,9 +5,9 @@ import (
"context" "context"
"fmt" "fmt"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
) )
var ( var (

View File

@ -1,5 +1,5 @@
// Package metadata is a way of defining message headers // Package metadata is a way of defining message headers
package metadata // import "go.unistack.org/micro/v3/metadata" package metadata // import "go.unistack.org/micro/v4/metadata"
import ( import (
"net/textproto" "net/textproto"

View File

@ -1,5 +1,5 @@
// Package meter is for instrumentation // Package meter is for instrumentation
package meter // import "go.unistack.org/micro/v3/meter" package meter // import "go.unistack.org/micro/v4/meter"
import ( import (
"io" "io"

View File

@ -3,7 +3,7 @@ package meter
import ( import (
"context" "context"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
) )
// Option powers the configuration for metrics implementations: // Option powers the configuration for metrics implementations:

View File

@ -1,49 +1,16 @@
package wrapper // import "go.unistack.org/micro/v3/meter/wrapper" package wrapper // import "go.unistack.org/micro/v4/meter/wrapper"
import ( import (
"context" "context"
"fmt" "fmt"
"time" "time"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
) )
var ( var (
// ClientRequestDurationSeconds specifies meter metric name
ClientRequestDurationSeconds = "client_request_duration_seconds"
// ClientRequestLatencyMicroseconds specifies meter metric name
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
// ClientRequestTotal specifies meter metric name
ClientRequestTotal = "client_request_total"
// ClientRequestInflight specifies meter metric name
ClientRequestInflight = "client_request_inflight"
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
labelSuccess = "success" labelSuccess = "success"
labelFailure = "failure" labelFailure = "failure"
labelStatus = "status" labelStatus = "status"
@ -229,40 +196,6 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
return stream, err return stream, err
} }
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
ts := time.Now()
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
return err
}
// NewHandlerWrapper create new server handler wrapper
// deprecated
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.HandlerFunc
}
// NewServerHandlerWrapper create new server handler wrapper // NewServerHandlerWrapper create new server handler wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper { func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{ handler := &wrapper{
@ -302,46 +235,3 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return err return err
} }
} }
// NewSubscriberWrapper create server subscribe wrapper
// deprecated
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, msg)
te := time.Since(ts)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
return err
}
}

View File

@ -1,4 +1,4 @@
package mtls // import "go.unistack.org/micro/v3/mtls" package mtls // import "go.unistack.org/micro/v4/mtls"
import ( import (
"bytes" "bytes"

View File

@ -1,9 +1,9 @@
// Package network is for creating internetworks // Package network is for creating internetworks
package network // import "go.unistack.org/micro/v3/network" package network // import "go.unistack.org/micro/v4/network"
import ( import (
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
) )
// Error is network node errors // Error is network node errors

View File

@ -1,13 +1,13 @@
package network package network
import ( import (
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/network/tunnel" "go.unistack.org/micro/v4/network/tunnel"
"go.unistack.org/micro/v3/proxy" "go.unistack.org/micro/v4/proxy"
"go.unistack.org/micro/v3/router" "go.unistack.org/micro/v4/router"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
"go.unistack.org/micro/v3/util/id" "go.unistack.org/micro/v4/util/id"
) )
// Option func // Option func

View File

@ -8,9 +8,9 @@ import (
"sync" "sync"
"time" "time"
maddr "go.unistack.org/micro/v3/util/addr" maddr "go.unistack.org/micro/v4/util/addr"
mnet "go.unistack.org/micro/v3/util/net" mnet "go.unistack.org/micro/v4/util/net"
"go.unistack.org/micro/v3/util/rand" "go.unistack.org/micro/v4/util/rand"
) )
type memorySocket struct { type memorySocket struct {

View File

@ -5,10 +5,10 @@ import (
"crypto/tls" "crypto/tls"
"time" "time"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Options struct holds the transport options // Options struct holds the transport options

View File

@ -1,11 +1,11 @@
// Package transport is an interface for synchronous connection based communication // Package transport is an interface for synchronous connection based communication
package transport // import "go.unistack.org/micro/v3/network/transport" package transport // import "go.unistack.org/micro/v4/network/transport"
import ( import (
"context" "context"
"time" "time"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
) )
var ( var (

View File

@ -1,15 +1,15 @@
// Package broker is a tunnel broker // Package broker is a tunnel broker
package broker // import "go.unistack.org/micro/v3/network/tunnel/broker" package broker // import "go.unistack.org/micro/v4/network/tunnel/broker"
import ( import (
"context" "context"
"fmt" "fmt"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v4/network/transport"
"go.unistack.org/micro/v3/network/tunnel" "go.unistack.org/micro/v4/network/tunnel"
) )
type tunBroker struct { type tunBroker struct {

View File

@ -3,11 +3,11 @@ package tunnel
import ( import (
"time" "time"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v4/network/transport"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
"go.unistack.org/micro/v3/util/id" "go.unistack.org/micro/v4/util/id"
) )
var ( var (

View File

@ -1,8 +1,8 @@
package transport package transport
import ( import (
"go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v4/network/transport"
"go.unistack.org/micro/v3/network/tunnel" "go.unistack.org/micro/v4/network/tunnel"
) )
type tunListener struct { type tunListener struct {

View File

@ -1,12 +1,12 @@
// Package transport provides a tunnel transport // Package transport provides a tunnel transport
package transport // import "go.unistack.org/micro/v3/network/tunnel/transport" package transport // import "go.unistack.org/micro/v4/network/tunnel/transport"
import ( import (
"context" "context"
"fmt" "fmt"
"go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v4/network/transport"
"go.unistack.org/micro/v3/network/tunnel" "go.unistack.org/micro/v4/network/tunnel"
) )
type tunTransport struct { type tunTransport struct {

View File

@ -1,12 +1,12 @@
// Package tunnel provides gre network tunnelling // Package tunnel provides gre network tunnelling
package tunnel // import "go.unistack.org/micro/v3/network/transport/tunnel" package tunnel // import "go.unistack.org/micro/v4/network/transport/tunnel"
import ( import (
"context" "context"
"errors" "errors"
"time" "time"
"go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v4/network/transport"
) )
// DefaultTunnel contains default tunnel implementation // DefaultTunnel contains default tunnel implementation

View File

@ -5,17 +5,17 @@ import (
"fmt" "fmt"
"time" "time"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/config" "go.unistack.org/micro/v4/config"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/router" "go.unistack.org/micro/v4/router"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v3/store" "go.unistack.org/micro/v4/store"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Options for micro service // Options for micro service
@ -90,33 +90,7 @@ type Option func(*Options) error
// Broker to be used for client and server // Broker to be used for client and server
func Broker(b broker.Broker, opts ...BrokerOption) Option { func Broker(b broker.Broker, opts ...BrokerOption) Option {
return func(o *Options) error { return func(o *Options) error {
var err error o.Brokers = []broker.Broker{b}
bopts := brokerOptions{}
for _, opt := range opts {
opt(&bopts)
}
all := false
if len(opts) == 0 {
all = true
}
for _, srv := range o.Servers {
for _, os := range bopts.servers {
if srv.Name() == os || all {
if err = srv.Init(server.Broker(b)); err != nil {
return err
}
}
}
}
for _, cli := range o.Clients {
for _, oc := range bopts.clients {
if cli.Name() == oc || all {
if err = cli.Init(client.Broker(b)); err != nil {
return err
}
}
}
}
return nil return nil
} }
} }

View File

@ -1,4 +1,4 @@
package options // import "go.unistack.org/micro/v3/options" package options // import "go.unistack.org/micro/v4/options"
// Hook func interface // Hook func interface
type Hook interface{} type Hook interface{}

View File

@ -1,5 +1,5 @@
// Package http enables the http profiler // Package http enables the http profiler
package http // import "go.unistack.org/micro/v3/profiler/http" package http // import "go.unistack.org/micro/v4/profiler/http"
import ( import (
"context" "context"
@ -7,7 +7,7 @@ import (
"net/http/pprof" "net/http/pprof"
"sync" "sync"
profile "go.unistack.org/micro/v3/profiler" profile "go.unistack.org/micro/v4/profiler"
) )
type httpProfile struct { type httpProfile struct {

View File

@ -1,5 +1,5 @@
// Package pprof provides a pprof profiler which writes output to /tmp/[name].{cpu,mem}.pprof // Package pprof provides a pprof profiler which writes output to /tmp/[name].{cpu,mem}.pprof
package pprof // import "go.unistack.org/micro/v3/profiler/pprof" package pprof // import "go.unistack.org/micro/v4/profiler/pprof"
import ( import (
"os" "os"
@ -9,7 +9,7 @@ import (
"sync" "sync"
"time" "time"
profile "go.unistack.org/micro/v3/profiler" profile "go.unistack.org/micro/v4/profiler"
) )
type profiler struct { type profiler struct {

View File

@ -1,5 +1,5 @@
// Package profiler is for profilers // Package profiler is for profilers
package profiler // import "go.unistack.org/micro/v3/profiler" package profiler // import "go.unistack.org/micro/v4/profiler"
// Profiler interface // Profiler interface
type Profiler interface { type Profiler interface {

View File

@ -2,11 +2,11 @@
package proxy package proxy
import ( import (
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/router" "go.unistack.org/micro/v4/router"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Options for proxy // Options for proxy

View File

@ -1,10 +1,10 @@
// Package proxy is a transparent proxy built on the micro/server // Package proxy is a transparent proxy built on the micro/server
package proxy // import "go.unistack.org/micro/v3/proxy" package proxy // import "go.unistack.org/micro/v4/proxy"
import ( import (
"context" "context"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
) )
// DefaultEndpoint holds default proxy address // DefaultEndpoint holds default proxy address

View File

@ -6,7 +6,7 @@ import (
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
) )
// ExtractValue from reflect.Type from specified depth // ExtractValue from reflect.Type from specified depth

View File

@ -5,8 +5,8 @@ import (
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/util/id" "go.unistack.org/micro/v4/util/id"
) )
var ( var (

View File

@ -5,9 +5,9 @@ import (
"crypto/tls" "crypto/tls"
"time" "time"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Options holds options for register // Options holds options for register

View File

@ -1,11 +1,11 @@
// Package register is an interface for service discovery // Package register is an interface for service discovery
package register // import "go.unistack.org/micro/v3/register" package register // import "go.unistack.org/micro/v4/register"
import ( import (
"context" "context"
"errors" "errors"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
) )
const ( const (

View File

@ -1,5 +1,5 @@
// Package dns resolves names to dns records // Package dns resolves names to dns records
package dns // import "go.unistack.org/micro/v3/resolver/dns" package dns // import "go.unistack.org/micro/v4/resolver/dns"
import ( import (
"context" "context"
@ -7,7 +7,7 @@ import (
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v3/resolver" "go.unistack.org/micro/v4/resolver"
) )
// Resolver is a DNS network resolve // Resolver is a DNS network resolve

View File

@ -1,11 +1,11 @@
// Package dnssrv resolves names to dns srv records // Package dnssrv resolves names to dns srv records
package dnssrv // import "go.unistack.org/micro/v3/resolver/dnssrv" package dnssrv // import "go.unistack.org/micro/v4/resolver/dnssrv"
import ( import (
"fmt" "fmt"
"net" "net"
"go.unistack.org/micro/v3/resolver" "go.unistack.org/micro/v4/resolver"
) )
// Resolver is a DNS network resolve // Resolver is a DNS network resolve

View File

@ -1,5 +1,5 @@
// Package http resolves names to network addresses using a http request // Package http resolves names to network addresses using a http request
package http // import "go.unistack.org/micro/v3/resolver/http" package http // import "go.unistack.org/micro/v4/resolver/http"
import ( import (
"encoding/json" "encoding/json"
@ -8,7 +8,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"go.unistack.org/micro/v3/resolver" "go.unistack.org/micro/v4/resolver"
) )
// nolint: golint,revive // nolint: golint,revive

View File

@ -1,8 +1,8 @@
// Package noop is a noop resolver // Package noop is a noop resolver
package noop // import "go.unistack.org/micro/v3/resolver/noop" package noop // import "go.unistack.org/micro/v4/resolver/noop"
import ( import (
"go.unistack.org/micro/v3/resolver" "go.unistack.org/micro/v4/resolver"
) )
// Resolver contains noop resolver // Resolver contains noop resolver

View File

@ -1,11 +1,11 @@
// Package register resolves names using the micro register // Package register resolves names using the micro register
package register // import "go.unistack.org/micro/v3/resolver/registry" package register // import "go.unistack.org/micro/v4/resolver/registry"
import ( import (
"context" "context"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/resolver" "go.unistack.org/micro/v4/resolver"
) )
// Resolver is a register network resolver // Resolver is a register network resolver

View File

@ -1,5 +1,5 @@
// Package resolver resolves network names to addresses // Package resolver resolves network names to addresses
package resolver // import "go.unistack.org/micro/v3/resolver" package resolver // import "go.unistack.org/micro/v4/resolver"
// Resolver is network resolver. It's used to find network nodes // Resolver is network resolver. It's used to find network nodes
// via the name to connect to. This is done based on Network.Name(). // via the name to connect to. This is done based on Network.Name().

View File

@ -1,8 +1,8 @@
// Package static is a static resolver // Package static is a static resolver
package static // import "go.unistack.org/micro/v3/resolver/static" package static // import "go.unistack.org/micro/v4/resolver/static"
import ( import (
"go.unistack.org/micro/v3/resolver" "go.unistack.org/micro/v4/resolver"
) )
// Resolver returns a static list of nodes. In the event the node list // Resolver returns a static list of nodes. In the event the node list

View File

@ -3,9 +3,9 @@ package router
import ( import (
"context" "context"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/util/id" "go.unistack.org/micro/v4/util/id"
) )
// Options are router options // Options are router options

View File

@ -3,7 +3,7 @@ package router
import ( import (
"hash/fnv" "hash/fnv"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
) )
var ( var (

View File

@ -1,5 +1,5 @@
// Package router provides a network routing control plane // Package router provides a network routing control plane
package router // import "go.unistack.org/micro/v3/router" package router // import "go.unistack.org/micro/v4/router"
import ( import (
"errors" "errors"

View File

@ -1,8 +1,8 @@
package random // import "go.unistack.org/micro/v3/selector/random" package random // import "go.unistack.org/micro/v4/selector/random"
import ( import (
"go.unistack.org/micro/v3/selector" "go.unistack.org/micro/v4/selector"
"go.unistack.org/micro/v3/util/rand" "go.unistack.org/micro/v4/util/rand"
) )
type random struct{} type random struct{}

View File

@ -3,7 +3,7 @@ package random
import ( import (
"testing" "testing"
"go.unistack.org/micro/v3/selector" "go.unistack.org/micro/v4/selector"
) )
func TestRandom(t *testing.T) { func TestRandom(t *testing.T) {

View File

@ -1,8 +1,8 @@
package roundrobin // import "go.unistack.org/micro/v3/selector/roundrobin" package roundrobin // import "go.unistack.org/micro/v4/selector/roundrobin"
import ( import (
"go.unistack.org/micro/v3/selector" "go.unistack.org/micro/v4/selector"
"go.unistack.org/micro/v3/util/rand" "go.unistack.org/micro/v4/util/rand"
) )
// NewSelector returns an initialised round robin selector // NewSelector returns an initialised round robin selector

View File

@ -3,7 +3,7 @@ package roundrobin
import ( import (
"testing" "testing"
"go.unistack.org/micro/v3/selector" "go.unistack.org/micro/v4/selector"
) )
func TestRoundRobin(t *testing.T) { func TestRoundRobin(t *testing.T) {

View File

@ -1,5 +1,5 @@
// Package selector is for node selection and load balancing // Package selector is for node selection and load balancing
package selector // import "go.unistack.org/micro/v3/selector" package selector // import "go.unistack.org/micro/v4/selector"
import ( import (
"errors" "errors"

View File

@ -33,16 +33,6 @@ func SetOption(k, v interface{}) Option {
} }
} }
// SetSubscriberOption returns a function to setup a context with given value
func SetSubscriberOption(k, v interface{}) SubscriberOption {
return func(o *SubscriberOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
// SetHandlerOption returns a function to setup a context with given value // SetHandlerOption returns a function to setup a context with given value
func SetHandlerOption(k, v interface{}) HandlerOption { func SetHandlerOption(k, v interface{}) HandlerOption {
return func(o *HandlerOptions) { return func(o *HandlerOptions) {

View File

@ -51,14 +51,3 @@ func TestSetOption(t *testing.T) {
t.Fatal("SetOption not works") t.Fatal("SetOption not works")
} }
} }
func TestSetSubscriberOption(t *testing.T) {
type key struct{}
o := SetSubscriberOption(key{}, "test")
opts := &SubscriberOptions{}
o(opts)
if v, ok := opts.Context.Value(key{}).(string); !ok || v == "" {
t.Fatal("SetSubscriberOption not works")
}
}

View File

@ -1,6 +1,6 @@
package server package server
import "go.unistack.org/micro/v3/errors" import "go.unistack.org/micro/v4/errors"
type Error struct { type Error struct {
id string id string

View File

@ -3,7 +3,7 @@ package server
import ( import (
"testing" "testing"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v4/errors"
) )
func TestError(t *testing.T) { func TestError(t *testing.T) {

View File

@ -1,59 +0,0 @@
package server
import (
"reflect"
"go.unistack.org/micro/v3/register"
)
type rpcHandler struct {
opts HandlerOptions
handler interface{}
name string
endpoints []*register.Endpoint
}
func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
options := NewHandlerOptions(opts...)
typ := reflect.TypeOf(handler)
hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name()
var endpoints []*register.Endpoint
for m := 0; m < typ.NumMethod(); m++ {
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
for k, v := range options.Metadata[e.Name] {
e.Metadata[k] = v
}
endpoints = append(endpoints, e)
}
}
return &rpcHandler{
name: name,
handler: handler,
endpoints: endpoints,
opts: options,
}
}
func (r *rpcHandler) Name() string {
return r.name
}
func (r *rpcHandler) Handler() interface{} {
return r.handler
}
func (r *rpcHandler) Endpoints() []*register.Endpoint {
return r.endpoints
}
func (r *rpcHandler) Options() HandlerOptions {
return r.opts
}

View File

@ -1,18 +1,17 @@
package server package server
import ( import (
"fmt" "reflect"
"sort" "sort"
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/register" maddr "go.unistack.org/micro/v4/util/addr"
maddr "go.unistack.org/micro/v3/util/addr" mnet "go.unistack.org/micro/v4/util/net"
mnet "go.unistack.org/micro/v3/util/net" "go.unistack.org/micro/v4/util/rand"
"go.unistack.org/micro/v3/util/rand"
) )
// DefaultCodecs will be used to encode/decode // DefaultCodecs will be used to encode/decode
@ -25,13 +24,12 @@ const (
) )
type noopServer struct { type noopServer struct {
h Handler h Handler
wg *sync.WaitGroup wg *sync.WaitGroup
rsvc *register.Service rsvc *register.Service
handlers map[string]Handler handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber exit chan chan error
exit chan chan error opts Options
opts Options
sync.RWMutex sync.RWMutex
registered bool registered bool
started bool started bool
@ -43,9 +41,6 @@ func NewServer(opts ...Option) Server {
if n.handlers == nil { if n.handlers == nil {
n.handlers = make(map[string]Handler) n.handlers = make(map[string]Handler)
} }
if n.subscribers == nil {
n.subscribers = make(map[*subscriber][]broker.Subscriber)
}
if n.exit == nil { if n.exit == nil {
n.exit = make(chan chan error) n.exit = make(chan chan error)
} }
@ -71,37 +66,10 @@ func (n *noopServer) Name() string {
return n.opts.Name return n.opts.Name
} }
func (n *noopServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
} else if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := ValidateSubscriber(sb); err != nil {
return err
}
n.Lock()
if _, ok = n.subscribers[sub]; ok {
n.Unlock()
return fmt.Errorf("subscriber %v already exists", sub)
}
n.subscribers[sub] = nil
n.Unlock()
return nil
}
func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
return newRPCHandler(h, opts...) return newRPCHandler(h, opts...)
} }
func (n *noopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (n *noopServer) Init(opts ...Option) error { func (n *noopServer) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
o(&n.opts) o(&n.opts)
@ -110,9 +78,6 @@ func (n *noopServer) Init(opts ...Option) error {
if n.handlers == nil { if n.handlers == nil {
n.handlers = make(map[string]Handler, 1) n.handlers = make(map[string]Handler, 1)
} }
if n.subscribers == nil {
n.subscribers = make(map[*subscriber][]broker.Subscriber, 1)
}
if n.exit == nil { if n.exit == nil {
n.exit = make(chan chan error) n.exit = make(chan chan error)
@ -158,21 +123,10 @@ func (n *noopServer) Register() error {
sort.Strings(handlerList) sort.Strings(handlerList)
subscriberList := make([]*subscriber, 0, len(n.subscribers)) endpoints := make([]*register.Endpoint, 0, len(handlerList))
for e := range n.subscribers {
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, h := range handlerList { for _, h := range handlerList {
endpoints = append(endpoints, n.handlers[h].Endpoints()...) endpoints = append(endpoints, n.handlers[h].Endpoints()...)
} }
for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...)
}
n.RUnlock() n.RUnlock()
service.Nodes[0].Metadata["protocol"] = "noop" service.Nodes[0].Metadata["protocol"] = "noop"
@ -202,39 +156,6 @@ func (n *noopServer) Register() error {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
cx := config.Context
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
}
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
n.registered = true n.registered = true
if cacheService { if cacheService {
n.rsvc = service n.rsvc = service
@ -273,33 +194,6 @@ func (n *noopServer) Deregister() error {
n.registered = false n.registered = false
cx := config.Context
wg := sync.WaitGroup{}
for sb, subs := range n.subscribers {
for idx := range subs {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
ncx := cx
wg.Add(1)
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic())
}
if err := s.Unsubscribe(ncx); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err)
}
}
}(subs[idx])
}
n.subscribers[sb] = nil
}
wg.Wait()
n.Unlock() n.Unlock()
return nil return nil
} }
@ -336,21 +230,6 @@ func (n *noopServer) Start() error {
} }
n.Unlock() n.Unlock()
// only connect if we're subscribed
if len(n.subscribers) > 0 {
// connect to the broker
if err := config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
}
}
// use RegisterCheck func before register // use RegisterCheck func before register
// nolint: nestif // nolint: nestif
if err := config.RegisterCheck(config.Context); err != nil { if err := config.RegisterCheck(config.Context); err != nil {
@ -429,16 +308,6 @@ func (n *noopServer) Start() error {
// close transport // close transport
ch <- nil ch <- nil
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
}
// disconnect broker
if err := config.Broker.Disconnect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err)
}
}
}() }()
// mark the server as started // mark the server as started
@ -468,3 +337,55 @@ func (n *noopServer) Stop() error {
return err return err
} }
type rpcHandler struct {
opts HandlerOptions
handler interface{}
name string
endpoints []*register.Endpoint
}
func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
options := NewHandlerOptions(opts...)
typ := reflect.TypeOf(handler)
hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name()
var endpoints []*register.Endpoint
for m := 0; m < typ.NumMethod(); m++ {
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
for k, v := range options.Metadata[e.Name] {
e.Metadata[k] = v
}
endpoints = append(endpoints, e)
}
}
return &rpcHandler{
name: name,
handler: handler,
endpoints: endpoints,
opts: options,
}
}
func (r *rpcHandler) Name() string {
return r.name
}
func (r *rpcHandler) Handler() interface{} {
return r.handler
}
func (r *rpcHandler) Endpoints() []*register.Endpoint {
return r.endpoints
}
func (r *rpcHandler) Options() HandlerOptions {
return r.opts
}

View File

@ -1,104 +0,0 @@
package server_test
import (
"context"
"fmt"
"testing"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
)
type TestHandler struct {
t *testing.T
}
type TestMessage struct {
Name string
}
func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error {
// fmt.Printf("msg %s\n", msg.Data)
return nil
}
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
if len(msgs) != 8 {
h.t.Fatal("invalid number of messages received")
}
for idx := 0; idx < len(msgs); idx++ {
md, _ := metadata.FromIncomingContext(ctxs[idx])
_ = md
// fmt.Printf("msg md %v\n", md)
}
return nil
}
func TestNoopSub(t *testing.T) {
ctx := context.Background()
b := broker.NewBroker()
if err := b.Init(); err != nil {
t.Fatal(err)
}
if err := b.Connect(ctx); err != nil {
t.Fatal(err)
}
logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel))
s := server.NewServer(
server.Broker(b),
server.Codec("application/octet-stream", codec.NewCodec()),
)
if err := s.Init(); err != nil {
t.Fatal(err)
}
c := client.NewClient(
client.Broker(b),
client.Codec("application/octet-stream", codec.NewCodec()),
client.ContentType("application/octet-stream"),
)
if err := c.Init(); err != nil {
t.Fatal(err)
}
h := &TestHandler{t: t}
if err := s.Subscribe(s.NewSubscriber("single_topic", h.SingleSubHandler,
server.SubscriberQueue("queue"),
)); err != nil {
t.Fatal(err)
}
if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
server.SubscriberQueue("queue"),
server.SubscriberBatch(true),
)); err != nil {
t.Fatal(err)
}
if err := s.Start(); err != nil {
t.Fatal(err)
}
msgs := make([]client.Message, 0, 8)
for i := 0; i < 8; i++ {
msgs = append(msgs, c.NewMessage("batch_topic", &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))}))
}
if err := c.BatchPublish(ctx, msgs); err != nil {
t.Fatal(err)
}
defer func() {
if err := s.Stop(); err != nil {
t.Fatal(err)
}
}()
}

View File

@ -7,16 +7,26 @@ import (
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/network/transport"
"go.unistack.org/micro/v3/network/transport" "go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v3/options" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/tracer"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/util/id"
"go.unistack.org/micro/v3/util/id" )
var (
// ServerRequestDurationSeconds specifies meter metric name
ServerRequestDurationSeconds = "server_request_duration_seconds"
// ServerRequestLatencyMicroseconds specifies meter metric name
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
// ServerRequestTotal specifies meter metric name
ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight"
) )
// Option func // Option func
@ -26,8 +36,6 @@ type Option func(*Options)
type Options struct { type Options struct {
// Context holds the external options and can be used for server shutdown // Context holds the external options and can be used for server shutdown
Context context.Context Context context.Context
// Broker holds the server broker
Broker broker.Broker
// Register holds the register // Register holds the register
Register register.Register Register register.Register
// Tracer holds the tracer // Tracer holds the tracer
@ -38,12 +46,6 @@ type Options struct {
Meter meter.Meter Meter meter.Meter
// Transport holds the transport // Transport holds the transport
Transport transport.Transport Transport transport.Transport
/*
// Router for requests
Router Router
*/
// Listener may be passed if already created // Listener may be passed if already created
Listener net.Listener Listener net.Listener
// Wait group // Wait group
@ -68,10 +70,6 @@ type Options struct {
Advertise string Advertise string
// Version holds the server version // Version holds the server version
Version string Version string
// SubWrappers holds the server subscribe wrappers
SubWrappers []SubscriberWrapper
// BatchSubWrappers holds the server batch subscribe wrappers
BatchSubWrappers []BatchSubscriberWrapper
// HdlrWrappers holds the handler wrappers // HdlrWrappers holds the handler wrappers
HdlrWrappers []HandlerWrapper HdlrWrappers []HandlerWrapper
// RegisterAttempts holds the number of register attempts before error // RegisterAttempts holds the number of register attempts before error
@ -84,7 +82,7 @@ type Options struct {
MaxConn int MaxConn int
// DeregisterAttempts holds the number of deregister attempts before error // DeregisterAttempts holds the number of deregister attempts before error
DeregisterAttempts int DeregisterAttempts int
// Hooks may contains SubscriberWrapper, HandlerWrapper or Server func wrapper // Hooks may contains HandlerWrapper or Server func wrapper
Hooks options.Hooks Hooks options.Hooks
} }
@ -100,7 +98,6 @@ func NewOptions(opts ...Option) Options {
Logger: logger.DefaultLogger, Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
Broker: broker.DefaultBroker,
Register: register.DefaultRegister, Register: register.DefaultRegister,
Transport: transport.DefaultTransport, Transport: transport.DefaultTransport,
Address: DefaultAddress, Address: DefaultAddress,
@ -173,13 +170,6 @@ func Advertise(a string) Option {
} }
} }
// Broker to use for pub/sub
func Broker(b broker.Broker) Option {
return func(o *Options) {
o.Broker = b
}
}
// Codec to use to encode/decode requests for a given content type // Codec to use to encode/decode requests for a given content type
func Codec(contentType string, c codec.Codec) Option { func Codec(contentType string, c codec.Codec) Option {
return func(o *Options) { return func(o *Options) {
@ -261,15 +251,6 @@ func TLSConfig(t *tls.Config) Option {
} }
} }
/*
// WithRouter sets the request router
func WithRouter(r Router) Option {
return func(o *Options) {
o.Router = r
}
}
*/
// Wait tells the server to wait for requests to finish before exiting // Wait tells the server to wait for requests to finish before exiting
// If `wg` is nil, server only wait for completion of rpc handler. // If `wg` is nil, server only wait for completion of rpc handler.
// For user need finer grained control, pass a concrete `wg` here, server will // For user need finer grained control, pass a concrete `wg` here, server will
@ -290,20 +271,6 @@ func WrapHandler(w HandlerWrapper) Option {
} }
} }
// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
func WrapSubscriber(w SubscriberWrapper) Option {
return func(o *Options) {
o.SubWrappers = append(o.SubWrappers, w)
}
}
// WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server
func WrapBatchSubscriber(w BatchSubscriberWrapper) Option {
return func(o *Options) {
o.BatchSubWrappers = append(o.BatchSubWrappers, w)
}
}
// MaxConn specifies maximum number of max simultaneous connections to server // MaxConn specifies maximum number of max simultaneous connections to server
func MaxConn(n int) Option { func MaxConn(n int) Option {
return func(o *Options) { return func(o *Options) {
@ -343,41 +310,6 @@ func NewHandlerOptions(opts ...HandlerOption) HandlerOptions {
return options return options
} }
// SubscriberOption func
type SubscriberOption func(*SubscriberOptions)
// SubscriberOptions struct
type SubscriberOptions struct {
// Context holds the external options
Context context.Context
// Queue holds the subscription queue
Queue string
// AutoAck flag for auto ack messages after processing
AutoAck bool
// BodyOnly flag specifies that message without headers
BodyOnly bool
// Batch flag specifies that message processed in batches
Batch bool
// BatchSize flag specifies max size of batch
BatchSize int
// BatchWait flag specifies max wait time for batch filling
BatchWait time.Duration
}
// NewSubscriberOptions create new SubscriberOptions
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
options := SubscriberOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// EndpointMetadata is a Handler option that allows metadata to be added to // EndpointMetadata is a Handler option that allows metadata to be added to
// individual endpoints. // individual endpoints.
func EndpointMetadata(name string, md metadata.Metadata) HandlerOption { func EndpointMetadata(name string, md metadata.Metadata) HandlerOption {
@ -385,68 +317,3 @@ func EndpointMetadata(name string, md metadata.Metadata) HandlerOption {
o.Metadata[name] = metadata.Copy(md) o.Metadata[name] = metadata.Copy(md)
} }
} }
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
func DisableAutoAck() SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = false
}
}
// SubscriberQueue sets the shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberGroup sets the shared group name distributed messages across subscribers
func SubscriberGroup(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberBodyOnly says broker that message contains raw data with absence of micro broker.Message format
func SubscriberBodyOnly(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.BodyOnly = b
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
}
// SubscriberAck control auto ack processing for handler
func SubscriberAck(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = b
}
}
// SubscriberBatch control batch processing for handler
func SubscriberBatch(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Batch = b
}
}
// SubscriberBatchSize control batch filling size for handler
// Batch filling max waiting time controlled by SubscriberBatchWait
func SubscriberBatchSize(n int) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchSize = n
}
}
// SubscriberBatchWait control batch filling wait time for handler
func SubscriberBatchWait(td time.Duration) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchWait = td
}
}

View File

@ -4,10 +4,10 @@ import (
"net" "net"
"time" "time"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/util/addr" "go.unistack.org/micro/v4/util/addr"
"go.unistack.org/micro/v3/util/backoff" "go.unistack.org/micro/v4/util/backoff"
) )
var ( var (
@ -78,7 +78,6 @@ func NewRegisterService(s Server) (*register.Service, error) {
node.Metadata = metadata.Copy(opts.Metadata) node.Metadata = metadata.Copy(opts.Metadata)
node.Metadata["server"] = s.String() node.Metadata["server"] = s.String()
node.Metadata["broker"] = opts.Broker.String()
node.Metadata["register"] = opts.Register.String() node.Metadata["register"] = opts.Register.String()
return &register.Service{ return &register.Service{

View File

@ -1,35 +0,0 @@
package server
import (
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
)
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
body []byte
}
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Body() interface{} {
return r.payload
}
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}
func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}

View File

@ -1,13 +1,13 @@
// Package server is an interface for a micro server // Package server is an interface for a micro server
package server // import "go.unistack.org/micro/v3/server" package server // import "go.unistack.org/micro/v4/server"
import ( import (
"context" "context"
"time" "time"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/register"
) )
// DefaultServer default server // DefaultServer default server
@ -48,10 +48,6 @@ type Server interface {
Handle(h Handler) error Handle(h Handler) error
// Create a new handler // Create a new handler
NewHandler(h interface{}, opts ...HandlerOption) Handler NewHandler(h interface{}, opts ...HandlerOption) Handler
// Create a new subscriber
NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber
// Register a subscriber
Subscribe(s Subscriber) error
// Start the server // Start the server
Start() error Start() error
// Stop the server // Stop the server
@ -60,30 +56,6 @@ type Server interface {
String() string String() string
} }
/*
// Router handle serving messages
type Router interface {
// ProcessMessage processes a message
ProcessMessage(ctx context.Context, msg Message) error
// ServeRequest processes a request to completion
ServeRequest(ctx context.Context, req Request, rsp Response) error
}
*/
// Message is an async message interface
type Message interface {
// Topic of the message
Topic() string
// The decoded payload value
Body() interface{}
// The content type of the payload
ContentType() string
// The raw headers of the message
Header() metadata.Metadata
// Codec used to decode the message
Codec() codec.Codec
}
// Request is a synchronous request interface // Request is a synchronous request interface
type Request interface { type Request interface {
// Service name requested // Service name requested
@ -145,25 +117,14 @@ type Stream interface {
// //
// Example: // Example:
// //
// type Greeter struct {} // type Greeter struct {}
//
// func (g *Greeter) Hello(context, request, response) error {
// return nil
// }
// //
// func (g *Greeter) Hello(context, request, response) error {
// return nil
// }
type Handler interface { type Handler interface {
Name() string Name() string
Handler() interface{} Handler() interface{}
Endpoints() []*register.Endpoint Endpoints() []*register.Endpoint
Options() HandlerOptions Options() HandlerOptions
} }
// Subscriber interface represents a subscription to a given topic using
// a specific subscriber function or object with endpoints. It mirrors
// the handler in its behaviour.
type Subscriber interface {
Topic() string
Subscriber() interface{}
Endpoints() []*register.Endpoint
Options() SubscriberOptions
}

View File

@ -1,440 +0,0 @@
package server
import (
"bytes"
"context"
"fmt"
"reflect"
"runtime/debug"
"strings"
"unicode"
"unicode/utf8"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/register"
)
const (
subSig = "func(context.Context, interface{}) error"
batchSubSig = "func([]context.Context, []interface{}) error"
)
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type handler struct {
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type subscriber struct {
typ reflect.Type
subscriber interface{}
topic string
endpoints []*register.Endpoint
handlers []*handler
opts SubscriberOptions
rcvr reflect.Value
}
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// ValidateSubscriber func signature
func ValidateSubscriber(sub Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
if sub.Options().Batch {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
}
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
}
}
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
name, typ.NumOut(), subSig, batchSubSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
default:
hdlr := reflect.ValueOf(sub.Subscriber())
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
var endpoints []*register.Endpoint
var handlers []*handler
options := NewSubscriberOptions(opts...)
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: "Func",
Request: register.ExtractSubValue(typ),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: name + "." + method.Name,
Request: register.ExtractSubValue(method.Type),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
}
}
return &subscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
}
}
//nolint:gocyclo
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) {
defer func() {
if r := recover(); r != nil {
n.RLock()
config := n.opts
n.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msgs := make([]Message, 0, len(ps))
ctxs := make([]context.Context, 0, len(ps))
for _, p := range ps {
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
}
ct, _ := msg.Header.Get(metadata.HeaderContentType)
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
hdr := metadata.Copy(msg.Header)
topic, _ := msg.Header.Get(metadata.HeaderTopic)
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
msgs = append(msgs, &rpcMessage{
topic: topic,
contentType: ct,
header: msg.Header,
body: msg.Body,
})
}
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var req reflect.Value
switch handler.reqType.Kind() {
case reflect.Ptr:
req = reflect.New(handler.reqType.Elem())
default:
req = reflect.New(handler.reqType.Elem()).Elem()
}
reqType := handler.reqType
var cf codec.Codec
for _, msg := range msgs {
cf, err = n.newCodec(msg.ContentType())
if err != nil {
return err
}
rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
return err
}
msg.(*rpcMessage).codec = cf
msg.(*rpcMessage).payload = rb.Interface()
}
fn := func(ctxs []context.Context, ms []Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctxs))
}
payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
}
vals = append(vals, payloads)
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
for i := len(opts.BatchSubWrappers); i > 0; i-- {
fn = opts.BatchSubWrappers[i-1](fn)
}
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
results <- fn(ctxs, msgs)
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
//nolint:gocyclo
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {
n.RLock()
config := n.opts
n.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
}
ct := msg.Header["Content-Type"]
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
cf, err := n.newCodec(ct)
if err != nil {
return err
}
hdr := metadata.New(len(msg.Header))
for k, v := range msg.Header {
if k == "Content-Type" {
continue
}
hdr.Set(k, v)
}
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
if err = cf.ReadBody(bytes.NewBuffer(msg.Body), req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
cerr := fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
})
results <- cerr
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
func (s *subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Endpoints() []*register.Endpoint {
return s.endpoints
}
func (s *subscriber) Options() SubscriberOptions {
return s.opts
}

View File

@ -9,25 +9,9 @@ import (
// request and response types. // request and response types.
type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// SubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message.
type SubscriberFunc func(ctx context.Context, msg Message) error
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message. This func used by batch subscribers
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent // HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
// StreamWrapper wraps a Stream interface and returns the equivalent. // StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this // Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring, // is a convenient way to wrap a Stream as its in use for trace, monitoring,

View File

@ -1,20 +1,20 @@
// Package micro is a pluggable framework for microservices // Package micro is a pluggable framework for microservices
package micro // import "go.unistack.org/micro/v3" package micro // import "go.unistack.org/micro/v4"
import ( import (
"fmt" "fmt"
"sync" "sync"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/config" "go.unistack.org/micro/v4/config"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/router" "go.unistack.org/micro/v4/router"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v3/store" "go.unistack.org/micro/v4/store"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Service is an interface that wraps the lower level components. // Service is an interface that wraps the lower level components.
@ -66,11 +66,6 @@ func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOptio
return s.Handle(s.NewHandler(h, opts...)) return s.Handle(s.NewHandler(h, opts...))
} }
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}
type service struct { type service struct {
sync.RWMutex sync.RWMutex
opts Options opts Options
@ -88,6 +83,7 @@ func (s *service) Name() string {
// Init initialises options. Additionally it calls cmd.Init // Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called // which parses command line flags. cmd.Init is only called
// on first Init. // on first Init.
//
//nolint:gocyclo //nolint:gocyclo
func (s *service) Init(opts ...Option) error { func (s *service) Init(opts ...Option) error {
var err error var err error

View File

@ -1,20 +1,19 @@
package micro package micro
import ( import (
"context"
"reflect" "reflect"
"testing" "testing"
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/config" "go.unistack.org/micro/v4/config"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/router" "go.unistack.org/micro/v4/router"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v3/store" "go.unistack.org/micro/v4/store"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
type testItem struct { type testItem struct {
@ -65,41 +64,6 @@ func TestRegisterHandler(t *testing.T) {
} }
} }
func TestRegisterSubscriber(t *testing.T) {
type args struct {
topic string
s server.Server
h interface{}
opts []server.SubscriberOption
}
h := func(_ context.Context, _ interface{}) error {
return nil
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "RegisterSubscriber",
args: args{
topic: "test",
s: server.DefaultServer,
h: h,
opts: nil,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := RegisterSubscriber(tt.args.topic, tt.args.s, tt.args.h, tt.args.opts...); (err != nil) != tt.wantErr {
t.Errorf("RegisterSubscriber() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestNewService(t *testing.T) { func TestNewService(t *testing.T) {
type args struct { type args struct {
opts []Option opts []Option
@ -222,7 +186,6 @@ func Test_service_Options(t *testing.T) {
} }
func Test_service_Broker(t *testing.T) { func Test_service_Broker(t *testing.T) {
b := broker.NewBroker()
type fields struct { type fields struct {
opts Options opts Options
} }
@ -238,12 +201,12 @@ func Test_service_Broker(t *testing.T) {
{ {
name: "service.Broker", name: "service.Broker",
fields: fields{ fields: fields{
opts: Options{Brokers: []broker.Broker{b}}, opts: Options{Brokers: []broker.Broker{broker.DefaultBroker}},
}, },
args: args{ args: args{
names: []string{"noop"}, names: []string{"noop"},
}, },
want: b, want: broker.DefaultBroker,
}, },
} }
for _, tt := range tests { for _, tt := range tests {

View File

@ -5,7 +5,7 @@ import (
"testing" "testing"
"time" "time"
"go.unistack.org/micro/v3/store" "go.unistack.org/micro/v4/store"
) )
func TestMemoryReInit(t *testing.T) { func TestMemoryReInit(t *testing.T) {

View File

@ -5,11 +5,11 @@ import (
"crypto/tls" "crypto/tls"
"time" "time"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Options contains configuration for the Store // Options contains configuration for the Store

View File

@ -1,5 +1,5 @@
// Package store is an interface for distributed data storage. // Package store is an interface for distributed data storage.
package store // import "go.unistack.org/micro/v3/store" package store // import "go.unistack.org/micro/v4/store"
import ( import (
"context" "context"

View File

@ -3,9 +3,9 @@ package sync
import ( import (
"time" "time"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
// Options holds the sync options // Options holds the sync options

View File

@ -1,5 +1,5 @@
// Package sync is an interface for distributed synchronization // Package sync is an interface for distributed synchronization
package sync // import "go.unistack.org/micro/v3/sync" package sync // import "go.unistack.org/micro/v4/sync"
import ( import (
"errors" "errors"

View File

@ -3,7 +3,7 @@ package tracer
import ( import (
"context" "context"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/logger"
) )
type SpanStatus int type SpanStatus int

View File

@ -1,5 +1,5 @@
// Package tracer provides an interface for distributed tracing // Package tracer provides an interface for distributed tracing
package tracer // import "go.unistack.org/micro/v3/tracer" package tracer // import "go.unistack.org/micro/v4/tracer"
import ( import (
"context" "context"

View File

@ -1,14 +1,14 @@
// Package wrapper provides wrapper for Tracer // Package wrapper provides wrapper for Tracer
package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper" package wrapper // import "go.unistack.org/micro/v4/tracer/wrapper"
import ( import (
"context" "context"
"fmt" "fmt"
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v4/tracer"
) )
var ( var (

View File

@ -1,4 +1,4 @@
package addr // import "go.unistack.org/micro/v3/util/addr" package addr // import "go.unistack.org/micro/v4/util/addr"
import ( import (
"fmt" "fmt"
@ -58,6 +58,7 @@ func IsLocal(addr string) bool {
} }
// Extract returns a real ip // Extract returns a real ip
//
//nolint:gocyclo //nolint:gocyclo
func Extract(addr string) (string, error) { func Extract(addr string) (string, error) {
// if addr specified then its returned // if addr specified then its returned

Some files were not shown because too many files have changed in this diff Show More