diff --git a/broker/options.go b/broker/options.go index 94db357f..5f0f132a 100644 --- a/broker/options.go +++ b/broker/options.go @@ -232,7 +232,7 @@ func SubscribeContext(ctx context.Context) SubscribeOption { } // DisableAutoAck disables auto ack -// DEPRECATED +// Deprecated func DisableAutoAck() SubscribeOption { return func(o *SubscribeOptions) { o.AutoAck = false diff --git a/client/noop.go b/client/noop.go index ec81a860..92031673 100644 --- a/client/noop.go +++ b/client/noop.go @@ -173,7 +173,7 @@ func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts } func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message { - options := NewMessageOptions(opts...) + options := NewMessageOptions(append([]MessageOption{MessageContentType(n.opts.ContentType)}, opts...)...) return &noopMessage{topic: topic, payload: msg, opts: options} } diff --git a/client/options.go b/client/options.go index 47c2a463..e7d50f2d 100644 --- a/client/options.go +++ b/client/options.go @@ -373,7 +373,7 @@ func DialTimeout(d time.Duration) Option { } // WithExchange sets the exchange to route a message through -// DEPRECATED +// Deprecated func WithExchange(e string) PublishOption { return func(o *PublishOptions) { o.Exchange = e @@ -514,7 +514,7 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption { } // WithMessageContentType sets the message content type -// DEPRECATED +// Deprecated func WithMessageContentType(ct string) MessageOption { return func(o *MessageOptions) { o.ContentType = ct diff --git a/server/noop.go b/server/noop.go index 5bf7990e..425b6239 100644 --- a/server/noop.go +++ b/server/noop.go @@ -6,11 +6,13 @@ import ( "sync" "time" - // cprotorpc "github.com/unistack-org/micro-codec-protorpc" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/register" + maddr "github.com/unistack-org/micro/v3/util/addr" + mnet "github.com/unistack-org/micro/v3/util/net" + "github.com/unistack-org/micro/v3/util/rand" ) // DefaultCodecs will be used to encode/decode @@ -73,8 +75,7 @@ func (n *noopServer) Subscribe(sb Subscriber) error { sub, ok := sb.(*subscriber) if !ok { return fmt.Errorf("invalid subscriber: expected *subscriber") - } - if len(sub.handlers) == 0 { + } else if len(sub.handlers) == 0 { return fmt.Errorf("invalid subscriber: no handler functions") } @@ -107,11 +108,12 @@ func (n *noopServer) Init(opts ...Option) error { } if n.handlers == nil { - n.handlers = make(map[string]Handler) + n.handlers = make(map[string]Handler, 1) } if n.subscribers == nil { - n.subscribers = make(map[*subscriber][]broker.Subscriber) + n.subscribers = make(map[*subscriber][]broker.Subscriber, 1) } + if n.exit == nil { n.exit = make(chan chan error) } @@ -202,26 +204,34 @@ func (n *noopServer) Register() error { cx := config.Context - for sb := range n.subscribers { - handler := n.createSubHandler(sb, config) - var opts []broker.SubscribeOption - if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.SubscribeGroup(queue)) - } + var sub broker.Subscriber + for sb := range n.subscribers { if sb.Options().Context != nil { cx = sb.Options().Context } - opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)) + opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)} + if queue := sb.Options().Queue; len(queue) > 0 { + opts = append(opts, broker.SubscribeGroup(queue)) + } + + if sb.Options().Batch { + // batch processing handler + sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...) + } else { + // single processing handler + sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...) + } + + if err != nil { + return err + } if config.Logger.V(logger.InfoLevel) { config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic()) } - sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...) - if err != nil { - return err - } + n.subscribers[sb] = []broker.Subscriber{sub} } @@ -303,9 +313,22 @@ func (n *noopServer) Start() error { config := n.Options() n.RUnlock() + // use 127.0.0.1 to avoid scan of all network interfaces + addr, err := maddr.Extract("127.0.0.1") + if err != nil { + return err + } + var rng rand.Rand + i := rng.Intn(20000) + // set addr with port + addr = mnet.HostPort(addr, 10000+i) + + config.Address = addr + if config.Logger.V(logger.InfoLevel) { config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address) } + n.Lock() if len(config.Advertise) == 0 { config.Advertise = config.Address diff --git a/server/noop_test.go b/server/noop_test.go new file mode 100644 index 00000000..56b4877c --- /dev/null +++ b/server/noop_test.go @@ -0,0 +1,106 @@ +package server_test + +import ( + "context" + "fmt" + "testing" + + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/server" +) + +type TestHandler struct { + t *testing.T +} + +type TestMessage struct { + Name string +} + +var ( + numMsg int = 8 +) + +func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error { + //fmt.Printf("msg %s\n", msg.Data) + return nil +} + +func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error { + if len(msgs) != 8 { + h.t.Fatal("invalid number of messages received") + } + for idx := 0; idx < len(msgs); idx++ { + md, _ := metadata.FromIncomingContext(ctxs[idx]) + _ = md + // fmt.Printf("msg md %v\n", md) + } + return nil +} + +func TestNoopSub(t *testing.T) { + ctx := context.Background() + + b := broker.NewBroker() + + if err := b.Init(); err != nil { + t.Fatal(err) + } + + if err := b.Connect(ctx); err != nil { + t.Fatal(err) + } + + s := server.NewServer( + server.Broker(b), + server.Codec("application/octet-stream", codec.NewCodec()), + ) + if err := s.Init(); err != nil { + t.Fatal(err) + } + + c := client.NewClient( + client.Broker(b), + client.Codec("application/octet-stream", codec.NewCodec()), + client.ContentType("application/octet-stream"), + ) + if err := c.Init(); err != nil { + t.Fatal(err) + } + h := &TestHandler{t: t} + + if err := s.Subscribe(s.NewSubscriber("single_topic", h.SingleSubHandler, + server.SubscriberQueue("queue"), + )); err != nil { + t.Fatal(err) + } + + if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler, + server.SubscriberQueue("queue"), + server.SubscriberBatch(true), + )); err != nil { + t.Fatal(err) + } + + if err := s.Start(); err != nil { + t.Fatal(err) + } + + msgs := make([]client.Message, 0, 8) + for i := 0; i < 8; i++ { + msgs = append(msgs, c.NewMessage("batch_topic", &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))})) + } + + if err := c.BatchPublish(ctx, msgs); err != nil { + t.Fatal(err) + } + + defer func() { + if err := s.Stop(); err != nil { + t.Fatal(err) + } + }() +} diff --git a/server/options.go b/server/options.go index 74abb199..0224a9e8 100644 --- a/server/options.go +++ b/server/options.go @@ -71,6 +71,8 @@ type Options struct { Version string // SubWrappers holds the server subscribe wrappers SubWrappers []SubscriberWrapper + // BatchSubWrappers holds the server batch subscribe wrappers + BatchSubWrappers []BatchSubscriberWrapper // HdlrWrappers holds the handler wrappers HdlrWrappers []HandlerWrapper // RegisterAttempts holds the number of register attempts before error @@ -302,6 +304,13 @@ func WrapSubscriber(w SubscriberWrapper) Option { } } +// WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server +func WrapBatchSubscriber(w BatchSubscriberWrapper) Option { + return func(o *Options) { + o.BatchSubWrappers = append(o.BatchSubWrappers, w) + } +} + // MaxConn specifies maximum number of max simultaneous connections to server func MaxConn(n int) Option { return func(o *Options) { @@ -354,6 +363,12 @@ type SubscriberOptions struct { AutoAck bool // BodyOnly flag specifies that message without headers BodyOnly bool + // Batch flag specifies that message processed in batches + Batch bool + // BatchSize flag specifies max size of batch + BatchSize int + // BatchWait flag specifies max wait time for batch filling + BatchWait time.Duration } // NewSubscriberOptions create new SubscriberOptions @@ -413,3 +428,32 @@ func SubscriberContext(ctx context.Context) SubscriberOption { o.Context = ctx } } + +// SubscriberAck control auto ack processing for handler +func SubscriberAck(b bool) SubscriberOption { + return func(o *SubscriberOptions) { + o.AutoAck = b + } +} + +// SubscriberBatch control batch processing for handler +func SubscriberBatch(b bool) SubscriberOption { + return func(o *SubscriberOptions) { + o.Batch = b + } +} + +// SubscriberBatchSize control batch filling size for handler +// Batch filling max waiting time controlled by SubscriberBatchWait +func SubscriberBatchSize(n int) SubscriberOption { + return func(o *SubscriberOptions) { + o.BatchSize = n + } +} + +// SubscriberBatchWait control batch filling wait time for handler +func SubscriberBatchWait(td time.Duration) SubscriberOption { + return func(o *SubscriberOptions) { + o.BatchWait = td + } +} diff --git a/server/subscriber.go b/server/subscriber.go index bb047818..2db65d8c 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -18,7 +18,8 @@ import ( ) const ( - subSig = "func(context.Context, interface{}) error" + subSig = "func(context.Context, interface{}) error" + batchSubSig = "func([]context.Context, []interface{}) error" ) // Precompute the reflect type for error. Can't use error directly @@ -57,26 +58,33 @@ func isExportedOrBuiltinType(t reflect.Type) bool { return isExported(t.Name()) || t.PkgPath() == "" } -// ValidateSubscriber func +// ValidateSubscriber func signature func ValidateSubscriber(sub Subscriber) error { typ := reflect.TypeOf(sub.Subscriber()) var argType reflect.Type - switch typ.Kind() { case reflect.Func: name := "Func" switch typ.NumIn() { case 2: argType = typ.In(1) + if sub.Options().Batch { + if argType.Kind() != reflect.Slice { + return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig) + } + if strings.Compare(fmt.Sprintf("%s", argType), "[]interface{}") == 0 { + return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig) + } + } default: - return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig) + return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig) } if !isExportedOrBuiltinType(argType) { return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType) } if typ.NumOut() != 1 { - return fmt.Errorf("subscriber %v has wrong number of outs: %v require signature %s", - name, typ.NumOut(), subSig) + return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s", + name, typ.NumOut(), subSig, batchSubSig) } if returnType := typ.Out(0); returnType != typeOfError { return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String()) @@ -87,13 +95,12 @@ func ValidateSubscriber(sub Subscriber) error { for m := 0; m < typ.NumMethod(); m++ { method := typ.Method(m) - switch method.Type.NumIn() { case 3: argType = method.Type.In(2) default: - return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s", - name, method.Name, method.Type.NumIn(), subSig) + return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s", + name, method.Name, method.Type.NumIn(), subSig, batchSubSig) } if !isExportedOrBuiltinType(argType) { @@ -101,8 +108,8 @@ func ValidateSubscriber(sub Subscriber) error { } if method.Type.NumOut() != 1 { return fmt.Errorf( - "subscriber %v.%v has wrong number of outs: %v require signature %s", - name, method.Name, method.Type.NumOut(), subSig) + "subscriber %v.%v has wrong number of return values: %v require signature %s or %s", + name, method.Name, method.Type.NumOut(), subSig, batchSubSig) } if returnType := method.Type.Out(0); returnType != typeOfError { return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String()) @@ -183,7 +190,125 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs } //nolint:gocyclo -func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { +func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler { + return func(ps broker.Events) (err error) { + defer func() { + if r := recover(); r != nil { + n.RLock() + config := n.opts + n.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error(n.opts.Context, "panic recovered: ", r) + config.Logger.Error(n.opts.Context, string(debug.Stack())) + } + err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) + } + }() + + msgs := make([]Message, 0, len(ps)) + ctxs := make([]context.Context, 0, len(ps)) + for _, p := range ps { + msg := p.Message() + // if we don't have headers, create empty map + if msg.Header == nil { + msg.Header = metadata.New(2) + } + + ct, _ := msg.Header.Get(metadata.HeaderContentType) + if len(ct) == 0 { + msg.Header.Set(metadata.HeaderContentType, defaultContentType) + ct = defaultContentType + } + hdr := metadata.Copy(msg.Header) + topic, _ := msg.Header.Get(metadata.HeaderTopic) + ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr)) + msgs = append(msgs, &rpcMessage{ + topic: topic, + contentType: ct, + header: msg.Header, + body: msg.Body, + }) + } + results := make(chan error, len(sb.handlers)) + + for i := 0; i < len(sb.handlers); i++ { + handler := sb.handlers[i] + + var req reflect.Value + + switch handler.reqType.Kind() { + case reflect.Ptr: + req = reflect.New(handler.reqType.Elem()) + default: + req = reflect.New(handler.reqType.Elem()).Elem() + } + + reqType := handler.reqType + + for _, msg := range msgs { + cf, err := n.newCodec(msg.ContentType()) + if err != nil { + return err + } + rb := reflect.New(req.Type().Elem()) + if err = cf.ReadBody(bytes.NewReader(msg.Body()), rb.Interface()); err != nil { + return err + } + msg.(*rpcMessage).codec = cf + msg.(*rpcMessage).payload = rb.Interface() + } + + fn := func(ctxs []context.Context, ms []Message) error { + var vals []reflect.Value + if sb.typ.Kind() != reflect.Func { + vals = append(vals, sb.rcvr) + } + if handler.ctxType != nil { + vals = append(vals, reflect.ValueOf(ctxs)) + } + payloads := reflect.MakeSlice(reqType, 0, len(ms)) + for _, m := range ms { + payloads = reflect.Append(payloads, reflect.ValueOf(m.Payload())) + } + vals = append(vals, payloads) + + returnValues := handler.method.Call(vals) + if rerr := returnValues[0].Interface(); rerr != nil { + return rerr.(error) + } + return nil + } + + for i := len(opts.BatchSubWrappers); i > 0; i-- { + fn = opts.BatchSubWrappers[i-1](fn) + } + + if n.wg != nil { + n.wg.Add(1) + } + go func() { + if n.wg != nil { + defer n.wg.Done() + } + results <- fn(ctxs, msgs) + }() + } + + var errors []string + for i := 0; i < len(sb.handlers); i++ { + if rerr := <-results; rerr != nil { + errors = append(errors, rerr.Error()) + } + } + if len(errors) > 0 { + err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + } + return err + } +} + +//nolint:gocyclo +func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler { return func(p broker.Event) (err error) { defer func() { if r := recover(); r != nil { @@ -201,12 +326,12 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl msg := p.Message() // if we don't have headers, create empty map if msg.Header == nil { - msg.Header = make(map[string]string) + msg.Header = metadata.New(2) } ct := msg.Header["Content-Type"] if len(ct) == 0 { - msg.Header["Content-Type"] = defaultContentType + msg.Header.Set(metadata.HeaderContentType, defaultContentType) ct = defaultContentType } cf, err := n.newCodec(ct) @@ -214,12 +339,12 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl return err } - hdr := make(map[string]string, len(msg.Header)) + hdr := metadata.New(len(msg.Header)) for k, v := range msg.Header { if k == "Content-Type" { continue } - hdr[k] = v + hdr.Set(k, v) } ctx := metadata.NewIncomingContext(sb.opts.Context, hdr) @@ -294,7 +419,6 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl if len(errors) > 0 { err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) } - return err } } diff --git a/server/wrapper.go b/server/wrapper.go index 3e4d3ecd..b4596e63 100644 --- a/server/wrapper.go +++ b/server/wrapper.go @@ -14,12 +14,20 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error // publication message. type SubscriberFunc func(ctx context.Context, msg Message) error +// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily +// for the wrappers. What's handed to the actual method is the concrete +// publication message. This func used by batch subscribers +type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error + // HandlerWrapper wraps the HandlerFunc and returns the equivalent type HandlerWrapper func(HandlerFunc) HandlerFunc // SubscriberWrapper wraps the SubscriberFunc and returns the equivalent type SubscriberWrapper func(SubscriberFunc) SubscriberFunc +// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent +type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc + // StreamWrapper wraps a Stream interface and returns the equivalent. // Because streams exist for the lifetime of a method invocation this // is a convenient way to wrap a Stream as its in use for trace, monitoring,