move extractor to micro

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-10-09 16:26:35 +03:00
parent 0d3d13b7ab
commit 4209736cb8
7 changed files with 11 additions and 248 deletions

View File

@@ -61,7 +61,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
endpoints = append(endpoints, &registry.Endpoint{
Name: "Func",
Request: extractSubValue(typ),
Request: registry.ExtractSubValue(typ),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
@@ -89,7 +89,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
endpoints = append(endpoints, &registry.Endpoint{
Name: name + "." + method.Name,
Request: extractSubValue(method.Type),
Request: registry.ExtractSubValue(method.Type),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
@@ -109,60 +109,6 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
}
}
func validateSubscriber(sub server.Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type
if typ.Kind() == reflect.Func {
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of outs: %v require signature %s",
name, typ.NumOut(), subSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
} else {
hdlr := reflect.ValueOf(sub.Subscriber())
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of outs: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {