move hooks #397
							
								
								
									
										76
									
								
								hooks/metadata/metadata.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										76
									
								
								hooks/metadata/metadata.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,76 @@
 | 
			
		||||
package metadata
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
	"go.unistack.org/micro/v3/server"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var DefaultMetadataKeys = []string{"x-request-id"}
 | 
			
		||||
 | 
			
		||||
type hook struct {
 | 
			
		||||
	keys []string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewHook(keys ...string) *hook {
 | 
			
		||||
	return &hook{keys: keys}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func metadataCopy(ctx context.Context, keys []string) context.Context {
 | 
			
		||||
	if keys == nil {
 | 
			
		||||
		return ctx
 | 
			
		||||
	}
 | 
			
		||||
	if imd, iok := metadata.FromIncomingContext(ctx); iok && imd != nil {
 | 
			
		||||
		omd, ook := metadata.FromOutgoingContext(ctx)
 | 
			
		||||
		if !ook || omd == nil {
 | 
			
		||||
			omd = metadata.New(len(keys))
 | 
			
		||||
		}
 | 
			
		||||
		for _, k := range keys {
 | 
			
		||||
			if v, ok := imd.Get(k); ok && v != "" {
 | 
			
		||||
				omd.Set(k, v)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !ook {
 | 
			
		||||
			ctx = metadata.NewOutgoingContext(ctx, omd)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return ctx
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
 | 
			
		||||
	return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
 | 
			
		||||
		return next(metadataCopy(ctx, w.keys), req, rsp, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
 | 
			
		||||
	return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
 | 
			
		||||
		return next(metadataCopy(ctx, w.keys), req, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientPublish(next client.FuncPublish) client.FuncPublish {
 | 
			
		||||
	return func(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
		return next(metadataCopy(ctx, w.keys), msg, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientBatchPublish(next client.FuncBatchPublish) client.FuncBatchPublish {
 | 
			
		||||
	return func(ctx context.Context, msgs []client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
		return next(metadataCopy(ctx, w.keys), msgs, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
 | 
			
		||||
	return func(ctx context.Context, req server.Request, rsp interface{}) error {
 | 
			
		||||
		return next(metadataCopy(ctx, w.keys), req, rsp)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ServerSubscriber(next server.FuncSubHandler) server.FuncSubHandler {
 | 
			
		||||
	return func(ctx context.Context, msg server.Message) error {
 | 
			
		||||
		return next(metadataCopy(ctx, w.keys), msg)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										94
									
								
								hooks/recovery/recovery.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										94
									
								
								hooks/recovery/recovery.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,94 @@
 | 
			
		||||
package recovery
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/errors"
 | 
			
		||||
	"go.unistack.org/micro/v3/server"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewOptions(opts ...Option) Options {
 | 
			
		||||
	options := Options{
 | 
			
		||||
		ServerHandlerFn:    DefaultServerHandlerFn,
 | 
			
		||||
		ServerSubscriberFn: DefaultServerSubscriberFn,
 | 
			
		||||
	}
 | 
			
		||||
	for _, o := range opts {
 | 
			
		||||
		o(&options)
 | 
			
		||||
	}
 | 
			
		||||
	return options
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Options struct {
 | 
			
		||||
	ServerHandlerFn    func(context.Context, server.Request, interface{}, error) error
 | 
			
		||||
	ServerSubscriberFn func(context.Context, server.Message, error) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Option func(*Options)
 | 
			
		||||
 | 
			
		||||
func ServerHandlerFunc(fn func(context.Context, server.Request, interface{}, error) error) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.ServerHandlerFn = fn
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ServerSubscriberFunc(fn func(context.Context, server.Message, error) error) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.ServerSubscriberFn = fn
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	DefaultServerHandlerFn = func(ctx context.Context, req server.Request, rsp interface{}, err error) error {
 | 
			
		||||
		return errors.BadRequest("", "%v", err)
 | 
			
		||||
	}
 | 
			
		||||
	DefaultServerSubscriberFn = func(ctx context.Context, req server.Message, err error) error {
 | 
			
		||||
		return errors.BadRequest("", "%v", err)
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var Hook = NewHook()
 | 
			
		||||
 | 
			
		||||
type hook struct {
 | 
			
		||||
	opts Options
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewHook(opts ...Option) *hook {
 | 
			
		||||
	return &hook{opts: NewOptions(opts...)}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
 | 
			
		||||
	return func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
 | 
			
		||||
		defer func() {
 | 
			
		||||
			r := recover()
 | 
			
		||||
			switch verr := r.(type) {
 | 
			
		||||
			case nil:
 | 
			
		||||
				return
 | 
			
		||||
			case error:
 | 
			
		||||
				err = w.opts.ServerHandlerFn(ctx, req, rsp, verr)
 | 
			
		||||
			default:
 | 
			
		||||
				err = w.opts.ServerHandlerFn(ctx, req, rsp, fmt.Errorf("%v", r))
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
		err = next(ctx, req, rsp)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ServerSubscriber(next server.FuncSubHandler) server.FuncSubHandler {
 | 
			
		||||
	return func(ctx context.Context, msg server.Message) (err error) {
 | 
			
		||||
		defer func() {
 | 
			
		||||
			r := recover()
 | 
			
		||||
			switch verr := r.(type) {
 | 
			
		||||
			case nil:
 | 
			
		||||
				return
 | 
			
		||||
			case error:
 | 
			
		||||
				err = w.opts.ServerSubscriberFn(ctx, msg, verr)
 | 
			
		||||
			default:
 | 
			
		||||
				err = w.opts.ServerSubscriberFn(ctx, msg, fmt.Errorf("%v", r))
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
		err = next(ctx, msg)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										139
									
								
								hooks/requestid/requestid.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										139
									
								
								hooks/requestid/requestid.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,139 @@
 | 
			
		||||
package requestid
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"net/textproto"
 | 
			
		||||
	
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
	"go.unistack.org/micro/v3/server"
 | 
			
		||||
	"go.unistack.org/micro/v3/util/id"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type XRequestIDKey struct{}
 | 
			
		||||
 | 
			
		||||
// DefaultMetadataKey contains metadata key
 | 
			
		||||
var DefaultMetadataKey = textproto.CanonicalMIMEHeaderKey("x-request-id")
 | 
			
		||||
 | 
			
		||||
// DefaultMetadataFunc wil be used if user not provide own func to fill metadata
 | 
			
		||||
var DefaultMetadataFunc = func(ctx context.Context) (context.Context, error) {
 | 
			
		||||
	var xid string
 | 
			
		||||
 | 
			
		||||
	cid, cok := ctx.Value(XRequestIDKey{}).(string)
 | 
			
		||||
	if cok && cid != "" {
 | 
			
		||||
		xid = cid
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	imd, iok := metadata.FromIncomingContext(ctx)
 | 
			
		||||
	if !iok || imd == nil {
 | 
			
		||||
		imd = metadata.New(1)
 | 
			
		||||
		ctx = metadata.NewIncomingContext(ctx, imd)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	omd, ook := metadata.FromOutgoingContext(ctx)
 | 
			
		||||
	if !ook || omd == nil {
 | 
			
		||||
		omd = metadata.New(1)
 | 
			
		||||
		ctx = metadata.NewOutgoingContext(ctx, omd)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if xid == "" {
 | 
			
		||||
		var id string
 | 
			
		||||
		if id, iok = imd.Get(DefaultMetadataKey); iok && id != "" {
 | 
			
		||||
			xid = id
 | 
			
		||||
		}
 | 
			
		||||
		if id, ook = omd.Get(DefaultMetadataKey); ook && id != "" {
 | 
			
		||||
			xid = id
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if xid == "" {
 | 
			
		||||
		var err error
 | 
			
		||||
		xid, err = id.New()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return ctx, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !cok {
 | 
			
		||||
		ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !iok {
 | 
			
		||||
		imd.Set(DefaultMetadataKey, xid)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !ook {
 | 
			
		||||
		omd.Set(DefaultMetadataKey, xid)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return ctx, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type hook struct{}
 | 
			
		||||
 | 
			
		||||
func NewHook() *hook {
 | 
			
		||||
	return &hook{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ServerSubscriber(next server.FuncSubHandler) server.FuncSubHandler {
 | 
			
		||||
	return func(ctx context.Context, msg server.Message) error {
 | 
			
		||||
		var err error
 | 
			
		||||
		if xid, ok := msg.Header()[DefaultMetadataKey]; ok {
 | 
			
		||||
			ctx = context.WithValue(ctx, XRequestIDKey{}, xid)
 | 
			
		||||
		}
 | 
			
		||||
		if ctx, err = DefaultMetadataFunc(ctx); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, msg)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
 | 
			
		||||
	return func(ctx context.Context, req server.Request, rsp interface{}) error {
 | 
			
		||||
		var err error
 | 
			
		||||
		if ctx, err = DefaultMetadataFunc(ctx); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, req, rsp)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientBatchPublish(next client.FuncBatchPublish) client.FuncBatchPublish {
 | 
			
		||||
	return func(ctx context.Context, msgs []client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
		var err error
 | 
			
		||||
		if ctx, err = DefaultMetadataFunc(ctx); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, msgs, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientPublish(next client.FuncPublish) client.FuncPublish {
 | 
			
		||||
	return func(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
		var err error
 | 
			
		||||
		if ctx, err = DefaultMetadataFunc(ctx); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, msg, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
 | 
			
		||||
	return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
 | 
			
		||||
		var err error
 | 
			
		||||
		if ctx, err = DefaultMetadataFunc(ctx); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, req, rsp, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
 | 
			
		||||
	return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
 | 
			
		||||
		var err error
 | 
			
		||||
		if ctx, err = DefaultMetadataFunc(ctx); err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, req, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										33
									
								
								hooks/requestid/requestid_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								hooks/requestid/requestid_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,33 @@
 | 
			
		||||
package requestid
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestDefaultMetadataFunc(t *testing.T) {
 | 
			
		||||
	ctx := context.TODO()
 | 
			
		||||
 | 
			
		||||
	nctx, err := DefaultMetadataFunc(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("%v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	imd, ok := metadata.FromIncomingContext(nctx)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		t.Fatalf("md missing in incoming context")
 | 
			
		||||
	}
 | 
			
		||||
	omd, ok := metadata.FromOutgoingContext(nctx)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		t.Fatalf("md missing in outgoing context")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, iok := imd.Get(DefaultMetadataKey)
 | 
			
		||||
	_, ook := omd.Get(DefaultMetadataKey)
 | 
			
		||||
 | 
			
		||||
	if !iok || !ook {
 | 
			
		||||
		t.Fatalf("missing metadata key value")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										194
									
								
								hooks/validator/validator.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										194
									
								
								hooks/validator/validator.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,194 @@
 | 
			
		||||
package validator
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/errors"
 | 
			
		||||
	"go.unistack.org/micro/v3/server"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	DefaultClientErrorFunc = func(req client.Request, rsp interface{}, err error) error {
 | 
			
		||||
		if rsp != nil {
 | 
			
		||||
			return errors.BadGateway(req.Service(), "%v", err)
 | 
			
		||||
		}
 | 
			
		||||
		return errors.BadRequest(req.Service(), "%v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	DefaultServerErrorFunc = func(req server.Request, rsp interface{}, err error) error {
 | 
			
		||||
		if rsp != nil {
 | 
			
		||||
			return errors.BadGateway(req.Service(), "%v", err)
 | 
			
		||||
		}
 | 
			
		||||
		return errors.BadRequest(req.Service(), "%v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	DefaultPublishErrorFunc = func(msg client.Message, err error) error {
 | 
			
		||||
		return errors.BadRequest(msg.Topic(), "%v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	DefaultSubscribeErrorFunc = func(msg server.Message, err error) error {
 | 
			
		||||
		return errors.BadRequest(msg.Topic(), "%v", err)
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	ClientErrorFunc    func(client.Request, interface{}, error) error
 | 
			
		||||
	ServerErrorFunc    func(server.Request, interface{}, error) error
 | 
			
		||||
	PublishErrorFunc   func(client.Message, error) error
 | 
			
		||||
	SubscribeErrorFunc func(server.Message, error) error
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Options struct holds wrapper options
 | 
			
		||||
type Options struct {
 | 
			
		||||
	ClientErrorFn          ClientErrorFunc
 | 
			
		||||
	ServerErrorFn          ServerErrorFunc
 | 
			
		||||
	PublishErrorFn         PublishErrorFunc
 | 
			
		||||
	SubscribeErrorFn       SubscribeErrorFunc
 | 
			
		||||
	ClientValidateResponse bool
 | 
			
		||||
	ServerValidateResponse bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Option func signature
 | 
			
		||||
type Option func(*Options)
 | 
			
		||||
 | 
			
		||||
func ClientValidateResponse(b bool) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.ClientValidateResponse = b
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ServerValidateResponse(b bool) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.ClientValidateResponse = b
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ClientReqErrorFn(fn ClientErrorFunc) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.ClientErrorFn = fn
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ServerErrorFn(fn ServerErrorFunc) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.ServerErrorFn = fn
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func PublishErrorFn(fn PublishErrorFunc) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.PublishErrorFn = fn
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func SubscribeErrorFn(fn SubscribeErrorFunc) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.SubscribeErrorFn = fn
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewOptions(opts ...Option) Options {
 | 
			
		||||
	options := Options{
 | 
			
		||||
		ClientErrorFn:    DefaultClientErrorFunc,
 | 
			
		||||
		ServerErrorFn:    DefaultServerErrorFunc,
 | 
			
		||||
		PublishErrorFn:   DefaultPublishErrorFunc,
 | 
			
		||||
		SubscribeErrorFn: DefaultSubscribeErrorFunc,
 | 
			
		||||
	}
 | 
			
		||||
	for _, o := range opts {
 | 
			
		||||
		o(&options)
 | 
			
		||||
	}
 | 
			
		||||
	return options
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewHook(opts ...Option) *hook {
 | 
			
		||||
	return &hook{opts: NewOptions(opts...)}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type validator interface {
 | 
			
		||||
	Validate() error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type hook struct {
 | 
			
		||||
	opts Options
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientCall(next client.FuncCall) client.FuncCall {
 | 
			
		||||
	return func(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
 | 
			
		||||
		if v, ok := req.Body().(validator); ok {
 | 
			
		||||
			if err := v.Validate(); err != nil {
 | 
			
		||||
				return w.opts.ClientErrorFn(req, nil, err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		err := next(ctx, req, rsp, opts...)
 | 
			
		||||
		if v, ok := rsp.(validator); ok && w.opts.ClientValidateResponse {
 | 
			
		||||
			if verr := v.Validate(); verr != nil {
 | 
			
		||||
				return w.opts.ClientErrorFn(req, rsp, verr)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientStream(next client.FuncStream) client.FuncStream {
 | 
			
		||||
	return func(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
 | 
			
		||||
		if v, ok := req.Body().(validator); ok {
 | 
			
		||||
			if err := v.Validate(); err != nil {
 | 
			
		||||
				return nil, w.opts.ClientErrorFn(req, nil, err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, req, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientPublish(next client.FuncPublish) client.FuncPublish {
 | 
			
		||||
	return func(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
		if v, ok := msg.Payload().(validator); ok {
 | 
			
		||||
			if err := v.Validate(); err != nil {
 | 
			
		||||
				return w.opts.PublishErrorFn(msg, err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, msg, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ClientBatchPublish(next client.FuncBatchPublish) client.FuncBatchPublish {
 | 
			
		||||
	return func(ctx context.Context, msgs []client.Message, opts ...client.PublishOption) error {
 | 
			
		||||
		for _, msg := range msgs {
 | 
			
		||||
			if v, ok := msg.Payload().(validator); ok {
 | 
			
		||||
				if err := v.Validate(); err != nil {
 | 
			
		||||
					return w.opts.PublishErrorFn(msg, err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, msgs, opts...)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ServerHandler(next server.FuncHandler) server.FuncHandler {
 | 
			
		||||
	return func(ctx context.Context, req server.Request, rsp interface{}) error {
 | 
			
		||||
		if v, ok := req.Body().(validator); ok {
 | 
			
		||||
			if err := v.Validate(); err != nil {
 | 
			
		||||
				return w.opts.ServerErrorFn(req, nil, err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		err := next(ctx, req, rsp)
 | 
			
		||||
		if v, ok := rsp.(validator); ok && w.opts.ServerValidateResponse {
 | 
			
		||||
			if verr := v.Validate(); verr != nil {
 | 
			
		||||
				return w.opts.ServerErrorFn(req, rsp, verr)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *hook) ServerSubscriber(next server.FuncSubHandler) server.FuncSubHandler {
 | 
			
		||||
	return func(ctx context.Context, msg server.Message) error {
 | 
			
		||||
		if v, ok := msg.Body().(validator); ok {
 | 
			
		||||
			if err := v.Validate(); err != nil {
 | 
			
		||||
				return w.opts.SubscribeErrorFn(msg, err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return next(ctx, msg)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user