Compare commits
49 Commits
v4.1.2
...
6c9dbc77dd
Author | SHA1 | Date | |
---|---|---|---|
6c9dbc77dd | |||
70f0ace92e | |||
3f21bafc2f | |||
a9ed8b16c1 | |||
|
740cd5931d | ||
85a78063d0 | |||
604ad9cd9d | |||
91137537a2 | |||
950e2352fd | |||
0bb29b29cf | |||
17bcd0b0ab | |||
20f9f4da3b | |||
66fa04b8dc | |||
1ef3ad6531 | |||
c95a91349d | |||
fdcf8e6ca4 | |||
8cb2d9db4a | |||
04da4388ac | |||
79fb23e644 | |||
f8fe923ab1 | |||
105f56dbfe | |||
9fed5a368b | |||
7374d41cf8 | |||
a4a8935c1f | |||
5f498c8232 | |||
a00fdf679b | |||
dc9ebe4155 | |||
87ced484b7 | |||
af99b11a59 | |||
2724b51f7c | |||
5b5d0e02b9 | |||
afc2de6819 | |||
32a8ab9c05 | |||
|
7e5401bded | ||
64b91cea06 | |||
|
0f59fdcbde | ||
50979e6708 | |||
46f3108870 | |||
|
5fed91a65f | ||
1c5bba908d | |||
|
bc8ebdcad5 | ||
fc24f3af92 | |||
1058177d1c | |||
|
fa53fac085 | ||
8c060df5e3 | |||
e1f8c62685 | |||
562b1ab9b7 | |||
|
f3c877a37b | ||
0999b2ad78 |
@@ -3,6 +3,9 @@ name: coverage
|
|||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
branches: [ main, v3, v4 ]
|
branches: [ main, v3, v4 ]
|
||||||
|
paths-ignore:
|
||||||
|
- '.github/**'
|
||||||
|
- '.gitea/**'
|
||||||
pull_request:
|
pull_request:
|
||||||
branches: [ main, v3, v4 ]
|
branches: [ main, v3, v4 ]
|
||||||
# Allows you to run this workflow manually from the Actions tab
|
# Allows you to run this workflow manually from the Actions tab
|
||||||
|
@@ -3,10 +3,10 @@ name: lint
|
|||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
types: [opened, reopened, synchronize]
|
types: [opened, reopened, synchronize]
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
paths-ignore:
|
||||||
- v3
|
- '.github/**'
|
||||||
- v4
|
- '.gitea/**'
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
lint:
|
lint:
|
||||||
|
54
.gitea/workflows/job_syncpull.yml
Normal file
54
.gitea/workflows/job_syncpull.yml
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
name: syncpull
|
||||||
|
|
||||||
|
on:
|
||||||
|
schedule:
|
||||||
|
- cron: '* * * * *'
|
||||||
|
# Allows you to run this workflow manually from the Actions tab
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
pull:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: init
|
||||||
|
run: |
|
||||||
|
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
|
||||||
|
git config --global user.name "github-actions[bot]"
|
||||||
|
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" | tee -a /root/.netrc
|
||||||
|
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" | tee -a /root/.netrc
|
||||||
|
|
||||||
|
- name: track master
|
||||||
|
run: |
|
||||||
|
git clone --depth=10 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||||
|
cd repo
|
||||||
|
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
|
||||||
|
git pull --rebase upstream master
|
||||||
|
git push upstream master --progress
|
||||||
|
git merge --allow-unrelated-histories "upstream/master"
|
||||||
|
git push origin master --progress
|
||||||
|
cd ../
|
||||||
|
rm -rf repo
|
||||||
|
|
||||||
|
- name: track v3
|
||||||
|
run: |
|
||||||
|
git clone --depth=10 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||||
|
cd repo
|
||||||
|
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||||
|
git pull --rebase upstream v3
|
||||||
|
git push upstream v3
|
||||||
|
git merge --allow-unrelated-histories "upstream/v3"
|
||||||
|
git push origin v3 --progress
|
||||||
|
cd ../
|
||||||
|
rm -rf repo
|
||||||
|
|
||||||
|
- name: track v4
|
||||||
|
run: |
|
||||||
|
git clone --depth=10 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||||
|
cd repo
|
||||||
|
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||||
|
git pull --rebase upstream v4
|
||||||
|
git push upstream v4
|
||||||
|
git merge --allow-unrelated-histories "upstream/v4"
|
||||||
|
git push origin v4 --progress
|
||||||
|
cd ../
|
||||||
|
rm -rf repo
|
@@ -3,15 +3,12 @@ name: test
|
|||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
types: [opened, reopened, synchronize]
|
types: [opened, reopened, synchronize]
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
|
||||||
- v3
|
|
||||||
- v4
|
|
||||||
push:
|
push:
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
paths-ignore:
|
||||||
- v3
|
- '.github/**'
|
||||||
- v4
|
- '.gitea/**'
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
||||||
|
@@ -3,15 +3,12 @@ name: test
|
|||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
types: [opened, reopened, synchronize]
|
types: [opened, reopened, synchronize]
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
|
||||||
- v3
|
|
||||||
- v4
|
|
||||||
push:
|
push:
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
paths-ignore:
|
||||||
- v3
|
- '.github/**'
|
||||||
- v4
|
- '.gitea/**'
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
run:
|
run:
|
||||||
concurrency: 8
|
concurrency: 8
|
||||||
deadline: 5m
|
timeout: 5m
|
||||||
issues-exit-code: 1
|
issues-exit-code: 1
|
||||||
tests: true
|
tests: true
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
# Micro
|
# Micro
|
||||||

