Compare commits

..

46 Commits

Author SHA1 Message Date
dee7bc9c38 fixed metadata hooks
Some checks failed
lint / lint (pull_request) Failing after 1m43s
test / test (pull_request) Successful in 2m37s
coverage / build (pull_request) Failing after 5m52s
2025-04-27 13:41:05 +03:00
053fe2a69d move hooks 2025-04-27 13:41:05 +03:00
a9ed8b16c1 skip on needed changes
All checks were successful
syncpull / pull (push) Successful in 10s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 13:40:33 +03:00
vtolstov
740cd5931d Apply Code Coverage Badge 2025-04-27 10:39:03 +00:00
85a78063d0 fix panic on shutdown caused by double channel close (#209)
All checks were successful
coverage / build (push) Successful in 2m35s
test / test (push) Successful in 3m43s
2025-04-27 13:37:18 +03:00
604ad9cd9d check sync action
All checks were successful
syncpull / pull (push) Successful in 18s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 13:36:11 +03:00
91137537a2 check sync action
All checks were successful
syncpull / pull (push) Successful in 11s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 13:23:33 +03:00
950e2352fd check sync action
Some checks failed
syncpull / pull (push) Failing after 10s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 13:17:27 +03:00
0bb29b29cf check sync action
Some checks failed
syncpull / pull (push) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 12:38:02 +03:00
17bcd0b0ab check sync action
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 12:37:17 +03:00
20f9f4da3b check sync action
Some checks failed
syncpull / pull (push) Failing after 8s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 12:34:28 +03:00
66fa04b8dc check sync action
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 12:33:09 +03:00
1ef3ad6531 check sync action
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 12:32:22 +03:00
c95a91349d check sync action
Some checks failed
syncpull / pull (push) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 12:30:52 +03:00
fdcf8e6ca4 check sync action
Some checks failed
syncpull / pull (push) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 12:25:27 +03:00
8cb2d9db4a check sync action
Some checks failed
syncpull / pull (push) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:28:50 +03:00
04da4388ac check sync action
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:27:26 +03:00
79fb23e644 check sync action
Some checks failed
syncpull / pull (push) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:24:40 +03:00
f8fe923ab1 check sync action
Some checks failed
syncpull / pull (push) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:21:25 +03:00
105f56dbfe check sync action
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:20:21 +03:00
9fed5a368b check sync action
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:19:58 +03:00
7374d41cf8 check sync action
Some checks failed
syncpull / pull (push) Failing after 6s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:15:32 +03:00
a4a8935c1f check sync action
Some checks failed
syncpull / pull (push) Failing after 6s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:11:39 +03:00
5f498c8232 check sync action
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:10:06 +03:00
a00fdf679b check sync action
Some checks failed
syncpull / pull (push) Failing after 6s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 11:05:22 +03:00
dc9ebe4155 check sync action
Some checks failed
syncpull / pull (push) Failing after 7s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 10:57:51 +03:00
87ced484b7 check sync action
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 10:56:34 +03:00
af99b11a59 check sync action
Some checks failed
syncpull / pull (push) Failing after 7s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 10:52:20 +03:00
2724b51f7c check sync action
Some checks failed
syncpull / pull (push) Failing after 4s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 10:41:22 +03:00
5b5d0e02b9 check sync action
Some checks failed
syncpull / pull (push) Failing after 7s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 10:38:36 +03:00
afc2de6819 check sync action
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 10:37:01 +03:00
32a8ab9c05 check sync action
Some checks failed
syncpull / pull (push) Failing after 5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 10:34:25 +03:00
vtolstov
7e5401bded Apply Code Coverage Badge 2025-04-27 06:30:32 +00:00
64b91cea06 check sync action
All checks were successful
coverage / build (push) Successful in 1m18s
test / test (push) Successful in 2m5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 09:29:33 +03:00
vtolstov
0f59fdcbde Apply Code Coverage Badge 2025-04-27 06:29:19 +00:00
50979e6708 check sync action
Some checks failed
coverage / build (push) Has started running
test / test (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 09:28:17 +03:00
46f3108870 check sync action
Some checks failed
coverage / build (push) Has been cancelled
test / test (push) Has started running
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 09:27:35 +03:00
vtolstov
5fed91a65f Apply Code Coverage Badge 2025-04-27 06:25:16 +00:00
1c5bba908d check sync action
All checks were successful
coverage / build (push) Successful in 1m14s
test / test (push) Successful in 2m3s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 09:24:41 +03:00
vtolstov
bc8ebdcad5 Apply Code Coverage Badge 2025-04-24 11:55:11 +00:00
fc24f3af92 metadata: add AsMap func
All checks were successful
coverage / build (push) Successful in 1m18s
test / test (push) Successful in 2m3s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-24 14:54:37 +03:00
1058177d1c Delete SECURITY.md 2025-04-22 15:54:19 +03:00
vtolstov
fa53fac085 Apply Code Coverage Badge 2025-04-13 21:03:53 +00:00
8c060df5e3 tracer: add IsRecording to span interface
All checks were successful
coverage / build (push) Successful in 2m0s
test / test (push) Successful in 4m29s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-14 00:02:45 +03:00
e1f8c62685 broker: add SetPublishOption
All checks were successful
coverage / build (push) Successful in 1m33s
test / test (push) Successful in 2m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-07 18:21:57 +03:00
562b1ab9b7 broker: simplify handler check
All checks were successful
coverage / build (push) Successful in 1m19s
test / test (push) Successful in 2m5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-03-07 15:26:20 +03:00
22 changed files with 630 additions and 120 deletions

View File

@@ -3,6 +3,9 @@ name: coverage
on:
push:
branches: [ main, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
pull_request:
branches: [ main, v3, v4 ]
# Allows you to run this workflow manually from the Actions tab

View File

@@ -3,10 +3,10 @@ name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
lint:

View File

@@ -0,0 +1,54 @@
name: syncpull
on:
schedule:
- cron: '* * * * *'
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
pull:
runs-on: ubuntu-latest
steps:
- name: init
run: |
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
git config --global user.name "github-actions[bot]"
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" | tee -a /root/.netrc
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" | tee -a /root/.netrc
- name: track master
run: |
git clone --depth=10 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream master
git push upstream master --progress
git merge --allow-unrelated-histories "upstream/master"
git push origin master --progress
cd ../
rm -rf repo
- name: track v3
run: |
git clone --depth=10 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v3
git push upstream v3
git merge --allow-unrelated-histories "upstream/v3"
git push origin v3 --progress
cd ../
rm -rf repo
- name: track v4
run: |
git clone --depth=10 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v4
git push upstream v4
git merge --allow-unrelated-histories "upstream/v4"
git push origin v4 --progress
cd ../
rm -rf repo

View File

@@ -3,15 +3,12 @@ name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
push:
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
test:

View File

@@ -3,15 +3,12 @@ name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
push:
branches:
- master
- v3
- v4
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
test:

View File

@@ -1,5 +1,5 @@
# Micro
![Coverage](https://img.shields.io/badge/Coverage-44.2%25-yellow)
![Coverage](https://img.shields.io/badge/Coverage-46.4%25-yellow)
[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v4)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)

View File

@@ -1,15 +0,0 @@
# Security Policy
## Supported Versions
Use this section to tell people about which versions of your project are
currently being supported with security updates.
| Version | Supported |
| ------- | ------------------ |
| 3.7.x | :white_check_mark: |
| < 3.7.0 | :x: |
## Reporting a Vulnerability
If you find any issue, please create github issue in this repo

View File

@@ -21,7 +21,7 @@ var (
// ErrInvalidMessage returns when invalid Message passed
ErrInvalidMessage = errors.New("invalid message")
// ErrInvalidHandler returns when subscriber passed to Subscribe
ErrInvalidHandler = errors.New("invalid handler")
ErrInvalidHandler = errors.New("invalid handler, ony func(Message) error and func([]Message) error supported")
// DefaultGracefulTimeout
DefaultGracefulTimeout = 5 * time.Second
)

View File

@@ -42,6 +42,16 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption {
}
}
// SetPublishOption returns a function to setup a context with given value
func SetPublishOption(k, v interface{}) PublishOption {
return func(o *PublishOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
// SetOption returns a function to setup a context with given value
func SetOption(k, v interface{}) Option {
return func(o *Options) {

View File

@@ -79,11 +79,15 @@ type PublishOptions struct {
// BodyOnly flag says the message contains raw body bytes and don't need
// codec Marshal method
BodyOnly bool
// Context holds custom options
Context context.Context
}
// NewPublishOptions creates PublishOptions struct
func NewPublishOptions(opts ...PublishOption) PublishOptions {
options := PublishOptions{}
options := PublishOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}

View File

@@ -1,87 +1,14 @@
package broker
import (
"fmt"
"reflect"
"unicode"
"unicode/utf8"
)
const (
messageSig = "func(broker.Message) error"
messagesSig = "func([]broker.Message) error"
)
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// Is this an exported - upper case - name?
func isExported(name string) bool {
r, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(r)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// IsValidHandler func signature
func IsValidHandler(sub interface{}) error {
typ := reflect.TypeOf(sub)
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 1:
argType = typ.In(0)
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), messageSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
name, typ.NumOut(), messageSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
switch sub.(type) {
default:
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), messageSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s",
name, method.Name, method.Type.NumOut(), messageSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
return ErrInvalidHandler
case func(Message) error:
break
case func([]Message) error:
break
}
return nil
}

117
hooks/metadata/metadata.go Normal file
View File

@@ -0,0 +1,117 @@
package metadata
import (
"context"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/server"
)
type wrapper struct {
keys []string
client.Client
}
func NewClientWrapper(keys ...string) client.Wrapper {
return func(c client.Client) client.Client {
handler := &wrapper{
Client: c,
keys: keys,
}
return handler
}
}
func NewClientCallWrapper(keys ...string) client.CallWrapper {
return func(fn client.CallFunc) client.CallFunc {
return func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
if keys == nil {
return fn(ctx, addr, req, rsp, opts)
}
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(len(imd))
}
for _, k := range keys {
if v, ok := imd.Get(k); ok {
omd.Add(k, v...)
}
}
if !ook {
ctx = metadata.NewOutgoingContext(ctx, omd)
}
}
return fn(ctx, addr, req, rsp, opts)
}
}
}
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
if w.keys == nil {
return w.Client.Call(ctx, req, rsp, opts...)
}
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(len(imd))
}
for _, k := range w.keys {
if v, ok := imd.Get(k); ok {
omd.Add(k, v...)
}
}
if !ook {
ctx = metadata.NewOutgoingContext(ctx, omd)
}
}
return w.Client.Call(ctx, req, rsp, opts...)
}
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
if w.keys == nil {
return w.Client.Stream(ctx, req, opts...)
}
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(len(imd))
}
for _, k := range w.keys {
if v, ok := imd.Get(k); ok {
omd.Add(k, v...)
}
}
if !ook {
ctx = metadata.NewOutgoingContext(ctx, omd)
}
}
return w.Client.Stream(ctx, req, opts...)
}
func NewServerHandlerWrapper(keys ...string) server.HandlerWrapper {
return func(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
if keys == nil {
return fn(ctx, req, rsp)
}
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(len(imd))
}
for _, k := range keys {
if v, ok := imd.Get(k); ok {
omd.Add(k, v...)
}
}
if !ook {
ctx = metadata.NewOutgoingContext(ctx, omd)
}
}
return fn(ctx, req, rsp)
}
}
}

