From 99af727138b23e39d3c47fbc2ab7165eddb56706 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 9 May 2023 18:38:49 +0300 Subject: [PATCH] cleanup message stuf from server Signed-off-by: Vasiliy Tolstov --- handler.go | 4 +- request.go | 33 +------- subscriber.go | 208 -------------------------------------------------- 3 files changed, 3 insertions(+), 242 deletions(-) delete mode 100644 subscriber.go diff --git a/handler.go b/handler.go index a00b637..07698a3 100644 --- a/handler.go +++ b/handler.go @@ -60,7 +60,7 @@ func (h *httpHandler) Options() server.HandlerOptions { return h.opts } -func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) { +func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) { if handler == nil { return nil, fmt.Errorf("invalid handler specified: %v", handler) } @@ -333,7 +333,7 @@ func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, err }, nil } -func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // check for http.HandlerFunc handlers if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil { ph.(http.HandlerFunc)(w, r) diff --git a/request.go b/request.go index 16ac60b..bc73260 100644 --- a/request.go +++ b/request.go @@ -8,10 +8,7 @@ import ( "go.unistack.org/micro/v4/server" ) -var ( - _ server.Request = &rpcRequest{} - _ server.Message = &rpcMessage{} -) +var _ server.Request = &rpcRequest{} type rpcRequest struct { rw io.ReadWriter @@ -25,14 +22,6 @@ type rpcRequest struct { stream bool } -type rpcMessage struct { - payload interface{} - codec codec.Codec - header metadata.Metadata - topic string - contentType string -} - func (r *rpcRequest) ContentType() string { return r.contentType } @@ -72,23 +61,3 @@ func (r *rpcRequest) Stream() bool { func (r *rpcRequest) Body() interface{} { return r.payload } - -func (r *rpcMessage) ContentType() string { - return r.contentType -} - -func (r *rpcMessage) Topic() string { - return r.topic -} - -func (r *rpcMessage) Body() interface{} { - return r.payload -} - -func (r *rpcMessage) Header() metadata.Metadata { - return r.header -} - -func (r *rpcMessage) Codec() codec.Codec { - return r.codec -} diff --git a/subscriber.go b/subscriber.go deleted file mode 100644 index 9889c9d..0000000 --- a/subscriber.go +++ /dev/null @@ -1,208 +0,0 @@ -package http - -import ( - "bytes" - "context" - "fmt" - "reflect" - "strings" - - "go.unistack.org/micro/v4/broker" - "go.unistack.org/micro/v4/codec" - "go.unistack.org/micro/v4/metadata" - "go.unistack.org/micro/v4/register" - "go.unistack.org/micro/v4/server" -) - -var typeOfError = reflect.TypeOf((*error)(nil)).Elem() - -type handler struct { - reqType reflect.Type - ctxType reflect.Type - method reflect.Value -} - -type httpSubscriber struct { - topic string - rcvr reflect.Value - typ reflect.Type - subscriber interface{} - handlers []*handler - endpoints []*register.Endpoint - opts server.SubscriberOptions -} - -func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { - options := server.NewSubscriberOptions(opts...) - - var endpoints []*register.Endpoint - var handlers []*handler - - if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { - h := &handler{ - method: reflect.ValueOf(sub), - } - - switch typ.NumIn() { - case 1: - h.reqType = typ.In(0) - case 2: - h.ctxType = typ.In(0) - h.reqType = typ.In(1) - } - - handlers = append(handlers, h) - ep := ®ister.Endpoint{ - Name: "Func", - Request: register.ExtractSubValue(typ), - Metadata: metadata.New(2), - } - ep.Metadata.Set("topic", topic) - ep.Metadata.Set("subscriber", "true") - endpoints = append(endpoints, ep) - } else { - hdlr := reflect.ValueOf(sub) - name := reflect.Indirect(hdlr).Type().Name() - - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - h := &handler{ - method: method.Func, - } - - switch method.Type.NumIn() { - case 2: - h.reqType = method.Type.In(1) - case 3: - h.ctxType = method.Type.In(1) - h.reqType = method.Type.In(2) - } - - handlers = append(handlers, h) - ep := ®ister.Endpoint{ - Name: name + "." + method.Name, - Request: register.ExtractSubValue(method.Type), - Metadata: metadata.New(2), - } - ep.Metadata.Set("topic", topic) - ep.Metadata.Set("subscriber", "true") - endpoints = append(endpoints, ep) - } - } - - return &httpSubscriber{ - rcvr: reflect.ValueOf(sub), - typ: reflect.TypeOf(sub), - topic: topic, - subscriber: sub, - handlers: handlers, - endpoints: endpoints, - opts: options, - } -} - -func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler { - return func(p broker.Event) error { - msg := p.Message() - ct := msg.Header["Content-Type"] - cf, err := s.newCodec(ct) - if err != nil { - return err - } - - hdr := metadata.Copy(msg.Header) - delete(hdr, "Content-Type") - ctx := metadata.NewIncomingContext(context.Background(), hdr) - - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var isVal bool - var req reflect.Value - - if handler.reqType.Kind() == reflect.Ptr { - req = reflect.New(handler.reqType.Elem()) - } else { - req = reflect.New(handler.reqType) - isVal = true - } - if isVal { - req = req.Elem() - } - - buf := bytes.NewBuffer(msg.Body) - - if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil { - return err - } - - if err := cf.ReadBody(buf, req.Interface()); err != nil { - return err - } - - fn := func(ctx context.Context, msg server.Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctx)) - } - - vals = append(vals, reflect.ValueOf(msg.Body())) - - returnValues := handler.method.Call(vals) - if err := returnValues[0].Interface(); err != nil { - return err.(error) - } - return nil - } - - for i := len(opts.SubWrappers); i > 0; i-- { - fn = opts.SubWrappers[i-1](fn) - } - - go func() { - results <- fn(ctx, &httpMessage{ - topic: sb.topic, - contentType: ct, - payload: req.Interface(), - header: msg.Header, - codec: cf, - }) - }() - } - - var errors []string - - for i := 0; i < len(sb.handlers); i++ { - if err := <-results; err != nil { - errors = append(errors, err.Error()) - } - } - - if len(errors) > 0 { - return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - - return nil - } -} - -func (s *httpSubscriber) Topic() string { - return s.topic -} - -func (s *httpSubscriber) Subscriber() interface{} { - return s.subscriber -} - -func (s *httpSubscriber) Endpoints() []*register.Endpoint { - return s.endpoints -} - -func (s *httpSubscriber) Options() server.SubscriberOptions { - return s.opts -} -- 2.40.1