server: add BatchSubscriber

Signed-off-by: Vasiliy Tolstov <>
This commit is contained in:
Василий Толстов 2021-07-27 23:58:06 +03:00
parent 9a5b158b4d
commit e5b0a7e20d
8 changed files with 342 additions and 37 deletions

@ -232,7 +232,7 @@ func SubscribeContext(ctx context.Context) SubscribeOption {
// DisableAutoAck disables auto ack
// Deprecated
func DisableAutoAck() SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = false

@ -173,7 +173,7 @@ func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts
func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message {
options := NewMessageOptions(opts...)
options := NewMessageOptions(append([]MessageOption{MessageContentType(n.opts.ContentType)}, opts...)...)
return &noopMessage{topic: topic, payload: msg, opts: options}

@ -373,7 +373,7 @@ func DialTimeout(d time.Duration) Option {
// WithExchange sets the exchange to route a message through
// Deprecated
func WithExchange(e string) PublishOption {
return func(o *PublishOptions) {
o.Exchange = e
@ -514,7 +514,7 @@ func WithSelectOptions(sops ...selector.SelectOption) CallOption {
// WithMessageContentType sets the message content type
// Deprecated
func WithMessageContentType(ct string) MessageOption {
return func(o *MessageOptions) {
o.ContentType = ct

@ -6,11 +6,13 @@ import (
// cprotorpc ""
maddr ""
mnet ""
// DefaultCodecs will be used to encode/decode
@ -73,8 +75,7 @@ func (n *noopServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
if len(sub.handlers) == 0 {
} else if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
@ -107,11 +108,12 @@ func (n *noopServer) Init(opts ...Option) error {
if n.handlers == nil {
n.handlers = make(map[string]Handler)
n.handlers = make(map[string]Handler, 1)
if n.subscribers == nil {
n.subscribers = make(map[*subscriber][]broker.Subscriber)
n.subscribers = make(map[*subscriber][]broker.Subscriber, 1)
if n.exit == nil {
n.exit = make(chan chan error)
@ -202,26 +204,34 @@ func (n *noopServer) Register() error {
cx := config.Context
for sb := range n.subscribers {
handler := n.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck))
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
if err != nil {
return err
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...)
if err != nil {
return err
n.subscribers[sb] = []broker.Subscriber{sub}
@ -303,9 +313,22 @@ func (n *noopServer) Start() error {
config := n.Options()
// use to avoid scan of all network interfaces
addr, err := maddr.Extract("")
if err != nil {
return err
var rng rand.Rand
i := rng.Intn(20000)
// set addr with port
addr = mnet.HostPort(addr, 10000+i)
config.Address = addr
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address)
if len(config.Advertise) == 0 {
config.Advertise = config.Address

server/noop_test.go Normal file

@ -0,0 +1,106 @@
package server_test
import (
type TestHandler struct {
t *testing.T
type TestMessage struct {
Name string
var (
numMsg int = 8
func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error {
//fmt.Printf("msg %s\n", msg.Data)
return nil
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
if len(msgs) != 8 {
h.t.Fatal("invalid number of messages received")
for idx := 0; idx < len(msgs); idx++ {
md, _ := metadata.FromIncomingContext(ctxs[idx])
_ = md
// fmt.Printf("msg md %v\n", md)
return nil
func TestNoopSub(t *testing.T) {
ctx := context.Background()
b := broker.NewBroker()
if err := b.Init(); err != nil {
if err := b.Connect(ctx); err != nil {
s := server.NewServer(
server.Codec("application/octet-stream", codec.NewCodec()),
if err := s.Init(); err != nil {
c := client.NewClient(
client.Codec("application/octet-stream", codec.NewCodec()),
if err := c.Init(); err != nil {
h := &TestHandler{t: t}
if err := s.Subscribe(s.NewSubscriber("single_topic", h.SingleSubHandler,
)); err != nil {
if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
)); err != nil {
if err := s.Start(); err != nil {
msgs := make([]client.Message, 0, 8)
for i := 0; i < 8; i++ {
msgs = append(msgs, c.NewMessage("batch_topic", &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))}))
if err := c.BatchPublish(ctx, msgs); err != nil {
defer func() {
if err := s.Stop(); err != nil {

@ -71,6 +71,8 @@ type Options struct {
Version string
// SubWrappers holds the server subscribe wrappers
SubWrappers []SubscriberWrapper
// BatchSubWrappers holds the server batch subscribe wrappers
BatchSubWrappers []BatchSubscriberWrapper
// HdlrWrappers holds the handler wrappers
HdlrWrappers []HandlerWrapper
// RegisterAttempts holds the number of register attempts before error
@ -302,6 +304,13 @@ func WrapSubscriber(w SubscriberWrapper) Option {
// WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server
func WrapBatchSubscriber(w BatchSubscriberWrapper) Option {
return func(o *Options) {
o.BatchSubWrappers = append(o.BatchSubWrappers, w)
// MaxConn specifies maximum number of max simultaneous connections to server
func MaxConn(n int) Option {
return func(o *Options) {
@ -354,6 +363,12 @@ type SubscriberOptions struct {
AutoAck bool
// BodyOnly flag specifies that message without headers
BodyOnly bool
// Batch flag specifies that message processed in batches
Batch bool
// BatchSize flag specifies max size of batch
BatchSize int
// BatchWait flag specifies max wait time for batch filling
BatchWait time.Duration
// NewSubscriberOptions create new SubscriberOptions
@ -413,3 +428,32 @@ func SubscriberContext(ctx context.Context) SubscriberOption {
o.Context = ctx
// SubscriberAck control auto ack processing for handler
func SubscriberAck(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = b
// SubscriberBatch control batch processing for handler
func SubscriberBatch(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Batch = b
// SubscriberBatchSize control batch filling size for handler
// Batch filling max waiting time controlled by SubscriberBatchWait
func SubscriberBatchSize(n int) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchSize = n
// SubscriberBatchWait control batch filling wait time for handler
func SubscriberBatchWait(td time.Duration) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchWait = td

@ -18,7 +18,8 @@ import (
const (
subSig = "func(context.Context, interface{}) error"
subSig = "func(context.Context, interface{}) error"
batchSubSig = "func([]context.Context, []interface{}) error"
// Precompute the reflect type for error. Can't use error directly
@ -57,26 +58,33 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
return isExported(t.Name()) || t.PkgPath() == ""
// ValidateSubscriber func
// ValidateSubscriber func signature
func ValidateSubscriber(sub Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
if sub.Options().Batch {
if argType.Kind() != reflect.Slice {
return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
if strings.Compare(fmt.Sprintf("%s", argType), "[]interface{}") == 0 {
return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of outs: %v require signature %s",
name, typ.NumOut(), subSig)
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
name, typ.NumOut(), subSig, batchSubSig)
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
@ -87,13 +95,12 @@ func ValidateSubscriber(sub Subscriber) error {
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
if !isExportedOrBuiltinType(argType) {
@ -101,8 +108,8 @@ func ValidateSubscriber(sub Subscriber) error {
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of outs: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
@ -183,7 +190,125 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) {
defer func() {
if r := recover(); r != nil {
config := n.opts
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
msgs := make([]Message, 0, len(ps))
ctxs := make([]context.Context, 0, len(ps))
for _, p := range ps {
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
ct, _ := msg.Header.Get(metadata.HeaderContentType)
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
hdr := metadata.Copy(msg.Header)
topic, _ := msg.Header.Get(metadata.HeaderTopic)
ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
msgs = append(msgs, &rpcMessage{
topic: topic,
contentType: ct,
header: msg.Header,
body: msg.Body,
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var req reflect.Value
switch handler.reqType.Kind() {
case reflect.Ptr:
req = reflect.New(handler.reqType.Elem())
req = reflect.New(handler.reqType.Elem()).Elem()
reqType := handler.reqType
for _, msg := range msgs {
cf, err := n.newCodec(msg.ContentType())
if err != nil {
return err
rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.Body()), rb.Interface()); err != nil {
return err
msg.(*rpcMessage).codec = cf
msg.(*rpcMessage).payload = rb.Interface()
fn := func(ctxs []context.Context, ms []Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctxs))
payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Payload()))
vals = append(vals, payloads)
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
return nil
for i := len(opts.BatchSubWrappers); i > 0; i-- {
fn = opts.BatchSubWrappers[i-1](fn)
if n.wg != nil {
go func() {
if n.wg != nil {
defer n.wg.Done()
results <- fn(ctxs, msgs)
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
return err
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {
@ -201,12 +326,12 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = make(map[string]string)
msg.Header = metadata.New(2)
ct := msg.Header["Content-Type"]
if len(ct) == 0 {
msg.Header["Content-Type"] = defaultContentType
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
cf, err := n.newCodec(ct)
@ -214,12 +339,12 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
return err
hdr := make(map[string]string, len(msg.Header))
hdr := metadata.New(len(msg.Header))
for k, v := range msg.Header {
if k == "Content-Type" {
hdr[k] = v
hdr.Set(k, v)
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
@ -294,7 +419,6 @@ func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handl
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
return err

@ -14,12 +14,20 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// publication message.
type SubscriberFunc func(ctx context.Context, msg Message) error
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message. This func used by batch subscribers
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
// StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring,