Compare commits

..

5 Commits

Author SHA1 Message Date
391813c260 util/reflect: add StructFieldNameByTag
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 14:34:41 +03:00
1a1459dd0e util/reflect: fix StructFieldByTag
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-04 13:16:31 +03:00
4e99680c30 server: add missing hook definitions
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-23 07:39:08 +03:00
92a3a547b8 Merge pull request 'server/noop: cleanup' (#342) from server-noop into v3
Reviewed-on: #342
2024-04-23 07:30:20 +03:00
849c462037 server/noop: cleanup
All checks were successful
pr / test (pull_request) Successful in 1m38s
lint / lint (pull_request) Successful in 10m35s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-23 07:28:58 +03:00
8 changed files with 93 additions and 224 deletions

View File

@@ -274,7 +274,7 @@ func (n *noopServer) Register() error {
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
config.Logger.Info(n.opts.Context, fmt.Sprintf("register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
}
}
@@ -312,7 +312,7 @@ func (n *noopServer) Deregister() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID)
config.Logger.Info(n.opts.Context, fmt.Sprintf("deregistering node: %s", service.Nodes[0].ID))
}
if err := DefaultDeregisterFunc(service, config); err != nil {
@@ -343,11 +343,11 @@ func (n *noopServer) Deregister() error {
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic())
config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(ncx); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err)
config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err)
}
}
}(subs[idx])
@@ -383,7 +383,7 @@ func (n *noopServer) Start() error {
config.Address = addr
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address)
config.Logger.Info(n.opts.Context, "server [noop] Listening on "+config.Address)
}
n.Lock()
@@ -397,13 +397,13 @@ func (n *noopServer) Start() error {
// connect to the broker
if err := config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
}
}
@@ -411,13 +411,13 @@ func (n *noopServer) Start() error {
// nolint: nestif
if err := config.RegisterCheck(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), err)
}
} else {
// announce self to the world
if err := n.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server register error: %v", err)
config.Logger.Error(n.opts.Context, "server register error", err)
}
}
}
@@ -450,23 +450,23 @@ func (n *noopServer) Start() error {
// nolint: nestif
if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
}
// deregister self in case of error
if err := n.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s deregister error", config.Name, config.ID), err)
}
}
} else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr)
config.Logger.Errorf(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), rerr)
}
continue
}
if err := n.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register error", config.Name, config.ID), err)
}
}
// wait for exit
@@ -478,7 +478,7 @@ func (n *noopServer) Start() error {
// deregister self
if err := n.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server deregister error: ", err)
config.Logger.Error(n.opts.Context, "server deregister error", err)
}
}
@@ -491,12 +491,12 @@ func (n *noopServer) Start() error {
ch <- nil
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()))
}
// disconnect broker
if err := config.Broker.Disconnect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err)
config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err)
}
}
}()
@@ -512,36 +512,33 @@ func (n *noopServer) Start() error {
func (n *noopServer) subscribe() error {
config := n.Options()
cx := config.Context
var err error
var sub broker.Subscriber
subCtx := config.Context
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts := []broker.SubscribeOption{
broker.SubscribeContext(subCtx),
broker.SubscribeAutoAck(sb.Options().AutoAck),
broker.SubscribeBodyOnly(sb.Options().BodyOnly),
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), n.createSubHandler(sb, config), opts...)
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
@@ -637,127 +634,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
}
}
//nolint:gocyclo
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) {
defer func() {
if r := recover(); r != nil {
n.RLock()
config := n.opts
n.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msgs := make([]Message, 0, len(ps))
ctxs := make([]context.Context, 0, len(ps))
for _, p := range ps {
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
}
ct, _ := msg.Header.Get(metadata.HeaderContentType)
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
hdr := metadata.Copy(msg.Header)
topic, _ := msg.Header.Get(metadata.HeaderTopic)
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
msgs = append(msgs, &rpcMessage{
topic: topic,
contentType: ct,
header: msg.Header,
body: msg.Body,
})
}
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var req reflect.Value
switch handler.reqType.Kind() {
case reflect.Ptr:
req = reflect.New(handler.reqType.Elem())
default:
req = reflect.New(handler.reqType.Elem()).Elem()
}
reqType := handler.reqType
var cf codec.Codec
for _, msg := range msgs {
cf, err = n.newCodec(msg.ContentType())
if err != nil {
return err
}
rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
return err
}
msg.(*rpcMessage).codec = cf
msg.(*rpcMessage).payload = rb.Interface()
}
fn := func(ctxs []context.Context, ms []Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctxs))
}
payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
}
vals = append(vals, payloads)
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(HookBatchSubHandler); ok {
fn = h(fn)
}
})
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
results <- fn(ctxs, msgs)
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
//nolint:gocyclo
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) {

View File

@@ -9,7 +9,6 @@ import (
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
)
@@ -26,18 +25,6 @@ func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) er
return nil
}
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
if len(msgs) != 8 {
h.t.Fatal("invalid number of messages received")
}
for idx := 0; idx < len(msgs); idx++ {
md, _ := metadata.FromIncomingContext(ctxs[idx])
_ = md
// fmt.Printf("msg md %v\n", md)
}
return nil
}
func TestNoopSub(t *testing.T) {
ctx := context.Background()
@@ -76,13 +63,6 @@ func TestNoopSub(t *testing.T) {
t.Fatal(err)
}
if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
server.SubscriberQueue("queue"),
server.SubscriberBatch(true),
)); err != nil {
t.Fatal(err)
}
if err := s.Start(); err != nil {
t.Fatal(err)
}