View File

@@ -0,0 +1,63 @@
package recovery
import (
"context"
"fmt"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/server"
)
func NewOptions(opts ...Option) Options {
options := Options{
ServerHandlerFn: DefaultServerHandlerFn,
}
for _, o := range opts {
o(&options)
}
return options
}
type Options struct {
ServerHandlerFn func(context.Context, server.Request, interface{}, error) error
}
type Option func(*Options)
func ServerHandlerFunc(fn func(context.Context, server.Request, interface{}, error) error) Option {
return func(o *Options) {
o.ServerHandlerFn = fn
}
}
var DefaultServerHandlerFn = func(ctx context.Context, req server.Request, rsp interface{}, err error) error {
return errors.BadRequest("", "%v", err)
}
var Hook = NewHook()
type hook struct {
opts Options
}
func NewHook(opts ...Option) *hook {
return &hook{opts: NewOptions(opts...)}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
defer func() {
r := recover()
switch verr := r.(type) {
case nil:
return
case error:
err = w.opts.ServerHandlerFn(ctx, req, rsp, verr)
default:
err = w.opts.ServerHandlerFn(ctx, req, rsp, fmt.Errorf("%v", r))
}
}()
err = next(ctx, req, rsp)
return err
}
}

View File

@@ -0,0 +1,114 @@
package requestid
import (
"context"
"net/textproto"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v4/util/id"
)
type XRequestIDKey struct{}
// DefaultMetadataKey contains metadata key
var DefaultMetadataKey = textproto.CanonicalMIMEHeaderKey("x-request-id")
// DefaultMetadataFunc wil be used if user not provide own func to fill metadata
var DefaultMetadataFunc = func(ctx context.Context) (context.Context, error) {
var xid string
cid, cok := ctx.Value(XRequestIDKey{}).(string)
if cok && cid != "" {
xid = cid
}
imd, iok := metadata.FromIncomingContext(ctx)
if !iok || imd == nil {
imd = metadata.New(1)
ctx = metadata.NewIncomingContext(ctx, imd)
}
omd, ook := metadata.FromOutgoingContext(ctx)
if !ook || omd == nil {
omd = metadata.New(1)
ctx = metadata.NewOutgoingContext(ctx, omd)
}
if xid == "" {
var ids []string
if ids, iok = imd.Get(DefaultMetadataKey); iok {
for i := range ids {
if ids[i] != "" {
xid = ids[i]
}
}
}
if ids, ook = omd.Get(DefaultMetadataKey); ook {
for i := range ids {
if ids[i] != "" {
xid = ids[i]
}
}
}
}
if xid == "" {
var err error
xid, err = id.New()
if err != nil {
return ctx, err
}
}
if !cok {
ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
}
if !iok {
imd.Set(DefaultMetadataKey, xid)
}
if !ook {
omd.Set(DefaultMetadataKey, xid)
}
return ctx, nil
}
type hook struct{}
func NewHook() *hook {
return &hook{}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return err
}
return next(ctx, req, rsp)
}
}
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return err
}
return next(ctx, req, rsp, opts...)
}
}
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
var err error
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
return nil, err
}
return next(ctx, req, opts...)
}
}

