Compare commits

...

35 Commits
v3.11.24 ... v3

Author SHA1 Message Date
3f82cb3ba4 Обновить README.md
Some checks failed
coverage / build (push) Successful in 1m31s
test / test (push) Has been cancelled
2025-01-18 15:35:52 +03:00
vtolstov
306b7a3962 Apply Code Coverage Badge 2025-01-17 12:58:03 +00:00
a8eda9d58d Merge pull request 'move set content-type in client publish' (#394) from devstigneev/micro:v3_publish_bug into v3
All checks were successful
coverage / build (push) Successful in 1m19s
test / test (push) Successful in 2m13s
Reviewed-on: #394
2025-01-17 15:57:30 +03:00
7e4477dcb4 move set content-type in client publish
Some checks failed
test / test (pull_request) Successful in 3m40s
lint / lint (pull_request) Successful in 45s
coverage / build (pull_request) Failing after 26s
2025-01-17 15:38:53 +03:00
vtolstov
d846044fc6 Apply Code Coverage Badge 2025-01-04 16:10:26 +00:00
29d956e74e fix readme
All checks were successful
coverage / build (push) Successful in 59s
test / test (push) Successful in 3m27s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-01-04 19:09:50 +03:00
fcc4faff8a fix godoc link
All checks were successful
coverage / build (push) Successful in 56s
test / test (push) Successful in 3m25s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-01-04 18:57:02 +03:00
5df8f83f45 badges (#392)
Some checks failed
coverage / build (push) Successful in 57s
test / test (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Co-authored-by: vtolstov <vtolstov@users.noreply.github.com>
Reviewed-on: #392
Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Co-committed-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-01-04 18:53:57 +03:00
vtolstov
27fa6e9173 Apply Code Coverage Badge 2024-12-28 22:58:19 +00:00
bd55a35dc3 logger/slog: add delayed buffer test
All checks were successful
test / test (push) Successful in 3m33s
coverage / build (push) Successful in 8m22s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-29 01:57:41 +03:00
653bd386cc util/buffer: add DelayedBuffer
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-29 01:57:41 +03:00
vtolstov
558c6f4d7c Apply Code Coverage Badge 2024-12-28 11:56:07 +00:00
d7dd6fbeb2 register/memory: fix build
All checks were successful
test / test (push) Successful in 3m35s
coverage / build (push) Successful in 8m22s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-28 14:55:20 +03:00
a00cf2c8d9 register: watcher fixes
Some checks failed
coverage / build (push) Failing after 55s
test / test (push) Successful in 3m39s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-28 14:51:10 +03:00
vtolstov
a3e8ab2492 Apply Code Coverage Badge 2024-12-27 20:57:08 +00:00
06da500ef4 register: cleanup
All checks were successful
test / test (push) Successful in 3m33s
coverage / build (push) Successful in 9m11s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-27 23:56:27 +03:00
277f04ba19 register: add Codec option
All checks were successful
coverage / build (push) Successful in 1m3s
test / test (push) Successful in 3m31s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-27 19:33:50 +03:00
vtolstov
470263ff5f Apply Code Coverage Badge 2024-12-27 16:14:00 +00:00
b8232e02be register: add ListName option
All checks were successful
test / test (push) Successful in 3m50s
coverage / build (push) Successful in 8m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-27 19:12:57 +03:00
vtolstov
f8c5e10c1d Apply Code Coverage Badge 2024-12-26 22:21:35 +00:00
397e71f815 Merge pull request 'register: improvements' (#390) from register into v3
All checks were successful
test / test (push) Successful in 3m50s
coverage / build (push) Successful in 13m40s
Reviewed-on: #390
2024-12-27 01:20:49 +03:00
74e31d99f6 fixup
Some checks failed
lint / lint (pull_request) Successful in 1m10s
test / test (pull_request) Successful in 3m45s
coverage / build (pull_request) Failing after 12m5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-27 01:16:22 +03:00
f39de15d93 fixup
Some checks failed
test / test (pull_request) Failing after 55s
coverage / build (pull_request) Failing after 1m1s
lint / lint (pull_request) Successful in 1m4s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-27 01:12:29 +03:00
d291102877 register: improvements
Some checks failed
coverage / build (pull_request) Failing after 1m33s
lint / lint (pull_request) Successful in 1m52s
test / test (pull_request) Successful in 4m11s
* change domain to namespace

* lower go.mod deps

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-27 01:08:00 +03:00
37ffbb18d8 lower go.deps
Some checks failed
coverage / build (push) Failing after 30s
test / test (push) Successful in 4m5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-26 08:35:58 +03:00
9a85dead86 lower go.deps
Some checks failed
coverage / build (push) Failing after 23s
test / test (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-26 08:32:59 +03:00
a489aab1c3 Merge pull request 'logger/slog: fixed time len field' (#389) from logger-slog into v3
Some checks failed
coverage / build (push) Failing after 47s
test / test (push) Successful in 7m10s
Reviewed-on: #389
2024-12-24 20:51:47 +03:00
d160664ef1 fixup test
Some checks failed
lint / lint (pull_request) Successful in 1m25s
coverage / build (pull_request) Failing after 1m25s
test / test (pull_request) Successful in 5m30s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-24 20:45:53 +03:00
fa868edcaa logger/slog: fixed time len field
Some checks failed
lint / lint (pull_request) Successful in 1m22s
test / test (pull_request) Successful in 4m50s
coverage / build (pull_request) Failing after 29s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-24 20:36:32 +03:00
vtolstov
6ed0b0e090 Apply Code Coverage Badge 2024-12-23 18:18:20 +00:00
533b265d19 add codec.RawMessage support
All checks were successful
test / test (push) Successful in 3m41s
coverage / build (push) Successful in 8m22s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-23 21:17:32 +03:00
1ace2631a4 Merge pull request 'codec: add yaml support' (#388) from codec-yaml into v3
Some checks failed
test / test (push) Failing after 13m17s
coverage / build (push) Failing after 13m26s
Reviewed-on: #388
2024-12-23 19:08:47 +03:00
3dd5ca68d1 codec: add yaml support
Some checks failed
lint / lint (pull_request) Successful in 1m38s
test / test (pull_request) Successful in 4m17s
coverage / build (pull_request) Failing after 8m42s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-23 19:08:21 +03:00
66ccd6021f Merge pull request 'codec: add yaml support' (#387) from codec-yaml into v3
Some checks failed
test / test (push) Successful in 2m28s
coverage / build (push) Failing after 13m22s
Reviewed-on: #387
2024-12-23 18:39:03 +03:00
e5346f7e4f codec: add yaml support
All checks were successful
lint / lint (pull_request) Successful in 1m31s
coverage / build (pull_request) Successful in 3m10s
test / test (pull_request) Successful in 4m1s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-23 18:38:01 +03:00
24 changed files with 362 additions and 434 deletions

View File

@ -26,24 +26,24 @@ jobs:
- name: test coverage
run: |
go test -v -cover ./... -coverprofile coverage.out -coverpkg ./...
go test -v -cover ./... -covermode=count -coverprofile coverage.out -coverpkg ./...
go tool cover -func coverage.out -o coverage.out
- name: coverage badge
uses: tj-actions/coverage-badge-go@v1
uses: tj-actions/coverage-badge-go@v2
with:
green: 80
filename: coverage.out
- uses: stefanzweifel/git-auto-commit-action@v4
id: auto-commit-action
name: autocommit
with:
commit_message: Apply Code Coverage Badge
skip_fetch: true
skip_checkout: true
file_pattern: ./README.md
- name: Push Changes
- name: push
if: steps.auto-commit-action.outputs.changes_detected == 'true'
uses: ad-m/github-push-action@master
with:

View File

@ -1,5 +1,9 @@
# Micro
![Coverage](https://img.shields.io/badge/Coverage-44.9%25-yellow)
![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow)
[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v3)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush)
[![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3)
Micro is a standard library for microservices.
@ -11,30 +15,20 @@ Micro provides the core requirements for distributed systems development includi
Micro abstracts away the details of distributed systems. Here are the main features.
- **Authentication** - Auth is built in as a first class citizen. Authentication and authorization enable secure
zero trust networking by providing every service an identity and certificates. This additionally includes rule
based access control.
- **Dynamic Config** - Load and hot reload dynamic config from anywhere. The config interface provides a way to load application
level config from any source such as env vars, file, etcd. You can merge the sources and even define fallbacks.
level config from any source such as env vars, cmdline, file, consul, vault... You can merge the sources and even define fallbacks.
- **Data Storage** - A simple data store interface to read, write and delete records. It includes support for memory, file and
CockroachDB by default. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework.
s3. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework.
- **Service Discovery** - Automatic service registration and name resolution. Service discovery is at the core of micro service
development. When service A needs to speak to service B it needs the location of that service.
- **Load Balancing** - Client side load balancing built on service discovery. Once we have the addresses of any number of instances
of a service we now need a way to decide which node to route to. We use random hashed load balancing to provide even distribution
across the services and retry a different node if there's a problem.
- **Message Encoding** - Dynamic message encoding based on content-type. The client and server will use codecs along with content-type
to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client
and server handle this by default.
- **Transport** - gRPC or http based request/response with support for bidirectional streaming. We provide an abstraction for synchronous communication. A request made to a service will be automatically resolved, load balanced, dialled and streamed.
- **Async Messaging** - PubSub is built in as a first class citizen for asynchronous communication and event driven architectures.
- **Async Messaging** - Pub/Sub is built in as a first class citizen for asynchronous communication and event driven architectures.
Event notifications are a core pattern in micro service development.
- **Synchronization** - Distributed systems are often built in an eventually consistent manner. Support for distributed locking and
@ -43,10 +37,6 @@ leadership are built in as a Sync interface. When using an eventually consistent
- **Pluggable Interfaces** - Micro makes use of Go interfaces for each system abstraction. Because of this these interfaces
are pluggable and allows Micro to be runtime agnostic.
## Getting Started
To be created.
## License
Micro is Apache 2.0 licensed.

View File

@ -588,7 +588,6 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
for _, p := range ps {
md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
@ -600,6 +599,8 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
md.Set(k, v)
}
md[metadata.HeaderContentType] = p.ContentType()
var body []byte
// passed in raw data

View File

@ -3,6 +3,8 @@ package codec
import (
"errors"
"gopkg.in/yaml.v3"
)
var (
@ -54,3 +56,22 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error {
*m = append((*m)[0:0], data...)
return nil
}
// MarshalYAML returns m as the JSON encoding of m.
func (m *RawMessage) MarshalYAML() ([]byte, error) {
if m == nil {
return []byte("null"), nil
} else if len(*m) == 0 {
return []byte("null"), nil
}
return *m, nil
}
// UnmarshalYAML sets *m to a copy of data.
func (m *RawMessage) UnmarshalYAML(n *yaml.Node) error {
if m == nil {
return errors.New("RawMessage UnmarshalYAML on nil pointer")
}
*m = append((*m)[0:0], []byte(n.Value)...)
return nil
}

View File

@ -1,5 +1,7 @@
package codec
import "gopkg.in/yaml.v3"
// Frame gives us the ability to define raw data to send over the pipes
type Frame struct {
Data []byte
@ -20,6 +22,17 @@ func (m *Frame) UnmarshalJSON(data []byte) error {
return m.Unmarshal(data)
}
// MarshalYAML returns frame data
func (m *Frame) MarshalYAML() ([]byte, error) {
return m.Marshal()
}
// UnmarshalYAML set frame data
func (m *Frame) UnmarshalYAML(n *yaml.Node) error {
m.Data = []byte(n.Value)
return nil
}
// ProtoMessage noop func
func (m *Frame) ProtoMessage() {}

14
go.mod
View File

@ -1,12 +1,12 @@
module go.unistack.org/micro/v3
go 1.23.4
go 1.22.0
require (
dario.cat/mergo v1.0.1
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/KimMachineGun/automemlimit v0.6.1
github.com/ash3in/uuidv8 v1.0.1
github.com/ash3in/uuidv8 v1.2.0
github.com/google/uuid v1.6.0
github.com/matoous/go-nanoid v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible
@ -14,8 +14,8 @@ require (
go.uber.org/automaxprocs v1.6.0
go.unistack.org/micro-proto/v3 v3.4.1
golang.org/x/sync v0.10.0
google.golang.org/grpc v1.68.1
google.golang.org/protobuf v1.35.2
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.1
gopkg.in/yaml.v3 v3.0.1
)
@ -36,8 +36,8 @@ require (
github.com/stretchr/testify v1.10.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)

25
go.sum
View File

@ -1,11 +1,11 @@
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8=
github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY=
github.com/ash3in/uuidv8 v1.0.1 h1:dIq1XRkWT8lGA7N5s7WRTB4V3k49WTBLvILz7aCLp80=
github.com/ash3in/uuidv8 v1.0.1/go.mod h1:EoyUgCtxNBnrnpc9efw5rVN1cQ+LFGCoJiFuD6maOMw=
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok=
github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE=
github.com/containerd/cgroups/v3 v3.0.4 h1:2fs7l3P0Qxb1nKWuJNFiwhp2CqiKzho71DQkDrHJIo4=
@ -35,6 +35,7 @@ github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtL
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM=
github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
@ -79,8 +80,8 @@ go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqt
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
golang.org/x/exp v0.0.0-20241210194714-1829a127f884 h1:Y/Mj/94zIQQGHVSv1tTtQBDaQaJe62U9bkDZKKyhPCU=
golang.org/x/exp v0.0.0-20241210194714-1829a127f884/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c=
golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI=
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -88,12 +89,12 @@ golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 h1:8ZmaLZE4XWrtU3MyClkYqqtl6Oegr3235h7jxsDyqCY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU=
google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0=
google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@ -22,6 +22,7 @@ const (
badKey = "!BADKEY"
// defaultCallerSkipCount used by logger
defaultCallerSkipCount = 3
timeFormat = "2006-01-02T15:04:05.000000000Z07:00"
)
var reTrace = regexp.MustCompile(`.*/slog/logger\.go.*\n`)
@ -64,6 +65,7 @@ func (s *slogLogger) renameAttr(_ []string, a slog.Attr) slog.Attr {
a.Key = s.opts.SourceKey
case slog.TimeKey:
a.Key = s.opts.TimeKey
a.Value = slog.StringValue(a.Value.Time().Format(timeFormat))
case slog.MessageKey:
a.Key = s.opts.MessageKey
case slog.LevelKey:

View File

@ -9,12 +9,15 @@ import (
"log/slog"
"strings"
"testing"
"time"
"github.com/google/uuid"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/util/buffer"
)
// always first to have proper check
func TestStacktrace(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
@ -28,7 +31,48 @@ func TestStacktrace(t *testing.T) {
l.Error(ctx, "msg1", errors.New("err"))
if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:29`)) {
if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:32`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
}
}
func TestDelayedBuffer(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
dbuf := buffer.NewDelayedBuffer(100, 100*time.Millisecond, buf)
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(dbuf),
WithHandlerFunc(slog.NewTextHandler),
logger.WithAddStacktrace(true),
)
if err := l.Init(logger.WithFields("key1", "val1")); err != nil {
t.Fatal(err)
}
l.Error(ctx, "msg1", errors.New("err"))
time.Sleep(120 * time.Millisecond)
if !bytes.Contains(buf.Bytes(), []byte(`key1=val1`)) {
t.Fatalf("logger delayed buffer not works, buf contains: %s", buf.Bytes())
}
}
func TestTime(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf),
WithHandlerFunc(slog.NewTextHandler),
logger.WithAddStacktrace(true),
logger.WithTimeFunc(func() time.Time {
return time.Unix(0, 0)
}),
)
if err := l.Init(logger.WithFields("key1", "val1")); err != nil {
t.Fatal(err)
}
l.Error(ctx, "msg1", errors.New("err"))
if !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T03:00:00.000000000+03:00`)) &&
!bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T00:00:00.000000000Z`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
}
}

View File

@ -1,12 +1,9 @@
package register
import (
"fmt"
"reflect"
"unicode"
"unicode/utf8"
"go.unistack.org/micro/v3/metadata"
)
// ExtractValue from reflect.Type from specified depth
@ -38,53 +35,6 @@ func ExtractValue(v reflect.Type, d int) string {
return v.Name()
}
// ExtractEndpoint extract *Endpoint from reflect.Method
func ExtractEndpoint(method reflect.Method) *Endpoint {
if method.PkgPath != "" {
return nil
}
var rspType, reqType reflect.Type
var stream bool
mt := method.Type
switch mt.NumIn() {
case 3:
reqType = mt.In(1)
rspType = mt.In(2)
case 4:
reqType = mt.In(2)
rspType = mt.In(3)
default:
return nil
}
// are we dealing with a stream?
switch rspType.Kind() {
case reflect.Func, reflect.Interface:
stream = true
}
request := ExtractValue(reqType, 0)
response := ExtractValue(rspType, 0)
if request == "" || response == "" {
return nil
}
ep := &Endpoint{
Name: method.Name,
Request: request,
Response: response,
Metadata: metadata.New(0),
}
if stream {
ep.Metadata.Set("stream", fmt.Sprintf("%v", stream))
}
return ep
}
// ExtractSubValue exctact *Value from reflect.Type
func ExtractSubValue(typ reflect.Type) string {
var reqType reflect.Type

View File

@ -2,8 +2,6 @@ package register
import (
"context"
"reflect"
"testing"
)
type TestHandler struct{}
@ -15,40 +13,3 @@ type TestResponse struct{}
func (t *TestHandler) Test(ctx context.Context, req *TestRequest, rsp *TestResponse) error {
return nil
}
func TestExtractEndpoint(t *testing.T) {
handler := &TestHandler{}
typ := reflect.TypeOf(handler)
var endpoints []*Endpoint
for m := 0; m < typ.NumMethod(); m++ {
if e := ExtractEndpoint(typ.Method(m)); e != nil {
endpoints = append(endpoints, e)
}
}
if i := len(endpoints); i != 1 {
t.Fatalf("Expected 1 endpoint, have %d", i)
}
if endpoints[0].Name != "Test" {
t.Fatalf("Expected handler Test, got %s", endpoints[0].Name)
}
if endpoints[0].Request == "" {
t.Fatal("Expected non nil Request")
}
if endpoints[0].Response == "" {
t.Fatal("Expected non nil Request")
}
if endpoints[0].Request != "TestRequest" {
t.Fatalf("Expected TestRequest got %s", endpoints[0].Request)
}
if endpoints[0].Response != "TestResponse" {
t.Fatalf("Expected TestResponse got %s", endpoints[0].Response)
}
}

View File

@ -27,7 +27,6 @@ type record struct {
Version string
Metadata map[string]string
Nodes map[string]*node
Endpoints []*register.Endpoint
}
type memory struct {
@ -59,7 +58,7 @@ func (m *memory) ttlPrune() {
for range prune.C {
m.Lock()
for domain, services := range m.records {
for namespace, services := range m.records {
for service, versions := range services {
for version, record := range versions {
for id, n := range record.Nodes {
@ -67,7 +66,7 @@ func (m *memory) ttlPrune() {
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register TTL expired for node %s of service %s", n.ID, service))
}
delete(m.records[domain][service][version].Nodes, id)
delete(m.records[namespace][service][version].Nodes, id)
}
}
}
@ -131,17 +130,12 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
options := register.NewRegisterOptions(opts...)
// get the services for this domain from the register
srvs, ok := m.records[options.Domain]
srvs, ok := m.records[options.Namespace]
if !ok {
srvs = make(services)
}
// domain is set in metadata so it can be passed to watchers
if s.Metadata == nil {
s.Metadata = map[string]string{"domain": options.Domain}
} else {
s.Metadata["domain"] = options.Domain
}
s.Namespace = options.Namespace
// ensure the service name exists
r := serviceToRecord(s, options.TTL)
@ -154,8 +148,8 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new service: %s, version: %s", s.Name, s.Version))
}
m.records[options.Domain] = srvs
go m.sendEvent(&register.Result{Action: "create", Service: s})
m.records[options.Namespace] = srvs
go m.sendEvent(&register.Result{Action: register.EventCreate, Service: s})
}
var addedNodes bool
@ -173,9 +167,6 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
metadata[k] = v
}
// set the domain
metadata["domain"] = options.Domain
// add the node
srvs[s.Name][s.Version].Nodes[n.ID] = &node{
Node: &register.Node{
@ -194,7 +185,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new node to service: %s, version: %s", s.Name, s.Version))
}
go m.sendEvent(&register.Result{Action: "update", Service: s})
go m.sendEvent(&register.Result{Action: register.EventUpdate, Service: s})
} else {
// refresh TTL and timestamp
for _, n := range s.Nodes {
@ -206,7 +197,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
}
}
m.records[options.Domain] = srvs
m.records[options.Namespace] = srvs
return nil
}
@ -216,15 +207,8 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
options := register.NewDeregisterOptions(opts...)
// domain is set in metadata so it can be passed to watchers
if s.Metadata == nil {
s.Metadata = map[string]string{"domain": options.Domain}
} else {
s.Metadata["domain"] = options.Domain
}
// if the domain doesn't exist, there is nothing to deregister
services, ok := m.records[options.Domain]
services, ok := m.records[options.Namespace]
if !ok {
return nil
}
@ -253,16 +237,16 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
// if the nodes not empty, we replace the version in the store and exist, the rest of the logic
// is cleanup
if len(version.Nodes) > 0 {
m.records[options.Domain][s.Name][s.Version] = version
go m.sendEvent(&register.Result{Action: "update", Service: s})
m.records[options.Namespace][s.Name][s.Version] = version
go m.sendEvent(&register.Result{Action: register.EventUpdate, Service: s})
return nil
}
// if this version was the only version of the service, we can remove the whole service from the
// register and exit
if len(versions) == 1 {
delete(m.records[options.Domain], s.Name)
go m.sendEvent(&register.Result{Action: "delete", Service: s})
delete(m.records[options.Namespace], s.Name)
go m.sendEvent(&register.Result{Action: register.EventDelete, Service: s})
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s", s.Name))
@ -271,8 +255,8 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
}
// there are other versions of the service running, so only remove this version of it
delete(m.records[options.Domain][s.Name], s.Version)
go m.sendEvent(&register.Result{Action: "delete", Service: s})
delete(m.records[options.Namespace][s.Name], s.Version)
go m.sendEvent(&register.Result{Action: register.EventDelete, Service: s})
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s, version: %s", s.Name, s.Version))
}
@ -284,15 +268,15 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe
options := register.NewLookupOptions(opts...)
// if it's a wildcard domain, return from all domains
if options.Domain == register.WildcardDomain {
if options.Namespace == register.WildcardNamespace {
m.RLock()
recs := m.records
m.RUnlock()
var services []*register.Service
for domain := range recs {
srvs, err := m.LookupService(ctx, name, append(opts, register.LookupDomain(domain))...)
for namespace := range recs {
srvs, err := m.LookupService(ctx, name, append(opts, register.LookupNamespace(namespace))...)
if err == register.ErrNotFound {
continue
} else if err != nil {
@ -311,7 +295,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe
defer m.RUnlock()
// check the domain exists
services, ok := m.records[options.Domain]
services, ok := m.records[options.Namespace]
if !ok {
return nil, register.ErrNotFound
}
@ -328,7 +312,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe
var i int
for _, r := range versions {
result[i] = recordToService(r, options.Domain)
result[i] = recordToService(r, options.Namespace)
i++
}
@ -339,15 +323,15 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption)
options := register.NewListOptions(opts...)
// if it's a wildcard domain, list from all domains
if options.Domain == register.WildcardDomain {
if options.Namespace == register.WildcardNamespace {
m.RLock()
recs := m.records
m.RUnlock()
var services []*register.Service
for domain := range recs {
srvs, err := m.ListServices(ctx, append(opts, register.ListDomain(domain))...)
for namespace := range recs {
srvs, err := m.ListServices(ctx, append(opts, register.ListNamespace(namespace))...)
if err != nil {
return nil, err
}
@ -361,7 +345,7 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption)
defer m.RUnlock()
// ensure the domain exists
services, ok := m.records[options.Domain]
services, ok := m.records[options.Namespace]
if !ok {
return make([]*register.Service, 0), nil
}
@ -371,7 +355,7 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption)
for _, service := range services {
for _, version := range service {
result = append(result, recordToService(version, options.Domain))
result = append(result, recordToService(version, options.Namespace))
}
}
@ -426,16 +410,13 @@ func (m *watcher) Next() (*register.Result, error) {
continue
}
// extract domain from service metadata
var domain string
if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 {
domain = r.Service.Metadata["domain"]
} else {
domain = register.DefaultDomain
namespace := register.DefaultNamespace
if r.Service.Namespace != "" {
namespace = r.Service.Namespace
}
// only send the event if watching the wildcard or this specific domain
if m.wo.Domain == register.WildcardDomain || m.wo.Domain == domain {
if m.wo.Namespace == register.WildcardNamespace || m.wo.Namespace == namespace {
return r, nil
}
case <-m.exit:
@ -454,11 +435,6 @@ func (m *watcher) Stop() {
}
func serviceToRecord(s *register.Service, ttl time.Duration) *record {
metadata := make(map[string]string, len(s.Metadata))
for k, v := range s.Metadata {
metadata[k] = v
}
nodes := make(map[string]*node, len(s.Nodes))
for _, n := range s.Nodes {
nodes[n.ID] = &node{
@ -468,42 +444,19 @@ func serviceToRecord(s *register.Service, ttl time.Duration) *record {
}
}
endpoints := make([]*register.Endpoint, len(s.Endpoints))
copy(endpoints, s.Endpoints)
return &record{
Name: s.Name,
Version: s.Version,
Metadata: metadata,
Nodes: nodes,
Endpoints: endpoints,
}
}
func recordToService(r *record, domain string) *register.Service {
func recordToService(r *record, namespace string) *register.Service {
metadata := make(map[string]string, len(r.Metadata))
for k, v := range r.Metadata {
metadata[k] = v
}
// set the domain in metadata so it can be determined when a wildcard query is performed
metadata["domain"] = domain
endpoints := make([]*register.Endpoint, len(r.Endpoints))
for i, e := range r.Endpoints {
md := make(map[string]string, len(e.Metadata))
for k, v := range e.Metadata {
md[k] = v
}
endpoints[i] = &register.Endpoint{
Name: e.Name,
Request: e.Request,
Response: e.Response,
Metadata: md,
}
}
nodes := make([]*register.Node, len(r.Nodes))
i := 0
for _, n := range r.Nodes {
@ -523,8 +476,7 @@ func recordToService(r *record, domain string) *register.Service {
return &register.Service{
Name: r.Name,
Version: r.Version,
Metadata: metadata,
Endpoints: endpoints,
Nodes: nodes,
Namespace: namespace,
}
}

View File

@ -253,32 +253,32 @@ func TestMemoryWildcard(t *testing.T) {
testSrv := &register.Service{Name: "foo", Version: "1.0.0"}
if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil {
if err := m.Register(ctx, testSrv, register.RegisterNamespace("one")); err != nil {
t.Fatalf("Register err: %v", err)
}
if err := m.Register(ctx, testSrv, register.RegisterDomain("two")); err != nil {
if err := m.Register(ctx, testSrv, register.RegisterNamespace("two")); err != nil {
t.Fatalf("Register err: %v", err)
}
if recs, err := m.ListServices(ctx, register.ListDomain("one")); err != nil {
if recs, err := m.ListServices(ctx, register.ListNamespace("one")); err != nil {
t.Errorf("List err: %v", err)
} else if len(recs) != 1 {
t.Errorf("Expected 1 record, got %v", len(recs))
}
if recs, err := m.ListServices(ctx, register.ListDomain("*")); err != nil {
if recs, err := m.ListServices(ctx, register.ListNamespace("*")); err != nil {
t.Errorf("List err: %v", err)
} else if len(recs) != 2 {
t.Errorf("Expected 2 records, got %v", len(recs))
}
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("one")); err != nil {
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupNamespace("one")); err != nil {
t.Errorf("Lookup err: %v", err)
} else if len(recs) != 1 {
t.Errorf("Expected 1 record, got %v", len(recs))
}
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("*")); err != nil {
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupNamespace("*")); err != nil {
t.Errorf("Lookup err: %v", err)
} else if len(recs) != 2 {
t.Errorf("Expected 2 records, got %v", len(recs))

View File

@ -5,6 +5,7 @@ import (
"crypto/tls"
"time"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/tracer"
@ -26,6 +27,8 @@ type Options struct {
Name string
// Addrs specifies register addrs
Addrs []string
// Codec used to marshal/unmarshal data in register
Codec codec.Codec
// Timeout specifies timeout
Timeout time.Duration
}
@ -37,6 +40,7 @@ func NewOptions(opts ...Option) Options {
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
Context: context.Background(),
Codec: codec.NewCodec(),
}
for _, o := range opts {
o(&options)
@ -47,7 +51,7 @@ func NewOptions(opts ...Option) Options {
// RegisterOptions holds options for register method
type RegisterOptions struct { // nolint: golint,revive
Context context.Context
Domain string
Namespace string
TTL time.Duration
Attempts int
}
@ -55,7 +59,7 @@ type RegisterOptions struct { // nolint: golint,revive
// NewRegisterOptions returns register options struct filled by opts
func NewRegisterOptions(opts ...RegisterOption) RegisterOptions {
options := RegisterOptions{
Domain: DefaultDomain,
Namespace: DefaultNamespace,
Context: context.Background(),
}
for _, o := range opts {
@ -72,14 +76,14 @@ type WatchOptions struct {
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
// Domain to watch
Domain string
// Namespace to watch
Namespace string
}
// NewWatchOptions returns watch options filled by opts
func NewWatchOptions(opts ...WatchOption) WatchOptions {
options := WatchOptions{
Domain: DefaultDomain,
Namespace: DefaultNamespace,
Context: context.Background(),
}
for _, o := range opts {
@ -91,8 +95,8 @@ func NewWatchOptions(opts ...WatchOption) WatchOptions {
// DeregisterOptions holds options for deregister method
type DeregisterOptions struct {
Context context.Context
// Domain the service was registered in
Domain string
// Namespace the service was registered in
Namespace string
// Atempts specify max attempts for deregister
Attempts int
}
@ -100,7 +104,7 @@ type DeregisterOptions struct {
// NewDeregisterOptions returns options for deregister filled by opts
func NewDeregisterOptions(opts ...DeregisterOption) DeregisterOptions {
options := DeregisterOptions{
Domain: DefaultDomain,
Namespace: DefaultNamespace,
Context: context.Background(),
}
for _, o := range opts {
@ -112,14 +116,14 @@ func NewDeregisterOptions(opts ...DeregisterOption) DeregisterOptions {
// LookupOptions holds lookup options
type LookupOptions struct {
Context context.Context
// Domain to scope the request to
Domain string
// Namespace to scope the request to
Namespace string
}
// NewLookupOptions returns lookup options filled by opts
func NewLookupOptions(opts ...LookupOption) LookupOptions {
options := LookupOptions{
Domain: DefaultDomain,
Namespace: DefaultNamespace,
Context: context.Background(),
}
for _, o := range opts {
@ -130,15 +134,16 @@ func NewLookupOptions(opts ...LookupOption) LookupOptions {
// ListOptions holds the list options for list method
type ListOptions struct {
// Context used to store additional options
Context context.Context
// Domain to scope the request to
Domain string
// Namespace to scope the request to
Namespace string
}
// NewListOptions returns list options filled by opts
func NewListOptions(opts ...ListOption) ListOptions {
options := ListOptions{
Domain: DefaultDomain,
Namespace: DefaultNamespace,
Context: context.Background(),
}
for _, o := range opts {
@ -217,10 +222,10 @@ func RegisterContext(ctx context.Context) RegisterOption { // nolint: golint,rev
}
}
// RegisterDomain secifies register domain
func RegisterDomain(d string) RegisterOption { // nolint: golint,revive
// RegisterNamespace secifies register Namespace
func RegisterNamespace(d string) RegisterOption { // nolint: golint,revive
return func(o *RegisterOptions) {
o.Domain = d
o.Namespace = d
}
}
@ -238,10 +243,10 @@ func WatchContext(ctx context.Context) WatchOption {
}
}
// WatchDomain sets the domain for watch
func WatchDomain(d string) WatchOption {
// WatchNamespace sets the Namespace for watch
func WatchNamespace(d string) WatchOption {
return func(o *WatchOptions) {
o.Domain = d
o.Namespace = d
}
}
@ -259,10 +264,10 @@ func DeregisterContext(ctx context.Context) DeregisterOption {
}
}
// DeregisterDomain specifies deregister domain
func DeregisterDomain(d string) DeregisterOption {
// DeregisterNamespace specifies deregister Namespace
func DeregisterNamespace(d string) DeregisterOption {
return func(o *DeregisterOptions) {
o.Domain = d
o.Namespace = d
}
}
@ -273,10 +278,10 @@ func LookupContext(ctx context.Context) LookupOption {
}
}
// LookupDomain sets the domain for lookup
func LookupDomain(d string) LookupOption {
// LookupNamespace sets the Namespace for lookup
func LookupNamespace(d string) LookupOption {
return func(o *LookupOptions) {
o.Domain = d
o.Namespace = d
}
}
@ -287,10 +292,10 @@ func ListContext(ctx context.Context) ListOption {
}
}
// ListDomain sets the domain for list method
func ListDomain(d string) ListOption {
// ListNamespace sets the Namespace for list method
func ListNamespace(d string) ListOption {
return func(o *ListOptions) {
o.Domain = d
o.Namespace = d
}
}
@ -300,3 +305,9 @@ func Name(n string) Option {
o.Name = n
}
}
func Codec(c codec.Codec) Option {
return func(o *Options) {
o.Codec = c
}
}

View File

@ -9,12 +9,12 @@ import (
)
const (
// WildcardDomain indicates any domain
WildcardDomain = "*"
// WildcardNamespace indicates any Namespace
WildcardNamespace = "*"
)
// DefaultDomain to use if none was provided in options
var DefaultDomain = "micro"
// DefaultNamespace to use if none was provided in options
var DefaultNamespace = "micro"
var (
// DefaultRegister is the global default register
@ -59,26 +59,17 @@ type Register interface {
// Service holds service register info
type Service struct {
Name string `json:"name"`
Version string `json:"version"`
Metadata metadata.Metadata `json:"metadata"`
Endpoints []*Endpoint `json:"endpoints"`
Nodes []*Node `json:"nodes"`
Name string `json:"name,omitempty"`
Version string `json:"version,omitempty"`
Nodes []*Node `json:"nodes,omitempty"`
Namespace string `json:"namespace,omitempty"`
}
// Node holds node register info
type Node struct {
Metadata metadata.Metadata `json:"metadata"`
ID string `json:"id"`
Address string `json:"address"`
}
// Endpoint holds endpoint register info
type Endpoint struct {
Request string `json:"request"`
Response string `json:"response"`
Metadata metadata.Metadata `json:"metadata"`
Name string `json:"name"`
Metadata metadata.Metadata `json:"metadata,omitempty"`
ID string `json:"id,omitempty"`
Address string `json:"address,omitempty"`
}
// Option func signature

View File

@ -15,31 +15,31 @@ type Watcher interface {
// the watcher. Actions can be create, update, delete
type Result struct {
// Service holds register service
Service *Service
Service *Service `json:"service,omitempty"`
// Action holds the action
Action string
Action EventType `json:"action,omitempty"`
}
// EventType defines register event type
type EventType int
const (
// Create is emitted when a new service is registered
Create EventType = iota
// Delete is emitted when an existing service is deregistered
Delete
// Update is emitted when an existing service is updated
Update
// EventCreate is emitted when a new service is registered
EventCreate EventType = iota
// EventDelete is emitted when an existing service is deregistered
EventDelete
// EventUpdate is emitted when an existing service is updated
EventUpdate
)
// String returns human readable event type
func (t EventType) String() string {
switch t {
case Create:
case EventCreate:
return "create"
case Delete:
case EventDelete:
return "delete"
case Update:
case EventUpdate:
return "update"
default:
return "unknown"
@ -49,11 +49,11 @@ func (t EventType) String() string {
// Event is register event
type Event struct {
// Timestamp is event timestamp
Timestamp time.Time
Timestamp time.Time `json:"timestamp,omitempty"`
// Service is register service
Service *Service
Service *Service `json:"service,omitempty"`
// ID is register id
ID string
ID string `json:"id,omitempty"`
// Type defines type of event
Type EventType
Type EventType `json:"type,omitempty"`
}

View File

@ -5,7 +5,6 @@ import (
"fmt"
"reflect"
"runtime/debug"
"sort"
"strings"
"sync"
"time"
@ -35,34 +34,17 @@ type rpcHandler struct {
opts HandlerOptions
handler interface{}
name string
endpoints []*register.Endpoint
}
func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
options := NewHandlerOptions(opts...)
typ := reflect.TypeOf(handler)
hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name()
var endpoints []*register.Endpoint
for m := 0; m < typ.NumMethod(); m++ {
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
for k, v := range options.Metadata[e.Name] {
e.Metadata[k] = v
}
endpoints = append(endpoints, e)
}
}
return &rpcHandler{
name: name,
handler: handler,
endpoints: endpoints,
opts: options,
}
}
@ -75,10 +57,6 @@ func (r *rpcHandler) Handler() interface{} {
return r.handler
}
func (r *rpcHandler) Endpoints() []*register.Endpoint {
return r.endpoints
}
func (r *rpcHandler) Options() HandlerOptions {
return r.opts
}
@ -249,35 +227,6 @@ func (n *noopServer) Register() error {
return err
}
n.RLock()
handlerList := make([]string, 0, len(n.handlers))
for n := range n.handlers {
handlerList = append(handlerList, n)
}
sort.Strings(handlerList)
subscriberList := make([]*subscriber, 0, len(n.subscribers))
for e := range n.subscribers {
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, h := range handlerList {
endpoints = append(endpoints, n.handlers[h].Endpoints()...)
}
for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...)
}
n.RUnlock()
service.Nodes[0].Metadata["protocol"] = "noop"
service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"]
service.Endpoints = endpoints
n.RLock()
registered := n.registered
n.RUnlock()
@ -576,7 +525,6 @@ func (n *noopServer) Stop() error {
}
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
var endpoints []*register.Endpoint
var handlers []*handler
options := NewSubscriberOptions(opts...)
@ -595,18 +543,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: "Func",
Request: register.ExtractSubValue(typ),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
@ -622,14 +559,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: name + "." + method.Name,
Request: register.ExtractSubValue(method.Type),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
}
}
@ -639,7 +568,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
}
}
@ -766,10 +694,6 @@ func (s *subscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Endpoints() []*register.Endpoint {
return s.endpoints
}
func (s *subscriber) Options() SubscriberOptions {
return s.opts
}
@ -780,7 +704,6 @@ type subscriber struct {
typ reflect.Type
subscriber interface{}
endpoints []*register.Endpoint
handlers []*handler
rcvr reflect.Value

View File

@ -17,7 +17,7 @@ var (
opts := []register.RegisterOption{
register.RegisterTTL(config.RegisterTTL),
register.RegisterDomain(config.Namespace),
register.RegisterNamespace(config.Namespace),
}
for i := 0; i <= config.RegisterAttempts; i++ {
@ -36,7 +36,7 @@ var (
var err error
opts := []register.DeregisterOption{
register.DeregisterDomain(config.Namespace),
register.DeregisterNamespace(config.Namespace),
}
for i := 0; i <= config.DeregisterAttempts; i++ {
@ -85,6 +85,5 @@ func NewRegisterService(s Server) (*register.Service, error) {
Name: opts.Name,
Version: opts.Version,
Nodes: []*register.Node{node},
Metadata: metadata.New(0),
}, nil
}

View File

@ -7,7 +7,6 @@ import (
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/register"
)
// DefaultServer default server
@ -170,7 +169,6 @@ type Stream interface {
type Handler interface {
Name() string
Handler() interface{}
Endpoints() []*register.Endpoint
Options() HandlerOptions
}
@ -180,6 +178,5 @@ type Handler interface {
type Subscriber interface {
Topic() string
Subscriber() interface{}
Endpoints() []*register.Endpoint
Options() SubscriberOptions
}

View File

@ -1,27 +0,0 @@
package buf
import (
"bytes"
"io"
)
var _ io.Closer = &Buffer{}
// Buffer bytes.Buffer wrapper to satisfie io.Closer interface
type Buffer struct {
*bytes.Buffer
}
// Close reset buffer contents
func (b *Buffer) Close() error {
b.Buffer.Reset()
return nil
}
// New creates new buffer that satisfies Closer interface
func New(b *bytes.Buffer) *Buffer {
if b == nil {
b = bytes.NewBuffer(nil)
}
return &Buffer{b}
}

85
util/buffer/buffer.go Normal file
View File

@ -0,0 +1,85 @@
package buffer
import (
"io"
"sync"
"time"
)
var _ io.WriteCloser = (*DelayedBuffer)(nil)
// DelayedBuffer is the buffer that holds items until either the buffer filled or a specified time limit is reached
type DelayedBuffer struct {
mu sync.Mutex
maxWait time.Duration
flushTime time.Time
buffer chan []byte
ticker *time.Ticker
w io.Writer
err error
}
func NewDelayedBuffer(size int, maxWait time.Duration, w io.Writer) *DelayedBuffer {
b := &DelayedBuffer{
buffer: make(chan []byte, size),
ticker: time.NewTicker(maxWait),
w: w,
flushTime: time.Now(),
maxWait: maxWait,
}
b.loop()
return b
}
func (b *DelayedBuffer) loop() {
go func() {
for range b.ticker.C {
b.mu.Lock()
if time.Since(b.flushTime) > b.maxWait {
b.flush()
}
b.mu.Unlock()
}
}()
}
func (b *DelayedBuffer) flush() {
bufLen := len(b.buffer)
if bufLen > 0 {
tmp := make([][]byte, bufLen)
for i := 0; i < bufLen; i++ {
tmp[i] = <-b.buffer
}
for _, t := range tmp {
_, b.err = b.w.Write(t)
}
b.flushTime = time.Now()
}
}
func (b *DelayedBuffer) Put(items ...[]byte) {
b.mu.Lock()
for _, item := range items {
select {
case b.buffer <- item:
default:
b.flush()
b.buffer <- item
}
}
b.mu.Unlock()
}
func (b *DelayedBuffer) Close() error {
b.mu.Lock()
b.flush()
close(b.buffer)
b.ticker.Stop()
b.mu.Unlock()
return b.err
}
func (b *DelayedBuffer) Write(data []byte) (int, error) {
b.Put(data)
return len(data), b.err
}

View File

@ -0,0 +1,22 @@
package buffer
import (
"bytes"
"testing"
"time"
)
func TestTimedBuffer(t *testing.T) {
buf := bytes.NewBuffer(nil)
b := NewDelayedBuffer(100, 300*time.Millisecond, buf)
for i := 0; i < 100; i++ {
_, _ = b.Write([]byte(`test`))
}
if buf.Len() != 0 {
t.Fatal("delayed write not worked")
}
time.Sleep(400 * time.Millisecond)
if buf.Len() == 0 {
t.Fatal("delayed write not worked")
}
}

View File

@ -71,14 +71,6 @@ func CopyService(service *register.Service) *register.Service {
}
s.Nodes = nodes
// copy endpoints
eps := make([]*register.Endpoint, len(service.Endpoints))
for j, ep := range service.Endpoints {
e := &register.Endpoint{}
*e = *ep
eps[j] = e
}
s.Endpoints = eps
return s
}

View File

@ -35,11 +35,11 @@ func TestUnmarshalYAML(t *testing.T) {
t.Fatalf("invalid duration %v != 10000000", v.TTL)
}
err = yaml.Unmarshal([]byte(`{"ttl":"1y"}`), v)
err = yaml.Unmarshal([]byte(`{"ttl":"1d"}`), v)
if err != nil {
t.Fatal(err)
} else if *(v.TTL) != 31622400000000000 {
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
} else if *(v.TTL) != 86400000000000 {
t.Fatalf("invalid duration %v != 86400000000000", *v.TTL)
}
}
@ -68,11 +68,11 @@ func TestUnmarshalJSON(t *testing.T) {
t.Fatalf("invalid duration %v != 10000000", v.TTL)
}
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
err = json.Unmarshal([]byte(`{"ttl":"1d"}`), v)
if err != nil {
t.Fatal(err)
} else if v.TTL != 31622400000000000 {
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
} else if v.TTL != 86400000000000 {
t.Fatalf("invalid duration %v != 86400000000000", v.TTL)
}
}
@ -87,11 +87,11 @@ func TestParseDuration(t *testing.T) {
if td.String() != "340h0m0s" {
t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String())
}
td, err = ParseDuration("1y")
td, err = ParseDuration("1d")
if err != nil {
t.Fatalf("ParseDuration error: %v", err)
}
if td.String() != "8784h0m0s" {
t.Fatalf("ParseDuration 1y != 8784h0m0s : %s", td.String())
if td.String() != "24h0m0s" {
t.Fatalf("ParseDuration 1d != 24h0m0s : %s", td.String())
}
}