server/noop: cleanup #342
							
								
								
									
										162
									
								
								server/noop.go
									
									
									
									
									
								
							
							
						
						
									
										162
									
								
								server/noop.go
									
									
									
									
									
								
							@@ -274,7 +274,7 @@ func (n *noopServer) Register() error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	if !registered {
 | 
						if !registered {
 | 
				
			||||||
		if config.Logger.V(logger.InfoLevel) {
 | 
							if config.Logger.V(logger.InfoLevel) {
 | 
				
			||||||
			config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
 | 
								config.Logger.Info(n.opts.Context, fmt.Sprintf("register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -312,7 +312,7 @@ func (n *noopServer) Deregister() error {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if config.Logger.V(logger.InfoLevel) {
 | 
						if config.Logger.V(logger.InfoLevel) {
 | 
				
			||||||
		config.Logger.Infof(n.opts.Context, "deregistering node: %s", service.Nodes[0].ID)
 | 
							config.Logger.Info(n.opts.Context, fmt.Sprintf("deregistering node: %s", service.Nodes[0].ID))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err := DefaultDeregisterFunc(service, config); err != nil {
 | 
						if err := DefaultDeregisterFunc(service, config); err != nil {
 | 
				
			||||||
@@ -343,11 +343,11 @@ func (n *noopServer) Deregister() error {
 | 
				
			|||||||
			go func(s broker.Subscriber) {
 | 
								go func(s broker.Subscriber) {
 | 
				
			||||||
				defer wg.Done()
 | 
									defer wg.Done()
 | 
				
			||||||
				if config.Logger.V(logger.InfoLevel) {
 | 
									if config.Logger.V(logger.InfoLevel) {
 | 
				
			||||||
					config.Logger.Infof(n.opts.Context, "unsubscribing from topic: %s", s.Topic())
 | 
										config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic())
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				if err := s.Unsubscribe(ncx); err != nil {
 | 
									if err := s.Unsubscribe(ncx); err != nil {
 | 
				
			||||||
					if config.Logger.V(logger.ErrorLevel) {
 | 
										if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
						config.Logger.Errorf(n.opts.Context, "unsubscribing from topic: %s err: %v", s.Topic(), err)
 | 
											config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}(subs[idx])
 | 
								}(subs[idx])
 | 
				
			||||||
@@ -383,7 +383,7 @@ func (n *noopServer) Start() error {
 | 
				
			|||||||
	config.Address = addr
 | 
						config.Address = addr
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if config.Logger.V(logger.InfoLevel) {
 | 
						if config.Logger.V(logger.InfoLevel) {
 | 
				
			||||||
		config.Logger.Infof(n.opts.Context, "server [noop] Listening on %s", config.Address)
 | 
							config.Logger.Info(n.opts.Context, "server [noop] Listening on "+config.Address)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	n.Lock()
 | 
						n.Lock()
 | 
				
			||||||
@@ -397,13 +397,13 @@ func (n *noopServer) Start() error {
 | 
				
			|||||||
		// connect to the broker
 | 
							// connect to the broker
 | 
				
			||||||
		if err := config.Broker.Connect(config.Context); err != nil {
 | 
							if err := config.Broker.Connect(config.Context); err != nil {
 | 
				
			||||||
			if config.Logger.V(logger.ErrorLevel) {
 | 
								if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
				config.Logger.Errorf(n.opts.Context, "broker [%s] connect error: %v", config.Broker.String(), err)
 | 
									config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if config.Logger.V(logger.InfoLevel) {
 | 
							if config.Logger.V(logger.InfoLevel) {
 | 
				
			||||||
			config.Logger.Infof(n.opts.Context, "broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
 | 
								config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -411,13 +411,13 @@ func (n *noopServer) Start() error {
 | 
				
			|||||||
	// nolint: nestif
 | 
						// nolint: nestif
 | 
				
			||||||
	if err := config.RegisterCheck(config.Context); err != nil {
 | 
						if err := config.RegisterCheck(config.Context); err != nil {
 | 
				
			||||||
		if config.Logger.V(logger.ErrorLevel) {
 | 
							if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
			config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, err)
 | 
								config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		// announce self to the world
 | 
							// announce self to the world
 | 
				
			||||||
		if err := n.Register(); err != nil {
 | 
							if err := n.Register(); err != nil {
 | 
				
			||||||
			if config.Logger.V(logger.ErrorLevel) {
 | 
								if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
				config.Logger.Errorf(n.opts.Context, "server register error: %v", err)
 | 
									config.Logger.Error(n.opts.Context, "server register error", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -450,23 +450,23 @@ func (n *noopServer) Start() error {
 | 
				
			|||||||
				// nolint: nestif
 | 
									// nolint: nestif
 | 
				
			||||||
				if rerr != nil && registered {
 | 
									if rerr != nil && registered {
 | 
				
			||||||
					if config.Logger.V(logger.ErrorLevel) {
 | 
										if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
						config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
 | 
											config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					// deregister self in case of error
 | 
										// deregister self in case of error
 | 
				
			||||||
					if err := n.Deregister(); err != nil {
 | 
										if err := n.Deregister(); err != nil {
 | 
				
			||||||
						if config.Logger.V(logger.ErrorLevel) {
 | 
											if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
							config.Logger.Errorf(n.opts.Context, "server %s-%s deregister error: %s", config.Name, config.ID, err)
 | 
												config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s deregister error", config.Name, config.ID), err)
 | 
				
			||||||
						}
 | 
											}
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				} else if rerr != nil && !registered {
 | 
									} else if rerr != nil && !registered {
 | 
				
			||||||
					if config.Logger.V(logger.ErrorLevel) {
 | 
										if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
						config.Logger.Errorf(n.opts.Context, "server %s-%s register check error: %s", config.Name, config.ID, rerr)
 | 
											config.Logger.Errorf(n.opts.Context, fmt.Sprintf("server %s-%s register check error", config.Name, config.ID), rerr)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					continue
 | 
										continue
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				if err := n.Register(); err != nil {
 | 
									if err := n.Register(); err != nil {
 | 
				
			||||||
					if config.Logger.V(logger.ErrorLevel) {
 | 
										if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
						config.Logger.Errorf(n.opts.Context, "server %s-%s register error: %s", config.Name, config.ID, err)
 | 
											config.Logger.Error(n.opts.Context, fmt.Sprintf("server %s-%s register error", config.Name, config.ID), err)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			// wait for exit
 | 
								// wait for exit
 | 
				
			||||||
@@ -478,7 +478,7 @@ func (n *noopServer) Start() error {
 | 
				
			|||||||
		// deregister self
 | 
							// deregister self
 | 
				
			||||||
		if err := n.Deregister(); err != nil {
 | 
							if err := n.Deregister(); err != nil {
 | 
				
			||||||
			if config.Logger.V(logger.ErrorLevel) {
 | 
								if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
				config.Logger.Errorf(n.opts.Context, "server deregister error: ", err)
 | 
									config.Logger.Error(n.opts.Context, "server deregister error", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -491,12 +491,12 @@ func (n *noopServer) Start() error {
 | 
				
			|||||||
		ch <- nil
 | 
							ch <- nil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if config.Logger.V(logger.InfoLevel) {
 | 
							if config.Logger.V(logger.InfoLevel) {
 | 
				
			||||||
			config.Logger.Infof(n.opts.Context, "broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
 | 
								config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()))
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		// disconnect broker
 | 
							// disconnect broker
 | 
				
			||||||
		if err := config.Broker.Disconnect(config.Context); err != nil {
 | 
							if err := config.Broker.Disconnect(config.Context); err != nil {
 | 
				
			||||||
			if config.Logger.V(logger.ErrorLevel) {
 | 
								if config.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
				config.Logger.Errorf(n.opts.Context, "broker [%s] disconnect error: %v", config.Broker.String(), err)
 | 
									config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
@@ -526,20 +526,13 @@ func (n *noopServer) subscribe() error {
 | 
				
			|||||||
			opts = append(opts, broker.SubscribeGroup(queue))
 | 
								opts = append(opts, broker.SubscribeGroup(queue))
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if sb.Options().Batch {
 | 
					 | 
				
			||||||
			// batch processing handler
 | 
					 | 
				
			||||||
			sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			// single processing handler
 | 
					 | 
				
			||||||
		sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
 | 
							sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if config.Logger.V(logger.InfoLevel) {
 | 
							if config.Logger.V(logger.InfoLevel) {
 | 
				
			||||||
			config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
 | 
								config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		n.subscribers[sb] = []broker.Subscriber{sub}
 | 
							n.subscribers[sb] = []broker.Subscriber{sub}
 | 
				
			||||||
@@ -637,127 +630,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
//nolint:gocyclo
 | 
					 | 
				
			||||||
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
 | 
					 | 
				
			||||||
	return func(ps broker.Events) (err error) {
 | 
					 | 
				
			||||||
		defer func() {
 | 
					 | 
				
			||||||
			if r := recover(); r != nil {
 | 
					 | 
				
			||||||
				n.RLock()
 | 
					 | 
				
			||||||
				config := n.opts
 | 
					 | 
				
			||||||
				n.RUnlock()
 | 
					 | 
				
			||||||
				if config.Logger.V(logger.ErrorLevel) {
 | 
					 | 
				
			||||||
					config.Logger.Error(n.opts.Context, "panic recovered: ", r)
 | 
					 | 
				
			||||||
					config.Logger.Error(n.opts.Context, string(debug.Stack()))
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		msgs := make([]Message, 0, len(ps))
 | 
					 | 
				
			||||||
		ctxs := make([]context.Context, 0, len(ps))
 | 
					 | 
				
			||||||
		for _, p := range ps {
 | 
					 | 
				
			||||||
			msg := p.Message()
 | 
					 | 
				
			||||||
			// if we don't have headers, create empty map
 | 
					 | 
				
			||||||
			if msg.Header == nil {
 | 
					 | 
				
			||||||
				msg.Header = metadata.New(2)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			ct, _ := msg.Header.Get(metadata.HeaderContentType)
 | 
					 | 
				
			||||||
			if len(ct) == 0 {
 | 
					 | 
				
			||||||
				msg.Header.Set(metadata.HeaderContentType, defaultContentType)
 | 
					 | 
				
			||||||
				ct = defaultContentType
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			hdr := metadata.Copy(msg.Header)
 | 
					 | 
				
			||||||
			topic, _ := msg.Header.Get(metadata.HeaderTopic)
 | 
					 | 
				
			||||||
			ctxs = append(ctxs, metadata.NewIncomingContext(sb.opts.Context, hdr))
 | 
					 | 
				
			||||||
			msgs = append(msgs, &rpcMessage{
 | 
					 | 
				
			||||||
				topic:       topic,
 | 
					 | 
				
			||||||
				contentType: ct,
 | 
					 | 
				
			||||||
				header:      msg.Header,
 | 
					 | 
				
			||||||
				body:        msg.Body,
 | 
					 | 
				
			||||||
			})
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		results := make(chan error, len(sb.handlers))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		for i := 0; i < len(sb.handlers); i++ {
 | 
					 | 
				
			||||||
			handler := sb.handlers[i]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			var req reflect.Value
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			switch handler.reqType.Kind() {
 | 
					 | 
				
			||||||
			case reflect.Ptr:
 | 
					 | 
				
			||||||
				req = reflect.New(handler.reqType.Elem())
 | 
					 | 
				
			||||||
			default:
 | 
					 | 
				
			||||||
				req = reflect.New(handler.reqType.Elem()).Elem()
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			reqType := handler.reqType
 | 
					 | 
				
			||||||
			var cf codec.Codec
 | 
					 | 
				
			||||||
			for _, msg := range msgs {
 | 
					 | 
				
			||||||
				cf, err = n.newCodec(msg.ContentType())
 | 
					 | 
				
			||||||
				if err != nil {
 | 
					 | 
				
			||||||
					return err
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				rb := reflect.New(req.Type().Elem())
 | 
					 | 
				
			||||||
				if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
 | 
					 | 
				
			||||||
					return err
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				msg.(*rpcMessage).codec = cf
 | 
					 | 
				
			||||||
				msg.(*rpcMessage).payload = rb.Interface()
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			fn := func(ctxs []context.Context, ms []Message) error {
 | 
					 | 
				
			||||||
				var vals []reflect.Value
 | 
					 | 
				
			||||||
				if sb.typ.Kind() != reflect.Func {
 | 
					 | 
				
			||||||
					vals = append(vals, sb.rcvr)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				if handler.ctxType != nil {
 | 
					 | 
				
			||||||
					vals = append(vals, reflect.ValueOf(ctxs))
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				payloads := reflect.MakeSlice(reqType, 0, len(ms))
 | 
					 | 
				
			||||||
				for _, m := range ms {
 | 
					 | 
				
			||||||
					payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				vals = append(vals, payloads)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				returnValues := handler.method.Call(vals)
 | 
					 | 
				
			||||||
				if rerr := returnValues[0].Interface(); rerr != nil {
 | 
					 | 
				
			||||||
					return rerr.(error)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				return nil
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			opts.Hooks.EachNext(func(hook options.Hook) {
 | 
					 | 
				
			||||||
				if h, ok := hook.(HookBatchSubHandler); ok {
 | 
					 | 
				
			||||||
					fn = h(fn)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			})
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			if n.wg != nil {
 | 
					 | 
				
			||||||
				n.wg.Add(1)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			go func() {
 | 
					 | 
				
			||||||
				if n.wg != nil {
 | 
					 | 
				
			||||||
					defer n.wg.Done()
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				results <- fn(ctxs, msgs)
 | 
					 | 
				
			||||||
			}()
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		var errors []string
 | 
					 | 
				
			||||||
		for i := 0; i < len(sb.handlers); i++ {
 | 
					 | 
				
			||||||
			if rerr := <-results; rerr != nil {
 | 
					 | 
				
			||||||
				errors = append(errors, rerr.Error())
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if len(errors) > 0 {
 | 
					 | 
				
			||||||
			err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
//nolint:gocyclo
 | 
					//nolint:gocyclo
 | 
				
			||||||
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
 | 
					func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
 | 
				
			||||||
	return func(p broker.Event) (err error) {
 | 
						return func(p broker.Event) (err error) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -9,7 +9,6 @@ import (
 | 
				
			|||||||
	"go.unistack.org/micro/v3/client"
 | 
						"go.unistack.org/micro/v3/client"
 | 
				
			||||||
	"go.unistack.org/micro/v3/codec"
 | 
						"go.unistack.org/micro/v3/codec"
 | 
				
			||||||
	"go.unistack.org/micro/v3/logger"
 | 
						"go.unistack.org/micro/v3/logger"
 | 
				
			||||||
	"go.unistack.org/micro/v3/metadata"
 | 
					 | 
				
			||||||
	"go.unistack.org/micro/v3/server"
 | 
						"go.unistack.org/micro/v3/server"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -26,18 +25,6 @@ func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) er
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (h *TestHandler) BatchSubHandler(ctxs []context.Context, msgs []*codec.Frame) error {
 | 
					 | 
				
			||||||
	if len(msgs) != 8 {
 | 
					 | 
				
			||||||
		h.t.Fatal("invalid number of messages received")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for idx := 0; idx < len(msgs); idx++ {
 | 
					 | 
				
			||||||
		md, _ := metadata.FromIncomingContext(ctxs[idx])
 | 
					 | 
				
			||||||
		_ = md
 | 
					 | 
				
			||||||
		//	fmt.Printf("msg md %v\n", md)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestNoopSub(t *testing.T) {
 | 
					func TestNoopSub(t *testing.T) {
 | 
				
			||||||
	ctx := context.Background()
 | 
						ctx := context.Background()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -76,13 +63,6 @@ func TestNoopSub(t *testing.T) {
 | 
				
			|||||||
		t.Fatal(err)
 | 
							t.Fatal(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err := s.Subscribe(s.NewSubscriber("batch_topic", h.BatchSubHandler,
 | 
					 | 
				
			||||||
		server.SubscriberQueue("queue"),
 | 
					 | 
				
			||||||
		server.SubscriberBatch(true),
 | 
					 | 
				
			||||||
	)); err != nil {
 | 
					 | 
				
			||||||
		t.Fatal(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err := s.Start(); err != nil {
 | 
						if err := s.Start(); err != nil {
 | 
				
			||||||
		t.Fatal(err)
 | 
							t.Fatal(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -341,8 +341,6 @@ type SubscriberOptions struct {
 | 
				
			|||||||
	AutoAck bool
 | 
						AutoAck bool
 | 
				
			||||||
	// BodyOnly flag specifies that message without headers
 | 
						// BodyOnly flag specifies that message without headers
 | 
				
			||||||
	BodyOnly bool
 | 
						BodyOnly bool
 | 
				
			||||||
	// Batch flag specifies that message processed in batches
 | 
					 | 
				
			||||||
	Batch bool
 | 
					 | 
				
			||||||
	// BatchSize flag specifies max size of batch
 | 
						// BatchSize flag specifies max size of batch
 | 
				
			||||||
	BatchSize int
 | 
						BatchSize int
 | 
				
			||||||
	// BatchWait flag specifies max wait time for batch filling
 | 
						// BatchWait flag specifies max wait time for batch filling
 | 
				
			||||||
@@ -414,13 +412,6 @@ func SubscriberAck(b bool) SubscriberOption {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// SubscriberBatch control batch processing for handler
 | 
					 | 
				
			||||||
func SubscriberBatch(b bool) SubscriberOption {
 | 
					 | 
				
			||||||
	return func(o *SubscriberOptions) {
 | 
					 | 
				
			||||||
		o.Batch = b
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// SubscriberBatchSize control batch filling size for handler
 | 
					// SubscriberBatchSize control batch filling size for handler
 | 
				
			||||||
// Batch filling max waiting time controlled by SubscriberBatchWait
 | 
					// Batch filling max waiting time controlled by SubscriberBatchWait
 | 
				
			||||||
func SubscriberBatchSize(n int) SubscriberOption {
 | 
					func SubscriberBatchSize(n int) SubscriberOption {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -63,8 +63,6 @@ type Server interface {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type (
 | 
					type (
 | 
				
			||||||
	FuncBatchSubHandler func(ctxs []context.Context, ms []Message) error
 | 
					 | 
				
			||||||
	HookBatchSubHandler func(next FuncBatchSubHandler) FuncBatchSubHandler
 | 
					 | 
				
			||||||
	FuncSubHandler func(ctx context.Context, ms Message) error
 | 
						FuncSubHandler func(ctx context.Context, ms Message) error
 | 
				
			||||||
	HookSubHandler func(next FuncSubHandler) FuncSubHandler
 | 
						HookSubHandler func(next FuncSubHandler) FuncSubHandler
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -3,14 +3,12 @@ package server
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"reflect"
 | 
						"reflect"
 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
	"unicode"
 | 
						"unicode"
 | 
				
			||||||
	"unicode/utf8"
 | 
						"unicode/utf8"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	subSig = "func(context.Context, interface{}) error"
 | 
						subSig = "func(context.Context, interface{}) error"
 | 
				
			||||||
	batchSubSig = "func([]context.Context, []interface{}) error"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Precompute the reflect type for error. Can't use error directly
 | 
					// Precompute the reflect type for error. Can't use error directly
 | 
				
			||||||
@@ -43,23 +41,15 @@ func ValidateSubscriber(sub Subscriber) error {
 | 
				
			|||||||
		switch typ.NumIn() {
 | 
							switch typ.NumIn() {
 | 
				
			||||||
		case 2:
 | 
							case 2:
 | 
				
			||||||
			argType = typ.In(1)
 | 
								argType = typ.In(1)
 | 
				
			||||||
			if sub.Options().Batch {
 | 
					 | 
				
			||||||
				if argType.Kind() != reflect.Slice {
 | 
					 | 
				
			||||||
					return fmt.Errorf("subscriber %v dont have required signature %s", name, batchSubSig)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				if strings.Compare(fmt.Sprintf("%v", argType), "[]interface{}") == 0 {
 | 
					 | 
				
			||||||
					return fmt.Errorf("subscriber %v dont have required signaure %s", name, batchSubSig)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		default:
 | 
							default:
 | 
				
			||||||
			return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s or %s", name, typ.NumIn(), subSig, batchSubSig)
 | 
								return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if !isExportedOrBuiltinType(argType) {
 | 
							if !isExportedOrBuiltinType(argType) {
 | 
				
			||||||
			return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
 | 
								return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if typ.NumOut() != 1 {
 | 
							if typ.NumOut() != 1 {
 | 
				
			||||||
			return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s or %s",
 | 
								return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
 | 
				
			||||||
				name, typ.NumOut(), subSig, batchSubSig)
 | 
									name, typ.NumOut(), subSig)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if returnType := typ.Out(0); returnType != typeOfError {
 | 
							if returnType := typ.Out(0); returnType != typeOfError {
 | 
				
			||||||
			return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
 | 
								return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
 | 
				
			||||||
@@ -74,8 +64,8 @@ func ValidateSubscriber(sub Subscriber) error {
 | 
				
			|||||||
			case 3:
 | 
								case 3:
 | 
				
			||||||
				argType = method.Type.In(2)
 | 
									argType = method.Type.In(2)
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
				return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s or %s",
 | 
									return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
 | 
				
			||||||
					name, method.Name, method.Type.NumIn(), subSig, batchSubSig)
 | 
										name, method.Name, method.Type.NumIn(), subSig)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if !isExportedOrBuiltinType(argType) {
 | 
								if !isExportedOrBuiltinType(argType) {
 | 
				
			||||||
@@ -83,8 +73,8 @@ func ValidateSubscriber(sub Subscriber) error {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			if method.Type.NumOut() != 1 {
 | 
								if method.Type.NumOut() != 1 {
 | 
				
			||||||
				return fmt.Errorf(
 | 
									return fmt.Errorf(
 | 
				
			||||||
					"subscriber %v.%v has wrong number of return values: %v require signature %s or %s",
 | 
										"subscriber %v.%v has wrong number of return values: %v require signature %s",
 | 
				
			||||||
					name, method.Name, method.Type.NumOut(), subSig, batchSubSig)
 | 
										name, method.Name, method.Type.NumOut(), subSig)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			if returnType := method.Type.Out(0); returnType != typeOfError {
 | 
								if returnType := method.Type.Out(0); returnType != typeOfError {
 | 
				
			||||||
				return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
 | 
									return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -14,20 +14,12 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
 | 
				
			|||||||
// publication message.
 | 
					// publication message.
 | 
				
			||||||
type SubscriberFunc func(ctx context.Context, msg Message) error
 | 
					type SubscriberFunc func(ctx context.Context, msg Message) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// BatchSubscriberFunc represents a single method of a subscriber. It's used primarily
 | 
					 | 
				
			||||||
// for the wrappers. What's handed to the actual method is the concrete
 | 
					 | 
				
			||||||
// publication message. This func used by batch subscribers
 | 
					 | 
				
			||||||
type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
 | 
					// HandlerWrapper wraps the HandlerFunc and returns the equivalent
 | 
				
			||||||
type HandlerWrapper func(HandlerFunc) HandlerFunc
 | 
					type HandlerWrapper func(HandlerFunc) HandlerFunc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
 | 
					// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
 | 
				
			||||||
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
 | 
					type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
 | 
					 | 
				
			||||||
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// StreamWrapper wraps a Stream interface and returns the equivalent.
 | 
					// StreamWrapper wraps a Stream interface and returns the equivalent.
 | 
				
			||||||
// Because streams exist for the lifetime of a method invocation this
 | 
					// Because streams exist for the lifetime of a method invocation this
 | 
				
			||||||
// is a convenient way to wrap a Stream as its in use for trace, monitoring,
 | 
					// is a convenient way to wrap a Stream as its in use for trace, monitoring,
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user