server/noop: cleanup #342
							
								
								
									
										162
									
								
								server/noop.go
									
									
									
									
									
								
							
							
						
						
									
										162
									
								
								server/noop.go
									
									
									
									
									
								
							@@ -274,7 +274,7 @@ func (n *noopServer) Register() error {
 | 
			
		||||
 | 
			
		||||
	if !registered {
 | 
			
		||||
		if config.Logger.V(logger.InfoLevel) {
 | 
			
		||||
			config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
 | 
			
		||||
			config.Logger.Info(n.opts.Context, fmt.Sprintf("register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -312,7 +312,7 @@ func (n *noopServer) Deregister() error {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if config.Logger.V(logger.InfoLevel) {
 | 
			
		||||
		config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID)
 | 
			
		||||
		config.Logger.Info(n.opts.Context, fmt.Sprintf("deregistering node: %s", service.Nodes[0].ID))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := DefaultDeregisterFunc(service, config); err != nil {
 | 
			
		||||
@@ -343,11 +343,11 @@ func (n *noopServer) Deregister() error {
 | 
			
		||||
			go func(s broker.Subscriber) {
 | 
			
		||||
				defer wg.Done()
 | 
			
		||||
				if config.Logger.V(logger.InfoLevel) {
 | 
			
		||||
					config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic())
 | 
			
		||||
					config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic())
 | 
			
		||||
				}
 | 
			
		||||
				if err := s.Unsubscribe(ncx); err != nil {
 | 
			
		||||
					if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
						config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err)
 | 
			
		||||
						config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}(subs[idx])
 | 
			
		||||
@@ -383,7 +383,7 @@ func (n *noopServer) Start() error {
 | 
			
		||||
	config.Address = addr
 | 
			
		||||
 | 
			
		||||
	if config.Logger.V(logger.InfoLevel) {
 | 
			
		||||
		config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address)
 | 
			
		||||
		config.Logger.Info(n.opts.Context, "server [noop] Listening on "+config.Address)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	n.Lock()
 | 
			
		||||
@@ -397,13 +397,13 @@ func (n *noopServer) Start() error {
 | 
			
		||||
		// connect to the broker
 | 
			
		||||
		if err := config.Broker.Connect(config.Context); err != nil {
 | 
			
		||||
			if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
				config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err)
 | 
			
		||||
				config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
 | 
			
		||||
			}
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if config.Logger.V(logger.InfoLevel) {
 | 
			
		||||
			config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
 | 
			
		||||
			config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -411,13 +411,13 @@ func (n *noopServer) Start() error {
 | 
			
		||||
	// nolint: nestif
 | 
			
		||||
	if err := config.RegisterCheck(config.Context); err != nil {
 | 
			
		||||
		if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
			config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err)
 | 
			
		||||
			config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), err)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		// announce self to the world
 | 
			
		||||
		if err := n.Register(); err != nil {
 | 
			
		||||
			if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
				config.Logger.Errorf(n.opts.Context, "server register error: %v", err)
 | 
			
		||||
				config.Logger.Error(n.opts.Context, "server register error", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@@ -450,23 +450,23 @@ func (n *noopServer) Start() error {
 | 
			
		||||
				// nolint: nestif
 | 
			
		||||
				if rerr != nil && registered {
 | 
			
		||||
					if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
						config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
 | 
			
		||||
						config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
 | 
			
		||||
					}
 | 
			
		||||
					// deregister self in case of error
 | 
			
		||||
					if err := n.Deregister(); err != nil {
 | 
			
		||||
						if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
							config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err)
 | 
			
		||||
							config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s deregister error", config.Name, config.ID), err)
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				} else if rerr != nil && !registered {
 | 
			
		||||
					if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
						config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr)
 | 
			
		||||
						config.Logger.Errorf(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), rerr)
 | 
			
		||||
					}
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				if err := n.Register(); err != nil {
 | 
			
		||||
					if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
						config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err)
 | 
			
		||||
						config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register error", config.Name, config.ID), err)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			// wait for exit
 | 
			
		||||
@@ -478,7 +478,7 @@ func (n *noopServer) Start() error {
 | 
			
		||||
		// deregister self
 | 
			
		||||
		if err := n.Deregister(); err != nil {
 | 
			
		||||
			if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
				config.Logger.Errorf(n.opts.Context, "server deregister error: ", err)
 | 
			
		||||
				config.Logger.Error(n.opts.Context, "server deregister error", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@@ -491,12 +491,12 @@ func (n *noopServer) Start() error {
 | 
			
		||||
		ch <- nil
 | 
			
		||||
 | 
			
		||||
		if config.Logger.V(logger.InfoLevel) {
 | 
			
		||||
			config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
 | 
			
		||||
			config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()))
 | 
			
		||||
		}
 | 
			
		||||
		// disconnect broker
 | 
			
		||||
		if err := config.Broker.Disconnect(config.Context); err != nil {
 | 
			
		||||
			if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
				config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err)
 | 
			
		||||
				config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
@@ -526,20 +526,13 @@ func (n *noopServer) subscribe() error {
 | 
			
		||||
			opts = append(opts, broker.SubscribeGroup(queue))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if sb.Options().Batch {
 | 
			
		||||
			// batch processing handler
 | 
			
		||||
			sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
 | 
			
		||||
		} else {
 | 
			
		||||
			// single processing handler
 | 
			
		||||
		sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if config.Logger.V(logger.InfoLevel) {
 | 
			
		||||
			config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
 | 
			
		||||
			config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		n.subscribers[sb] = []broker.Subscriber{sub}
 | 
			
		||||
@@ -637,127 +630,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//nolint:gocyclo
 | 
			
		||||
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
 | 
			
		||||
	return func(ps broker.Events) (err error) {
 | 
			
		||||
		defer func() {
 | 
			
		||||
			if r := recover(); r != nil {
 | 
			
		||||
				n.RLock()
 | 
			
		||||
				config := n.opts
 | 
			
		||||
				n.RUnlock()
 | 
			
		||||
				if config.Logger.V(logger.ErrorLevel) {
 | 
			
		||||
					config.Logger.Error(n.opts.Context, "panic recovered: ", r)
 | 
			
		||||
					config.Logger.Error(n.opts.Context, string(debug.Stack()))
 | 
			
		||||
				}
 | 
			
		||||
				err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
		msgs := make([]Message, 0, len(ps))
 | 
			
		||||
		ctxs := make([]context.Context, 0, len(ps))
 | 
			
		||||
		for _, p := range ps {
 | 
			
		||||
			msg := p.Message()
 | 
			
		||||
			// if we don't have headers, create empty map
 | 
			
		||||
			if msg.Header == nil {
 | 
			
		||||
				msg.Header = metadata.New(2)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			ct, _ := msg.Header.Get(metadata.HeaderContentType)
 | 
			
		||||
			if len(ct) == 0 {
 | 
			
		||||
				msg.Header.Set(metadata.HeaderContentType, defaultContentType)
 | 
			
		||||
				ct = defaultContentType
 | 
			
		||||
			}
 | 
			
		||||
			hdr := metadata.Copy(msg.Header)
 | 
			
		||||
			topic, _ := msg.Header.Get(metadata.HeaderTopic)
 | 
			
		||||
			ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
 | 
			
		||||
			msgs = append(msgs, &rpcMessage{
 | 
			
		||||
				topic:       topic,
 | 
			
		||||
				contentType: ct,
 | 
			
		||||
				header:      msg.Header,
 | 
			
		||||
				body:        msg.Body,
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
		results := make(chan error, len(sb.handlers))
 | 
			
		||||
 | 
			
		||||
		for i := 0; i < len(sb.handlers); i++ {
 | 
			
		||||
			handler := sb.handlers[i]
 | 
			
		||||
 | 
			
		||||
			var req reflect.Value
 | 
			
		||||
 | 
			
		||||
			switch handler.reqType.Kind() {
 | 
			
		||||
			case reflect.Ptr:
 | 
			
		||||
				req = reflect.New(handler.reqType.Elem())
 | 
			
		||||
			default:
 | 
			
		||||
				req = reflect.New(handler.reqType.Elem()).Elem()
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			reqType := handler.reqType
 | 
			
		||||
			var cf codec.Codec
 | 
			
		||||
			for _, msg := range msgs {
 | 
			
		||||
				cf, err = n.newCodec(msg.ContentType())
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return err
 | 
			
		||||
				}
 | 
			
		||||
				rb := reflect.New(req.Type().Elem())
 | 
			
		||||
				if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
 | 
			
		||||
					return err
 | 
			
		||||
				}
 | 
			
		||||
				msg.(*rpcMessage).codec = cf
 | 
			
		||||
				msg.(*rpcMessage).payload = rb.Interface()
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			fn := func(ctxs []context.Context, ms []Message) error {
 | 
			
		||||
				var vals []reflect.Value
 | 
			
		||||
				if sb.typ.Kind() != reflect.Func {
 | 
			
		||||
					vals = append(vals, sb.rcvr)
 | 
			
		||||
				}
 | 
			
		||||
				if handler.ctxType != nil {
 | 
			
		||||
					vals = append(vals, reflect.ValueOf(ctxs))
 | 
			
		||||
				}
 | 
			
		||||
				payloads := reflect.MakeSlice(reqType, 0, len(ms))
 | 
			
		||||
				for _, m := range ms {
 | 
			
		||||
					payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
 | 
			
		||||
				}
 | 
			
		||||
				vals = append(vals, payloads)
 | 
			
		||||
 | 
			
		||||
				returnValues := handler.method.Call(vals)
 | 
			
		||||
				if rerr := returnValues[0].Interface(); rerr != nil {
 | 
			
		||||
					return rerr.(error)
 | 
			
		||||
				}
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			opts.Hooks.EachNext(func(hook options.Hook) {
 | 
			
		||||
				if h, ok := hook.(HookBatchSubHandler); ok {
 | 
			
		||||
					fn = h(fn)
 | 
			
		||||
				}
 | 
			
		||||
			})
 | 
			
		||||
 | 
			
		||||
			if n.wg != nil {
 | 
			
		||||
				n.wg.Add(1)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			go func() {
 | 
			
		||||
				if n.wg != nil {
 | 
			
		||||
					defer n.wg.Done()
 | 
			
		||||
				}
 | 
			
		||||
				results <- fn(ctxs, msgs)
 | 
			
		||||
			}()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var errors []string
 | 
			
		||||
		for i := 0; i < len(sb.handlers); i++ {
 | 
			
		||||
			if rerr := <-results; rerr != nil {
 | 
			
		||||
				errors = append(errors, rerr.Error())
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if len(errors) > 0 {
 | 
			
		||||
			err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//nolint:gocyclo
 | 
			
		||||
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
 | 
			
		||||
	return func(p broker.Event) (err error) {
 | 
			
		||||
 
 | 
			
		||||
@@ -9,7 +9,6 @@ import (
 | 
			
		||||
	"go.unistack.org/micro/v3/client"
 | 
			
		||||
	"go.unistack.org/micro/v3/codec"
 | 
			
		||||
	"go.unistack.org/micro/v3/logger"
 | 
			
		||||
	"go.unistack.org/micro/v3/metadata"
 | 
			
		||||
	"go.unistack.org/micro/v3/server"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -26,18 +25,6 @@ func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) er
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
 | 
			
		||||
	if len(msgs) != 8 {
 | 
			
		||||
		h.t.Fatal("invalid number of messages received")
 | 
			
		||||
	}
 | 
			
		||||
	for idx := 0; idx < len(msgs); idx++ {
 | 
			
		||||
		md, _ := metadata.FromIncomingContext(ctxs[idx])
 | 
			
		||||
		_ = md
 | 
			
		||||
		//	fmt.Printf("msg md %v\n", md)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestNoopSub(t *testing.T) {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
@@ -76,13 +63,6 @@ func TestNoopSub(t *testing.T) {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
 | 
			
		||||
		server.SubscriberQueue("queue"),
 | 
			
		||||
		server.SubscriberBatch(true),
 | 
			
		||||
	)); err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := s.Start(); err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -341,8 +341,6 @@ type SubscriberOptions struct {
 | 
			
		||||
	AutoAck bool
 | 
			
		||||
	// BodyOnly flag specifies that message without headers
 | 
			
		||||
	BodyOnly bool
 | 
			
		||||
	// Batch flag specifies that message processed in batches
 | 
			
		||||
	Batch bool
 | 
			
		||||
	// BatchSize flag specifies max size of batch
 | 
			
		||||
	BatchSize int
 | 
			
		||||
	// BatchWait flag specifies max wait time for batch filling
 | 
			
		||||
@@ -414,13 +412,6 @@ func SubscriberAck(b bool) SubscriberOption {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SubscriberBatch control batch processing for handler
 | 
			
		||||
func SubscriberBatch(b bool) SubscriberOption {
 | 
			
		||||
	return func(o *SubscriberOptions) {
 | 
			
		||||
		o.Batch = b
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SubscriberBatchSize control batch filling size for handler
 | 
			
		||||
// Batch filling max waiting time controlled by SubscriberBatchWait
 | 
			
		||||
func SubscriberBatchSize(n int) SubscriberOption {
 | 
			
		||||
 
 | 
			
		||||
@@ -63,8 +63,6 @@ type Server interface {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error
 | 
			
		||||
	HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler
 | 
			
		||||
	FuncSubHandler func(ctx context.Context, ms Message) error
 | 
			
		||||
	HookSubHandler func(next FuncSubHandler) FuncSubHandler
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -3,14 +3,12 @@ package server
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"unicode"
 | 
			
		||||
	"unicode/utf8"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	subSig = "func(context.Context, interface{}) error"
 | 
			
		||||
	batchSubSig = "func([]context.Context, []interface{}) error"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Precompute the reflect type for error. Can't use error directly
 | 
			
		||||
@@ -43,23 +41,15 @@ func ValidateSubscriber(sub Subscriber) error {
 | 
			
		||||
		switch typ.NumIn() {
 | 
			
		||||
		case 2:
 | 
			
		||||
			argType = typ.In(1)
 | 
			
		||||
			if sub.Options().Batch {
 | 
			
		||||
				if argType.Kind() != reflect.Slice {
 | 
			
		||||
					return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
 | 
			
		||||
				}
 | 
			
		||||
				if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
 | 
			
		||||
					return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		default:
 | 
			
		||||
			return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
 | 
			
		||||
			return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
 | 
			
		||||
		}
 | 
			
		||||
		if !isExportedOrBuiltinType(argType) {
 | 
			
		||||
			return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
 | 
			
		||||
		}
 | 
			
		||||
		if typ.NumOut() != 1 {
 | 
			
		||||
			return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
 | 
			
		||||
				name, typ.NumOut(), subSig, batchSubSig)
 | 
			
		||||
			return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
 | 
			
		||||
				name, typ.NumOut(), subSig)
 | 
			
		||||
		}
 | 
			
		||||
		if returnType := typ.Out(0); returnType != typeOfError {
 | 
			
		||||
			return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
 | 
			
		||||
@@ -74,8 +64,8 @@ func ValidateSubscriber(sub Subscriber) error {
 | 
			
		||||
			case 3:
 | 
			
		||||
				argType = method.Type.In(2)
 | 
			
		||||
			default:
 | 
			
		||||
				return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
 | 
			
		||||
					name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
 | 
			
		||||
				return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
 | 
			
		||||
					name, method.Name, method.Type.NumIn(), subSig)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if !isExportedOrBuiltinType(argType) {
 | 
			
		||||
@@ -83,8 +73,8 @@ func ValidateSubscriber(sub Subscriber) error {
 | 
			
		||||
			}
 | 
			
		||||
			if method.Type.NumOut() != 1 {
 | 
			
		||||
				return fmt.Errorf(
 | 
			
		||||
					"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
 | 
			
		||||
					name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
 | 
			
		||||
					"subscriber %v.%v has wrong number of return values: %v require signature %s",
 | 
			
		||||
					name, method.Name, method.Type.NumOut(), subSig)
 | 
			
		||||
			}
 | 
			
		||||
			if returnType := method.Type.Out(0); returnType != typeOfError {
 | 
			
		||||
				return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
 | 
			
		||||
 
 | 
			
		||||
@@ -14,20 +14,12 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
 | 
			
		||||
// publication message.
 | 
			
		||||
type SubscriberFunc func(ctx context.Context, msg Message) error
 | 
			
		||||
 | 
			
		||||
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
 | 
			
		||||
// for the wrappers. What's handed to the actual method is the concrete
 | 
			
		||||
// publication message. This func used by batch subscribers
 | 
			
		||||
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
 | 
			
		||||
 | 
			
		||||
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
 | 
			
		||||
type HandlerWrapper func(HandlerFunc) HandlerFunc
 | 
			
		||||
 | 
			
		||||
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
 | 
			
		||||
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
 | 
			
		||||
 | 
			
		||||
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
 | 
			
		||||
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
 | 
			
		||||
 | 
			
		||||
// StreamWrapper wraps a Stream interface and returns the equivalent.
 | 
			
		||||
// Because streams exist for the lifetime of a method invocation this
 | 
			
		||||
// is a convenient way to wrap a Stream as its in use for trace, monitoring,
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user