diff --git a/server/noop.go b/server/noop.go index f2bc031f..85333a50 100644 --- a/server/noop.go +++ b/server/noop.go @@ -274,7 +274,7 @@ func (n *noopServer) Register() error { if !registered { if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID) + config.Logger.Info(n.opts.Context, fmt.Sprintf("register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)) } } @@ -312,7 +312,7 @@ func (n *noopServer) Deregister() error { } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID) + config.Logger.Info(n.opts.Context, fmt.Sprintf("deregistering node: %s", service.Nodes[0].ID)) } if err := DefaultDeregisterFunc(service, config); err != nil { @@ -343,11 +343,11 @@ func (n *noopServer) Deregister() error { go func(s broker.Subscriber) { defer wg.Done() if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic()) + config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic()) } if err := s.Unsubscribe(ncx); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err) + config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err) } } }(subs[idx]) @@ -383,7 +383,7 @@ func (n *noopServer) Start() error { config.Address = addr if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address) + config.Logger.Info(n.opts.Context, "server [noop] Listening on "+config.Address) } n.Lock() @@ -397,13 +397,13 @@ func (n *noopServer) Start() error { // connect to the broker if err := config.Broker.Connect(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err) + config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err) } return err } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())) } } @@ -411,13 +411,13 @@ func (n *noopServer) Start() error { // nolint: nestif if err := config.RegisterCheck(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err) + config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), err) } } else { // announce self to the world if err := n.Register(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "server register error: %v", err) + config.Logger.Error(n.opts.Context, "server register error", err) } } } @@ -450,23 +450,23 @@ func (n *noopServer) Start() error { // nolint: nestif if rerr != nil && registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr) + config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error, deregister it", config.Name, config.ID), rerr) } // deregister self in case of error if err := n.Deregister(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err) + config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s deregister error", config.Name, config.ID), err) } } } else if rerr != nil && !registered { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr) + config.Logger.Errorf(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), rerr) } continue } if err := n.Register(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err) + config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register error", config.Name, config.ID), err) } } // wait for exit @@ -478,7 +478,7 @@ func (n *noopServer) Start() error { // deregister self if err := n.Deregister(); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "server deregister error: ", err) + config.Logger.Error(n.opts.Context, "server deregister error", err) } } @@ -491,12 +491,12 @@ func (n *noopServer) Start() error { ch <- nil if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())) } // disconnect broker if err := config.Broker.Disconnect(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { - config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err) + config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err) } } }() @@ -526,20 +526,13 @@ func (n *noopServer) subscribe() error { opts = append(opts, broker.SubscribeGroup(queue)) } - if sb.Options().Batch { - // batch processing handler - sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...) - } else { - // single processing handler - sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...) - } - + sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...) if err != nil { return err } if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic()) + config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic()) } n.subscribers[sb] = []broker.Subscriber{sub} @@ -637,127 +630,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs } } -//nolint:gocyclo -func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler { - return func(ps broker.Events) (err error) { - defer func() { - if r := recover(); r != nil { - n.RLock() - config := n.opts - n.RUnlock() - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Error(n.opts.Context, "panic recovered: ", r) - config.Logger.Error(n.opts.Context, string(debug.Stack())) - } - err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) - } - }() - - msgs := make([]Message, 0, len(ps)) - ctxs := make([]context.Context, 0, len(ps)) - for _, p := range ps { - msg := p.Message() - // if we don't have headers, create empty map - if msg.Header == nil { - msg.Header = metadata.New(2) - } - - ct, _ := msg.Header.Get(metadata.HeaderContentType) - if len(ct) == 0 { - msg.Header.Set(metadata.HeaderContentType, defaultContentType) - ct = defaultContentType - } - hdr := metadata.Copy(msg.Header) - topic, _ := msg.Header.Get(metadata.HeaderTopic) - ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr)) - msgs = append(msgs, &rpcMessage{ - topic: topic, - contentType: ct, - header: msg.Header, - body: msg.Body, - }) - } - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var req reflect.Value - - switch handler.reqType.Kind() { - case reflect.Ptr: - req = reflect.New(handler.reqType.Elem()) - default: - req = reflect.New(handler.reqType.Elem()).Elem() - } - - reqType := handler.reqType - var cf codec.Codec - for _, msg := range msgs { - cf, err = n.newCodec(msg.ContentType()) - if err != nil { - return err - } - rb := reflect.New(req.Type().Elem()) - if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil { - return err - } - msg.(*rpcMessage).codec = cf - msg.(*rpcMessage).payload = rb.Interface() - } - - fn := func(ctxs []context.Context, ms []Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctxs)) - } - payloads := reflect.MakeSlice(reqType, 0, len(ms)) - for _, m := range ms { - payloads = reflect.Append(payloads, reflect.ValueOf(m.Body())) - } - vals = append(vals, payloads) - - returnValues := handler.method.Call(vals) - if rerr := returnValues[0].Interface(); rerr != nil { - return rerr.(error) - } - return nil - } - - opts.Hooks.EachNext(func(hook options.Hook) { - if h, ok := hook.(HookBatchSubHandler); ok { - fn = h(fn) - } - }) - - if n.wg != nil { - n.wg.Add(1) - } - - go func() { - if n.wg != nil { - defer n.wg.Done() - } - results <- fn(ctxs, msgs) - }() - } - - var errors []string - for i := 0; i < len(sb.handlers); i++ { - if rerr := <-results; rerr != nil { - errors = append(errors, rerr.Error()) - } - } - if len(errors) > 0 { - err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - return err - } -} - //nolint:gocyclo func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { return func(p broker.Event) (err error) { diff --git a/server/noop_test.go b/server/noop_test.go index 5ecc762d..2bad82e6 100644 --- a/server/noop_test.go +++ b/server/noop_test.go @@ -9,7 +9,6 @@ import ( "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" - "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/server" ) @@ -26,18 +25,6 @@ func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) er return nil } -func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error { - if len(msgs) != 8 { - h.t.Fatal("invalid number of messages received") - } - for idx := 0; idx < len(msgs); idx++ { - md, _ := metadata.FromIncomingContext(ctxs[idx]) - _ = md - // fmt.Printf("msg md %v\n", md) - } - return nil -} - func TestNoopSub(t *testing.T) { ctx := context.Background() @@ -76,13 +63,6 @@ func TestNoopSub(t *testing.T) { t.Fatal(err) } - if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler, - server.SubscriberQueue("queue"), - server.SubscriberBatch(true), - )); err != nil { - t.Fatal(err) - } - if err := s.Start(); err != nil { t.Fatal(err) } diff --git a/server/options.go b/server/options.go index 093b5e0e..ffced1a4 100644 --- a/server/options.go +++ b/server/options.go @@ -341,8 +341,6 @@ type SubscriberOptions struct { AutoAck bool // BodyOnly flag specifies that message without headers BodyOnly bool - // Batch flag specifies that message processed in batches - Batch bool // BatchSize flag specifies max size of batch BatchSize int // BatchWait flag specifies max wait time for batch filling @@ -414,13 +412,6 @@ func SubscriberAck(b bool) SubscriberOption { } } -// SubscriberBatch control batch processing for handler -func SubscriberBatch(b bool) SubscriberOption { - return func(o *SubscriberOptions) { - o.Batch = b - } -} - // SubscriberBatchSize control batch filling size for handler // Batch filling max waiting time controlled by SubscriberBatchWait func SubscriberBatchSize(n int) SubscriberOption { diff --git a/server/server.go b/server/server.go index eca9e82e..3879812e 100644 --- a/server/server.go +++ b/server/server.go @@ -63,10 +63,8 @@ type Server interface { } type ( - FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error - HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler - FuncSubHandler func(ctx context.Context, ms Message) error - HookSubHandler func(next FuncSubHandler) FuncSubHandler + FuncSubHandler func(ctx context.Context, ms Message) error + HookSubHandler func(next FuncSubHandler) FuncSubHandler ) /* diff --git a/server/subscriber.go b/server/subscriber.go index beed7dfe..81ee6030 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -3,14 +3,12 @@ package server import ( "fmt" "reflect" - "strings" "unicode" "unicode/utf8" ) const ( - subSig = "func(context.Context, interface{}) error" - batchSubSig = "func([]context.Context, []interface{}) error" + subSig = "func(context.Context, interface{}) error" ) // Precompute the reflect type for error. Can't use error directly @@ -43,23 +41,15 @@ func ValidateSubscriber(sub Subscriber) error { switch typ.NumIn() { case 2: argType = typ.In(1) - if sub.Options().Batch { - if argType.Kind() != reflect.Slice { - return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig) - } - if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 { - return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig) - } - } default: - return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig) + return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig) } if !isExportedOrBuiltinType(argType) { return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType) } if typ.NumOut() != 1 { - return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s", - name, typ.NumOut(), subSig, batchSubSig) + return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s", + name, typ.NumOut(), subSig) } if returnType := typ.Out(0); returnType != typeOfError { return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String()) @@ -74,8 +64,8 @@ func ValidateSubscriber(sub Subscriber) error { case 3: argType = method.Type.In(2) default: - return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s", - name, method.Name, method.Type.NumIn(), subSig, batchSubSig) + return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s", + name, method.Name, method.Type.NumIn(), subSig) } if !isExportedOrBuiltinType(argType) { @@ -83,8 +73,8 @@ func ValidateSubscriber(sub Subscriber) error { } if method.Type.NumOut() != 1 { return fmt.Errorf( - "subscriber %v.%v has wrong number of return values: %v require signature %s or %s", - name, method.Name, method.Type.NumOut(), subSig, batchSubSig) + "subscriber %v.%v has wrong number of return values: %v require signature %s", + name, method.Name, method.Type.NumOut(), subSig) } if returnType := method.Type.Out(0); returnType != typeOfError { return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String()) diff --git a/server/wrapper.go b/server/wrapper.go index b4596e63..3e4d3ecd 100644 --- a/server/wrapper.go +++ b/server/wrapper.go @@ -14,20 +14,12 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error // publication message. type SubscriberFunc func(ctx context.Context, msg Message) error -// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily -// for the wrappers. What's handed to the actual method is the concrete -// publication message. This func used by batch subscribers -type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error - // HandlerWrapper wraps the HandlerFunc and returns the equivalent type HandlerWrapper func(HandlerFunc) HandlerFunc // SubscriberWrapper wraps the SubscriberFunc and returns the equivalent type SubscriberWrapper func(SubscriberFunc) SubscriberFunc -// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent -type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc - // StreamWrapper wraps a Stream interface and returns the equivalent. // Because streams exist for the lifetime of a method invocation this // is a convenient way to wrap a Stream as its in use for trace, monitoring,