View File

@@ -0,0 +1,33 @@
package requestid
import (
"context"
"testing"
"go.unistack.org/micro/v4/metadata"
)
func TestDefaultMetadataFunc(t *testing.T) {
ctx := context.TODO()
nctx, err := DefaultMetadataFunc(ctx)
if err != nil {
t.Fatalf("%v", err)
}
imd, ok := metadata.FromIncomingContext(nctx)
if !ok {
t.Fatalf("md missing in incoming context")
}
omd, ok := metadata.FromOutgoingContext(nctx)
if !ok {
t.Fatalf("md missing in outgoing context")
}
_, iok := imd.Get(DefaultMetadataKey)
_, ook := omd.Get(DefaultMetadataKey)
if !iok || !ook {
t.Fatalf("missing metadata key value")
}
}

View File

@@ -0,0 +1,133 @@
package validator
import (
"context"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/server"
)
var (
DefaultClientErrorFunc = func(req client.Request, rsp interface{}, err error) error {
if rsp != nil {
return errors.BadGateway(req.Service(), "%v", err)
}
return errors.BadRequest(req.Service(), "%v", err)
}
DefaultServerErrorFunc = func(req server.Request, rsp interface{}, err error) error {
if rsp != nil {
return errors.BadGateway(req.Service(), "%v", err)
}
return errors.BadRequest(req.Service(), "%v", err)
}
)
type (
ClientErrorFunc func(client.Request, interface{}, error) error
ServerErrorFunc func(server.Request, interface{}, error) error
)
// Options struct holds wrapper options
type Options struct {
ClientErrorFn ClientErrorFunc
ServerErrorFn ServerErrorFunc
ClientValidateResponse bool
ServerValidateResponse bool
}
// Option func signature
type Option func(*Options)
func ClientValidateResponse(b bool) Option {
return func(o *Options) {
o.ClientValidateResponse = b
}
}
func ServerValidateResponse(b bool) Option {
return func(o *Options) {
o.ClientValidateResponse = b
}
}
func ClientReqErrorFn(fn ClientErrorFunc) Option {
return func(o *Options) {
o.ClientErrorFn = fn
}
}
func ServerErrorFn(fn ServerErrorFunc) Option {
return func(o *Options) {
o.ServerErrorFn = fn
}
}
func NewOptions(opts ...Option) Options {
options := Options{
ClientErrorFn: DefaultClientErrorFunc,
ServerErrorFn: DefaultServerErrorFunc,
}
for _, o := range opts {
o(&options)
}
return options
}
func NewHook(opts ...Option) *hook {
return &hook{opts: NewOptions(opts...)}
}
type validator interface {
Validate() error
}
type hook struct {
opts Options
}
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
if v, ok := req.Body().(validator); ok {
if err := v.Validate(); err != nil {
return w.opts.ClientErrorFn(req, nil, err)
}
}
err := next(ctx, req, rsp, opts...)
if v, ok := rsp.(validator); ok && w.opts.ClientValidateResponse {
if verr := v.Validate(); verr != nil {
return w.opts.ClientErrorFn(req, rsp, verr)
}
}
return err
}
}
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
if v, ok := req.Body().(validator); ok {
if err := v.Validate(); err != nil {
return nil, w.opts.ClientErrorFn(req, nil, err)
}
}
return next(ctx, req, opts...)
}
}
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
if v, ok := req.Body().(validator); ok {
if err := v.Validate(); err != nil {
return w.opts.ServerErrorFn(req, nil, err)
}
}
err := next(ctx, req, rsp)
if v, ok := rsp.(validator); ok && w.opts.ServerValidateResponse {
if verr := v.Validate(); verr != nil {
return w.opts.ServerErrorFn(req, rsp, verr)
}
}
return err
}
}

