Compare commits
51 Commits
v4.1.1
...
6c9dbc77dd
Author | SHA1 | Date | |
---|---|---|---|
6c9dbc77dd | |||
70f0ace92e | |||
3f21bafc2f | |||
a9ed8b16c1 | |||
|
740cd5931d | ||
85a78063d0 | |||
604ad9cd9d | |||
91137537a2 | |||
950e2352fd | |||
0bb29b29cf | |||
17bcd0b0ab | |||
20f9f4da3b | |||
66fa04b8dc | |||
1ef3ad6531 | |||
c95a91349d | |||
fdcf8e6ca4 | |||
8cb2d9db4a | |||
04da4388ac | |||
79fb23e644 | |||
f8fe923ab1 | |||
105f56dbfe | |||
9fed5a368b | |||
7374d41cf8 | |||
a4a8935c1f | |||
5f498c8232 | |||
a00fdf679b | |||
dc9ebe4155 | |||
87ced484b7 | |||
af99b11a59 | |||
2724b51f7c | |||
5b5d0e02b9 | |||
afc2de6819 | |||
32a8ab9c05 | |||
|
7e5401bded | ||
64b91cea06 | |||
|
0f59fdcbde | ||
50979e6708 | |||
46f3108870 | |||
|
5fed91a65f | ||
1c5bba908d | |||
|
bc8ebdcad5 | ||
fc24f3af92 | |||
1058177d1c | |||
|
fa53fac085 | ||
8c060df5e3 | |||
e1f8c62685 | |||
562b1ab9b7 | |||
|
f3c877a37b | ||
0999b2ad78 | |||
a365513177 | |||
|
d1e3f3cab2 |
@@ -3,6 +3,9 @@ name: coverage
|
||||
on:
|
||||
push:
|
||||
branches: [ main, v3, v4 ]
|
||||
paths-ignore:
|
||||
- '.github/**'
|
||||
- '.gitea/**'
|
||||
pull_request:
|
||||
branches: [ main, v3, v4 ]
|
||||
# Allows you to run this workflow manually from the Actions tab
|
||||
|
@@ -3,10 +3,10 @@ name: lint
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
branches: [ master, v3, v4 ]
|
||||
paths-ignore:
|
||||
- '.github/**'
|
||||
- '.gitea/**'
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
|
54
.gitea/workflows/job_syncpull.yml
Normal file
54
.gitea/workflows/job_syncpull.yml
Normal file
@@ -0,0 +1,54 @@
|
||||
name: syncpull
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '* * * * *'
|
||||
# Allows you to run this workflow manually from the Actions tab
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
pull:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: init
|
||||
run: |
|
||||
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" | tee -a /root/.netrc
|
||||
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" | tee -a /root/.netrc
|
||||
|
||||
- name: track master
|
||||
run: |
|
||||
git clone --depth=10 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||
cd repo
|
||||
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
|
||||
git pull --rebase upstream master
|
||||
git push upstream master --progress
|
||||
git merge --allow-unrelated-histories "upstream/master"
|
||||
git push origin master --progress
|
||||
cd ../
|
||||
rm -rf repo
|
||||
|
||||
- name: track v3
|
||||
run: |
|
||||
git clone --depth=10 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||
cd repo
|
||||
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||
git pull --rebase upstream v3
|
||||
git push upstream v3
|
||||
git merge --allow-unrelated-histories "upstream/v3"
|
||||
git push origin v3 --progress
|
||||
cd ../
|
||||
rm -rf repo
|
||||
|
||||
- name: track v4
|
||||
run: |
|
||||
git clone --depth=10 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||
cd repo
|
||||
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||
git pull --rebase upstream v4
|
||||
git push upstream v4
|
||||
git merge --allow-unrelated-histories "upstream/v4"
|
||||
git push origin v4 --progress
|
||||
cd ../
|
||||
rm -rf repo
|
@@ -3,15 +3,12 @@ name: test
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
branches: [ master, v3, v4 ]
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
branches: [ master, v3, v4 ]
|
||||
paths-ignore:
|
||||
- '.github/**'
|
||||
- '.gitea/**'
|
||||
|
||||
jobs:
|
||||
test:
|
||||
|
@@ -3,15 +3,12 @@ name: test
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
branches: [ master, v3, v4 ]
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
branches: [ master, v3, v4 ]
|
||||
paths-ignore:
|
||||
- '.github/**'
|
||||
- '.gitea/**'
|
||||
|
||||
jobs:
|
||||
test:
|
||||
|
@@ -1,5 +1,5 @@
|
||||
run:
|
||||
concurrency: 8
|
||||
deadline: 5m
|
||||
timeout: 5m
|
||||
issues-exit-code: 1
|
||||
tests: true
|
||||
|
@@ -1,5 +1,5 @@
|
||||
# Micro
|
||||

