Compare commits
10 Commits
v4.1.1
...
b6d2d459c5
| Author | SHA1 | Date | |
|---|---|---|---|
| b6d2d459c5 | |||
| c2d5dd4ffd | |||
|
|
fa53fac085 | ||
| 8c060df5e3 | |||
| e1f8c62685 | |||
| 562b1ab9b7 | |||
|
|
f3c877a37b | ||
| 0999b2ad78 | |||
| a365513177 | |||
|
|
d1e3f3cab2 |
@@ -1,5 +1,5 @@
|
|||||||
# Micro
|
# Micro
|
||||||

|

|
||||||
[](https://opensource.org/licenses/Apache-2.0)
|
[](https://opensource.org/licenses/Apache-2.0)
|
||||||
[](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
|
[](https://pkg.go.dev/go.unistack.org/micro/v4?tab=overview)
|
||||||
[](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)
|
[](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av4+event%3Apush)
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ var (
|
|||||||
// ErrInvalidMessage returns when invalid Message passed
|
// ErrInvalidMessage returns when invalid Message passed
|
||||||
ErrInvalidMessage = errors.New("invalid message")
|
ErrInvalidMessage = errors.New("invalid message")
|
||||||
// ErrInvalidHandler returns when subscriber passed to Subscribe
|
// ErrInvalidHandler returns when subscriber passed to Subscribe
|
||||||
ErrInvalidHandler = errors.New("invalid handler")
|
ErrInvalidHandler = errors.New("invalid handler, ony func(Message) error and func([]Message) error supported")
|
||||||
// DefaultGracefulTimeout
|
// DefaultGracefulTimeout
|
||||||
DefaultGracefulTimeout = 5 * time.Second
|
DefaultGracefulTimeout = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -42,6 +42,16 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetPublishOption returns a function to setup a context with given value
|
||||||
|
func SetPublishOption(k, v interface{}) PublishOption {
|
||||||
|
return func(o *PublishOptions) {
|
||||||
|
if o.Context == nil {
|
||||||
|
o.Context = context.Background()
|
||||||
|
}
|
||||||
|
o.Context = context.WithValue(o.Context, k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SetOption returns a function to setup a context with given value
|
// SetOption returns a function to setup a context with given value
|
||||||
func SetOption(k, v interface{}) Option {
|
func SetOption(k, v interface{}) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
|||||||
@@ -79,11 +79,15 @@ type PublishOptions struct {
|
|||||||
// BodyOnly flag says the message contains raw body bytes and don't need
|
// BodyOnly flag says the message contains raw body bytes and don't need
|
||||||
// codec Marshal method
|
// codec Marshal method
|
||||||
BodyOnly bool
|
BodyOnly bool
|
||||||
|
// Context holds custom options
|
||||||
|
Context context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPublishOptions creates PublishOptions struct
|
// NewPublishOptions creates PublishOptions struct
|
||||||
func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
||||||
options := PublishOptions{}
|
options := PublishOptions{
|
||||||
|
Context: context.Background(),
|
||||||
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,87 +1,14 @@
|
|||||||
package broker
|
package broker
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"unicode"
|
|
||||||
"unicode/utf8"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
messageSig = "func(broker.Message) error"
|
|
||||||
messagesSig = "func([]broker.Message) error"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Precompute the reflect type for error. Can't use error directly
|
|
||||||
// because Typeof takes an empty interface value. This is annoying.
|
|
||||||
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
|
||||||
|
|
||||||
// Is this an exported - upper case - name?
|
|
||||||
func isExported(name string) bool {
|
|
||||||
r, _ := utf8.DecodeRuneInString(name)
|
|
||||||
return unicode.IsUpper(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is this type exported or a builtin?
|
|
||||||
func isExportedOrBuiltinType(t reflect.Type) bool {
|
|
||||||
for t.Kind() == reflect.Ptr {
|
|
||||||
t = t.Elem()
|
|
||||||
}
|
|
||||||
// PkgPath will be non-empty even for an exported type,
|
|
||||||
// so we need to check the type name as well.
|
|
||||||
return isExported(t.Name()) || t.PkgPath() == ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsValidHandler func signature
|
// IsValidHandler func signature
|
||||||
func IsValidHandler(sub interface{}) error {
|
func IsValidHandler(sub interface{}) error {
|
||||||
typ := reflect.TypeOf(sub)
|
switch sub.(type) {
|
||||||
var argType reflect.Type
|
|
||||||
switch typ.Kind() {
|
|
||||||
case reflect.Func:
|
|
||||||
name := "Func"
|
|
||||||
switch typ.NumIn() {
|
|
||||||
case 1:
|
|
||||||
argType = typ.In(0)
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), messageSig)
|
|
||||||
}
|
|
||||||
if !isExportedOrBuiltinType(argType) {
|
|
||||||
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
|
|
||||||
}
|
|
||||||
if typ.NumOut() != 1 {
|
|
||||||
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
|
|
||||||
name, typ.NumOut(), messageSig)
|
|
||||||
}
|
|
||||||
if returnType := typ.Out(0); returnType != typeOfError {
|
|
||||||
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
hdlr := reflect.ValueOf(sub)
|
return ErrInvalidHandler
|
||||||
name := reflect.Indirect(hdlr).Type().Name()
|
case func(Message) error:
|
||||||
|
break
|
||||||
for m := 0; m < typ.NumMethod(); m++ {
|
case func([]Message) error:
|
||||||
method := typ.Method(m)
|
break
|
||||||
switch method.Type.NumIn() {
|
|
||||||
case 3:
|
|
||||||
argType = method.Type.In(2)
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
|
|
||||||
name, method.Name, method.Type.NumIn(), messageSig)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !isExportedOrBuiltinType(argType) {
|
|
||||||
return fmt.Errorf("%v argument type not exported: %v", name, argType)
|
|
||||||
}
|
|
||||||
if method.Type.NumOut() != 1 {
|
|
||||||
return fmt.Errorf(
|
|
||||||
"subscriber %v.%v has wrong number of return values: %v require signature %s",
|
|
||||||
name, method.Name, method.Type.NumOut(), messageSig)
|
|
||||||
}
|
|
||||||
if returnType := method.Type.Out(0); returnType != typeOfError {
|
|
||||||
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
117
hooks/metadata/metadata.go
Normal file
117
hooks/metadata/metadata.go
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
package metadata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/client"
|
||||||
|
"go.unistack.org/micro/v4/metadata"
|
||||||
|
"go.unistack.org/micro/v4/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
type wrapper struct {
|
||||||
|
keys []string
|
||||||
|
|
||||||
|
client.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClientWrapper(keys ...string) client.Wrapper {
|
||||||
|
return func(c client.Client) client.Client {
|
||||||
|
handler := &wrapper{
|
||||||
|
Client: c,
|
||||||
|
keys: keys,
|
||||||
|
}
|
||||||
|
return handler
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClientCallWrapper(keys ...string) client.CallWrapper {
|
||||||
|
return func(fn client.CallFunc) client.CallFunc {
|
||||||
|
return func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||||
|
if keys == nil {
|
||||||
|
return fn(ctx, addr, req, rsp, opts)
|
||||||
|
}
|
||||||
|
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(len(imd))
|
||||||
|
}
|
||||||
|
for _, k := range keys {
|
||||||
|
if v, ok := imd.Get(k); ok {
|
||||||
|
omd.Add(k, v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ook {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fn(ctx, addr, req, rsp, opts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
|
if w.keys == nil {
|
||||||
|
return w.Client.Call(ctx, req, rsp, opts...)
|
||||||
|
}
|
||||||
|
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(len(imd))
|
||||||
|
}
|
||||||
|
for _, k := range w.keys {
|
||||||
|
if v, ok := imd.Get(k); ok {
|
||||||
|
omd.Add(k, v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ook {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return w.Client.Call(ctx, req, rsp, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
|
if w.keys == nil {
|
||||||
|
return w.Client.Stream(ctx, req, opts...)
|
||||||
|
}
|
||||||
|
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(len(imd))
|
||||||
|
}
|
||||||
|
for _, k := range w.keys {
|
||||||
|
if v, ok := imd.Get(k); ok {
|
||||||
|
omd.Add(k, v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ook {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return w.Client.Stream(ctx, req, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServerHandlerWrapper(keys ...string) server.HandlerWrapper {
|
||||||
|
return func(fn server.HandlerFunc) server.HandlerFunc {
|
||||||
|
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||||
|
if keys == nil {
|
||||||
|
return fn(ctx, req, rsp)
|
||||||
|
}
|
||||||
|
if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(len(imd))
|
||||||
|
}
|
||||||
|
for _, k := range keys {
|
||||||
|
if v, ok := imd.Get(k); ok {
|
||||||
|
omd.Add(k, v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ook {
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fn(ctx, req, rsp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
63
hooks/recovery/recovery.go
Normal file
63
hooks/recovery/recovery.go
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
package recovery
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/errors"
|
||||||
|
"go.unistack.org/micro/v4/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewOptions(opts ...Option) Options {
|
||||||
|
options := Options{
|
||||||
|
ServerHandlerFn: DefaultServerHandlerFn,
|
||||||
|
}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
return options
|
||||||
|
}
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
ServerHandlerFn func(context.Context, server.Request, interface{}, error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
|
func ServerHandlerFunc(fn func(context.Context, server.Request, interface{}, error) error) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ServerHandlerFn = fn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefaultServerHandlerFn = func(ctx context.Context, req server.Request, rsp interface{}, err error) error {
|
||||||
|
return errors.BadRequest("", "%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var Hook = NewHook()
|
||||||
|
|
||||||
|
type hook struct {
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHook(opts ...Option) *hook {
|
||||||
|
return &hook{opts: NewOptions(opts...)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||||
|
return func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
|
||||||
|
defer func() {
|
||||||
|
r := recover()
|
||||||
|
switch verr := r.(type) {
|
||||||
|
case nil:
|
||||||
|
return
|
||||||
|
case error:
|
||||||
|
err = w.opts.ServerHandlerFn(ctx, req, rsp, verr)
|
||||||
|
default:
|
||||||
|
err = w.opts.ServerHandlerFn(ctx, req, rsp, fmt.Errorf("%v", r))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
err = next(ctx, req, rsp)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
114
hooks/requestid/requestid.go
Normal file
114
hooks/requestid/requestid.go
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
package requestid
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/textproto"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/client"
|
||||||
|
"go.unistack.org/micro/v4/metadata"
|
||||||
|
"go.unistack.org/micro/v4/server"
|
||||||
|
"go.unistack.org/micro/v4/util/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type XRequestIDKey struct{}
|
||||||
|
|
||||||
|
// DefaultMetadataKey contains metadata key
|
||||||
|
var DefaultMetadataKey = textproto.CanonicalMIMEHeaderKey("x-request-id")
|
||||||
|
|
||||||
|
// DefaultMetadataFunc wil be used if user not provide own func to fill metadata
|
||||||
|
var DefaultMetadataFunc = func(ctx context.Context) (context.Context, error) {
|
||||||
|
var xid string
|
||||||
|
|
||||||
|
cid, cok := ctx.Value(XRequestIDKey{}).(string)
|
||||||
|
if cok && cid != "" {
|
||||||
|
xid = cid
|
||||||
|
}
|
||||||
|
|
||||||
|
imd, iok := metadata.FromIncomingContext(ctx)
|
||||||
|
if !iok || imd == nil {
|
||||||
|
imd = metadata.New(1)
|
||||||
|
ctx = metadata.NewIncomingContext(ctx, imd)
|
||||||
|
}
|
||||||
|
|
||||||
|
omd, ook := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ook || omd == nil {
|
||||||
|
omd = metadata.New(1)
|
||||||
|
ctx = metadata.NewOutgoingContext(ctx, omd)
|
||||||
|
}
|
||||||
|
|
||||||
|
if xid == "" {
|
||||||
|
var ids []string
|
||||||
|
if ids, iok = imd.Get(DefaultMetadataKey); iok {
|
||||||
|
for i := range ids {
|
||||||
|
if ids[i] != "" {
|
||||||
|
xid = ids[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ids, ook = omd.Get(DefaultMetadataKey); ook {
|
||||||
|
for i := range ids {
|
||||||
|
if ids[i] != "" {
|
||||||
|
xid = ids[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if xid == "" {
|
||||||
|
var err error
|
||||||
|
xid, err = id.New()
|
||||||
|
if err != nil {
|
||||||
|
return ctx, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !cok {
|
||||||
|
ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !iok {
|
||||||
|
imd.Set(DefaultMetadataKey, xid)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ook {
|
||||||
|
omd.Set(DefaultMetadataKey, xid)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type hook struct{}
|
||||||
|
|
||||||
|
func NewHook() *hook {
|
||||||
|
return &hook{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||||
|
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||||
|
var err error
|
||||||
|
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return next(ctx, req, rsp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
|
||||||
|
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
|
var err error
|
||||||
|
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return next(ctx, req, rsp, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
|
||||||
|
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
|
var err error
|
||||||
|
if ctx, err = DefaultMetadataFunc(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return next(ctx, req, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
33
hooks/requestid/requestid_test.go
Normal file
33
hooks/requestid/requestid_test.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package requestid
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDefaultMetadataFunc(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
nctx, err := DefaultMetadataFunc(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
imd, ok := metadata.FromIncomingContext(nctx)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("md missing in incoming context")
|
||||||
|
}
|
||||||
|
omd, ok := metadata.FromOutgoingContext(nctx)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("md missing in outgoing context")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, iok := imd.Get(DefaultMetadataKey)
|
||||||
|
_, ook := omd.Get(DefaultMetadataKey)
|
||||||
|
|
||||||
|
if !iok || !ook {
|
||||||
|
t.Fatalf("missing metadata key value")
|
||||||
|
}
|
||||||
|
}
|
||||||
133
hooks/validator/validator.go
Normal file
133
hooks/validator/validator.go
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
package validator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/client"
|
||||||
|
"go.unistack.org/micro/v4/errors"
|
||||||
|
"go.unistack.org/micro/v4/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
DefaultClientErrorFunc = func(req client.Request, rsp interface{}, err error) error {
|
||||||
|
if rsp != nil {
|
||||||
|
return errors.BadGateway(req.Service(), "%v", err)
|
||||||
|
}
|
||||||
|
return errors.BadRequest(req.Service(), "%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultServerErrorFunc = func(req server.Request, rsp interface{}, err error) error {
|
||||||
|
if rsp != nil {
|
||||||
|
return errors.BadGateway(req.Service(), "%v", err)
|
||||||
|
}
|
||||||
|
return errors.BadRequest(req.Service(), "%v", err)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
ClientErrorFunc func(client.Request, interface{}, error) error
|
||||||
|
ServerErrorFunc func(server.Request, interface{}, error) error
|
||||||
|
)
|
||||||
|
|
||||||
|
// Options struct holds wrapper options
|
||||||
|
type Options struct {
|
||||||
|
ClientErrorFn ClientErrorFunc
|
||||||
|
ServerErrorFn ServerErrorFunc
|
||||||
|
ClientValidateResponse bool
|
||||||
|
ServerValidateResponse bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option func signature
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
|
func ClientValidateResponse(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ClientValidateResponse = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ServerValidateResponse(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ClientValidateResponse = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ClientReqErrorFn(fn ClientErrorFunc) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ClientErrorFn = fn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ServerErrorFn(fn ServerErrorFunc) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.ServerErrorFn = fn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOptions(opts ...Option) Options {
|
||||||
|
options := Options{
|
||||||
|
ClientErrorFn: DefaultClientErrorFunc,
|
||||||
|
ServerErrorFn: DefaultServerErrorFunc,
|
||||||
|
}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
return options
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHook(opts ...Option) *hook {
|
||||||
|
return &hook{opts: NewOptions(opts...)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type validator interface {
|
||||||
|
Validate() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type hook struct {
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
|
||||||
|
return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
|
if v, ok := req.Body().(validator); ok {
|
||||||
|
if err := v.Validate(); err != nil {
|
||||||
|
return w.opts.ClientErrorFn(req, nil, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := next(ctx, req, rsp, opts...)
|
||||||
|
if v, ok := rsp.(validator); ok && w.opts.ClientValidateResponse {
|
||||||
|
if verr := v.Validate(); verr != nil {
|
||||||
|
return w.opts.ClientErrorFn(req, rsp, verr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
|
||||||
|
return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
|
if v, ok := req.Body().(validator); ok {
|
||||||
|
if err := v.Validate(); err != nil {
|
||||||
|
return nil, w.opts.ClientErrorFn(req, nil, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return next(ctx, req, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
|
||||||
|
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||||
|
if v, ok := req.Body().(validator); ok {
|
||||||
|
if err := v.Validate(); err != nil {
|
||||||
|
return w.opts.ServerErrorFn(req, nil, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := next(ctx, req, rsp)
|
||||||
|
if v, ok := rsp.(validator); ok && w.opts.ServerValidateResponse {
|
||||||
|
if verr := v.Validate(); verr != nil {
|
||||||
|
return w.opts.ServerErrorFn(req, rsp, verr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -99,6 +99,7 @@ func WithAddFields(fields ...interface{}) Option {
|
|||||||
iv, iok := o.Fields[i].(string)
|
iv, iok := o.Fields[i].(string)
|
||||||
jv, jok := fields[j].(string)
|
jv, jok := fields[j].(string)
|
||||||
if iok && jok && iv == jv {
|
if iok && jok && iv == jv {
|
||||||
|
o.Fields[i+1] = fields[j+1]
|
||||||
fields = slices.Delete(fields, j, j+2)
|
fields = slices.Delete(fields, j, j+2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -124,7 +124,7 @@ func TestWithDedupKeysWithAddFields(t *testing.T) {
|
|||||||
|
|
||||||
l.Info(ctx, "msg3")
|
l.Info(ctx, "msg3")
|
||||||
|
|
||||||
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val1 key2=val2`)) {
|
if !bytes.Contains(buf.Bytes(), []byte(`msg=msg3 key1=val4 key2=val3`)) {
|
||||||
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -89,6 +89,10 @@ func (s *Span) Tracer() tracer.Tracer {
|
|||||||
return s.tracer
|
return s.tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Span) IsRecording() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
name string
|
name string
|
||||||
labels []interface{}
|
labels []interface{}
|
||||||
|
|||||||
@@ -120,6 +120,10 @@ func (s *noopSpan) SetStatus(st SpanStatus, msg string) {
|
|||||||
s.statusMsg = msg
|
s.statusMsg = msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *noopSpan) IsRecording() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// NewTracer returns new memory tracer
|
// NewTracer returns new memory tracer
|
||||||
func NewTracer(opts ...Option) Tracer {
|
func NewTracer(opts ...Option) Tracer {
|
||||||
return &noopTracer{
|
return &noopTracer{
|
||||||
|
|||||||
@@ -78,4 +78,6 @@ type Span interface {
|
|||||||
TraceID() string
|
TraceID() string
|
||||||
// SpanID returns span id
|
// SpanID returns span id
|
||||||
SpanID() string
|
SpanID() string
|
||||||
|
// IsRecording returns the recording state of the Span.
|
||||||
|
IsRecording() bool
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user