View File

@@ -341,8 +341,6 @@ type SubscriberOptions struct {
AutoAck bool
// BodyOnly flag specifies that message without headers
BodyOnly bool
// Batch flag specifies that message processed in batches
Batch bool
// BatchSize flag specifies max size of batch
BatchSize int
// BatchWait flag specifies max wait time for batch filling
@@ -414,13 +412,6 @@ func SubscriberAck(b bool) SubscriberOption {
}
}
// SubscriberBatch control batch processing for handler
func SubscriberBatch(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Batch = b
}
}
// SubscriberBatchSize control batch filling size for handler
// Batch filling max waiting time controlled by SubscriberBatchWait
func SubscriberBatchSize(n int) SubscriberOption {

View File

@@ -63,10 +63,10 @@ type Server interface {
}
type (
FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error
HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler
FuncSubHandler func(ctx context.Context, ms Message) error
HookSubHandler func(next FuncSubHandler) FuncSubHandler
FuncSubHandler func(ctx context.Context, ms Message) error
HookSubHandler func(next FuncSubHandler) FuncSubHandler
FuncHandler func(ctx context.Context, req Request, rsp interface{}) error
HookHandler func(next FuncHandler) FuncHandler
)
/*

View File

@@ -3,14 +3,12 @@ package server
import (
"fmt"
"reflect"
"strings"
"unicode"
"unicode/utf8"
)
const (
subSig = "func(context.Context, interface{}) error"
batchSubSig = "func([]context.Context, []interface{}) error"
subSig = "func(context.Context, interface{}) error"
)
// Precompute the reflect type for error. Can't use error directly
@@ -43,23 +41,15 @@ func ValidateSubscriber(sub Subscriber) error {
switch typ.NumIn() {
case 2:
argType = typ.In(1)
if sub.Options().Batch {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
}
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
}
}
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
name, typ.NumOut(), subSig, batchSubSig)
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
name, typ.NumOut(), subSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
@@ -74,8 +64,8 @@ func ValidateSubscriber(sub Subscriber) error {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
@@ -83,8 +73,8 @@ func ValidateSubscriber(sub Subscriber) error {
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
"subscriber %v.%v has wrong number of return values: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())

View File

@@ -14,20 +14,12 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// publication message.
type SubscriberFunc func(ctx context.Context, msg Message) error
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message. This func used by batch subscribers
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
// StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring,

View File

@@ -25,6 +25,48 @@ type StructField struct {
Field reflect.StructField
}
// StructFieldNameByTag get struct field name by tag key and its value
func StructFieldNameByTag(src interface{}, tkey string, tval string) (string, error) {
sv := reflect.ValueOf(src)
if sv.Kind() == reflect.Ptr {
sv = sv.Elem()
}
if sv.Kind() != reflect.Struct {
return "", ErrInvalidStruct
}
typ := sv.Type()
for idx := 0; idx < typ.NumField(); idx++ {
fld := typ.Field(idx)
val := sv.Field(idx)
if len(fld.PkgPath) != 0 {
continue
}
if ts, ok := fld.Tag.Lookup(tkey); ok {
for _, p := range strings.Split(ts, ",") {
if p == tval {
return fld.Name, nil
}
}
}
switch val.Kind() {
case reflect.Ptr:
if val = val.Elem(); val.Kind() == reflect.Struct {
if name, err := StructFieldNameByTag(val.Interface(), tkey, tval); err == nil {
return name, nil
}
}
case reflect.Struct:
if name, err := StructFieldNameByTag(val.Interface(), tkey, tval); err == nil {
return name, nil
}
}
}
return "", ErrNotFound
}
// StructFieldByTag get struct field by tag key and its value
func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, error) {
sv := reflect.ValueOf(src)
@@ -46,9 +88,6 @@ func StructFieldByTag(src interface{}, tkey string, tval string) (interface{}, e
if ts, ok := fld.Tag.Lookup(tkey); ok {
for _, p := range strings.Split(ts, ",") {
if p == tval {
if val.Kind() != reflect.Ptr && val.CanAddr() {
val = val.Addr()
}
return val.Interface(), nil
}
}
@@ -493,13 +532,14 @@ func btSplitter(str string) []string {
}
// queryToMap turns something like a[b][c]=4 into
// map[string]interface{}{
// "a": map[string]interface{}{
// "b": map[string]interface{}{
// "c": 4,
// },
// },
// }
//
// map[string]interface{}{
// "a": map[string]interface{}{
// "b": map[string]interface{}{
// "c": 4,
// },
// },
// }
func queryToMap(param string) (map[string]interface{}, error) {
rawKey, rawValue, err := splitKeyAndValue(param)
if err != nil {

View File

@@ -190,9 +190,9 @@ func TestStructFieldByTag(t *testing.T) {
t.Fatal(err)
}
if v, ok := iface.(*[]string); !ok {
if v, ok := iface.([]string); !ok {
t.Fatalf("not *[]string %v", iface)
} else if len(*v) != 2 {
} else if len(v) != 2 {
t.Fatalf("invalid number %v", iface)
}
}