package grpc import ( "context" "fmt" "reflect" "strings" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/server" ) var _ server.Message = &rpcMessage{} type rpcMessage struct { payload interface{} codec codec.Codec header metadata.Metadata topic string contentType string } func (r *rpcMessage) ContentType() string { return r.contentType } func (r *rpcMessage) Topic() string { return r.topic } func (r *rpcMessage) Body() interface{} { return r.payload } func (r *rpcMessage) Header() metadata.Metadata { return r.header } func (r *rpcMessage) Codec() codec.Codec { return r.codec } type handler struct { reqType reflect.Type ctxType reflect.Type method reflect.Value } type subscriber struct { topic string rcvr reflect.Value typ reflect.Type subscriber interface{} handlers []*handler endpoints []*register.Endpoint opts server.SubscriberOptions } func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { options := server.NewSubscriberOptions(opts...) var endpoints []*register.Endpoint var handlers []*handler if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { h := &handler{ method: reflect.ValueOf(sub), } switch typ.NumIn() { case 1: h.reqType = typ.In(0) case 2: h.ctxType = typ.In(0) h.reqType = typ.In(1) } handlers = append(handlers, h) endpoints = append(endpoints, ®ister.Endpoint{ Name: "Func", Request: register.ExtractSubValue(typ), Metadata: map[string]string{ "topic": topic, "subscriber": "true", }, }) } else { hdlr := reflect.ValueOf(sub) name := reflect.Indirect(hdlr).Type().Name() for m := 0; m < typ.NumMethod(); m++ { method := typ.Method(m) h := &handler{ method: method.Func, } switch method.Type.NumIn() { case 2: h.reqType = method.Type.In(1) case 3: h.ctxType = method.Type.In(1) h.reqType = method.Type.In(2) } handlers = append(handlers, h) endpoints = append(endpoints, ®ister.Endpoint{ Name: name + "." + method.Name, Request: register.ExtractSubValue(method.Type), Metadata: map[string]string{ "topic": topic, "subscriber": "true", }, }) } } return &subscriber{ rcvr: reflect.ValueOf(sub), typ: reflect.TypeOf(sub), topic: topic, subscriber: sub, handlers: handlers, endpoints: endpoints, opts: options, } } func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { return func(p broker.Event) (err error) { msg := p.Message() // if we don't have headers, create empty map if msg.Header == nil { msg.Header = make(map[string]string) } ct := msg.Header["Content-Type"] if len(ct) == 0 { msg.Header["Content-Type"] = DefaultContentType ct = DefaultContentType } cf, err := g.newCodec(ct) if err != nil { return err } hdr := make(map[string]string, len(msg.Header)) for k, v := range msg.Header { hdr[k] = v } ctx := metadata.NewIncomingContext(sb.opts.Context, hdr) results := make(chan error, len(sb.handlers)) for i := 0; i < len(sb.handlers); i++ { handler := sb.handlers[i] var isVal bool var req reflect.Value if handler.reqType.Kind() == reflect.Ptr { req = reflect.New(handler.reqType.Elem()) } else { req = reflect.New(handler.reqType) isVal = true } if isVal { req = req.Elem() } if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil { return err } fn := func(ctx context.Context, msg server.Message) error { var vals []reflect.Value if sb.typ.Kind() != reflect.Func { vals = append(vals, sb.rcvr) } if handler.ctxType != nil { vals = append(vals, reflect.ValueOf(ctx)) } vals = append(vals, reflect.ValueOf(msg.Body())) returnValues := handler.method.Call(vals) if rerr := returnValues[0].Interface(); rerr != nil { return rerr.(error) } return nil } opts.Hooks.EachNext(func(hook options.Hook) { if h, ok := hook.(server.HookSubHandler); ok { fn = h(fn) } }) if g.wg != nil { g.wg.Add(1) } go func() { if g.wg != nil { defer g.wg.Done() } cerr := fn(ctx, &rpcMessage{ topic: sb.topic, contentType: ct, payload: req.Interface(), header: msg.Header, }) results <- cerr }() } var errors []string for i := 0; i < len(sb.handlers); i++ { if rerr := <-results; rerr != nil { errors = append(errors, rerr.Error()) } } if len(errors) > 0 { err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) } return err } } func (s *subscriber) Topic() string { return s.topic } func (s *subscriber) Subscriber() interface{} { return s.subscriber } func (s *subscriber) Endpoints() []*register.Endpoint { return s.endpoints } func (s *subscriber) Options() server.SubscriberOptions { return s.opts } func (g *Server) subscribe() error { config := g.opts subCtx := config.Context for sb := range g.subscribers { if cx := sb.Options().Context; cx != nil { subCtx = cx } opts := []broker.SubscribeOption{ broker.SubscribeContext(subCtx), broker.SubscribeAutoAck(sb.Options().AutoAck), broker.SubscribeBodyOnly(sb.Options().BodyOnly), } if queue := sb.Options().Queue; len(queue) > 0 { opts = append(opts, broker.SubscribeGroup(queue)) } if config.Logger.V(logger.InfoLevel) { config.Logger.Info(config.Context, "subscribing to topic: "+sb.Topic()) } sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), g.createSubHandler(sb, config), opts...) if err != nil { return err } g.subscribers[sb] = []broker.Subscriber{sub} } return nil }