Compare commits
42 Commits
Author | SHA1 | Date | |
---|---|---|---|
03de1ec38f | |||
819ad1117a | |||
a2a383606d | |||
55ce58617b | |||
0e587d923e | |||
fa0248c80c | |||
054bd02b59 | |||
0cf246d2d6 | |||
af278bd7d3 | |||
814b90efe5 | |||
e403ae3d8e | |||
c9816a3957 | |||
5691238a6a | |||
963a0fa7b7 | |||
485257035c | |||
ebd8ddf05b | |||
1d5e795443 | |||
a3a434d923 | |||
bcc06054f1 | |||
270d26f1ae | |||
646212cc08 | |||
00c2c749db | |||
2dbada0e94 | |||
7b8f4410fb | |||
45ebef5544 | |||
cf4cac0733 | |||
50d60b5825 | |||
46ef491764 | |||
a51b8b8102 | |||
15aac48f1e | |||
078069b2d7 | |||
258812304a | |||
da5d50db5b | |||
384e4d113d | |||
dfd1da7f0d | |||
8e5015e580 | |||
bd0c309b71 | |||
b4f0c3e29a | |||
8fddaa0455 | |||
2710c269a8 | |||
70ea93e466 | |||
a87d0ab1c1 |
18
.gitea/ISSUE_TEMPLATE/bug_report.md
Normal file
18
.gitea/ISSUE_TEMPLATE/bug_report.md
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
---
|
||||||
|
name: Bug report
|
||||||
|
about: For reporting bugs in micro
|
||||||
|
title: "[BUG]"
|
||||||
|
labels: ''
|
||||||
|
assignees: ''
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Describe the bug**
|
||||||
|
|
||||||
|
1. What are you trying to do?
|
||||||
|
2. What did you expect to happen?
|
||||||
|
3. What happens instead?
|
||||||
|
|
||||||
|
**How to reproduce the bug:**
|
||||||
|
|
||||||
|
If possible, please include a minimal code snippet here.
|
17
.gitea/ISSUE_TEMPLATE/feature-request---enhancement.md
Normal file
17
.gitea/ISSUE_TEMPLATE/feature-request---enhancement.md
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
---
|
||||||
|
name: Feature request / Enhancement
|
||||||
|
about: If you have a need not served by micro
|
||||||
|
title: "[FEATURE]"
|
||||||
|
labels: ''
|
||||||
|
assignees: ''
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Is your feature request related to a problem? Please describe.**
|
||||||
|
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
|
||||||
|
|
||||||
|
**Describe the solution you'd like**
|
||||||
|
A clear and concise description of what you want to happen.
|
||||||
|
|
||||||
|
**Additional context**
|
||||||
|
Add any other context or screenshots about the feature request here.
|
8
.gitea/ISSUE_TEMPLATE/question.md
Normal file
8
.gitea/ISSUE_TEMPLATE/question.md
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
---
|
||||||
|
name: Question
|
||||||
|
about: Ask a question about micro
|
||||||
|
title: ''
|
||||||
|
labels: ''
|
||||||
|
assignees: ''
|
||||||
|
|
||||||
|
---
|
28
.gitea/autoapprove.yml
Normal file
28
.gitea/autoapprove.yml
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
name: "autoapprove"
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request_target:
|
||||||
|
types: [assigned, opened, synchronize, reopened]
|
||||||
|
workflow_run:
|
||||||
|
workflows: ["prbuild"]
|
||||||
|
types:
|
||||||
|
- completed
|
||||||
|
|
||||||
|
permissions:
|
||||||
|
pull-requests: write
|
||||||
|
contents: write
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
autoapprove:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: approve
|
||||||
|
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
|
||||||
|
"chmod +x ./tea",
|
||||||
|
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
|
||||||
|
"./tea pr --repo ${{ github.event.repository.name }}"
|
||||||
|
]
|
||||||
|
if: github.actor == 'vtolstov'
|
||||||
|
id: approve
|
||||||
|
with:
|
||||||
|
github-token: ${{ secrets.GITHUB_TOKEN }}
|
24
.gitea/workflows/lint.yml
Normal file
24
.gitea/workflows/lint.yml
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
name: lint
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
- v3
|
||||||
|
jobs:
|
||||||
|
lint:
|
||||||
|
name: lint
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: setup-go
|
||||||
|
uses: https://gitea.com/actions/setup-go@v3
|
||||||
|
with:
|
||||||
|
go-version: 1.18
|
||||||
|
- name: checkout
|
||||||
|
uses: https://gitea.com/actions/checkout@v3
|
||||||
|
- name: deps
|
||||||
|
run: go get -v -d ./...
|
||||||
|
- name: lint
|
||||||
|
uses: https://github.com/golangci/golangci-lint-action@v3.4.0
|
||||||
|
continue-on-error: true
|
||||||
|
with:
|
||||||
|
version: v1.52
|
23
.gitea/workflows/pr.yml
Normal file
23
.gitea/workflows/pr.yml
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
name: pr
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
- v3
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
name: test
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: checkout
|
||||||
|
uses: https://gitea.com/actions/checkout@v3
|
||||||
|
- name: setup-go
|
||||||
|
uses: https://gitea.com/actions/setup-go@v3
|
||||||
|
with:
|
||||||
|
go-version: 1.18
|
||||||
|
- name: deps
|
||||||
|
run: go get -v -t -d ./...
|
||||||
|
- name: test
|
||||||
|
env:
|
||||||
|
INTEGRATION_TESTS: yes
|
||||||
|
run: go test -mod readonly -v ./...
|
9
.github.old/PULL_REQUEST_TEMPLATE.md
Normal file
9
.github.old/PULL_REQUEST_TEMPLATE.md
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
## Pull Request template
|
||||||
|
Please, go through these steps before clicking submit on this PR.
|
||||||
|
|
||||||
|
1. Give a descriptive title to your PR.
|
||||||
|
2. Provide a description of your changes.
|
||||||
|
3. Make sure you have some relevant tests.
|
||||||
|
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
|
||||||
|
|
||||||
|
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**
|
@@ -37,11 +37,4 @@ jobs:
|
|||||||
uses: golangci/golangci-lint-action@v3.4.0
|
uses: golangci/golangci-lint-action@v3.4.0
|
||||||
continue-on-error: true
|
continue-on-error: true
|
||||||
with:
|
with:
|
||||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
version: v1.30
|
||||||
version: v1.30
|
|
||||||
# Optional: working directory, useful for monorepos
|
|
||||||
# working-directory: somedir
|
|
||||||
# Optional: golangci-lint command line arguments.
|
|
||||||
# args: --issues-exit-code=0
|
|
||||||
# Optional: show only new issues if it's a pull request. The default value is `false`.
|
|
||||||
# only-new-issues: true
|
|
@@ -1,4 +1,4 @@
|
|||||||
# Micro [](https://opensource.org/licenses/Apache-2.0) [](https://pkg.go.dev/github.com/unistack-org/micro/v3?tab=overview) [](https://github.com/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Amaster+event%3Apush) [](https://goreportcard.com/report/go.unistack.org/micro/v3) [](https://codecov.io/gh/unistack-org/micro)
|
# Micro [](https://opensource.org/licenses/Apache-2.0) [](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview) [](https://github.com/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Amaster+event%3Apush) [](https://goreportcard.com/report/go.unistack.org/micro/v4) [](https://codecov.io/gh/unistack-org/micro)
|
||||||
|
|
||||||
Micro is a standard library for microservices.
|
Micro is a standard library for microservices.
|
||||||
|
|
||||||
|
@@ -1,21 +1,24 @@
|
|||||||
// Package broker is an interface used for asynchronous messaging
|
// Package broker is an interface used for asynchronous messaging
|
||||||
package broker // import "go.unistack.org/micro/v3/broker"
|
package broker // import "go.unistack.org/micro/v4/broker"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/codec"
|
||||||
|
"go.unistack.org/micro/v4/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultBroker default memory broker
|
// DefaultBroker default memory broker
|
||||||
var DefaultBroker = NewBroker()
|
var DefaultBroker Broker // = NewBroker()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ErrNotConnected returns when broker used but not connected yet
|
// ErrNotConnected returns when broker used but not connected yet
|
||||||
ErrNotConnected = errors.New("broker not connected")
|
ErrNotConnected = errors.New("broker not connected")
|
||||||
// ErrDisconnected returns when broker disconnected
|
// ErrDisconnected returns when broker disconnected
|
||||||
ErrDisconnected = errors.New("broker disconnected")
|
ErrDisconnected = errors.New("broker disconnected")
|
||||||
|
// ErrInvalidMessage returns when message has nvalid format
|
||||||
|
ErrInvalidMessage = errors.New("broker message has invalid format")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Broker is an interface used for asynchronous messaging.
|
// Broker is an interface used for asynchronous messaging.
|
||||||
@@ -32,91 +35,42 @@ type Broker interface {
|
|||||||
Connect(ctx context.Context) error
|
Connect(ctx context.Context) error
|
||||||
// Disconnect disconnect from broker
|
// Disconnect disconnect from broker
|
||||||
Disconnect(ctx context.Context) error
|
Disconnect(ctx context.Context) error
|
||||||
|
// NewMessage creates new broker message
|
||||||
|
NewMessage(endpoint string, req interface{}, opts ...MessageOption) Message
|
||||||
// Publish message to broker topic
|
// Publish message to broker topic
|
||||||
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
|
Publish(ctx context.Context, msg interface{}, opts ...PublishOption) error
|
||||||
// Subscribe subscribes to topic message via handler
|
// Subscribe subscribes to topic message via handler
|
||||||
Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
|
Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error)
|
||||||
// BatchPublish messages to broker with multiple topics
|
|
||||||
BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error
|
|
||||||
// BatchSubscribe subscribes to topic messages via handler
|
|
||||||
BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error)
|
|
||||||
// String type of broker
|
// String type of broker
|
||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handler is used to process messages via a subscription of a topic.
|
// Message is given to a subscription handler for processing
|
||||||
type Handler func(Event) error
|
type Message interface {
|
||||||
|
// Context for the message
|
||||||
// Events contains multiple events
|
Context() context.Context
|
||||||
type Events []Event
|
|
||||||
|
|
||||||
// Ack try to ack all events and return
|
|
||||||
func (evs Events) Ack() error {
|
|
||||||
var err error
|
|
||||||
for _, ev := range evs {
|
|
||||||
if err = ev.Ack(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetError sets error on event
|
|
||||||
func (evs Events) SetError(err error) {
|
|
||||||
for _, ev := range evs {
|
|
||||||
ev.SetError(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// BatchHandler is used to process messages in batches via a subscription of a topic.
|
|
||||||
type BatchHandler func(Events) error
|
|
||||||
|
|
||||||
// Event is given to a subscription handler for processing
|
|
||||||
type Event interface {
|
|
||||||
// Topic returns event topic
|
// Topic returns event topic
|
||||||
Topic() string
|
Topic() string
|
||||||
// Message returns broker message
|
// Body returns broker message
|
||||||
Message() *Message
|
Body() interface{}
|
||||||
// Ack acknowledge message
|
// Ack acknowledge message
|
||||||
Ack() error
|
Ack() error
|
||||||
// Error returns message error (like decoding errors or some other)
|
// Error returns message error (like decoding errors or some other)
|
||||||
|
// In this case Body contains raw []byte from broker
|
||||||
Error() error
|
Error() error
|
||||||
// SetError set event processing error
|
|
||||||
SetError(err error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RawMessage is a raw encoded JSON value.
|
// RawMessage is used to transfer data
|
||||||
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
type RawMessage struct {
|
||||||
type RawMessage []byte
|
|
||||||
|
|
||||||
// MarshalJSON returns m as the JSON encoding of m.
|
|
||||||
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
|
||||||
if m == nil {
|
|
||||||
return []byte("null"), nil
|
|
||||||
}
|
|
||||||
return *m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnmarshalJSON sets *m to a copy of data.
|
|
||||||
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
|
||||||
if m == nil {
|
|
||||||
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
|
||||||
}
|
|
||||||
*m = append((*m)[0:0], data...)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Message is used to transfer data
|
|
||||||
type Message struct {
|
|
||||||
// Header contains message metadata
|
// Header contains message metadata
|
||||||
Header metadata.Metadata
|
Header metadata.Metadata
|
||||||
// Body contains message body
|
// Body contains message body
|
||||||
Body RawMessage
|
Body codec.RawMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMessage create broker message with topic filled
|
// NewMessage create broker message with topic filled
|
||||||
func NewMessage(topic string) *Message {
|
func NewRawMessage(topic string) *RawMessage {
|
||||||
m := &Message{Header: metadata.New(2)}
|
m := &RawMessage{Header: metadata.New(2)}
|
||||||
m.Header.Set(metadata.HeaderTopic, topic)
|
m.Header.Set(metadata.HeaderTopic, topic)
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
233
broker/memory.go
233
broker/memory.go
@@ -1,15 +1,17 @@
|
|||||||
|
//go:build ignore
|
||||||
|
|
||||||
package broker
|
package broker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
maddr "go.unistack.org/micro/v4/util/addr"
|
||||||
maddr "go.unistack.org/micro/v3/util/addr"
|
"go.unistack.org/micro/v4/util/id"
|
||||||
"go.unistack.org/micro/v3/util/id"
|
mnet "go.unistack.org/micro/v4/util/net"
|
||||||
mnet "go.unistack.org/micro/v3/util/net"
|
"go.unistack.org/micro/v4/util/rand"
|
||||||
"go.unistack.org/micro/v3/util/rand"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type memoryBroker struct {
|
type memoryBroker struct {
|
||||||
@@ -20,23 +22,6 @@ type memoryBroker struct {
|
|||||||
connected bool
|
connected bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type memoryEvent struct {
|
|
||||||
err error
|
|
||||||
message interface{}
|
|
||||||
topic string
|
|
||||||
opts Options
|
|
||||||
}
|
|
||||||
|
|
||||||
type memorySubscriber struct {
|
|
||||||
ctx context.Context
|
|
||||||
exit chan bool
|
|
||||||
handler Handler
|
|
||||||
batchhandler BatchHandler
|
|
||||||
id string
|
|
||||||
topic string
|
|
||||||
opts SubscribeOptions
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *memoryBroker) Options() Options {
|
func (m *memoryBroker) Options() Options {
|
||||||
return m.opts
|
return m.opts
|
||||||
}
|
}
|
||||||
@@ -88,16 +73,11 @@ func (m *memoryBroker) Init(opts ...Option) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
|
func (m *memoryBroker) NewMessage(endpoint string, req interface{}, opts ...MessageOption) Message {
|
||||||
msg.Header.Set(metadata.HeaderTopic, topic)
|
return &memoryMessage{}
|
||||||
return m.publish(ctx, []*Message{msg}, opts...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
|
func (m *memoryBroker) Publish(ctx context.Context, message interface{}, opts ...PublishOption) error {
|
||||||
return m.publish(ctx, msgs, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
|
|
||||||
m.RLock()
|
m.RLock()
|
||||||
if !m.connected {
|
if !m.connected {
|
||||||
m.RUnlock()
|
m.RUnlock()
|
||||||
@@ -112,128 +92,94 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
|
|||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
default:
|
default:
|
||||||
options := NewPublishOptions(opts...)
|
options := NewPublishOptions(opts...)
|
||||||
|
var msgs []*memoryMessage
|
||||||
msgTopicMap := make(map[string]Events)
|
switch v := message.(type) {
|
||||||
for _, v := range msgs {
|
case *memoryMessage:
|
||||||
p := &memoryEvent{opts: m.opts}
|
msgs = []*memoryMessage{v}
|
||||||
|
case []*memoryMessage:
|
||||||
if m.opts.Codec == nil || options.BodyOnly {
|
msgs = v
|
||||||
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
|
default:
|
||||||
p.message = v.Body
|
return ErrInvalidMessage
|
||||||
} else {
|
}
|
||||||
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
|
msgTopicMap := make(map[string][]*memoryMessage)
|
||||||
p.message, err = m.opts.Codec.Marshal(v)
|
for _, msg := range msgs {
|
||||||
if err != nil {
|
p := &memoryMessage{opts: options}
|
||||||
return err
|
/*
|
||||||
|
if mb, ok := msg.Body().(*codec.Frame); ok {
|
||||||
|
p.message = v.Body
|
||||||
|
} else {
|
||||||
|
p.topic, _ = v.Header.Get(metadata.HeaderTopic)
|
||||||
|
p.message, err = m.opts.Codec.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
*/
|
||||||
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
|
msgTopicMap[msg.Topic()] = append(msgTopicMap[p.topic], p)
|
||||||
}
|
}
|
||||||
|
|
||||||
beh := m.opts.BatchErrorHandler
|
|
||||||
eh := m.opts.ErrorHandler
|
eh := m.opts.ErrorHandler
|
||||||
|
|
||||||
for t, ms := range msgTopicMap {
|
for t, ms := range msgTopicMap {
|
||||||
|
ts := time.Now()
|
||||||
|
|
||||||
|
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(len(ms))
|
||||||
|
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(len(ms))
|
||||||
|
|
||||||
m.RLock()
|
m.RLock()
|
||||||
subs, ok := m.subscribers[t]
|
subs, ok := m.subscribers[t]
|
||||||
m.RUnlock()
|
m.RUnlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
|
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "failure").Add(len(ms))
|
||||||
|
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
|
||||||
|
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms))
|
||||||
for _, sub := range subs {
|
for _, sub := range subs {
|
||||||
if sub.opts.BatchErrorHandler != nil {
|
|
||||||
beh = sub.opts.BatchErrorHandler
|
|
||||||
}
|
|
||||||
if sub.opts.ErrorHandler != nil {
|
if sub.opts.ErrorHandler != nil {
|
||||||
eh = sub.opts.ErrorHandler
|
eh = sub.opts.ErrorHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
for _, p := range ms {
|
||||||
// batch processing
|
if err = sub.handler(p); err != nil {
|
||||||
case sub.batchhandler != nil:
|
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||||
if err = sub.batchhandler(ms); err != nil {
|
if eh != nil {
|
||||||
ms.SetError(err)
|
_ = eh(p)
|
||||||
if beh != nil {
|
|
||||||
_ = beh(ms)
|
|
||||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||||
}
|
}
|
||||||
} else if sub.opts.AutoAck {
|
} else {
|
||||||
if err = ms.Ack(); err != nil {
|
if sub.opts.AutoAck {
|
||||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// single processing
|
|
||||||
case sub.handler != nil:
|
|
||||||
for _, p := range ms {
|
|
||||||
if err = sub.handler(p); err != nil {
|
|
||||||
p.SetError(err)
|
|
||||||
if eh != nil {
|
|
||||||
_ = eh(p)
|
|
||||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
|
||||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
|
||||||
}
|
|
||||||
} else if sub.opts.AutoAck {
|
|
||||||
if err = p.Ack(); err != nil {
|
if err = p.Ack(); err != nil {
|
||||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||||
|
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||||
|
} else {
|
||||||
|
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1)
|
||||||
|
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
te := time.Since(ts)
|
||||||
|
m.opts.Meter.Summary(PublishMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
||||||
|
m.opts.Meter.Histogram(PublishMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
||||||
|
m.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
||||||
|
m.opts.Meter.Histogram(SubscribeMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
|
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...SubscribeOption) (Subscriber, error) {
|
||||||
m.RLock()
|
|
||||||
if !m.connected {
|
|
||||||
m.RUnlock()
|
|
||||||
return nil, ErrNotConnected
|
|
||||||
}
|
|
||||||
m.RUnlock()
|
|
||||||
|
|
||||||
sid, err := id.New()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
options := NewSubscribeOptions(opts...)
|
|
||||||
|
|
||||||
sub := &memorySubscriber{
|
|
||||||
exit: make(chan bool, 1),
|
|
||||||
id: sid,
|
|
||||||
topic: topic,
|
|
||||||
batchhandler: handler,
|
|
||||||
opts: options,
|
|
||||||
ctx: ctx,
|
|
||||||
}
|
|
||||||
|
|
||||||
m.Lock()
|
|
||||||
m.subscribers[topic] = append(m.subscribers[topic], sub)
|
|
||||||
m.Unlock()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-sub.exit
|
|
||||||
m.Lock()
|
|
||||||
newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
|
|
||||||
for _, sb := range m.subscribers[topic] {
|
|
||||||
if sb.id == sub.id {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
newSubscribers = append(newSubscribers, sb)
|
|
||||||
}
|
|
||||||
m.subscribers[topic] = newSubscribers
|
|
||||||
m.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
return sub, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
|
||||||
m.RLock()
|
m.RLock()
|
||||||
if !m.connected {
|
if !m.connected {
|
||||||
m.RUnlock()
|
m.RUnlock()
|
||||||
@@ -286,38 +232,41 @@ func (m *memoryBroker) Name() string {
|
|||||||
return m.opts.Name
|
return m.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryEvent) Topic() string {
|
type memoryMessage struct {
|
||||||
|
err error
|
||||||
|
body interface{}
|
||||||
|
topic string
|
||||||
|
opts PublishOptions
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *memoryMessage) Topic() string {
|
||||||
return m.topic
|
return m.topic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryEvent) Message() *Message {
|
func (m *memoryMessage) Body() interface{} {
|
||||||
switch v := m.message.(type) {
|
return m.body
|
||||||
case *Message:
|
}
|
||||||
return v
|
|
||||||
case []byte:
|
|
||||||
msg := &Message{}
|
|
||||||
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
|
|
||||||
if m.opts.Logger.V(logger.ErrorLevel) {
|
|
||||||
m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return msg
|
|
||||||
}
|
|
||||||
|
|
||||||
|
func (m *memoryMessage) Ack() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryEvent) Ack() error {
|
func (m *memoryMessage) Error() error {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *memoryEvent) Error() error {
|
|
||||||
return m.err
|
return m.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryEvent) SetError(err error) {
|
func (m *memoryMessage) Context() context.Context {
|
||||||
m.err = err
|
return m.ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
type memorySubscriber struct {
|
||||||
|
ctx context.Context
|
||||||
|
exit chan bool
|
||||||
|
handler interface{}
|
||||||
|
id string
|
||||||
|
topic string
|
||||||
|
opts SubscribeOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memorySubscriber) Options() SubscribeOptions {
|
func (m *memorySubscriber) Options() SubscribeOptions {
|
||||||
@@ -334,7 +283,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewBroker return new memory broker
|
// NewBroker return new memory broker
|
||||||
func NewBroker(opts ...Option) Broker {
|
func NewBroker(opts ...Option) *memoryBroker {
|
||||||
return &memoryBroker{
|
return &memoryBroker{
|
||||||
opts: NewOptions(opts...),
|
opts: NewOptions(opts...),
|
||||||
subscribers: make(map[string][]*memorySubscriber),
|
subscribers: make(map[string][]*memorySubscriber),
|
||||||
|
@@ -5,7 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMemoryBatchBroker(t *testing.T) {
|
func TestMemoryBatchBroker(t *testing.T) {
|
||||||
|
@@ -5,11 +5,31 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/register"
|
||||||
|
"go.unistack.org/micro/v4/tracer"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// PublishMessageDurationSeconds specifies meter metric name
|
||||||
|
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
||||||
|
// PublishMessageLatencyMicroseconds specifies meter metric name
|
||||||
|
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
||||||
|
// PublishMessageTotal specifies meter metric name
|
||||||
|
PublishMessageTotal = "publish_message_total"
|
||||||
|
// PublishMessageInflight specifies meter metric name
|
||||||
|
PublishMessageInflight = "publish_message_inflight"
|
||||||
|
// SubscribeMessageDurationSeconds specifies meter metric name
|
||||||
|
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
||||||
|
// SubscribeMessageLatencyMicroseconds specifies meter metric name
|
||||||
|
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
||||||
|
// SubscribeMessageTotal specifies meter metric name
|
||||||
|
SubscribeMessageTotal = "subscribe_message_total"
|
||||||
|
// SubscribeMessageInflight specifies meter metric name
|
||||||
|
SubscribeMessageInflight = "subscribe_message_inflight"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options struct
|
// Options struct
|
||||||
@@ -18,8 +38,8 @@ type Options struct {
|
|||||||
Tracer tracer.Tracer
|
Tracer tracer.Tracer
|
||||||
// Register can be used for clustering
|
// Register can be used for clustering
|
||||||
Register register.Register
|
Register register.Register
|
||||||
// Codec holds the codec for marshal/unmarshal
|
// Codecs holds the codec for marshal/unmarshal
|
||||||
Codec codec.Codec
|
Codecs map[string]codec.Codec
|
||||||
// Logger used for logging
|
// Logger used for logging
|
||||||
Logger logger.Logger
|
Logger logger.Logger
|
||||||
// Meter used for metrics
|
// Meter used for metrics
|
||||||
@@ -29,15 +49,16 @@ type Options struct {
|
|||||||
// TLSConfig holds tls.TLSConfig options
|
// TLSConfig holds tls.TLSConfig options
|
||||||
TLSConfig *tls.Config
|
TLSConfig *tls.Config
|
||||||
// ErrorHandler used when broker can't unmarshal incoming message
|
// ErrorHandler used when broker can't unmarshal incoming message
|
||||||
ErrorHandler Handler
|
ErrorHandler func(Message)
|
||||||
// BatchErrorHandler used when broker can't unmashal incoming messages
|
|
||||||
BatchErrorHandler BatchHandler
|
|
||||||
// Name holds the broker name
|
// Name holds the broker name
|
||||||
Name string
|
Name string
|
||||||
// Addrs holds the broker address
|
// Addrs holds the broker address
|
||||||
Addrs []string
|
Addrs []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Option func
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
// NewOptions create new Options
|
// NewOptions create new Options
|
||||||
func NewOptions(opts ...Option) Options {
|
func NewOptions(opts ...Option) Options {
|
||||||
options := Options{
|
options := Options{
|
||||||
@@ -45,7 +66,7 @@ func NewOptions(opts ...Option) Options {
|
|||||||
Logger: logger.DefaultLogger,
|
Logger: logger.DefaultLogger,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
Meter: meter.DefaultMeter,
|
Meter: meter.DefaultMeter,
|
||||||
Codec: codec.DefaultCodec,
|
Codecs: make(map[string]codec.Codec),
|
||||||
Tracer: tracer.DefaultTracer,
|
Tracer: tracer.DefaultTracer,
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
@@ -61,6 +82,32 @@ func Context(ctx context.Context) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MessageOption func
|
||||||
|
type MessageOption func(*MessageOptions)
|
||||||
|
|
||||||
|
// MessageOptions struct
|
||||||
|
type MessageOptions struct {
|
||||||
|
Metadata metadata.Metadata
|
||||||
|
ContentType string
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessageMetadata pass additional message metadata
|
||||||
|
func MessageMetadata(md metadata.Metadata) MessageOption {
|
||||||
|
return func(o *MessageOptions) {
|
||||||
|
o.Metadata = md
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessageContentType pass ContentType for message data
|
||||||
|
func MessageContentType(ct string) MessageOption {
|
||||||
|
return func(o *MessageOptions) {
|
||||||
|
o.ContentType = ct
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PublishOption func
|
||||||
|
type PublishOption func(*PublishOptions)
|
||||||
|
|
||||||
// PublishOptions struct
|
// PublishOptions struct
|
||||||
type PublishOptions struct {
|
type PublishOptions struct {
|
||||||
// Context holds external options
|
// Context holds external options
|
||||||
@@ -85,11 +132,9 @@ type SubscribeOptions struct {
|
|||||||
// Context holds external options
|
// Context holds external options
|
||||||
Context context.Context
|
Context context.Context
|
||||||
// ErrorHandler used when broker can't unmarshal incoming message
|
// ErrorHandler used when broker can't unmarshal incoming message
|
||||||
ErrorHandler Handler
|
ErrorHandler func(Message)
|
||||||
// BatchErrorHandler used when broker can't unmashal incoming messages
|
// QueueGroup holds consumer group
|
||||||
BatchErrorHandler BatchHandler
|
QueueGroup string
|
||||||
// Group holds consumer group
|
|
||||||
Group string
|
|
||||||
// AutoAck flag specifies auto ack of incoming message when no error happens
|
// AutoAck flag specifies auto ack of incoming message when no error happens
|
||||||
AutoAck bool
|
AutoAck bool
|
||||||
// BodyOnly flag specifies that message contains only body bytes without header
|
// BodyOnly flag specifies that message contains only body bytes without header
|
||||||
@@ -100,12 +145,6 @@ type SubscribeOptions struct {
|
|||||||
BatchWait time.Duration
|
BatchWait time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option func
|
|
||||||
type Option func(*Options)
|
|
||||||
|
|
||||||
// PublishOption func
|
|
||||||
type PublishOption func(*PublishOptions)
|
|
||||||
|
|
||||||
// PublishBodyOnly publish only body of the message
|
// PublishBodyOnly publish only body of the message
|
||||||
func PublishBodyOnly(b bool) PublishOption {
|
func PublishBodyOnly(b bool) PublishOption {
|
||||||
return func(o *PublishOptions) {
|
return func(o *PublishOptions) {
|
||||||
@@ -129,59 +168,29 @@ func Addrs(addrs ...string) Option {
|
|||||||
|
|
||||||
// Codec sets the codec used for encoding/decoding used where
|
// Codec sets the codec used for encoding/decoding used where
|
||||||
// a broker does not support headers
|
// a broker does not support headers
|
||||||
func Codec(c codec.Codec) Option {
|
// Codec to be used to encode/decode requests for a given content type
|
||||||
|
func Codec(contentType string, c codec.Codec) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.Codec = c
|
o.Codecs[contentType] = c
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrorHandler will catch all broker errors that cant be handled
|
// ErrorHandler will catch all broker errors that cant be handled
|
||||||
// in normal way, for example Codec errors
|
// in normal way, for example Codec errors
|
||||||
func ErrorHandler(h Handler) Option {
|
func ErrorHandler(h func(Message)) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ErrorHandler = h
|
o.ErrorHandler = h
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// BatchErrorHandler will catch all broker errors that cant be handled
|
|
||||||
// in normal way, for example Codec errors
|
|
||||||
func BatchErrorHandler(h BatchHandler) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.BatchErrorHandler = h
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubscribeErrorHandler will catch all broker errors that cant be handled
|
// SubscribeErrorHandler will catch all broker errors that cant be handled
|
||||||
// in normal way, for example Codec errors
|
// in normal way, for example Codec errors
|
||||||
func SubscribeErrorHandler(h Handler) SubscribeOption {
|
func SubscribeErrorHandler(h func(Message)) SubscribeOption {
|
||||||
return func(o *SubscribeOptions) {
|
return func(o *SubscribeOptions) {
|
||||||
o.ErrorHandler = h
|
o.ErrorHandler = h
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeBatchErrorHandler will catch all broker errors that cant be handled
|
|
||||||
// in normal way, for example Codec errors
|
|
||||||
func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption {
|
|
||||||
return func(o *SubscribeOptions) {
|
|
||||||
o.BatchErrorHandler = h
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Queue sets the subscribers queue
|
|
||||||
// Deprecated
|
|
||||||
func Queue(name string) SubscribeOption {
|
|
||||||
return func(o *SubscribeOptions) {
|
|
||||||
o.Group = name
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubscribeGroup sets the name of the queue to share messages on
|
|
||||||
func SubscribeGroup(name string) SubscribeOption {
|
|
||||||
return func(o *SubscribeOptions) {
|
|
||||||
o.Group = name
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register sets register option
|
// Register sets register option
|
||||||
func Register(r register.Register) Option {
|
func Register(r register.Register) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
@@ -224,6 +233,21 @@ func Name(n string) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubscribeOption func signature
|
||||||
|
type SubscribeOption func(*SubscribeOptions)
|
||||||
|
|
||||||
|
// NewSubscribeOptions creates new SubscribeOptions
|
||||||
|
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
||||||
|
options := SubscribeOptions{
|
||||||
|
AutoAck: true,
|
||||||
|
Context: context.Background(),
|
||||||
|
}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
return options
|
||||||
|
}
|
||||||
|
|
||||||
// SubscribeContext set context
|
// SubscribeContext set context
|
||||||
func SubscribeContext(ctx context.Context) SubscribeOption {
|
func SubscribeContext(ctx context.Context) SubscribeOption {
|
||||||
return func(o *SubscribeOptions) {
|
return func(o *SubscribeOptions) {
|
||||||
@@ -231,14 +255,6 @@ func SubscribeContext(ctx context.Context) SubscribeOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DisableAutoAck disables auto ack
|
|
||||||
// Deprecated
|
|
||||||
func DisableAutoAck() SubscribeOption {
|
|
||||||
return func(o *SubscribeOptions) {
|
|
||||||
o.AutoAck = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubscribeAutoAck contol auto acking of messages
|
// SubscribeAutoAck contol auto acking of messages
|
||||||
// after they have been handled.
|
// after they have been handled.
|
||||||
func SubscribeAutoAck(b bool) SubscribeOption {
|
func SubscribeAutoAck(b bool) SubscribeOption {
|
||||||
@@ -268,17 +284,16 @@ func SubscribeBatchWait(td time.Duration) SubscribeOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeOption func
|
// SubscribeQueueGroup sets the shared queue name distributed messages across subscribers
|
||||||
type SubscribeOption func(*SubscribeOptions)
|
func SubscribeQueueGroup(n string) SubscribeOption {
|
||||||
|
return func(o *SubscribeOptions) {
|
||||||
// NewSubscribeOptions creates new SubscribeOptions
|
o.QueueGroup = n
|
||||||
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
|
}
|
||||||
options := SubscribeOptions{
|
}
|
||||||
AutoAck: true,
|
|
||||||
Context: context.Background(),
|
// SubscribeAutoAck control auto ack processing for handler
|
||||||
|
func SubscribeAuthAck(b bool) SubscribeOption {
|
||||||
|
return func(o *SubscribeOptions) {
|
||||||
|
o.AutoAck = b
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
|
||||||
o(&options)
|
|
||||||
}
|
|
||||||
return options
|
|
||||||
}
|
}
|
||||||
|
98
broker/subscriber.go
Normal file
98
broker/subscriber.go
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package broker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"unicode"
|
||||||
|
"unicode/utf8"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
subSig = "func(context.Context, interface{}) error"
|
||||||
|
batchSubSig = "func([]context.Context, []interface{}) error"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Precompute the reflect type for error. Can't use error directly
|
||||||
|
// because Typeof takes an empty interface value. This is annoying.
|
||||||
|
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
||||||
|
|
||||||
|
// Is this an exported - upper case - name?
|
||||||
|
func isExported(name string) bool {
|
||||||
|
rune, _ := utf8.DecodeRuneInString(name)
|
||||||
|
return unicode.IsUpper(rune)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is this type exported or a builtin?
|
||||||
|
func isExportedOrBuiltinType(t reflect.Type) bool {
|
||||||
|
for t.Kind() == reflect.Ptr {
|
||||||
|
t = t.Elem()
|
||||||
|
}
|
||||||
|
// PkgPath will be non-empty even for an exported type,
|
||||||
|
// so we need to check the type name as well.
|
||||||
|
return isExported(t.Name()) || t.PkgPath() == ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidateSubscriber func signature
|
||||||
|
func ValidateSubscriber(sub interface{}) error {
|
||||||
|
typ := reflect.TypeOf(sub)
|
||||||
|
var argType reflect.Type
|
||||||
|
switch typ.Kind() {
|
||||||
|
case reflect.Func:
|
||||||
|
name := "Func"
|
||||||
|
switch typ.NumIn() {
|
||||||
|
case 1: // func(Message) error
|
||||||
|
|
||||||
|
case 2: // func(context.Context, Message) error or func(context.Context, []Message) error
|
||||||
|
argType = typ.In(2)
|
||||||
|
// if sub.Options().Batch {
|
||||||
|
if argType.Kind() != reflect.Slice {
|
||||||
|
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
|
||||||
|
}
|
||||||
|
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
|
||||||
|
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
|
||||||
|
}
|
||||||
|
// }
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
|
||||||
|
}
|
||||||
|
if !isExportedOrBuiltinType(argType) {
|
||||||
|
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
|
||||||
|
}
|
||||||
|
if typ.NumOut() != 1 {
|
||||||
|
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
|
||||||
|
name, typ.NumOut(), subSig, batchSubSig)
|
||||||
|
}
|
||||||
|
if returnType := typ.Out(0); returnType != typeOfError {
|
||||||
|
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
hdlr := reflect.ValueOf(sub)
|
||||||
|
name := reflect.Indirect(hdlr).Type().Name()
|
||||||
|
|
||||||
|
for m := 0; m < typ.NumMethod(); m++ {
|
||||||
|
method := typ.Method(m)
|
||||||
|
switch method.Type.NumIn() {
|
||||||
|
case 3:
|
||||||
|
argType = method.Type.In(2)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
|
||||||
|
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isExportedOrBuiltinType(argType) {
|
||||||
|
return fmt.Errorf("%v argument type not exported: %v", name, argType)
|
||||||
|
}
|
||||||
|
if method.Type.NumOut() != 1 {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
|
||||||
|
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
|
||||||
|
}
|
||||||
|
if returnType := method.Type.Out(0); returnType != typeOfError {
|
||||||
|
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@@ -5,7 +5,7 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/util/backoff"
|
"go.unistack.org/micro/v4/util/backoff"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BackoffFunc is the backoff call func
|
// BackoffFunc is the backoff call func
|
||||||
|
@@ -1,12 +1,11 @@
|
|||||||
// Package client is an interface for an RPC client
|
// Package client is an interface for an RPC client
|
||||||
package client // import "go.unistack.org/micro/v3/client"
|
package client // import "go.unistack.org/micro/v4/client"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -35,23 +34,12 @@ type Client interface {
|
|||||||
Name() string
|
Name() string
|
||||||
Init(opts ...Option) error
|
Init(opts ...Option) error
|
||||||
Options() Options
|
Options() Options
|
||||||
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
|
|
||||||
NewRequest(service string, endpoint string, req interface{}, opts ...RequestOption) Request
|
NewRequest(service string, endpoint string, req interface{}, opts ...RequestOption) Request
|
||||||
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
|
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
|
||||||
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
|
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
|
||||||
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
|
|
||||||
BatchPublish(ctx context.Context, msg []Message, opts ...PublishOption) error
|
|
||||||
String() string
|
String() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Message is the interface for publishing asynchronously
|
|
||||||
type Message interface {
|
|
||||||
Topic() string
|
|
||||||
Payload() interface{}
|
|
||||||
ContentType() string
|
|
||||||
Metadata() metadata.Metadata
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request is the interface for a synchronous request used by Call or Stream
|
// Request is the interface for a synchronous request used by Call or Stream
|
||||||
type Request interface {
|
type Request interface {
|
||||||
// The service to call
|
// The service to call
|
||||||
@@ -68,16 +56,22 @@ type Request interface {
|
|||||||
Codec() codec.Codec
|
Codec() codec.Codec
|
||||||
// indicates whether the request will be a streaming one rather than unary
|
// indicates whether the request will be a streaming one rather than unary
|
||||||
Stream() bool
|
Stream() bool
|
||||||
|
// Header data
|
||||||
|
// Header() metadata.Metadata
|
||||||
}
|
}
|
||||||
|
|
||||||
// Response is the response received from a service
|
// Response is the response received from a service
|
||||||
type Response interface {
|
type Response interface {
|
||||||
// Read the response
|
// Read the response
|
||||||
Codec() codec.Codec
|
Codec() codec.Codec
|
||||||
|
// The content type
|
||||||
|
// ContentType() string
|
||||||
// Header data
|
// Header data
|
||||||
Header() metadata.Metadata
|
// Header() metadata.Metadata
|
||||||
// Read the undecoded response
|
// Read the undecoded response
|
||||||
Read() ([]byte, error)
|
Read() ([]byte, error)
|
||||||
|
// The unencoded request body
|
||||||
|
// Body() interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream is the interface for a bidirectional synchronous stream
|
// Stream is the interface for a bidirectional synchronous stream
|
||||||
|
@@ -4,8 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v4/errors"
|
||||||
"go.unistack.org/micro/v3/router"
|
"go.unistack.org/micro/v4/router"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LookupFunc is used to lookup routes for a service
|
// LookupFunc is used to lookup routes for a service
|
||||||
|
@@ -5,11 +5,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/errors"
|
||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/selector"
|
||||||
"go.unistack.org/micro/v3/selector"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultCodecs will be used to encode/decode data
|
// DefaultCodecs will be used to encode/decode data
|
||||||
@@ -285,6 +284,9 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
|
|||||||
ch := make(chan error, callOpts.Retries)
|
ch := make(chan error, callOpts.Retries)
|
||||||
var gerr error
|
var gerr error
|
||||||
|
|
||||||
|
ts := time.Now()
|
||||||
|
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||||
|
n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Inc()
|
||||||
for i := 0; i <= callOpts.Retries; i++ {
|
for i := 0; i <= callOpts.Retries; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
ch <- call(i)
|
ch <- call(i)
|
||||||
@@ -312,6 +314,16 @@ func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if gerr != nil {
|
||||||
|
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "failure").Inc()
|
||||||
|
} else {
|
||||||
|
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "success").Inc()
|
||||||
|
}
|
||||||
|
n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Dec()
|
||||||
|
te := time.Since(ts)
|
||||||
|
n.opts.Meter.Summary(ClientRequestLatencyMicroseconds, "endpoint", endpoint).Update(te.Seconds())
|
||||||
|
n.opts.Meter.Histogram(ClientRequestDurationSeconds, "endpoint", endpoint).Update(te.Seconds())
|
||||||
|
|
||||||
return gerr
|
return gerr
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -323,11 +335,6 @@ func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts
|
|||||||
return &noopRequest{service: service, endpoint: endpoint}
|
return &noopRequest{service: service, endpoint: endpoint}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message {
|
|
||||||
options := NewMessageOptions(append([]MessageOption{MessageContentType(n.opts.ContentType)}, opts...)...)
|
|
||||||
return &noopMessage{topic: topic, payload: msg, opts: options}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@@ -414,7 +421,15 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
|
|||||||
|
|
||||||
node := next()
|
node := next()
|
||||||
|
|
||||||
|
// ts := time.Now()
|
||||||
|
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||||
|
n.opts.Meter.Counter(ClientRequestInflight, "endpoint", endpoint).Inc()
|
||||||
stream, cerr := n.stream(ctx, node, req, callOpts)
|
stream, cerr := n.stream(ctx, node, req, callOpts)
|
||||||
|
if cerr != nil {
|
||||||
|
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "failure").Inc()
|
||||||
|
} else {
|
||||||
|
n.opts.Meter.Counter(ClientRequestTotal, "endpoint", endpoint, "status", "success").Inc()
|
||||||
|
}
|
||||||
|
|
||||||
// record the result of the call to inform future routing decisions
|
// record the result of the call to inform future routing decisions
|
||||||
if verr := n.opts.Selector.Record(node, cerr); verr != nil {
|
if verr := n.opts.Selector.Record(node, cerr); verr != nil {
|
||||||
@@ -468,64 +483,6 @@ func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption
|
|||||||
return nil, grr
|
return nil, grr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) {
|
func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (*noopStream, error) {
|
||||||
return &noopStream{}, nil
|
return &noopStream{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
|
||||||
return n.publish(ctx, ps, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error {
|
|
||||||
return n.publish(ctx, []Message{p}, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishOption) error {
|
|
||||||
options := NewPublishOptions(opts...)
|
|
||||||
|
|
||||||
msgs := make([]*broker.Message, 0, len(ps))
|
|
||||||
|
|
||||||
for _, p := range ps {
|
|
||||||
md, ok := metadata.FromOutgoingContext(ctx)
|
|
||||||
if !ok {
|
|
||||||
md = metadata.New(0)
|
|
||||||
}
|
|
||||||
md[metadata.HeaderContentType] = p.ContentType()
|
|
||||||
|
|
||||||
topic := p.Topic()
|
|
||||||
|
|
||||||
// get the exchange
|
|
||||||
if len(options.Exchange) > 0 {
|
|
||||||
topic = options.Exchange
|
|
||||||
}
|
|
||||||
|
|
||||||
md[metadata.HeaderTopic] = topic
|
|
||||||
|
|
||||||
var body []byte
|
|
||||||
|
|
||||||
// passed in raw data
|
|
||||||
if d, ok := p.Payload().(*codec.Frame); ok {
|
|
||||||
body = d.Data
|
|
||||||
} else {
|
|
||||||
// use codec for payload
|
|
||||||
cf, err := n.newCodec(p.ContentType())
|
|
||||||
if err != nil {
|
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the body
|
|
||||||
b, err := cf.Marshal(p.Payload())
|
|
||||||
if err != nil {
|
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
|
||||||
}
|
|
||||||
body = b
|
|
||||||
}
|
|
||||||
|
|
||||||
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
|
||||||
}
|
|
||||||
|
|
||||||
return n.opts.Broker.BatchPublish(ctx, msgs,
|
|
||||||
broker.PublishContext(options.Context),
|
|
||||||
broker.PublishBodyOnly(options.BodyOnly),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
@@ -6,17 +6,27 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/network/transport"
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
"go.unistack.org/micro/v4/register"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v4/router"
|
||||||
"go.unistack.org/micro/v3/router"
|
"go.unistack.org/micro/v4/selector"
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v4/selector/random"
|
||||||
"go.unistack.org/micro/v3/selector/random"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ClientRequestDurationSeconds specifies meter metric name
|
||||||
|
ClientRequestDurationSeconds = "client_request_duration_seconds"
|
||||||
|
// ClientRequestLatencyMicroseconds specifies meter metric name
|
||||||
|
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
|
||||||
|
// ClientRequestTotal specifies meter metric name
|
||||||
|
ClientRequestTotal = "client_request_total"
|
||||||
|
// ClientRequestInflight specifies meter metric name
|
||||||
|
ClientRequestInflight = "client_request_inflight"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options holds client options
|
// Options holds client options
|
||||||
@@ -29,8 +39,6 @@ type Options struct {
|
|||||||
Logger logger.Logger
|
Logger logger.Logger
|
||||||
// Tracer used for tracing
|
// Tracer used for tracing
|
||||||
Tracer tracer.Tracer
|
Tracer tracer.Tracer
|
||||||
// Broker used to publish messages
|
|
||||||
Broker broker.Broker
|
|
||||||
// Meter used for metrics
|
// Meter used for metrics
|
||||||
Meter meter.Meter
|
Meter meter.Meter
|
||||||
// Context is used for external options
|
// Context is used for external options
|
||||||
@@ -199,7 +207,6 @@ func NewOptions(opts ...Option) Options {
|
|||||||
PoolTTL: DefaultPoolTTL,
|
PoolTTL: DefaultPoolTTL,
|
||||||
Selector: random.NewSelector(),
|
Selector: random.NewSelector(),
|
||||||
Logger: logger.DefaultLogger,
|
Logger: logger.DefaultLogger,
|
||||||
Broker: broker.DefaultBroker,
|
|
||||||
Meter: meter.DefaultMeter,
|
Meter: meter.DefaultMeter,
|
||||||
Tracer: tracer.DefaultTracer,
|
Tracer: tracer.DefaultTracer,
|
||||||
Router: router.DefaultRouter,
|
Router: router.DefaultRouter,
|
||||||
@@ -213,13 +220,6 @@ func NewOptions(opts ...Option) Options {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broker to be used for pub/sub
|
|
||||||
func Broker(b broker.Broker) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Broker = b
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tracer to be used for tracing
|
// Tracer to be used for tracing
|
||||||
func Tracer(t tracer.Tracer) Option {
|
func Tracer(t tracer.Tracer) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
@@ -3,7 +3,7 @@ package client
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v4/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RetryFunc that returning either false or a non-nil error will result in the call not being retried
|
// RetryFunc that returning either false or a non-nil error will result in the call not being retried
|
||||||
|
@@ -5,7 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v4/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRetryAlways(t *testing.T) {
|
func TestRetryAlways(t *testing.T) {
|
||||||
|
@@ -1,7 +1,7 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testRequest struct {
|
type testRequest struct {
|
||||||
|
@@ -1,11 +1,11 @@
|
|||||||
// Package codec is an interface for encoding messages
|
// Package codec is an interface for encoding messages
|
||||||
package codec // import "go.unistack.org/micro/v3/codec"
|
package codec // import "go.unistack.org/micro/v4/codec"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Message types
|
// Message types
|
||||||
@@ -84,3 +84,24 @@ func MarshalAppend(buf []byte, c Codec, v interface{}, opts ...Option) ([]byte,
|
|||||||
|
|
||||||
return append(buf, mbuf...), nil
|
return append(buf, mbuf...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RawMessage is a raw encoded JSON value.
|
||||||
|
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
||||||
|
type RawMessage []byte
|
||||||
|
|
||||||
|
// MarshalJSON returns m as the JSON encoding of m.
|
||||||
|
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
||||||
|
if m == nil {
|
||||||
|
return []byte("null"), nil
|
||||||
|
}
|
||||||
|
return *m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON sets *m to a copy of data.
|
||||||
|
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||||
|
if m == nil {
|
||||||
|
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
||||||
|
}
|
||||||
|
*m = append((*m)[0:0], data...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
// Copyright 2021 Unistack LLC
|
// Copyright 2021-2023 Unistack LLC
|
||||||
//
|
//
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
@@ -17,7 +17,7 @@ syntax = "proto3";
|
|||||||
package micro.codec;
|
package micro.codec;
|
||||||
|
|
||||||
option cc_enable_arenas = true;
|
option cc_enable_arenas = true;
|
||||||
option go_package = "go.unistack.org/micro/v3/codec;codec";
|
option go_package = "go.unistack.org/micro/v4/codec;codec";
|
||||||
option java_multiple_files = true;
|
option java_multiple_files = true;
|
||||||
option java_outer_classname = "MicroCodec";
|
option java_outer_classname = "MicroCodec";
|
||||||
option java_package = "micro.codec";
|
option java_package = "micro.codec";
|
||||||
|
@@ -3,9 +3,9 @@ package codec
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option func
|
// Option func
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package config is an interface for dynamic configuration.
|
// Package config is an interface for dynamic configuration.
|
||||||
package config // import "go.unistack.org/micro/v3/config"
|
package config // import "go.unistack.org/micro/v4/config"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@@ -8,8 +8,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/imdario/mergo"
|
"github.com/imdario/mergo"
|
||||||
rutil "go.unistack.org/micro/v3/util/reflect"
|
rutil "go.unistack.org/micro/v4/util/reflect"
|
||||||
mtime "go.unistack.org/micro/v3/util/time"
|
mtime "go.unistack.org/micro/v4/util/time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type defaultConfig struct {
|
type defaultConfig struct {
|
||||||
@@ -169,7 +169,7 @@ func fillValue(value reflect.Value, val string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
value.Set(reflect.ValueOf(v))
|
value.Set(reflect.ValueOf(v))
|
||||||
case value.Type().String() == "time.Duration" && value.Type().PkgPath() == "go.unistack.org/micro/v3/util/time":
|
case value.Type().String() == "time.Duration" && value.Type().PkgPath() == "go.unistack.org/micro/v4/util/time":
|
||||||
v, err := mtime.ParseDuration(val)
|
v, err := mtime.ParseDuration(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@@ -6,17 +6,18 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/config"
|
"go.unistack.org/micro/v4/config"
|
||||||
mtime "go.unistack.org/micro/v3/util/time"
|
mtime "go.unistack.org/micro/v4/util/time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
StringValue string `default:"string_value"`
|
StringValue string `default:"string_value"`
|
||||||
IgnoreValue string `json:"-"`
|
IgnoreValue string `json:"-"`
|
||||||
StructValue *cfgStructValue
|
StructValue *cfgStructValue
|
||||||
IntValue int `default:"99"`
|
IntValue int `default:"99"`
|
||||||
DurationValue time.Duration `default:"10s"`
|
DurationValue time.Duration `default:"10s"`
|
||||||
MDurationValue mtime.Duration `default:"10s"`
|
MDurationValue mtime.Duration `default:"10s"`
|
||||||
|
MapValue map[string]bool `default:"key1=true,key2=false"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgStructValue struct {
|
type cfgStructValue struct {
|
||||||
@@ -67,6 +68,9 @@ func TestDefault(t *testing.T) {
|
|||||||
if conf.StringValue != "after_load" {
|
if conf.StringValue != "after_load" {
|
||||||
t.Fatal("AfterLoad option not working")
|
t.Fatal("AfterLoad option not working")
|
||||||
}
|
}
|
||||||
|
if len(conf.MapValue) != 2 {
|
||||||
|
t.Fatalf("map value invalid: %#+v\n", conf.MapValue)
|
||||||
|
}
|
||||||
_ = conf
|
_ = conf
|
||||||
// t.Logf("%#+v\n", conf)
|
// t.Logf("%#+v\n", conf)
|
||||||
}
|
}
|
||||||
|
@@ -4,10 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options hold the config options
|
// Options hold the config options
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
// Package errors provides a way to return detailed information
|
// Package errors provides a way to return detailed information
|
||||||
// for an RPC request error. The error is normally JSON encoded.
|
// for an RPC request error. The error is normally JSON encoded.
|
||||||
package errors // import "go.unistack.org/micro/v3/errors"
|
package errors // import "go.unistack.org/micro/v4/errors"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
// Copyright 2021 Unistack LLC
|
// Copyright 2021-2023 Unistack LLC
|
||||||
//
|
//
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
@@ -17,7 +17,7 @@ syntax = "proto3";
|
|||||||
package micro.errors;
|
package micro.errors;
|
||||||
|
|
||||||
option cc_enable_arenas = true;
|
option cc_enable_arenas = true;
|
||||||
option go_package = "go.unistack.org/micro/v3/errors;errors";
|
option go_package = "go.unistack.org/micro/v4/errors;errors";
|
||||||
option java_multiple_files = true;
|
option java_multiple_files = true;
|
||||||
option java_outer_classname = "MicroErrors";
|
option java_outer_classname = "MicroErrors";
|
||||||
option java_package = "micro.errors";
|
option java_package = "micro.errors";
|
||||||
|
27
event.go
27
event.go
@@ -1,27 +0,0 @@
|
|||||||
package micro
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/client"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Event is used to publish messages to a topic
|
|
||||||
type Event interface {
|
|
||||||
// Publish publishes a message to the event topic
|
|
||||||
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type event struct {
|
|
||||||
c client.Client
|
|
||||||
topic string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewEvent creates a new event publisher
|
|
||||||
func NewEvent(topic string, c client.Client) Event {
|
|
||||||
return &event{c, topic}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
|
|
||||||
return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...)
|
|
||||||
}
|
|
@@ -6,12 +6,12 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/silas/dag"
|
"github.com/silas/dag"
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v4/client"
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
"go.unistack.org/micro/v3/store"
|
"go.unistack.org/micro/v4/store"
|
||||||
"go.unistack.org/micro/v3/util/id"
|
"go.unistack.org/micro/v4/util/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
type microFlow struct {
|
type microFlow struct {
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package flow is an interface used for saga pattern microservice workflow
|
// Package flow is an interface used for saga pattern microservice workflow
|
||||||
package flow // import "go.unistack.org/micro/v3/flow"
|
package flow // import "go.unistack.org/micro/v4/flow"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@@ -4,11 +4,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v4/client"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/store"
|
"go.unistack.org/micro/v4/store"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option func
|
// Option func
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
package fsm // import "go.unistack.org/micro/v3/fsm"
|
package fsm // import "go.unistack.org/micro/v4/fsm"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@@ -5,7 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFSMStart(t *testing.T) {
|
func TestFSMStart(t *testing.T) {
|
||||||
|
19
go.mod
19
go.mod
@@ -1,10 +1,19 @@
|
|||||||
module go.unistack.org/micro/v3
|
module go.unistack.org/micro/v4
|
||||||
|
|
||||||
go 1.16
|
go 1.20
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/imdario/mergo v0.3.13
|
github.com/DATA-DOG/go-sqlmock v1.5.0
|
||||||
|
github.com/imdario/mergo v0.3.15
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
golang.org/x/sync v0.1.0
|
||||||
|
golang.org/x/sys v0.7.0
|
||||||
|
google.golang.org/grpc v1.54.0
|
||||||
|
google.golang.org/protobuf v1.30.0
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/golang/protobuf v1.5.3 // indirect
|
||||||
|
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
||||||
)
|
)
|
||||||
|
31
go.sum
31
go.sum
@@ -1,10 +1,31 @@
|
|||||||
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
|
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
|
||||||
github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg=
|
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
|
||||||
|
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||||
|
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||||
|
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||||
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||||
|
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
|
||||||
|
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ=
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||||
|
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
|
||||||
|
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||||
|
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
|
||||||
|
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
|
||||||
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
|
||||||
|
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
|
||||||
|
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
|
||||||
|
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
|
||||||
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
|
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
|
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
|
||||||
|
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package logger provides a log interface
|
// Package logger provides a log interface
|
||||||
package logger // import "go.unistack.org/micro/v3/logger"
|
package logger // import "go.unistack.org/micro/v4/logger"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@@ -8,7 +8,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
)
|
)
|
||||||
|
|
||||||
const sf = "0-+# "
|
const sf = "0-+# "
|
||||||
|
@@ -5,7 +5,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestUnwrap(t *testing.T) {
|
func TestUnwrap(t *testing.T) {
|
||||||
|
@@ -5,9 +5,9 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v4/client"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package metadata is a way of defining message headers
|
// Package metadata is a way of defining message headers
|
||||||
package metadata // import "go.unistack.org/micro/v3/metadata"
|
package metadata // import "go.unistack.org/micro/v4/metadata"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/textproto"
|
"net/textproto"
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package meter is for instrumentation
|
// Package meter is for instrumentation
|
||||||
package meter // import "go.unistack.org/micro/v3/meter"
|
package meter // import "go.unistack.org/micro/v4/meter"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
@@ -3,7 +3,7 @@ package meter
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option powers the configuration for metrics implementations:
|
// Option powers the configuration for metrics implementations:
|
||||||
|
@@ -1,49 +1,16 @@
|
|||||||
package wrapper // import "go.unistack.org/micro/v3/meter/wrapper"
|
package wrapper // import "go.unistack.org/micro/v4/meter/wrapper"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v4/client"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ClientRequestDurationSeconds specifies meter metric name
|
|
||||||
ClientRequestDurationSeconds = "client_request_duration_seconds"
|
|
||||||
// ClientRequestLatencyMicroseconds specifies meter metric name
|
|
||||||
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
|
|
||||||
// ClientRequestTotal specifies meter metric name
|
|
||||||
ClientRequestTotal = "client_request_total"
|
|
||||||
// ClientRequestInflight specifies meter metric name
|
|
||||||
ClientRequestInflight = "client_request_inflight"
|
|
||||||
// ServerRequestDurationSeconds specifies meter metric name
|
|
||||||
ServerRequestDurationSeconds = "server_request_duration_seconds"
|
|
||||||
// ServerRequestLatencyMicroseconds specifies meter metric name
|
|
||||||
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
|
|
||||||
// ServerRequestTotal specifies meter metric name
|
|
||||||
ServerRequestTotal = "server_request_total"
|
|
||||||
// ServerRequestInflight specifies meter metric name
|
|
||||||
ServerRequestInflight = "server_request_inflight"
|
|
||||||
// PublishMessageDurationSeconds specifies meter metric name
|
|
||||||
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
|
||||||
// PublishMessageLatencyMicroseconds specifies meter metric name
|
|
||||||
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
|
||||||
// PublishMessageTotal specifies meter metric name
|
|
||||||
PublishMessageTotal = "publish_message_total"
|
|
||||||
// PublishMessageInflight specifies meter metric name
|
|
||||||
PublishMessageInflight = "publish_message_inflight"
|
|
||||||
// SubscribeMessageDurationSeconds specifies meter metric name
|
|
||||||
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
|
||||||
// SubscribeMessageLatencyMicroseconds specifies meter metric name
|
|
||||||
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
|
||||||
// SubscribeMessageTotal specifies meter metric name
|
|
||||||
SubscribeMessageTotal = "subscribe_message_total"
|
|
||||||
// SubscribeMessageInflight specifies meter metric name
|
|
||||||
SubscribeMessageInflight = "subscribe_message_inflight"
|
|
||||||
|
|
||||||
labelSuccess = "success"
|
labelSuccess = "success"
|
||||||
labelFailure = "failure"
|
labelFailure = "failure"
|
||||||
labelStatus = "status"
|
labelStatus = "status"
|
||||||
@@ -230,37 +197,7 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
endpoint := p.Topic()
|
return w.Client.Publish(ctx, p, opts...)
|
||||||
|
|
||||||
labels := make([]string, 0, 4)
|
|
||||||
labels = append(labels, labelEndpoint, endpoint)
|
|
||||||
|
|
||||||
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
|
|
||||||
ts := time.Now()
|
|
||||||
err := w.Client.Publish(ctx, p, opts...)
|
|
||||||
te := time.Since(ts)
|
|
||||||
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
|
|
||||||
|
|
||||||
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
|
||||||
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
labels = append(labels, labelStatus, labelSuccess)
|
|
||||||
} else {
|
|
||||||
labels = append(labels, labelStatus, labelFailure)
|
|
||||||
}
|
|
||||||
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHandlerWrapper create new server handler wrapper
|
|
||||||
// deprecated
|
|
||||||
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
|
||||||
handler := &wrapper{
|
|
||||||
opts: NewOptions(opts...),
|
|
||||||
}
|
|
||||||
return handler.HandlerFunc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServerHandlerWrapper create new server handler wrapper
|
// NewServerHandlerWrapper create new server handler wrapper
|
||||||
@@ -302,46 +239,3 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSubscriberWrapper create server subscribe wrapper
|
|
||||||
// deprecated
|
|
||||||
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
|
||||||
handler := &wrapper{
|
|
||||||
opts: NewOptions(opts...),
|
|
||||||
}
|
|
||||||
return handler.SubscriberFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
|
||||||
handler := &wrapper{
|
|
||||||
opts: NewOptions(opts...),
|
|
||||||
}
|
|
||||||
return handler.SubscriberFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
|
|
||||||
return func(ctx context.Context, msg server.Message) error {
|
|
||||||
endpoint := msg.Topic()
|
|
||||||
|
|
||||||
labels := make([]string, 0, 4)
|
|
||||||
labels = append(labels, labelEndpoint, endpoint)
|
|
||||||
|
|
||||||
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
|
|
||||||
ts := time.Now()
|
|
||||||
err := fn(ctx, msg)
|
|
||||||
te := time.Since(ts)
|
|
||||||
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
|
|
||||||
|
|
||||||
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
|
||||||
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
labels = append(labels, labelStatus, labelSuccess)
|
|
||||||
} else {
|
|
||||||
labels = append(labels, labelStatus, labelFailure)
|
|
||||||
}
|
|
||||||
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
package mtls // import "go.unistack.org/micro/v3/mtls"
|
package mtls // import "go.unistack.org/micro/v4/mtls"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
@@ -1,9 +1,9 @@
|
|||||||
// Package network is for creating internetworks
|
// Package network is for creating internetworks
|
||||||
package network // import "go.unistack.org/micro/v3/network"
|
package network // import "go.unistack.org/micro/v4/network"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v4/client"
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Error is network node errors
|
// Error is network node errors
|
||||||
|
@@ -1,13 +1,13 @@
|
|||||||
package network
|
package network
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/network/tunnel"
|
"go.unistack.org/micro/v4/network/tunnel"
|
||||||
"go.unistack.org/micro/v3/proxy"
|
"go.unistack.org/micro/v4/proxy"
|
||||||
"go.unistack.org/micro/v3/router"
|
"go.unistack.org/micro/v4/router"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
"go.unistack.org/micro/v3/util/id"
|
"go.unistack.org/micro/v4/util/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option func
|
// Option func
|
||||||
|
@@ -8,9 +8,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
maddr "go.unistack.org/micro/v3/util/addr"
|
maddr "go.unistack.org/micro/v4/util/addr"
|
||||||
mnet "go.unistack.org/micro/v3/util/net"
|
mnet "go.unistack.org/micro/v4/util/net"
|
||||||
"go.unistack.org/micro/v3/util/rand"
|
"go.unistack.org/micro/v4/util/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
type memorySocket struct {
|
type memorySocket struct {
|
||||||
|
@@ -5,10 +5,10 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options struct holds the transport options
|
// Options struct holds the transport options
|
||||||
|
@@ -1,11 +1,11 @@
|
|||||||
// Package transport is an interface for synchronous connection based communication
|
// Package transport is an interface for synchronous connection based communication
|
||||||
package transport // import "go.unistack.org/micro/v3/network/transport"
|
package transport // import "go.unistack.org/micro/v4/network/transport"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@@ -1,15 +1,15 @@
|
|||||||
// Package broker is a tunnel broker
|
// Package broker is a tunnel broker
|
||||||
package broker // import "go.unistack.org/micro/v3/network/tunnel/broker"
|
package broker // import "go.unistack.org/micro/v4/network/tunnel/broker"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v4/broker"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
"go.unistack.org/micro/v4/network/transport"
|
||||||
"go.unistack.org/micro/v3/network/tunnel"
|
"go.unistack.org/micro/v4/network/tunnel"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tunBroker struct {
|
type tunBroker struct {
|
||||||
|
@@ -3,11 +3,11 @@ package tunnel
|
|||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
"go.unistack.org/micro/v4/network/transport"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
"go.unistack.org/micro/v3/util/id"
|
"go.unistack.org/micro/v4/util/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@@ -1,8 +1,8 @@
|
|||||||
package transport
|
package transport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
"go.unistack.org/micro/v4/network/transport"
|
||||||
"go.unistack.org/micro/v3/network/tunnel"
|
"go.unistack.org/micro/v4/network/tunnel"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tunListener struct {
|
type tunListener struct {
|
||||||
|
@@ -1,12 +1,12 @@
|
|||||||
// Package transport provides a tunnel transport
|
// Package transport provides a tunnel transport
|
||||||
package transport // import "go.unistack.org/micro/v3/network/tunnel/transport"
|
package transport // import "go.unistack.org/micro/v4/network/tunnel/transport"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
"go.unistack.org/micro/v4/network/transport"
|
||||||
"go.unistack.org/micro/v3/network/tunnel"
|
"go.unistack.org/micro/v4/network/tunnel"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tunTransport struct {
|
type tunTransport struct {
|
||||||
|
@@ -1,12 +1,12 @@
|
|||||||
// Package tunnel provides gre network tunnelling
|
// Package tunnel provides gre network tunnelling
|
||||||
package tunnel // import "go.unistack.org/micro/v3/network/transport/tunnel"
|
package tunnel // import "go.unistack.org/micro/v4/network/transport/tunnel"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/network/transport"
|
"go.unistack.org/micro/v4/network/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultTunnel contains default tunnel implementation
|
// DefaultTunnel contains default tunnel implementation
|
||||||
|
53
options.go
53
options.go
@@ -5,17 +5,17 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v4/broker"
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v4/client"
|
||||||
"go.unistack.org/micro/v3/config"
|
"go.unistack.org/micro/v4/config"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v4/register"
|
||||||
"go.unistack.org/micro/v3/router"
|
"go.unistack.org/micro/v4/router"
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
"go.unistack.org/micro/v3/store"
|
"go.unistack.org/micro/v4/store"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options for micro service
|
// Options for micro service
|
||||||
@@ -90,33 +90,7 @@ type Option func(*Options) error
|
|||||||
// Broker to be used for client and server
|
// Broker to be used for client and server
|
||||||
func Broker(b broker.Broker, opts ...BrokerOption) Option {
|
func Broker(b broker.Broker, opts ...BrokerOption) Option {
|
||||||
return func(o *Options) error {
|
return func(o *Options) error {
|
||||||
var err error
|
o.Brokers = []broker.Broker{b}
|
||||||
bopts := brokerOptions{}
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(&bopts)
|
|
||||||
}
|
|
||||||
all := false
|
|
||||||
if len(opts) == 0 {
|
|
||||||
all = true
|
|
||||||
}
|
|
||||||
for _, srv := range o.Servers {
|
|
||||||
for _, os := range bopts.servers {
|
|
||||||
if srv.Name() == os || all {
|
|
||||||
if err = srv.Init(server.Broker(b)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, cli := range o.Clients {
|
|
||||||
for _, oc := range bopts.clients {
|
|
||||||
if cli.Name() == oc || all {
|
|
||||||
if err = cli.Init(client.Broker(b)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -211,6 +185,7 @@ func Stores(s ...store.Store) Option {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Logger set the logger to use
|
// Logger set the logger to use
|
||||||
|
//
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func Logger(l logger.Logger, opts ...LoggerOption) Option {
|
func Logger(l logger.Logger, opts ...LoggerOption) Option {
|
||||||
return func(o *Options) error {
|
return func(o *Options) error {
|
||||||
@@ -329,6 +304,7 @@ func Meters(m ...meter.Meter) Option {
|
|||||||
|
|
||||||
// Register sets the register for the service
|
// Register sets the register for the service
|
||||||
// and the underlying components
|
// and the underlying components
|
||||||
|
//
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func Register(r register.Register, opts ...RegisterOption) Option {
|
func Register(r register.Register, opts ...RegisterOption) Option {
|
||||||
return func(o *Options) error {
|
return func(o *Options) error {
|
||||||
@@ -403,6 +379,7 @@ func RegisterBroker(n string) RegisterOption {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Tracer sets the tracer
|
// Tracer sets the tracer
|
||||||
|
//
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func Tracer(t tracer.Tracer, opts ...TracerOption) Option {
|
func Tracer(t tracer.Tracer, opts ...TracerOption) Option {
|
||||||
return func(o *Options) error {
|
return func(o *Options) error {
|
||||||
|
31
options/hooks.go
Normal file
31
options/hooks.go
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
package options // import "go.unistack.org/micro/v4/options"
|
||||||
|
|
||||||
|
// Hook func interface
|
||||||
|
type Hook interface{}
|
||||||
|
|
||||||
|
// Hooks func slice
|
||||||
|
type Hooks []Hook
|
||||||
|
|
||||||
|
// Append is used to add hooks
|
||||||
|
func (hs *Hooks) Append(h ...Hook) {
|
||||||
|
*hs = append(*hs, h...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replace is used to set hooks
|
||||||
|
func (hs *Hooks) Replace(h ...Hook) {
|
||||||
|
*hs = h
|
||||||
|
}
|
||||||
|
|
||||||
|
// EachNext is used to itearate over hooks forward
|
||||||
|
func (hs *Hooks) EachNext(fn func(Hook)) {
|
||||||
|
for idx := 0; idx < len(*hs); idx++ {
|
||||||
|
fn((*hs)[idx])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EachPrev is used to iterate over hooks backward
|
||||||
|
func (hs *Hooks) EachPrev(fn func(Hook)) {
|
||||||
|
for idx := len(*hs) - 1; idx >= 0; idx-- {
|
||||||
|
fn((*hs)[idx])
|
||||||
|
}
|
||||||
|
}
|
65
options/hooks_test.go
Normal file
65
options/hooks_test.go
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
package options
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestHooks_Append(t *testing.T) {
|
||||||
|
fn1 := func() {}
|
||||||
|
fn2 := func() {}
|
||||||
|
hs := &Hooks{}
|
||||||
|
hs.Append(fn1, fn2)
|
||||||
|
if len(*hs) != 2 {
|
||||||
|
t.Fatalf("unexpected Append error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHooks_Replace(t *testing.T) {
|
||||||
|
fn1 := func() {}
|
||||||
|
fn2 := func() {}
|
||||||
|
hs := &Hooks{}
|
||||||
|
hs.Append(fn1, fn2, fn1)
|
||||||
|
if len(*hs) != 3 {
|
||||||
|
t.Fatalf("unexpected Append error")
|
||||||
|
}
|
||||||
|
hs.Replace(fn1, fn2)
|
||||||
|
if len(*hs) != 2 {
|
||||||
|
t.Fatalf("unexpected Replace error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHooks_EachNext(t *testing.T) {
|
||||||
|
n := 5
|
||||||
|
fn1 := func() {
|
||||||
|
n *= 2
|
||||||
|
}
|
||||||
|
fn2 := func() {
|
||||||
|
n -= 10
|
||||||
|
}
|
||||||
|
hs := &Hooks{}
|
||||||
|
hs.Append(fn1, fn2)
|
||||||
|
|
||||||
|
hs.EachNext(func(h Hook) {
|
||||||
|
h.(func())()
|
||||||
|
})
|
||||||
|
if n != 0 {
|
||||||
|
t.Fatalf("unexpected EachNext")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHooks_EachPrev(t *testing.T) {
|
||||||
|
n := 5
|
||||||
|
fn1 := func() {
|
||||||
|
n *= 2
|
||||||
|
}
|
||||||
|
fn2 := func() {
|
||||||
|
n -= 10
|
||||||
|
}
|
||||||
|
hs := &Hooks{}
|
||||||
|
hs.Append(fn2, fn1)
|
||||||
|
|
||||||
|
hs.EachPrev(func(h Hook) {
|
||||||
|
h.(func())()
|
||||||
|
})
|
||||||
|
if n != 0 {
|
||||||
|
t.Fatalf("unexpected EachPrev")
|
||||||
|
}
|
||||||
|
}
|
@@ -1,5 +1,5 @@
|
|||||||
// Package http enables the http profiler
|
// Package http enables the http profiler
|
||||||
package http // import "go.unistack.org/micro/v3/profiler/http"
|
package http // import "go.unistack.org/micro/v4/profiler/http"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"net/http/pprof"
|
"net/http/pprof"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
profile "go.unistack.org/micro/v3/profiler"
|
profile "go.unistack.org/micro/v4/profiler"
|
||||||
)
|
)
|
||||||
|
|
||||||
type httpProfile struct {
|
type httpProfile struct {
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package pprof provides a pprof profiler which writes output to /tmp/[name].{cpu,mem}.pprof
|
// Package pprof provides a pprof profiler which writes output to /tmp/[name].{cpu,mem}.pprof
|
||||||
package pprof // import "go.unistack.org/micro/v3/profiler/pprof"
|
package pprof // import "go.unistack.org/micro/v4/profiler/pprof"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
profile "go.unistack.org/micro/v3/profiler"
|
profile "go.unistack.org/micro/v4/profiler"
|
||||||
)
|
)
|
||||||
|
|
||||||
type profiler struct {
|
type profiler struct {
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package profiler is for profilers
|
// Package profiler is for profilers
|
||||||
package profiler // import "go.unistack.org/micro/v3/profiler"
|
package profiler // import "go.unistack.org/micro/v4/profiler"
|
||||||
|
|
||||||
// Profiler interface
|
// Profiler interface
|
||||||
type Profiler interface {
|
type Profiler interface {
|
||||||
|
@@ -2,11 +2,11 @@
|
|||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v4/client"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/router"
|
"go.unistack.org/micro/v4/router"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options for proxy
|
// Options for proxy
|
||||||
|
@@ -1,10 +1,10 @@
|
|||||||
// Package proxy is a transparent proxy built on the micro/server
|
// Package proxy is a transparent proxy built on the micro/server
|
||||||
package proxy // import "go.unistack.org/micro/v3/proxy"
|
package proxy // import "go.unistack.org/micro/v4/proxy"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultEndpoint holds default proxy address
|
// DefaultEndpoint holds default proxy address
|
||||||
|
@@ -6,7 +6,7 @@ import (
|
|||||||
"unicode"
|
"unicode"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExtractValue from reflect.Type from specified depth
|
// ExtractValue from reflect.Type from specified depth
|
||||||
|
@@ -5,8 +5,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/util/id"
|
"go.unistack.org/micro/v4/util/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@@ -5,9 +5,9 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options holds options for register
|
// Options holds options for register
|
||||||
|
@@ -1,11 +1,11 @@
|
|||||||
// Package register is an interface for service discovery
|
// Package register is an interface for service discovery
|
||||||
package register // import "go.unistack.org/micro/v3/register"
|
package register // import "go.unistack.org/micro/v4/register"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package dns resolves names to dns records
|
// Package dns resolves names to dns records
|
||||||
package dns // import "go.unistack.org/micro/v3/resolver/dns"
|
package dns // import "go.unistack.org/micro/v4/resolver/dns"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/resolver"
|
"go.unistack.org/micro/v4/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Resolver is a DNS network resolve
|
// Resolver is a DNS network resolve
|
||||||
|
@@ -1,11 +1,11 @@
|
|||||||
// Package dnssrv resolves names to dns srv records
|
// Package dnssrv resolves names to dns srv records
|
||||||
package dnssrv // import "go.unistack.org/micro/v3/resolver/dnssrv"
|
package dnssrv // import "go.unistack.org/micro/v4/resolver/dnssrv"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/resolver"
|
"go.unistack.org/micro/v4/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Resolver is a DNS network resolve
|
// Resolver is a DNS network resolve
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package http resolves names to network addresses using a http request
|
// Package http resolves names to network addresses using a http request
|
||||||
package http // import "go.unistack.org/micro/v3/resolver/http"
|
package http // import "go.unistack.org/micro/v4/resolver/http"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/resolver"
|
"go.unistack.org/micro/v4/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// nolint: golint,revive
|
// nolint: golint,revive
|
||||||
|
@@ -1,8 +1,8 @@
|
|||||||
// Package noop is a noop resolver
|
// Package noop is a noop resolver
|
||||||
package noop // import "go.unistack.org/micro/v3/resolver/noop"
|
package noop // import "go.unistack.org/micro/v4/resolver/noop"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/resolver"
|
"go.unistack.org/micro/v4/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Resolver contains noop resolver
|
// Resolver contains noop resolver
|
||||||
|
@@ -1,11 +1,11 @@
|
|||||||
// Package register resolves names using the micro register
|
// Package register resolves names using the micro register
|
||||||
package register // import "go.unistack.org/micro/v3/resolver/registry"
|
package register // import "go.unistack.org/micro/v4/resolver/registry"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v4/register"
|
||||||
"go.unistack.org/micro/v3/resolver"
|
"go.unistack.org/micro/v4/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Resolver is a register network resolver
|
// Resolver is a register network resolver
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package resolver resolves network names to addresses
|
// Package resolver resolves network names to addresses
|
||||||
package resolver // import "go.unistack.org/micro/v3/resolver"
|
package resolver // import "go.unistack.org/micro/v4/resolver"
|
||||||
|
|
||||||
// Resolver is network resolver. It's used to find network nodes
|
// Resolver is network resolver. It's used to find network nodes
|
||||||
// via the name to connect to. This is done based on Network.Name().
|
// via the name to connect to. This is done based on Network.Name().
|
||||||
|
@@ -1,8 +1,8 @@
|
|||||||
// Package static is a static resolver
|
// Package static is a static resolver
|
||||||
package static // import "go.unistack.org/micro/v3/resolver/static"
|
package static // import "go.unistack.org/micro/v4/resolver/static"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/resolver"
|
"go.unistack.org/micro/v4/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Resolver returns a static list of nodes. In the event the node list
|
// Resolver returns a static list of nodes. In the event the node list
|
||||||
|
@@ -3,9 +3,9 @@ package router
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v4/register"
|
||||||
"go.unistack.org/micro/v3/util/id"
|
"go.unistack.org/micro/v4/util/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options are router options
|
// Options are router options
|
||||||
|
@@ -3,7 +3,7 @@ package router
|
|||||||
import (
|
import (
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package router provides a network routing control plane
|
// Package router provides a network routing control plane
|
||||||
package router // import "go.unistack.org/micro/v3/router"
|
package router // import "go.unistack.org/micro/v4/router"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
@@ -1,8 +1,8 @@
|
|||||||
package random // import "go.unistack.org/micro/v3/selector/random"
|
package random // import "go.unistack.org/micro/v4/selector/random"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v4/selector"
|
||||||
"go.unistack.org/micro/v3/util/rand"
|
"go.unistack.org/micro/v4/util/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
type random struct{}
|
type random struct{}
|
||||||
|
@@ -3,7 +3,7 @@ package random
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v4/selector"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRandom(t *testing.T) {
|
func TestRandom(t *testing.T) {
|
||||||
|
@@ -1,8 +1,8 @@
|
|||||||
package roundrobin // import "go.unistack.org/micro/v3/selector/roundrobin"
|
package roundrobin // import "go.unistack.org/micro/v4/selector/roundrobin"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v4/selector"
|
||||||
"go.unistack.org/micro/v3/util/rand"
|
"go.unistack.org/micro/v4/util/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewSelector returns an initialised round robin selector
|
// NewSelector returns an initialised round robin selector
|
||||||
|
@@ -3,7 +3,7 @@ package roundrobin
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/selector"
|
"go.unistack.org/micro/v4/selector"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRoundRobin(t *testing.T) {
|
func TestRoundRobin(t *testing.T) {
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
// Package selector is for node selection and load balancing
|
// Package selector is for node selection and load balancing
|
||||||
package selector // import "go.unistack.org/micro/v3/selector"
|
package selector // import "go.unistack.org/micro/v4/selector"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
@@ -33,16 +33,6 @@ func SetOption(k, v interface{}) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetSubscriberOption returns a function to setup a context with given value
|
|
||||||
func SetSubscriberOption(k, v interface{}) SubscriberOption {
|
|
||||||
return func(o *SubscriberOptions) {
|
|
||||||
if o.Context == nil {
|
|
||||||
o.Context = context.Background()
|
|
||||||
}
|
|
||||||
o.Context = context.WithValue(o.Context, k, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetHandlerOption returns a function to setup a context with given value
|
// SetHandlerOption returns a function to setup a context with given value
|
||||||
func SetHandlerOption(k, v interface{}) HandlerOption {
|
func SetHandlerOption(k, v interface{}) HandlerOption {
|
||||||
return func(o *HandlerOptions) {
|
return func(o *HandlerOptions) {
|
||||||
|
@@ -51,14 +51,3 @@ func TestSetOption(t *testing.T) {
|
|||||||
t.Fatal("SetOption not works")
|
t.Fatal("SetOption not works")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSetSubscriberOption(t *testing.T) {
|
|
||||||
type key struct{}
|
|
||||||
o := SetSubscriberOption(key{}, "test")
|
|
||||||
opts := &SubscriberOptions{}
|
|
||||||
o(opts)
|
|
||||||
|
|
||||||
if v, ok := opts.Context.Value(key{}).(string); !ok || v == "" {
|
|
||||||
t.Fatal("SetSubscriberOption not works")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import "go.unistack.org/micro/v3/errors"
|
import "go.unistack.org/micro/v4/errors"
|
||||||
|
|
||||||
type Error struct {
|
type Error struct {
|
||||||
id string
|
id string
|
||||||
|
@@ -3,7 +3,7 @@ package server
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v4/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestError(t *testing.T) {
|
func TestError(t *testing.T) {
|
||||||
|
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user