Merge pull request 'server/noop: cleanup' (#342) from server-noop into v3

Reviewed-on: #342
This commit is contained in:
Василий Толстов 2024-04-23 07:30:20 +03:00
commit 92a3a547b8
6 changed files with 28 additions and 205 deletions

View File

@ -274,7 +274,7 @@ func (n *noopServer) Register() error {
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID) config.Logger.Info(n.opts.Context, fmt.Sprintf("register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
} }
} }
@ -312,7 +312,7 @@ func (n *noopServer) Deregister() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID) config.Logger.Info(n.opts.Context, fmt.Sprintf("deregistering node: %s", service.Nodes[0].ID))
} }
if err := DefaultDeregisterFunc(service, config); err != nil { if err := DefaultDeregisterFunc(service, config); err != nil {
@ -343,11 +343,11 @@ func (n *noopServer) Deregister() error {
go func(s broker.Subscriber) { go func(s broker.Subscriber) {
defer wg.Done() defer wg.Done()
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic()) config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic())
} }
if err := s.Unsubscribe(ncx); err != nil { if err := s.Unsubscribe(ncx); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err) config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err)
} }
} }
}(subs[idx]) }(subs[idx])
@ -383,7 +383,7 @@ func (n *noopServer) Start() error {
config.Address = addr config.Address = addr
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address) config.Logger.Info(n.opts.Context, "server [noop] Listening on "+config.Address)
} }
n.Lock() n.Lock()
@ -397,13 +397,13 @@ func (n *noopServer) Start() error {
// connect to the broker // connect to the broker
if err := config.Broker.Connect(config.Context); err != nil { if err := config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err) config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
} }
return err return err
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
} }
} }
@ -411,13 +411,13 @@ func (n *noopServer) Start() error {
// nolint: nestif // nolint: nestif
if err := config.RegisterCheck(config.Context); err != nil { if err := config.RegisterCheck(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err) config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), err)
} }
} else { } else {
// announce self to the world // announce self to the world
if err := n.Register(); err != nil { if err := n.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server register error: %v", err) config.Logger.Error(n.opts.Context, "server register error", err)
} }
} }
} }
@ -450,23 +450,23 @@ func (n *noopServer) Start() error {
// nolint: nestif // nolint: nestif
if rerr != nil && registered { if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr) config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
} }
// deregister self in case of error // deregister self in case of error
if err := n.Deregister(); err != nil { if err := n.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err) config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s deregister error", config.Name, config.ID), err)
} }
} }
} else if rerr != nil && !registered { } else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr) config.Logger.Errorf(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), rerr)
} }
continue continue
} }
if err := n.Register(); err != nil { if err := n.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err) config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register error", config.Name, config.ID), err)
} }
} }
// wait for exit // wait for exit
@ -478,7 +478,7 @@ func (n *noopServer) Start() error {
// deregister self // deregister self
if err := n.Deregister(); err != nil { if err := n.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "server deregister error: ", err) config.Logger.Error(n.opts.Context, "server deregister error", err)
} }
} }
@ -491,12 +491,12 @@ func (n *noopServer) Start() error {
ch <- nil ch <- nil
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()))
} }
// disconnect broker // disconnect broker
if err := config.Broker.Disconnect(config.Context); err != nil { if err := config.Broker.Disconnect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err) config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err)
} }
} }
}() }()
@ -526,20 +526,13 @@ func (n *noopServer) subscribe() error {
opts = append(opts, broker.SubscribeGroup(queue)) opts = append(opts, broker.SubscribeGroup(queue))
} }
if sb.Options().Batch { sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
}
if err != nil { if err != nil {
return err return err
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic()) config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
} }
n.subscribers[sb] = []broker.Subscriber{sub} n.subscribers[sb] = []broker.Subscriber{sub}
@ -637,127 +630,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
} }
} }
//nolint:gocyclo
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) {
defer func() {
if r := recover(); r != nil {
n.RLock()
config := n.opts
n.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msgs := make([]Message, 0, len(ps))
ctxs := make([]context.Context, 0, len(ps))
for _, p := range ps {
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
}
ct, _ := msg.Header.Get(metadata.HeaderContentType)
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
hdr := metadata.Copy(msg.Header)
topic, _ := msg.Header.Get(metadata.HeaderTopic)
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
msgs = append(msgs, &rpcMessage{
topic: topic,
contentType: ct,
header: msg.Header,
body: msg.Body,
})
}
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var req reflect.Value
switch handler.reqType.Kind() {
case reflect.Ptr:
req = reflect.New(handler.reqType.Elem())
default:
req = reflect.New(handler.reqType.Elem()).Elem()
}
reqType := handler.reqType
var cf codec.Codec
for _, msg := range msgs {
cf, err = n.newCodec(msg.ContentType())
if err != nil {
return err
}
rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
return err
}
msg.(*rpcMessage).codec = cf
msg.(*rpcMessage).payload = rb.Interface()
}
fn := func(ctxs []context.Context, ms []Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctxs))
}
payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
}
vals = append(vals, payloads)
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(HookBatchSubHandler); ok {
fn = h(fn)
}
})
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
results <- fn(ctxs, msgs)
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
//nolint:gocyclo //nolint:gocyclo
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) { return func(p broker.Event) (err error) {

View File

@ -9,7 +9,6 @@ import (
"go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v3/server"
) )
@ -26,18 +25,6 @@ func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) er
return nil return nil
} }
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
if len(msgs) != 8 {
h.t.Fatal("invalid number of messages received")
}
for idx := 0; idx < len(msgs); idx++ {
md, _ := metadata.FromIncomingContext(ctxs[idx])
_ = md
// fmt.Printf("msg md %v\n", md)
}
return nil
}
func TestNoopSub(t *testing.T) { func TestNoopSub(t *testing.T) {
ctx := context.Background() ctx := context.Background()
@ -76,13 +63,6 @@ func TestNoopSub(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
server.SubscriberQueue("queue"),
server.SubscriberBatch(true),
)); err != nil {
t.Fatal(err)
}
if err := s.Start(); err != nil { if err := s.Start(); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -341,8 +341,6 @@ type SubscriberOptions struct {
AutoAck bool AutoAck bool
// BodyOnly flag specifies that message without headers // BodyOnly flag specifies that message without headers
BodyOnly bool BodyOnly bool
// Batch flag specifies that message processed in batches
Batch bool
// BatchSize flag specifies max size of batch // BatchSize flag specifies max size of batch
BatchSize int BatchSize int
// BatchWait flag specifies max wait time for batch filling // BatchWait flag specifies max wait time for batch filling
@ -414,13 +412,6 @@ func SubscriberAck(b bool) SubscriberOption {
} }
} }
// SubscriberBatch control batch processing for handler
func SubscriberBatch(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Batch = b
}
}
// SubscriberBatchSize control batch filling size for handler // SubscriberBatchSize control batch filling size for handler
// Batch filling max waiting time controlled by SubscriberBatchWait // Batch filling max waiting time controlled by SubscriberBatchWait
func SubscriberBatchSize(n int) SubscriberOption { func SubscriberBatchSize(n int) SubscriberOption {

View File

@ -63,10 +63,8 @@ type Server interface {
} }
type ( type (
FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error FuncSubHandler func(ctx context.Context, ms Message) error
HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler HookSubHandler func(next FuncSubHandler) FuncSubHandler
FuncSubHandler func(ctx context.Context, ms Message) error
HookSubHandler func(next FuncSubHandler) FuncSubHandler
) )
/* /*

View File

@ -3,14 +3,12 @@ package server
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strings"
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
) )
const ( const (
subSig = "func(context.Context, interface{}) error" subSig = "func(context.Context, interface{}) error"
batchSubSig = "func([]context.Context, []interface{}) error"
) )
// Precompute the reflect type for error. Can't use error directly // Precompute the reflect type for error. Can't use error directly
@ -43,23 +41,15 @@ func ValidateSubscriber(sub Subscriber) error {
switch typ.NumIn() { switch typ.NumIn() {
case 2: case 2:
argType = typ.In(1) argType = typ.In(1)
if sub.Options().Batch {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
}
if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
}
}
default: default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig) return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
} }
if !isExportedOrBuiltinType(argType) { if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType) return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
} }
if typ.NumOut() != 1 { if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s", return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
name, typ.NumOut(), subSig, batchSubSig) name, typ.NumOut(), subSig)
} }
if returnType := typ.Out(0); returnType != typeOfError { if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String()) return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
@ -74,8 +64,8 @@ func ValidateSubscriber(sub Subscriber) error {
case 3: case 3:
argType = method.Type.In(2) argType = method.Type.In(2)
default: default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s", return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig, batchSubSig) name, method.Name, method.Type.NumIn(), subSig)
} }
if !isExportedOrBuiltinType(argType) { if !isExportedOrBuiltinType(argType) {
@ -83,8 +73,8 @@ func ValidateSubscriber(sub Subscriber) error {
} }
if method.Type.NumOut() != 1 { if method.Type.NumOut() != 1 {
return fmt.Errorf( return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s", "subscriber %v.%v has wrong number of return values: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig, batchSubSig) name, method.Name, method.Type.NumOut(), subSig)
} }
if returnType := method.Type.Out(0); returnType != typeOfError { if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String()) return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())

View File

@ -14,20 +14,12 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// publication message. // publication message.
type SubscriberFunc func(ctx context.Context, msg Message) error type SubscriberFunc func(ctx context.Context, msg Message) error
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message. This func used by batch subscribers
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent // HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent // SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
// StreamWrapper wraps a Stream interface and returns the equivalent. // StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this // Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring, // is a convenient way to wrap a Stream as its in use for trace, monitoring,