|

|
||||||
[](https://opensource.org/licenses/Apache-2.0)
|
[](https://opensource.org/licenses/Apache-2.0)
|
||||||
[](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
|
[](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
|
||||||
[](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)
|
[](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)
|
||||||
|
15
SECURITY.md
15
SECURITY.md
@@ -1,15 +0,0 @@
|
|||||||
# Security Policy
|
|
||||||
|
|
||||||
## Supported Versions
|
|
||||||
|
|
||||||
Use this section to tell people about which versions of your project are
|
|
||||||
currently being supported with security updates.
|
|
||||||
|
|
||||||
| Version | Supported |
|
|
||||||
| ------- | ------------------ |
|
|
||||||
| 3.7.x | :white_check_mark: |
|
|
||||||
| < 3.7.0 | :x: |
|
|
||||||
|
|
||||||
## Reporting a Vulnerability
|
|
||||||
|
|
||||||
If you find any issue, please create github issue in this repo
|
|
@@ -21,7 +21,7 @@ var (
|
|||||||
// ErrInvalidMessage returns when invalid Message passed
|
// ErrInvalidMessage returns when invalid Message passed
|
||||||
ErrInvalidMessage = errors.New("invalid message")
|
ErrInvalidMessage = errors.New("invalid message")
|
||||||
// ErrInvalidHandler returns when subscriber passed to Subscribe
|
// ErrInvalidHandler returns when subscriber passed to Subscribe
|
||||||
ErrInvalidHandler = errors.New("invalid handler")
|
ErrInvalidHandler = errors.New("invalid handler, ony func(Message) error and func([]Message) error supported")
|
||||||
// DefaultGracefulTimeout
|
// DefaultGracefulTimeout
|
||||||
DefaultGracefulTimeout = 5 * time.Second
|
DefaultGracefulTimeout = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
@@ -42,6 +42,16 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetPublishOption returns a function to setup a context with given value
|
||||||
|
func SetPublishOption(k, v interface{}) PublishOption {
|
||||||
|
return func(o *PublishOptions) {
|
||||||
|
if o.Context == nil {
|
||||||
|
o.Context = context.Background()
|
||||||
|
}
|
||||||
|
o.Context = context.WithValue(o.Context, k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SetOption returns a function to setup a context with given value
|
// SetOption returns a function to setup a context with given value
|
||||||
func SetOption(k, v interface{}) Option {
|
func SetOption(k, v interface{}) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
@@ -79,11 +79,15 @@ type PublishOptions struct {
|
|||||||
// BodyOnly flag says the message contains raw body bytes and don't need
|
// BodyOnly flag says the message contains raw body bytes and don't need
|
||||||
// codec Marshal method
|
// codec Marshal method
|
||||||
BodyOnly bool
|
BodyOnly bool
|
||||||
|
// Context holds custom options
|
||||||
|
Context context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPublishOptions creates PublishOptions struct
|
// NewPublishOptions creates PublishOptions struct
|
||||||
func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
||||||
options := PublishOptions{}
|
options := PublishOptions{
|
||||||
|
Context: context.Background(),
|
||||||
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
@@ -1,87 +1,14 @@
|
|||||||
package broker
|
package broker
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"unicode"
|
|
||||||
"unicode/utf8"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
messageSig = "func(broker.Message) error"
|
|
||||||
messagesSig = "func([]broker.Message) error"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Precompute the reflect type for error. Can't use error directly
|
|
||||||
// because Typeof takes an empty interface value. This is annoying.
|
|
||||||
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
|
||||||
|
|
||||||
// Is this an exported - upper case - name?
|
|
||||||
func isExported(name string) bool {
|
|
||||||
r, _ := utf8.DecodeRuneInString(name)
|
|
||||||
return unicode.IsUpper(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is this type exported or a builtin?
|
|
||||||
func isExportedOrBuiltinType(t reflect.Type) bool {
|
|
||||||
for t.Kind() == reflect.Ptr {
|
|
||||||
t = t.Elem()
|
|
||||||
}
|
|
||||||
// PkgPath will be non-empty even for an exported type,
|
|
||||||
// so we need to check the type name as well.
|
|
||||||
return isExported(t.Name()) || t.PkgPath() == ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsValidHandler func signature
|
// IsValidHandler func signature
|
||||||
func IsValidHandler(sub interface{}) error {
|
func IsValidHandler(sub interface{}) error {
|
||||||
typ := reflect.TypeOf(sub)
|
switch sub.(type) {
|
||||||
var argType reflect.Type
|
|
||||||
switch typ.Kind() {
|
|
||||||
case reflect.Func:
|
|
||||||
name := "Func"
|
|
||||||
switch typ.NumIn() {
|
|
||||||
case 1:
|
|
||||||
argType = typ.In(0)
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), messageSig)
|
|
||||||
}
|
|
||||||
if !isExportedOrBuiltinType(argType) {
|
|
||||||
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
|
|
||||||
}
|
|
||||||
if typ.NumOut() != 1 {
|
|
||||||
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
|
|
||||||
name, typ.NumOut(), messageSig)
|
|
||||||
}
|
|
||||||
if returnType := typ.Out(0); returnType != typeOfError {
|
|
||||||
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
hdlr := reflect.ValueOf(sub)
|
return ErrInvalidHandler
|
||||||
name := reflect.Indirect(hdlr).Type().Name()
|
case func(Message) error:
|
||||||
|
break
|
||||||
for m := 0; m < typ.NumMethod(); m++ {
|
case func([]Message) error:
|
||||||
method := typ.Method(m)
|
break
|
||||||
switch method.Type.NumIn() {
|
|
||||||
case 3:
|
|
||||||
argType = method.Type.In(2)
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
|
|
||||||
name, method.Name, method.Type.NumIn(), messageSig)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !isExportedOrBuiltinType(argType) {
|
|
||||||
return fmt.Errorf("%v argument type not exported: %v", name, argType)
|
|
||||||
}
|
|
||||||
if method.Type.NumOut() != 1 {
|
|
||||||
return fmt.Errorf(
|
|
||||||
"subscriber %v.%v has wrong number of return values: %v require signature %s",
|
|
||||||
name, method.Name, method.Type.NumOut(), messageSig)
|
|
||||||
}
|
|
||||||
if returnType := method.Type.Out(0); returnType != typeOfError {
|
|
||||||
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
117
hooks/metadata/metadata.go
Normal file
117
hooks/metadata/metadata.go
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
package metadata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/client"
|
||||||
|
"go.unistack.org/micro/v4/metadata"
|
||||||
|
"go.unistack.org/micro/v4/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
type wrapper struct {
|
||||||
|
keys []string
|
||||||
|
|
||||||
|
client.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClientWrapper(keys ...string) client.Wrapper {
|
||||||
|
return func(c client.Client) client.Client {
|
||||||
|
handler := &wrapper{
|
||||||
|
Client: c,
|
||||||
|
keys: keys,
|
||||||
|
}
|
||||||
|
return handler
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClientCallWrapper(keys ...string) client.CallWrapper {
|
||||||
|
return func(fn client.CallFunc) client.CallFunc {
|
||||||
|
return func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||||
|
if keys == nil {
|
||||||
|
return fn(ctx, addr, req, rsp, opts)
|
||||||
|
}
|
||||||
|
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(len(imd))
|
||||||
|
}
|
||||||
|
for _, k := range keys {
|
||||||
|
if v, ok := imd.Get(k); ok {
|
||||||
|
omd.Add(k, v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ook {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fn(ctx, addr, req, rsp, opts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
|
if w.keys == nil {
|
||||||
|
return w.Client.Call(ctx, req, rsp, opts...)
|
||||||
|
}
|
||||||
|
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(len(imd))
|
||||||
|
}
|
||||||
|
for _, k := range w.keys {
|
||||||
|
if v, ok := imd.Get(k); ok {
|
||||||
|
omd.Add(k, v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ook {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return w.Client.Call(ctx, req, rsp, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
|
if w.keys == nil {
|
||||||
|
return w.Client.Stream(ctx, req, opts...)
|
||||||
|
}
|
||||||
|
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(len(imd))
|
||||||
|
}
|
||||||
|
for _, k := range w.keys {
|
||||||
|
if v, ok := imd.Get(k); ok {
|
||||||
|
omd.Add(k, v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ook {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return w.Client.Stream(ctx, req, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServerHandlerWrapper(keys ...string) server.HandlerWrapper {
|
||||||
|
return func(fn server.HandlerFunc) server.HandlerFunc {
|
||||||
|
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||||
|
if keys == nil {
|
||||||
|
return fn(ctx, req, rsp)
|
||||||
|
}
|
||||||
|
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(len(imd))
|
||||||
|
}
|
||||||
|
for _, k := range keys {
|
||||||
|
if v, ok := imd.Get(k); ok {
|
||||||
|
omd.Add(k, v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ook {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fn(ctx, req, rsp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
63
hooks/recovery/recovery.go
Normal file
63
hooks/recovery/recovery.go
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
package recovery
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/errors"
|
||||||
|
"go.unistack.org/micro/v4/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewOptions(opts ...Option) Options {
|
||||||
|
options := Options{
|
||||||
|
ServerHandlerFn: DefaultServerHandlerFn,
|
||||||
|
}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
return options
|
||||||
|
}
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
ServerHandlerFn func(context.Context, server.Request, interface{}, error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
|
func ServerHandlerFunc(fn func(context.Context, server.Request, interface{}, error) error) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ServerHandlerFn = fn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefaultServerHandlerFn = func(ctx context.Context, req server.Request, rsp interface{}, err error) error {
|
||||||
|
return errors.BadRequest("", "%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var Hook = NewHook()
|
||||||
|
|
||||||
|
type hook struct {
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHook(opts ...Option) *hook {
|
||||||
|
return &hook{opts: NewOptions(opts...)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||||
|
return func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
|
||||||
|
defer func() {
|
||||||
|
r := recover()
|
||||||
|
switch verr := r.(type) {
|
||||||
|
case nil:
|
||||||
|
return
|
||||||
|
case error:
|
||||||
|
err = w.opts.ServerHandlerFn(ctx, req, rsp, verr)
|
||||||
|
default:
|
||||||
|
err = w.opts.ServerHandlerFn(ctx, req, rsp, fmt.Errorf("%v", r))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
err = next(ctx, req, rsp)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
114
hooks/requestid/requestid.go
Normal file
114
hooks/requestid/requestid.go
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
package requestid
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/textproto"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/client"
|
||||||
|
"go.unistack.org/micro/v4/metadata"
|
||||||
|
"go.unistack.org/micro/v4/server"
|
||||||
|
"go.unistack.org/micro/v4/util/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type XRequestIDKey struct{}
|
||||||
|
|
||||||
|
// DefaultMetadataKey contains metadata key
|
||||||
|
var DefaultMetadataKey = textproto.CanonicalMIMEHeaderKey("x-request-id")
|
||||||
|
|
||||||
|
// DefaultMetadataFunc wil be used if user not provide own func to fill metadata
|
||||||
|
var DefaultMetadataFunc = func(ctx context.Context) (context.Context, error) {
|
||||||
|
var xid string
|
||||||
|
|
||||||
|
cid, cok := ctx.Value(XRequestIDKey{}).(string)
|
||||||
|
if cok && cid != "" {
|
||||||
|
xid = cid
|
||||||
|
}
|
||||||
|
|
||||||
|
imd, iok := metadata.FromIncomingContext(ctx)
|
||||||
|
if !iok || imd == nil {
|
||||||
|
imd = metadata.New(1)
|
||||||
|
ctx = metadata.NewIncomingContext(ctx, imd)
|
||||||
|
}
|
||||||
|
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(1)
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
|
||||||
|
if xid == "" {
|
||||||
|
var ids []string
|
||||||
|
if ids, iok = imd.Get(DefaultMetadataKey); iok {
|
||||||
|
for i := range ids {
|
||||||
|
if ids[i] != "" {
|
||||||
|
xid = ids[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ids, ook = omd.Get(DefaultMetadataKey); ook {
|
||||||
|
for i := range ids {
|
||||||
|
if ids[i] != "" {
|
||||||
|
xid = ids[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if xid == "" {
|
||||||
|
var err error
|
||||||
|
xid, err = id.New()
|
||||||
|
if err != nil {
|
||||||
|
return ctx, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !cok {
|
||||||
|
ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !iok {
|
||||||
|
imd.Set(DefaultMetadataKey, xid)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ook {
|
||||||
|
omd.Set(DefaultMetadataKey, xid)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type hook struct{}
|
||||||
|
|
||||||
|
func NewHook() *hook {
|
||||||
|
return &hook{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||||
|
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||||
|
var err error
|
||||||
|
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return next(ctx, req, rsp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
|
||||||
|
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
|
var err error
|
||||||
|
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return next(ctx, req, rsp, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
|
||||||
|
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
|
var err error
|
||||||
|
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return next(ctx, req, opts...)
|
||||||
|
}
|
||||||
|
}
|
33
hooks/requestid/requestid_test.go
Normal file
33
hooks/requestid/requestid_test.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package requestid
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDefaultMetadataFunc(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
nctx, err := DefaultMetadataFunc(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
imd, ok := metadata.FromIncomingContext(nctx)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("md missing in incoming context")
|
||||||
|
}
|
||||||
|
omd, ok := metadata.FromOutgoingContext(nctx)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("md missing in outgoing context")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, iok := imd.Get(DefaultMetadataKey)
|
||||||
|
_, ook := omd.Get(DefaultMetadataKey)
|
||||||
|
|
||||||
|
if !iok || !ook {
|
||||||
|
t.Fatalf("missing metadata key value")
|
||||||
|
}
|
||||||
|
}
|
133
hooks/validator/validator.go
Normal file
133
hooks/validator/validator.go
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
package validator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/client"
|
||||||
|
"go.unistack.org/micro/v4/errors"
|
||||||
|
"go.unistack.org/micro/v4/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
DefaultClientErrorFunc = func(req client.Request, rsp interface{}, err error) error {
|
||||||
|
if rsp != nil {
|
||||||
|
return errors.BadGateway(req.Service(), "%v", err)
|
||||||
|
}
|
||||||
|
return errors.BadRequest(req.Service(), "%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultServerErrorFunc = func(req server.Request, rsp interface{}, err error) error {
|
||||||
|
if rsp != nil {
|
||||||
|
return errors.BadGateway(req.Service(), "%v", err)
|
||||||
|
}
|
||||||
|
return errors.BadRequest(req.Service(), "%v", err)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
ClientErrorFunc func(client.Request, interface{}, error) error
|
||||||
|
ServerErrorFunc func(server.Request, interface{}, error) error
|
||||||
|
)
|
||||||
|
|
||||||
|
// Options struct holds wrapper options
|
||||||
|
type Options struct {
|
||||||
|
ClientErrorFn ClientErrorFunc
|
||||||
|
ServerErrorFn ServerErrorFunc
|
||||||
|
ClientValidateResponse bool
|
||||||
|
ServerValidateResponse bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option func signature
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
|
func ClientValidateResponse(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ClientValidateResponse = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ServerValidateResponse(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ClientValidateResponse = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ClientReqErrorFn(fn ClientErrorFunc) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ClientErrorFn = fn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ServerErrorFn(fn ServerErrorFunc) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ServerErrorFn = fn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOptions(opts ...Option) Options {
|
||||||
|
options := Options{
|
||||||
|
ClientErrorFn: DefaultClientErrorFunc,
|
||||||
|
ServerErrorFn: DefaultServerErrorFunc,
|
||||||
|
}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
return options
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHook(opts ...Option) *hook {
|
||||||
|
return &hook{opts: NewOptions(opts...)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type validator interface {
|
||||||
|
Validate() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type hook struct {
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
|
||||||
|
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
|
if v, ok := req.Body().(validator); ok {
|
||||||
|
if err := v.Validate(); err != nil {
|
||||||
|
return w.opts.ClientErrorFn(req, nil, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := next(ctx, req, rsp, opts...)
|
||||||
|
if v, ok := rsp.(validator); ok && w.opts.ClientValidateResponse {
|
||||||
|
if verr := v.Validate(); verr != nil {
|
||||||
|
return w.opts.ClientErrorFn(req, rsp, verr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
|
||||||
|
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
|
if v, ok := req.Body().(validator); ok {
|
||||||
|
if err := v.Validate(); err != nil {
|
||||||
|
return nil, w.opts.ClientErrorFn(req, nil, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return next(ctx, req, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||||
|
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||||
|
if v, ok := req.Body().(validator); ok {
|
||||||
|
if err := v.Validate(); err != nil {
|
||||||
|
return w.opts.ServerErrorFn(req, nil, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := next(ctx, req, rsp)
|
||||||
|
if v, ok := rsp.(validator); ok && w.opts.ServerValidateResponse {
|
||||||
|
if verr := v.Validate(); verr != nil {
|
||||||
|
return w.opts.ServerErrorFn(req, rsp, verr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
@@ -2,7 +2,6 @@ package logger
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
@@ -100,11 +99,8 @@ func WithAddFields(fields ...interface{}) Option {
|
|||||||
iv, iok := o.Fields[i].(string)
|
iv, iok := o.Fields[i].(string)
|
||||||
jv, jok := fields[j].(string)
|
jv, jok := fields[j].(string)
|
||||||
if iok && jok && iv == jv {
|
if iok && jok && iv == jv {
|
||||||
fmt.Printf("AAAA o.Fields:%v old:%v new:%v\n", o.Fields[i], o.Fields[i+1], fields[j+1])
|
|
||||||
o.Fields[i+1] = fields[j+1]
|
o.Fields[i+1] = fields[j+1]
|
||||||
fmt.Printf("BBBB o.Fields:%v old:%v new:%v\n", o.Fields[i], o.Fields[i+1], fields[j+1])
|
|
||||||
fields = slices.Delete(fields, j, j+2)
|
fields = slices.Delete(fields, j, j+2)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -69,6 +69,15 @@ func (md Metadata) Copy() Metadata {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AsMap returns a copy of Metadata with map[string]string.
|
||||||
|
func (md Metadata) AsMap() map[string]string {
|
||||||
|
out := make(map[string]string, len(md))
|
||||||
|
for k, v := range md {
|
||||||
|
out[k] = strings.Join(v, ",")
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
// AsHTTP1 returns a copy of Metadata
|
// AsHTTP1 returns a copy of Metadata
|
||||||
// with CanonicalMIMEHeaderKey.
|
// with CanonicalMIMEHeaderKey.
|
||||||
func (md Metadata) AsHTTP1() map[string][]string {
|
func (md Metadata) AsHTTP1() map[string][]string {
|
||||||
|
20
service.go
20
service.go
@@ -99,6 +99,7 @@ type service struct {
|
|||||||
done chan struct{}
|
done chan struct{}
|
||||||
opts Options
|
opts Options
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
stopped bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService creates and returns a new Service based on the packages within.
|
// NewService creates and returns a new Service based on the packages within.
|
||||||
@@ -424,7 +425,7 @@ func (s *service) Stop() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close(s.done)
|
s.notifyShutdown()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -448,10 +449,23 @@ func (s *service) Run() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait on context cancel
|
|
||||||
<-s.done
|
<-s.done
|
||||||
|
|
||||||
return s.Stop()
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// notifyShutdown marks the service as stopped and closes the done channel.
|
||||||
|
// It ensures the channel is closed only once, preventing multiple closures.
|
||||||
|
func (s *service) notifyShutdown() {
|
||||||
|
s.Lock()
|
||||||
|
if s.stopped {
|
||||||
|
s.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.stopped = true
|
||||||
|
s.Unlock()
|
||||||
|
|
||||||
|
close(s.done)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Namer interface {
|
type Namer interface {
|
||||||
|
@@ -3,7 +3,9 @@ package micro
|
|||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"go.unistack.org/micro/v4/broker"
|
"go.unistack.org/micro/v4/broker"
|
||||||
"go.unistack.org/micro/v4/client"
|
"go.unistack.org/micro/v4/client"
|
||||||
"go.unistack.org/micro/v4/config"
|
"go.unistack.org/micro/v4/config"
|
||||||
@@ -737,3 +739,41 @@ func Test_getNameIndex(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
func TestServiceShutdown(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
t.Fatalf("service shutdown failed: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
s, ok := NewService().(*service)
|
||||||
|
require.NotNil(t, s)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.NoError(t, s.Start())
|
||||||
|
require.False(t, s.stopped)
|
||||||
|
|
||||||
|
require.NoError(t, s.Stop())
|
||||||
|
require.True(t, s.stopped)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServiceMultipleShutdowns(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
t.Fatalf("service shutdown failed: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
s := NewService()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
// first call
|
||||||
|
require.NoError(t, s.Stop())
|
||||||
|
// duplicate call
|
||||||
|
require.NoError(t, s.Stop())
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.NoError(t, s.Run())
|
||||||
|
}
|
||||||
|
@@ -89,6 +89,10 @@ func (s *Span) Tracer() tracer.Tracer {
|
|||||||
return s.tracer
|
return s.tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Span) IsRecording() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
name string
|
name string
|
||||||
labels []interface{}
|
labels []interface{}
|
||||||
|
@@ -120,6 +120,10 @@ func (s *noopSpan) SetStatus(st SpanStatus, msg string) {
|
|||||||
s.statusMsg = msg
|
s.statusMsg = msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *noopSpan) IsRecording() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// NewTracer returns new memory tracer
|
// NewTracer returns new memory tracer
|
||||||
func NewTracer(opts ...Option) Tracer {
|
func NewTracer(opts ...Option) Tracer {
|
||||||
return &noopTracer{
|
return &noopTracer{
|
||||||
|
@@ -78,4 +78,6 @@ type Span interface {
|
|||||||
TraceID() string
|
TraceID() string
|
||||||
// SpanID returns span id
|
// SpanID returns span id
|
||||||
SpanID() string
|
SpanID() string
|
||||||
|
// IsRecording returns the recording state of the Span.
|
||||||
|
IsRecording() bool
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user