cleanup message stuf from server #160
| @@ -60,7 +60,7 @@ func (h *httpHandler) Options() server.HandlerOptions { | ||||
| 	return h.opts | ||||
| } | ||||
|  | ||||
| func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) { | ||||
| func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) { | ||||
| 	if handler == nil { | ||||
| 		return nil, fmt.Errorf("invalid handler specified: %v", handler) | ||||
| 	} | ||||
| @@ -333,7 +333,7 @@ func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, err | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 	// check for http.HandlerFunc handlers | ||||
| 	if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil { | ||||
| 		ph.(http.HandlerFunc)(w, r) | ||||
|   | ||||
							
								
								
									
										33
									
								
								request.go
									
									
									
									
									
								
							
							
						
						
									
										33
									
								
								request.go
									
									
									
									
									
								
							| @@ -8,10 +8,7 @@ import ( | ||||
| 	"go.unistack.org/micro/v4/server" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	_ server.Request = &rpcRequest{} | ||||
| 	_ server.Message = &rpcMessage{} | ||||
| ) | ||||
| var _ server.Request = &rpcRequest{} | ||||
|  | ||||
| type rpcRequest struct { | ||||
| 	rw          io.ReadWriter | ||||
| @@ -25,14 +22,6 @@ type rpcRequest struct { | ||||
| 	stream      bool | ||||
| } | ||||
|  | ||||
| type rpcMessage struct { | ||||
| 	payload     interface{} | ||||
| 	codec       codec.Codec | ||||
| 	header      metadata.Metadata | ||||
| 	topic       string | ||||
| 	contentType string | ||||
| } | ||||
|  | ||||
| func (r *rpcRequest) ContentType() string { | ||||
| 	return r.contentType | ||||
| } | ||||
| @@ -72,23 +61,3 @@ func (r *rpcRequest) Stream() bool { | ||||
| func (r *rpcRequest) Body() interface{} { | ||||
| 	return r.payload | ||||
| } | ||||
|  | ||||
| func (r *rpcMessage) ContentType() string { | ||||
| 	return r.contentType | ||||
| } | ||||
|  | ||||
| func (r *rpcMessage) Topic() string { | ||||
| 	return r.topic | ||||
| } | ||||
|  | ||||
| func (r *rpcMessage) Body() interface{} { | ||||
| 	return r.payload | ||||
| } | ||||
|  | ||||
| func (r *rpcMessage) Header() metadata.Metadata { | ||||
| 	return r.header | ||||
| } | ||||
|  | ||||
| func (r *rpcMessage) Codec() codec.Codec { | ||||
| 	return r.codec | ||||
| } | ||||
|   | ||||
							
								
								
									
										208
									
								
								subscriber.go
									
									
									
									
									
								
							
							
						
						
									
										208
									
								
								subscriber.go
									
									
									
									
									
								
							| @@ -1,208 +0,0 @@ | ||||
| package http | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"strings" | ||||
|  | ||||
| 	"go.unistack.org/micro/v4/broker" | ||||
| 	"go.unistack.org/micro/v4/codec" | ||||
| 	"go.unistack.org/micro/v4/metadata" | ||||
| 	"go.unistack.org/micro/v4/register" | ||||
| 	"go.unistack.org/micro/v4/server" | ||||
| ) | ||||
|  | ||||
| var typeOfError = reflect.TypeOf((*error)(nil)).Elem() | ||||
|  | ||||
| type handler struct { | ||||
| 	reqType reflect.Type | ||||
| 	ctxType reflect.Type | ||||
| 	method  reflect.Value | ||||
| } | ||||
|  | ||||
| type httpSubscriber struct { | ||||
| 	topic      string | ||||
| 	rcvr       reflect.Value | ||||
| 	typ        reflect.Type | ||||
| 	subscriber interface{} | ||||
| 	handlers   []*handler | ||||
| 	endpoints  []*register.Endpoint | ||||
| 	opts       server.SubscriberOptions | ||||
| } | ||||
|  | ||||
| func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { | ||||
| 	options := server.NewSubscriberOptions(opts...) | ||||
|  | ||||
| 	var endpoints []*register.Endpoint | ||||
| 	var handlers []*handler | ||||
|  | ||||
| 	if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { | ||||
| 		h := &handler{ | ||||
| 			method: reflect.ValueOf(sub), | ||||
| 		} | ||||
|  | ||||
| 		switch typ.NumIn() { | ||||
| 		case 1: | ||||
| 			h.reqType = typ.In(0) | ||||
| 		case 2: | ||||
| 			h.ctxType = typ.In(0) | ||||
| 			h.reqType = typ.In(1) | ||||
| 		} | ||||
|  | ||||
| 		handlers = append(handlers, h) | ||||
| 		ep := ®ister.Endpoint{ | ||||
| 			Name:     "Func", | ||||
| 			Request:  register.ExtractSubValue(typ), | ||||
| 			Metadata: metadata.New(2), | ||||
| 		} | ||||
| 		ep.Metadata.Set("topic", topic) | ||||
| 		ep.Metadata.Set("subscriber", "true") | ||||
| 		endpoints = append(endpoints, ep) | ||||
| 	} else { | ||||
| 		hdlr := reflect.ValueOf(sub) | ||||
| 		name := reflect.Indirect(hdlr).Type().Name() | ||||
|  | ||||
| 		for m := 0; m < typ.NumMethod(); m++ { | ||||
| 			method := typ.Method(m) | ||||
| 			h := &handler{ | ||||
| 				method: method.Func, | ||||
| 			} | ||||
|  | ||||
| 			switch method.Type.NumIn() { | ||||
| 			case 2: | ||||
| 				h.reqType = method.Type.In(1) | ||||
| 			case 3: | ||||
| 				h.ctxType = method.Type.In(1) | ||||
| 				h.reqType = method.Type.In(2) | ||||
| 			} | ||||
|  | ||||
| 			handlers = append(handlers, h) | ||||
| 			ep := ®ister.Endpoint{ | ||||
| 				Name:     name + "." + method.Name, | ||||
| 				Request:  register.ExtractSubValue(method.Type), | ||||
| 				Metadata: metadata.New(2), | ||||
| 			} | ||||
| 			ep.Metadata.Set("topic", topic) | ||||
| 			ep.Metadata.Set("subscriber", "true") | ||||
| 			endpoints = append(endpoints, ep) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return &httpSubscriber{ | ||||
| 		rcvr:       reflect.ValueOf(sub), | ||||
| 		typ:        reflect.TypeOf(sub), | ||||
| 		topic:      topic, | ||||
| 		subscriber: sub, | ||||
| 		handlers:   handlers, | ||||
| 		endpoints:  endpoints, | ||||
| 		opts:       options, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler { | ||||
| 	return func(p broker.Event) error { | ||||
| 		msg := p.Message() | ||||
| 		ct := msg.Header["Content-Type"] | ||||
| 		cf, err := s.newCodec(ct) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		hdr := metadata.Copy(msg.Header) | ||||
| 		delete(hdr, "Content-Type") | ||||
| 		ctx := metadata.NewIncomingContext(context.Background(), hdr) | ||||
|  | ||||
| 		results := make(chan error, len(sb.handlers)) | ||||
|  | ||||
| 		for i := 0; i < len(sb.handlers); i++ { | ||||
| 			handler := sb.handlers[i] | ||||
|  | ||||
| 			var isVal bool | ||||
| 			var req reflect.Value | ||||
|  | ||||
| 			if handler.reqType.Kind() == reflect.Ptr { | ||||
| 				req = reflect.New(handler.reqType.Elem()) | ||||
| 			} else { | ||||
| 				req = reflect.New(handler.reqType) | ||||
| 				isVal = true | ||||
| 			} | ||||
| 			if isVal { | ||||
| 				req = req.Elem() | ||||
| 			} | ||||
|  | ||||
| 			buf := bytes.NewBuffer(msg.Body) | ||||
|  | ||||
| 			if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			if err := cf.ReadBody(buf, req.Interface()); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			fn := func(ctx context.Context, msg server.Message) error { | ||||
| 				var vals []reflect.Value | ||||
| 				if sb.typ.Kind() != reflect.Func { | ||||
| 					vals = append(vals, sb.rcvr) | ||||
| 				} | ||||
| 				if handler.ctxType != nil { | ||||
| 					vals = append(vals, reflect.ValueOf(ctx)) | ||||
| 				} | ||||
|  | ||||
| 				vals = append(vals, reflect.ValueOf(msg.Body())) | ||||
|  | ||||
| 				returnValues := handler.method.Call(vals) | ||||
| 				if err := returnValues[0].Interface(); err != nil { | ||||
| 					return err.(error) | ||||
| 				} | ||||
| 				return nil | ||||
| 			} | ||||
|  | ||||
| 			for i := len(opts.SubWrappers); i > 0; i-- { | ||||
| 				fn = opts.SubWrappers[i-1](fn) | ||||
| 			} | ||||
|  | ||||
| 			go func() { | ||||
| 				results <- fn(ctx, &httpMessage{ | ||||
| 					topic:       sb.topic, | ||||
| 					contentType: ct, | ||||
| 					payload:     req.Interface(), | ||||
| 					header:      msg.Header, | ||||
| 					codec:       cf, | ||||
| 				}) | ||||
| 			}() | ||||
| 		} | ||||
|  | ||||
| 		var errors []string | ||||
|  | ||||
| 		for i := 0; i < len(sb.handlers); i++ { | ||||
| 			if err := <-results; err != nil { | ||||
| 				errors = append(errors, err.Error()) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if len(errors) > 0 { | ||||
| 			return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *httpSubscriber) Topic() string { | ||||
| 	return s.topic | ||||
| } | ||||
|  | ||||
| func (s *httpSubscriber) Subscriber() interface{} { | ||||
| 	return s.subscriber | ||||
| } | ||||
|  | ||||
| func (s *httpSubscriber) Endpoints() []*register.Endpoint { | ||||
| 	return s.endpoints | ||||
| } | ||||
|  | ||||
| func (s *httpSubscriber) Options() server.SubscriberOptions { | ||||
| 	return s.opts | ||||
| } | ||||
		Reference in New Issue
	
	Block a user