|
||||

|
||||
[](https://opensource.org/licenses/Apache-2.0)
|
||||
[](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
|
||||
[](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)
|
||||
|
15
SECURITY.md
15
SECURITY.md
@@ -1,15 +0,0 @@
|
||||
# Security Policy
|
||||
|
||||
## Supported Versions
|
||||
|
||||
Use this section to tell people about which versions of your project are
|
||||
currently being supported with security updates.
|
||||
|
||||
| Version | Supported |
|
||||
| ------- | ------------------ |
|
||||
| 3.7.x | :white_check_mark: |
|
||||
| < 3.7.0 | :x: |
|
||||
|
||||
## Reporting a Vulnerability
|
||||
|
||||
If you find any issue, please create github issue in this repo
|
@@ -21,7 +21,7 @@ var (
|
||||
// ErrInvalidMessage returns when invalid Message passed
|
||||
ErrInvalidMessage = errors.New("invalid message")
|
||||
// ErrInvalidHandler returns when subscriber passed to Subscribe
|
||||
ErrInvalidHandler = errors.New("invalid handler")
|
||||
ErrInvalidHandler = errors.New("invalid handler, ony func(Message) error and func([]Message) error supported")
|
||||
// DefaultGracefulTimeout
|
||||
DefaultGracefulTimeout = 5 * time.Second
|
||||
)
|
||||
|
@@ -42,6 +42,16 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption {
|
||||
}
|
||||
}
|
||||
|
||||
// SetPublishOption returns a function to setup a context with given value
|
||||
func SetPublishOption(k, v interface{}) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// SetOption returns a function to setup a context with given value
|
||||
func SetOption(k, v interface{}) Option {
|
||||
return func(o *Options) {
|
||||
|
@@ -79,11 +79,15 @@ type PublishOptions struct {
|
||||
// BodyOnly flag says the message contains raw body bytes and don't need
|
||||
// codec Marshal method
|
||||
BodyOnly bool
|
||||
// Context holds custom options
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// NewPublishOptions creates PublishOptions struct
|
||||
func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
||||
options := PublishOptions{}
|
||||
options := PublishOptions{
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
@@ -1,87 +1,14 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
const (
|
||||
messageSig = "func(broker.Message) error"
|
||||
messagesSig = "func([]broker.Message) error"
|
||||
)
|
||||
|
||||
// Precompute the reflect type for error. Can't use error directly
|
||||
// because Typeof takes an empty interface value. This is annoying.
|
||||
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
||||
|
||||
// Is this an exported - upper case - name?
|
||||
func isExported(name string) bool {
|
||||
r, _ := utf8.DecodeRuneInString(name)
|
||||
return unicode.IsUpper(r)
|
||||
}
|
||||
|
||||
// Is this type exported or a builtin?
|
||||
func isExportedOrBuiltinType(t reflect.Type) bool {
|
||||
for t.Kind() == reflect.Ptr {
|
||||
t = t.Elem()
|
||||
}
|
||||
// PkgPath will be non-empty even for an exported type,
|
||||
// so we need to check the type name as well.
|
||||
return isExported(t.Name()) || t.PkgPath() == ""
|
||||
}
|
||||
|
||||
// IsValidHandler func signature
|
||||
func IsValidHandler(sub interface{}) error {
|
||||
typ := reflect.TypeOf(sub)
|
||||
var argType reflect.Type
|
||||
switch typ.Kind() {
|
||||
case reflect.Func:
|
||||
name := "Func"
|
||||
switch typ.NumIn() {
|
||||
case 1:
|
||||
argType = typ.In(0)
|
||||
default:
|
||||
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), messageSig)
|
||||
}
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
|
||||
}
|
||||
if typ.NumOut() != 1 {
|
||||
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
|
||||
name, typ.NumOut(), messageSig)
|
||||
}
|
||||
if returnType := typ.Out(0); returnType != typeOfError {
|
||||
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
||||
}
|
||||
switch sub.(type) {
|
||||
default:
|
||||
hdlr := reflect.ValueOf(sub)
|
||||
name := reflect.Indirect(hdlr).Type().Name()
|
||||
|
||||
for m := 0; m < typ.NumMethod(); m++ {
|
||||
method := typ.Method(m)
|
||||
switch method.Type.NumIn() {
|
||||
case 3:
|
||||
argType = method.Type.In(2)
|
||||
default:
|
||||
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
|
||||
name, method.Name, method.Type.NumIn(), messageSig)
|
||||
}
|
||||
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
return fmt.Errorf("%v argument type not exported: %v", name, argType)
|
||||
}
|
||||
if method.Type.NumOut() != 1 {
|
||||
return fmt.Errorf(
|
||||
"subscriber %v.%v has wrong number of return values: %v require signature %s",
|
||||
name, method.Name, method.Type.NumOut(), messageSig)
|
||||
}
|
||||
if returnType := method.Type.Out(0); returnType != typeOfError {
|
||||
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
|
||||
}
|
||||
}
|
||||
return ErrInvalidHandler
|
||||
case func(Message) error:
|
||||
break
|
||||
case func([]Message) error:
|
||||
break
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
117
hooks/metadata/metadata.go
Normal file
117
hooks/metadata/metadata.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
"go.unistack.org/micro/v4/server"
|
||||
)
|
||||
|
||||
type wrapper struct {
|
||||
keys []string
|
||||
|
||||
client.Client
|
||||
}
|
||||
|
||||
func NewClientWrapper(keys ...string) client.Wrapper {
|
||||
return func(c client.Client) client.Client {
|
||||
handler := &wrapper{
|
||||
Client: c,
|
||||
keys: keys,
|
||||
}
|
||||
return handler
|
||||
}
|
||||
}
|
||||
|
||||
func NewClientCallWrapper(keys ...string) client.CallWrapper {
|
||||
return func(fn client.CallFunc) client.CallFunc {
|
||||
return func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
if keys == nil {
|
||||
return fn(ctx, addr, req, rsp, opts)
|
||||
}
|
||||
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(len(imd))
|
||||
}
|
||||
for _, k := range keys {
|
||||
if v, ok := imd.Get(k); ok {
|
||||
omd.Add(k, v...)
|
||||
}
|
||||
}
|
||||
if !ook {
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
}
|
||||
return fn(ctx, addr, req, rsp, opts)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
if w.keys == nil {
|
||||
return w.Client.Call(ctx, req, rsp, opts...)
|
||||
}
|
||||
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(len(imd))
|
||||
}
|
||||
for _, k := range w.keys {
|
||||
if v, ok := imd.Get(k); ok {
|
||||
omd.Add(k, v...)
|
||||
}
|
||||
}
|
||||
if !ook {
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
}
|
||||
return w.Client.Call(ctx, req, rsp, opts...)
|
||||
}
|
||||
|
||||
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
if w.keys == nil {
|
||||
return w.Client.Stream(ctx, req, opts...)
|
||||
}
|
||||
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(len(imd))
|
||||
}
|
||||
for _, k := range w.keys {
|
||||
if v, ok := imd.Get(k); ok {
|
||||
omd.Add(k, v...)
|
||||
}
|
||||
}
|
||||
if !ook {
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
}
|
||||
return w.Client.Stream(ctx, req, opts...)
|
||||
}
|
||||
|
||||
func NewServerHandlerWrapper(keys ...string) server.HandlerWrapper {
|
||||
return func(fn server.HandlerFunc) server.HandlerFunc {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
if keys == nil {
|
||||
return fn(ctx, req, rsp)
|
||||
}
|
||||
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(len(imd))
|
||||
}
|
||||
for _, k := range keys {
|
||||
if v, ok := imd.Get(k); ok {
|
||||
omd.Add(k, v...)
|
||||
}
|
||||
}
|
||||
if !ook {
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
}
|
||||
return fn(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
}
|
63
hooks/recovery/recovery.go
Normal file
63
hooks/recovery/recovery.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package recovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.unistack.org/micro/v4/errors"
|
||||
"go.unistack.org/micro/v4/server"
|
||||
)
|
||||
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
ServerHandlerFn: DefaultServerHandlerFn,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
ServerHandlerFn func(context.Context, server.Request, interface{}, error) error
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
func ServerHandlerFunc(fn func(context.Context, server.Request, interface{}, error) error) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerHandlerFn = fn
|
||||
}
|
||||
}
|
||||
|
||||
var DefaultServerHandlerFn = func(ctx context.Context, req server.Request, rsp interface{}, err error) error {
|
||||
return errors.BadRequest("", "%v", err)
|
||||
}
|
||||
|
||||
var Hook = NewHook()
|
||||
|
||||
type hook struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
func NewHook(opts ...Option) *hook {
|
||||
return &hook{opts: NewOptions(opts...)}
|
||||
}
|
||||
|
||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
|
||||
defer func() {
|
||||
r := recover()
|
||||
switch verr := r.(type) {
|
||||
case nil:
|
||||
return
|
||||
case error:
|
||||
err = w.opts.ServerHandlerFn(ctx, req, rsp, verr)
|
||||
default:
|
||||
err = w.opts.ServerHandlerFn(ctx, req, rsp, fmt.Errorf("%v", r))
|
||||
}
|
||||
}()
|
||||
err = next(ctx, req, rsp)
|
||||
return err
|
||||
}
|
||||
}
|
114
hooks/requestid/requestid.go
Normal file
114
hooks/requestid/requestid.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package requestid
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/textproto"
|
||||
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
"go.unistack.org/micro/v4/server"
|
||||
"go.unistack.org/micro/v4/util/id"
|
||||
)
|
||||
|
||||
type XRequestIDKey struct{}
|
||||
|
||||
// DefaultMetadataKey contains metadata key
|
||||
var DefaultMetadataKey = textproto.CanonicalMIMEHeaderKey("x-request-id")
|
||||
|
||||
// DefaultMetadataFunc wil be used if user not provide own func to fill metadata
|
||||
var DefaultMetadataFunc = func(ctx context.Context) (context.Context, error) {
|
||||
var xid string
|
||||
|
||||
cid, cok := ctx.Value(XRequestIDKey{}).(string)
|
||||
if cok && cid != "" {
|
||||
xid = cid
|
||||
}
|
||||
|
||||
imd, iok := metadata.FromIncomingContext(ctx)
|
||||
if !iok || imd == nil {
|
||||
imd = metadata.New(1)
|
||||
ctx = metadata.NewIncomingContext(ctx, imd)
|
||||
}
|
||||
|
||||
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||
if !ook || omd == nil {
|
||||
omd = metadata.New(1)
|
||||
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||
}
|
||||
|
||||
if xid == "" {
|
||||
var ids []string
|
||||
if ids, iok = imd.Get(DefaultMetadataKey); iok {
|
||||
for i := range ids {
|
||||
if ids[i] != "" {
|
||||
xid = ids[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
if ids, ook = omd.Get(DefaultMetadataKey); ook {
|
||||
for i := range ids {
|
||||
if ids[i] != "" {
|
||||
xid = ids[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if xid == "" {
|
||||
var err error
|
||||
xid, err = id.New()
|
||||
if err != nil {
|
||||
return ctx, err
|
||||
}
|
||||
}
|
||||
|
||||
if !cok {
|
||||
ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
|
||||
}
|
||||
|
||||
if !iok {
|
||||
imd.Set(DefaultMetadataKey, xid)
|
||||
}
|
||||
|
||||
if !ook {
|
||||
omd.Set(DefaultMetadataKey, xid)
|
||||
}
|
||||
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
type hook struct{}
|
||||
|
||||
func NewHook() *hook {
|
||||
return &hook{}
|
||||
}
|
||||
|
||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
var err error
|
||||
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return next(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
|
||||
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
var err error
|
||||
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return next(ctx, req, rsp, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
|
||||
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
var err error
|
||||
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return next(ctx, req, opts...)
|
||||
}
|
||||
}
|
33
hooks/requestid/requestid_test.go
Normal file
33
hooks/requestid/requestid_test.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package requestid
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
)
|
||||
|
||||
func TestDefaultMetadataFunc(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
nctx, err := DefaultMetadataFunc(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
imd, ok := metadata.FromIncomingContext(nctx)
|
||||
if !ok {
|
||||
t.Fatalf("md missing in incoming context")
|
||||
}
|
||||
omd, ok := metadata.FromOutgoingContext(nctx)
|
||||
if !ok {
|
||||
t.Fatalf("md missing in outgoing context")
|
||||
}
|
||||
|
||||
_, iok := imd.Get(DefaultMetadataKey)
|
||||
_, ook := omd.Get(DefaultMetadataKey)
|
||||
|
||||
if !iok || !ook {
|
||||
t.Fatalf("missing metadata key value")
|
||||
}
|
||||
}
|
133
hooks/validator/validator.go
Normal file
133
hooks/validator/validator.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package validator
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/errors"
|
||||
"go.unistack.org/micro/v4/server"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultClientErrorFunc = func(req client.Request, rsp interface{}, err error) error {
|
||||
if rsp != nil {
|
||||
return errors.BadGateway(req.Service(), "%v", err)
|
||||
}
|
||||
return errors.BadRequest(req.Service(), "%v", err)
|
||||
}
|
||||
|
||||
DefaultServerErrorFunc = func(req server.Request, rsp interface{}, err error) error {
|
||||
if rsp != nil {
|
||||
return errors.BadGateway(req.Service(), "%v", err)
|
||||
}
|
||||
return errors.BadRequest(req.Service(), "%v", err)
|
||||
}
|
||||
)
|
||||
|
||||
type (
|
||||
ClientErrorFunc func(client.Request, interface{}, error) error
|
||||
ServerErrorFunc func(server.Request, interface{}, error) error
|
||||
)
|
||||
|
||||
// Options struct holds wrapper options
|
||||
type Options struct {
|
||||
ClientErrorFn ClientErrorFunc
|
||||
ServerErrorFn ServerErrorFunc
|
||||
ClientValidateResponse bool
|
||||
ServerValidateResponse bool
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
type Option func(*Options)
|
||||
|
||||
func ClientValidateResponse(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientValidateResponse = b
|
||||
}
|
||||
}
|
||||
|
||||
func ServerValidateResponse(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientValidateResponse = b
|
||||
}
|
||||
}
|
||||
|
||||
func ClientReqErrorFn(fn ClientErrorFunc) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientErrorFn = fn
|
||||
}
|
||||
}
|
||||
|
||||
func ServerErrorFn(fn ServerErrorFunc) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerErrorFn = fn
|
||||
}
|
||||
}
|
||||
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
ClientErrorFn: DefaultClientErrorFunc,
|
||||
ServerErrorFn: DefaultServerErrorFunc,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
func NewHook(opts ...Option) *hook {
|
||||
return &hook{opts: NewOptions(opts...)}
|
||||
}
|
||||
|
||||
type validator interface {
|
||||
Validate() error
|
||||
}
|
||||
|
||||
type hook struct {
|
||||
opts Options
|
||||
}
|
||||
|
||||
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
|
||||
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
if v, ok := req.Body().(validator); ok {
|
||||
if err := v.Validate(); err != nil {
|
||||
return w.opts.ClientErrorFn(req, nil, err)
|
||||
}
|
||||
}
|
||||
err := next(ctx, req, rsp, opts...)
|
||||
if v, ok := rsp.(validator); ok && w.opts.ClientValidateResponse {
|
||||
if verr := v.Validate(); verr != nil {
|
||||
return w.opts.ClientErrorFn(req, rsp, verr)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
|
||||
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
if v, ok := req.Body().(validator); ok {
|
||||
if err := v.Validate(); err != nil {
|
||||
return nil, w.opts.ClientErrorFn(req, nil, err)
|
||||
}
|
||||
}
|
||||
return next(ctx, req, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
if v, ok := req.Body().(validator); ok {
|
||||
if err := v.Validate(); err != nil {
|
||||
return w.opts.ServerErrorFn(req, nil, err)
|
||||
}
|
||||
}
|
||||
err := next(ctx, req, rsp)
|
||||
if v, ok := rsp.(validator); ok && w.opts.ServerValidateResponse {
|
||||
if verr := v.Validate(); verr != nil {
|
||||
return w.opts.ServerErrorFn(req, rsp, verr)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
@@ -99,6 +99,7 @@ func WithAddFields(fields ...interface{}) Option {
|
||||
iv, iok := o.Fields[i].(string)
|
||||
jv, jok := fields[j].(string)
|
||||
if iok && jok && iv == jv {
|
||||
o.Fields[i+1] = fields[j+1]
|
||||
fields = slices.Delete(fields, j, j+2)
|
||||
}
|
||||
}
|
||||
|
@@ -124,7 +124,7 @@ func TestWithDedupKeysWithAddFields(t *testing.T) {
|
||||
|
||||
l.Info(ctx, "msg3")
|
||||
|
||||
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val1 key2=val2`)) {
|
||||
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val4 key2=val3`)) {
|
||||
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||
}
|
||||
}
|
||||
|
@@ -69,6 +69,15 @@ func (md Metadata) Copy() Metadata {
|
||||
return out
|
||||
}
|
||||
|
||||
// AsMap returns a copy of Metadata with map[string]string.
|
||||
func (md Metadata) AsMap() map[string]string {
|
||||
out := make(map[string]string, len(md))
|
||||
for k, v := range md {
|
||||
out[k] = strings.Join(v, ",")
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// AsHTTP1 returns a copy of Metadata
|
||||
// with CanonicalMIMEHeaderKey.
|
||||
func (md Metadata) AsHTTP1() map[string][]string {
|
||||
|
20
service.go
20
service.go
@@ -99,6 +99,7 @@ type service struct {
|
||||
done chan struct{}
|
||||
opts Options
|
||||
sync.RWMutex
|
||||
stopped bool
|
||||
}
|
||||
|
||||
// NewService creates and returns a new Service based on the packages within.
|
||||
@@ -424,7 +425,7 @@ func (s *service) Stop() error {
|
||||
}
|
||||
}
|
||||
|
||||
close(s.done)
|
||||
s.notifyShutdown()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -448,10 +449,23 @@ func (s *service) Run() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// wait on context cancel
|
||||
<-s.done
|
||||
|
||||
return s.Stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// notifyShutdown marks the service as stopped and closes the done channel.
|
||||
// It ensures the channel is closed only once, preventing multiple closures.
|
||||
func (s *service) notifyShutdown() {
|
||||
s.Lock()
|
||||
if s.stopped {
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
s.stopped = true
|
||||
s.Unlock()
|
||||
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
type Namer interface {
|
||||
|
@@ -3,7 +3,9 @@ package micro
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.unistack.org/micro/v4/broker"
|
||||
"go.unistack.org/micro/v4/client"
|
||||
"go.unistack.org/micro/v4/config"
|
||||
@@ -737,3 +739,41 @@ func Test_getNameIndex(t *testing.T) {
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func TestServiceShutdown(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatalf("service shutdown failed: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
s, ok := NewService().(*service)
|
||||
require.NotNil(t, s)
|
||||
require.True(t, ok)
|
||||
|
||||
require.NoError(t, s.Start())
|
||||
require.False(t, s.stopped)
|
||||
|
||||
require.NoError(t, s.Stop())
|
||||
require.True(t, s.stopped)
|
||||
}
|
||||
|
||||
func TestServiceMultipleShutdowns(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatalf("service shutdown failed: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
s := NewService()
|
||||
|
||||
go func() {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
// first call
|
||||
require.NoError(t, s.Stop())
|
||||
// duplicate call
|
||||
require.NoError(t, s.Stop())
|
||||
}()
|
||||
|
||||
require.NoError(t, s.Run())
|
||||
}
|
||||
|
@@ -89,6 +89,10 @@ func (s *Span) Tracer() tracer.Tracer {
|
||||
return s.tracer
|
||||
}
|
||||
|
||||
func (s *Span) IsRecording() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
name string
|
||||
labels []interface{}
|
||||
|
@@ -120,6 +120,10 @@ func (s *noopSpan) SetStatus(st SpanStatus, msg string) {
|
||||
s.statusMsg = msg
|
||||
}
|
||||
|
||||
func (s *noopSpan) IsRecording() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// NewTracer returns new memory tracer
|
||||
func NewTracer(opts ...Option) Tracer {
|
||||
return &noopTracer{
|
||||
|
@@ -78,4 +78,6 @@ type Span interface {
|
||||
TraceID() string
|
||||
// SpanID returns span id
|
||||
SpanID() string
|
||||
// IsRecording returns the recording state of the Span.
|
||||
IsRecording() bool
|
||||
}
|
||||
|
Reference in New Issue
Block a user