View File

@@ -69,6 +69,15 @@ func (md Metadata) Copy() Metadata {
return out
}
// AsMap returns a copy of Metadata with map[string]string.
func (md Metadata) AsMap() map[string]string {
out := make(map[string]string, len(md))
for k, v := range md {
out[k] = strings.Join(v, ",")
}
return out
}
// AsHTTP1 returns a copy of Metadata
// with CanonicalMIMEHeaderKey.
func (md Metadata) AsHTTP1() map[string][]string {

View File

@@ -99,6 +99,7 @@ type service struct {
done chan struct{}
opts Options
sync.RWMutex
stopped bool
}
// NewService creates and returns a new Service based on the packages within.
@@ -424,7 +425,7 @@ func (s *service) Stop() error {
}
}
close(s.done)
s.notifyShutdown()
return nil
}
@@ -448,10 +449,23 @@ func (s *service) Run() error {
return err
}
// wait on context cancel
<-s.done
return s.Stop()
return nil
}
// notifyShutdown marks the service as stopped and closes the done channel.
// It ensures the channel is closed only once, preventing multiple closures.
func (s *service) notifyShutdown() {
s.Lock()
if s.stopped {
s.Unlock()
return
}
s.stopped = true
s.Unlock()
close(s.done)
}
type Namer interface {

View File

@@ -3,7 +3,9 @@ package micro
import (
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/config"
@@ -737,3 +739,41 @@ func Test_getNameIndex(t *testing.T) {
}
}
*/
func TestServiceShutdown(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("service shutdown failed: %v", r)
}
}()
s, ok := NewService().(*service)
require.NotNil(t, s)
require.True(t, ok)
require.NoError(t, s.Start())
require.False(t, s.stopped)
require.NoError(t, s.Stop())
require.True(t, s.stopped)
}
func TestServiceMultipleShutdowns(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("service shutdown failed: %v", r)
}
}()
s := NewService()
go func() {
time.Sleep(10 * time.Millisecond)
// first call
require.NoError(t, s.Stop())
// duplicate call
require.NoError(t, s.Stop())
}()
require.NoError(t, s.Run())
}

View File

@@ -89,6 +89,10 @@ func (s *Span) Tracer() tracer.Tracer {
return s.tracer
}
func (s *Span) IsRecording() bool {
return true
}
type Event struct {
name string
labels []interface{}

View File

@@ -120,6 +120,10 @@ func (s *noopSpan) SetStatus(st SpanStatus, msg string) {
s.statusMsg = msg
}
func (s *noopSpan) IsRecording() bool {
return false
}
// NewTracer returns new memory tracer
func NewTracer(opts ...Option) Tracer {
return &noopTracer{

View File

@@ -78,4 +78,6 @@ type Span interface {
TraceID() string
// SpanID returns span id
SpanID() string
// IsRecording returns the recording state of the Span.
IsRecording() bool
}