cleanup message stuf from server
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -60,7 +60,7 @@ func (h *httpHandler) Options() server.HandlerOptions { | |||||||
| 	return h.opts | 	return h.opts | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) { | func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) { | ||||||
| 	if handler == nil { | 	if handler == nil { | ||||||
| 		return nil, fmt.Errorf("invalid handler specified: %v", handler) | 		return nil, fmt.Errorf("invalid handler specified: %v", handler) | ||||||
| 	} | 	} | ||||||
| @@ -333,7 +333,7 @@ func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, err | |||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { | func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||||
| 	// check for http.HandlerFunc handlers | 	// check for http.HandlerFunc handlers | ||||||
| 	if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil { | 	if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil { | ||||||
| 		ph.(http.HandlerFunc)(w, r) | 		ph.(http.HandlerFunc)(w, r) | ||||||
|   | |||||||
							
								
								
									
										33
									
								
								request.go
									
									
									
									
									
								
							
							
						
						
									
										33
									
								
								request.go
									
									
									
									
									
								
							| @@ -8,10 +8,7 @@ import ( | |||||||
| 	"go.unistack.org/micro/v4/server" | 	"go.unistack.org/micro/v4/server" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var _ server.Request = &rpcRequest{} | ||||||
| 	_ server.Request = &rpcRequest{} |  | ||||||
| 	_ server.Message = &rpcMessage{} |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| type rpcRequest struct { | type rpcRequest struct { | ||||||
| 	rw          io.ReadWriter | 	rw          io.ReadWriter | ||||||
| @@ -25,14 +22,6 @@ type rpcRequest struct { | |||||||
| 	stream      bool | 	stream      bool | ||||||
| } | } | ||||||
|  |  | ||||||
| type rpcMessage struct { |  | ||||||
| 	payload     interface{} |  | ||||||
| 	codec       codec.Codec |  | ||||||
| 	header      metadata.Metadata |  | ||||||
| 	topic       string |  | ||||||
| 	contentType string |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (r *rpcRequest) ContentType() string { | func (r *rpcRequest) ContentType() string { | ||||||
| 	return r.contentType | 	return r.contentType | ||||||
| } | } | ||||||
| @@ -72,23 +61,3 @@ func (r *rpcRequest) Stream() bool { | |||||||
| func (r *rpcRequest) Body() interface{} { | func (r *rpcRequest) Body() interface{} { | ||||||
| 	return r.payload | 	return r.payload | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcMessage) ContentType() string { |  | ||||||
| 	return r.contentType |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (r *rpcMessage) Topic() string { |  | ||||||
| 	return r.topic |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (r *rpcMessage) Body() interface{} { |  | ||||||
| 	return r.payload |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (r *rpcMessage) Header() metadata.Metadata { |  | ||||||
| 	return r.header |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (r *rpcMessage) Codec() codec.Codec { |  | ||||||
| 	return r.codec |  | ||||||
| } |  | ||||||
|   | |||||||
							
								
								
									
										208
									
								
								subscriber.go
									
									
									
									
									
								
							
							
						
						
									
										208
									
								
								subscriber.go
									
									
									
									
									
								
							| @@ -1,208 +0,0 @@ | |||||||
| package http |  | ||||||
|  |  | ||||||
| import ( |  | ||||||
| 	"bytes" |  | ||||||
| 	"context" |  | ||||||
| 	"fmt" |  | ||||||
| 	"reflect" |  | ||||||
| 	"strings" |  | ||||||
|  |  | ||||||
| 	"go.unistack.org/micro/v4/broker" |  | ||||||
| 	"go.unistack.org/micro/v4/codec" |  | ||||||
| 	"go.unistack.org/micro/v4/metadata" |  | ||||||
| 	"go.unistack.org/micro/v4/register" |  | ||||||
| 	"go.unistack.org/micro/v4/server" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| var typeOfError = reflect.TypeOf((*error)(nil)).Elem() |  | ||||||
|  |  | ||||||
| type handler struct { |  | ||||||
| 	reqType reflect.Type |  | ||||||
| 	ctxType reflect.Type |  | ||||||
| 	method  reflect.Value |  | ||||||
| } |  | ||||||
|  |  | ||||||
| type httpSubscriber struct { |  | ||||||
| 	topic      string |  | ||||||
| 	rcvr       reflect.Value |  | ||||||
| 	typ        reflect.Type |  | ||||||
| 	subscriber interface{} |  | ||||||
| 	handlers   []*handler |  | ||||||
| 	endpoints  []*register.Endpoint |  | ||||||
| 	opts       server.SubscriberOptions |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { |  | ||||||
| 	options := server.NewSubscriberOptions(opts...) |  | ||||||
|  |  | ||||||
| 	var endpoints []*register.Endpoint |  | ||||||
| 	var handlers []*handler |  | ||||||
|  |  | ||||||
| 	if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { |  | ||||||
| 		h := &handler{ |  | ||||||
| 			method: reflect.ValueOf(sub), |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		switch typ.NumIn() { |  | ||||||
| 		case 1: |  | ||||||
| 			h.reqType = typ.In(0) |  | ||||||
| 		case 2: |  | ||||||
| 			h.ctxType = typ.In(0) |  | ||||||
| 			h.reqType = typ.In(1) |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		handlers = append(handlers, h) |  | ||||||
| 		ep := ®ister.Endpoint{ |  | ||||||
| 			Name:     "Func", |  | ||||||
| 			Request:  register.ExtractSubValue(typ), |  | ||||||
| 			Metadata: metadata.New(2), |  | ||||||
| 		} |  | ||||||
| 		ep.Metadata.Set("topic", topic) |  | ||||||
| 		ep.Metadata.Set("subscriber", "true") |  | ||||||
| 		endpoints = append(endpoints, ep) |  | ||||||
| 	} else { |  | ||||||
| 		hdlr := reflect.ValueOf(sub) |  | ||||||
| 		name := reflect.Indirect(hdlr).Type().Name() |  | ||||||
|  |  | ||||||
| 		for m := 0; m < typ.NumMethod(); m++ { |  | ||||||
| 			method := typ.Method(m) |  | ||||||
| 			h := &handler{ |  | ||||||
| 				method: method.Func, |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			switch method.Type.NumIn() { |  | ||||||
| 			case 2: |  | ||||||
| 				h.reqType = method.Type.In(1) |  | ||||||
| 			case 3: |  | ||||||
| 				h.ctxType = method.Type.In(1) |  | ||||||
| 				h.reqType = method.Type.In(2) |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			handlers = append(handlers, h) |  | ||||||
| 			ep := ®ister.Endpoint{ |  | ||||||
| 				Name:     name + "." + method.Name, |  | ||||||
| 				Request:  register.ExtractSubValue(method.Type), |  | ||||||
| 				Metadata: metadata.New(2), |  | ||||||
| 			} |  | ||||||
| 			ep.Metadata.Set("topic", topic) |  | ||||||
| 			ep.Metadata.Set("subscriber", "true") |  | ||||||
| 			endpoints = append(endpoints, ep) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return &httpSubscriber{ |  | ||||||
| 		rcvr:       reflect.ValueOf(sub), |  | ||||||
| 		typ:        reflect.TypeOf(sub), |  | ||||||
| 		topic:      topic, |  | ||||||
| 		subscriber: sub, |  | ||||||
| 		handlers:   handlers, |  | ||||||
| 		endpoints:  endpoints, |  | ||||||
| 		opts:       options, |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler { |  | ||||||
| 	return func(p broker.Event) error { |  | ||||||
| 		msg := p.Message() |  | ||||||
| 		ct := msg.Header["Content-Type"] |  | ||||||
| 		cf, err := s.newCodec(ct) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		hdr := metadata.Copy(msg.Header) |  | ||||||
| 		delete(hdr, "Content-Type") |  | ||||||
| 		ctx := metadata.NewIncomingContext(context.Background(), hdr) |  | ||||||
|  |  | ||||||
| 		results := make(chan error, len(sb.handlers)) |  | ||||||
|  |  | ||||||
| 		for i := 0; i < len(sb.handlers); i++ { |  | ||||||
| 			handler := sb.handlers[i] |  | ||||||
|  |  | ||||||
| 			var isVal bool |  | ||||||
| 			var req reflect.Value |  | ||||||
|  |  | ||||||
| 			if handler.reqType.Kind() == reflect.Ptr { |  | ||||||
| 				req = reflect.New(handler.reqType.Elem()) |  | ||||||
| 			} else { |  | ||||||
| 				req = reflect.New(handler.reqType) |  | ||||||
| 				isVal = true |  | ||||||
| 			} |  | ||||||
| 			if isVal { |  | ||||||
| 				req = req.Elem() |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			buf := bytes.NewBuffer(msg.Body) |  | ||||||
|  |  | ||||||
| 			if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil { |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if err := cf.ReadBody(buf, req.Interface()); err != nil { |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			fn := func(ctx context.Context, msg server.Message) error { |  | ||||||
| 				var vals []reflect.Value |  | ||||||
| 				if sb.typ.Kind() != reflect.Func { |  | ||||||
| 					vals = append(vals, sb.rcvr) |  | ||||||
| 				} |  | ||||||
| 				if handler.ctxType != nil { |  | ||||||
| 					vals = append(vals, reflect.ValueOf(ctx)) |  | ||||||
| 				} |  | ||||||
|  |  | ||||||
| 				vals = append(vals, reflect.ValueOf(msg.Body())) |  | ||||||
|  |  | ||||||
| 				returnValues := handler.method.Call(vals) |  | ||||||
| 				if err := returnValues[0].Interface(); err != nil { |  | ||||||
| 					return err.(error) |  | ||||||
| 				} |  | ||||||
| 				return nil |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			for i := len(opts.SubWrappers); i > 0; i-- { |  | ||||||
| 				fn = opts.SubWrappers[i-1](fn) |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			go func() { |  | ||||||
| 				results <- fn(ctx, &httpMessage{ |  | ||||||
| 					topic:       sb.topic, |  | ||||||
| 					contentType: ct, |  | ||||||
| 					payload:     req.Interface(), |  | ||||||
| 					header:      msg.Header, |  | ||||||
| 					codec:       cf, |  | ||||||
| 				}) |  | ||||||
| 			}() |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		var errors []string |  | ||||||
|  |  | ||||||
| 		for i := 0; i < len(sb.handlers); i++ { |  | ||||||
| 			if err := <-results; err != nil { |  | ||||||
| 				errors = append(errors, err.Error()) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		if len(errors) > 0 { |  | ||||||
| 			return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *httpSubscriber) Topic() string { |  | ||||||
| 	return s.topic |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *httpSubscriber) Subscriber() interface{} { |  | ||||||
| 	return s.subscriber |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *httpSubscriber) Endpoints() []*register.Endpoint { |  | ||||||
| 	return s.endpoints |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s *httpSubscriber) Options() server.SubscriberOptions { |  | ||||||
| 	return s.opts |  | ||||||
| } |  | ||||||
		Reference in New Issue
	
	Block a user