From 3ec25548a88821babd52aaca06f84541c177ae39 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 5 Feb 2021 18:31:51 +0300 Subject: [PATCH] fully working micro http server implementation Signed-off-by: Vasiliy Tolstov --- go.mod | 2 +- go.sum | 4 +- handler.go | 199 +++++++++++++++++++++++++++++++++++++++++++++++++++-- http.go | 88 ++++++++++++++++++++--- request.go | 95 +++++++++++++++++++++++++ server.go | 106 ++++++++++++++++++++++++++++ 6 files changed, 476 insertions(+), 18 deletions(-) create mode 100644 request.go create mode 100644 server.go diff --git a/go.mod b/go.mod index 0c3ed80..8743151 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/unistack-org/micro-server-http/v3 go 1.13 require ( - github.com/unistack-org/micro/v3 v3.2.0 + github.com/unistack-org/micro/v3 v3.2.7 golang.org/x/net v0.0.0-20201224014010-6772e930b67b ) diff --git a/go.sum b/go.sum index baa2a3d..58e535c 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/unistack-org/micro/v3 v3.2.0 h1:1e6cFMHzHV+RjwwajwYJARpxQiEpnTCvV/2L9xeW4zc= -github.com/unistack-org/micro/v3 v3.2.0/go.mod h1:J8XxJj4Pqa3Ee0a4biRRtut7UwTlfBq8QRe+s4PKGS0= +github.com/unistack-org/micro/v3 v3.2.7 h1:+vLVmoQeE0z0cmIAKXSQXbxC4pxXpmkzckh9B9shogo= +github.com/unistack-org/micro/v3 v3.2.7/go.mod h1:J8XxJj4Pqa3Ee0a4biRRtut7UwTlfBq8QRe+s4PKGS0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= diff --git a/handler.go b/handler.go index 65f8481..007dd0a 100644 --- a/handler.go +++ b/handler.go @@ -1,18 +1,56 @@ package http import ( + "context" + "fmt" + "net/http" + "reflect" + "strings" + + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/server" + "github.com/unistack-org/micro/v3/util/qson" + rflutil "github.com/unistack-org/micro/v3/util/reflect" + rutil "github.com/unistack-org/micro/v3/util/router" ) +var ( + DefaultErrorHandler = func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int) { + w.WriteHeader(status) + w.Write([]byte(err.Error())) + } + DefaultContentType = "application/json" +) + +type patHandler struct { + pat rutil.Pattern + mtype *methodType + name string + rcvr reflect.Value +} + type httpHandler struct { - opts server.HandlerOptions - eps []*register.Endpoint - hd interface{} + name string + opts server.HandlerOptions + sopts server.Options + eps []*register.Endpoint + hd interface{} + handlers map[string][]patHandler + errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int) +} + +func (h *httpHandler) newCodec(ct string) (codec.Codec, error) { + if cf, ok := h.sopts.Codecs[ct]; ok { + return cf, nil + } + return nil, codec.ErrUnknownContentType } func (h *httpHandler) Name() string { - return "handler" + return h.name } func (h *httpHandler) Handler() interface{} { @@ -26,3 +64,156 @@ func (h *httpHandler) Endpoints() []*register.Endpoint { func (h *httpHandler) Options() server.HandlerOptions { return h.opts } + +func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + path := r.URL.Path + if !strings.HasPrefix(path, "/") { + h.errorHandler(ctx, h, w, r, fmt.Errorf("path must contains /"), http.StatusBadRequest) + } + + ct := DefaultContentType + if htype := r.Header.Get("Content-Type"); htype != "" { + ct = htype + } + + cf, err := h.newCodec(ct) + if err != nil { + h.errorHandler(ctx, h, w, r, err, http.StatusBadRequest) + } + + components := strings.Split(path[1:], "/") + l := len(components) + var verb string + idx := strings.LastIndex(components[l-1], ":") + if idx == 0 { + h.errorHandler(ctx, h, w, r, fmt.Errorf("not found"), http.StatusNotFound) + return + } + if idx > 0 { + c := components[l-1] + components[l-1], verb = c[:idx], c[idx+1:] + } + + matches := make(map[string]interface{}) + var match bool + var hldr patHandler + for _, hldr = range h.handlers[r.Method] { + mp, err := hldr.pat.Match(components, verb) + if err == nil { + match = true + for k, v := range mp { + matches[k] = v + } + break + } + } + + if !match { + h.errorHandler(ctx, h, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound) + return + } + + md, ok := metadata.FromContext(ctx) + if !ok { + md = metadata.New(0) + } + + for k, v := range r.Header { + md.Set(k, strings.Join(v, ", ")) + } + + // get fields from url values + if len(r.URL.RawQuery) > 0 { + umd := make(map[string]interface{}) + err = qson.Unmarshal(&umd, r.URL.RawQuery) + if err != nil { + h.errorHandler(ctx, h, w, r, err, http.StatusBadRequest) + } + for k, v := range umd { + matches[k] = v + } + } + + var argv, replyv reflect.Value + + // Decode the argument value. + argIsValue := false // if true, need to indirect before calling. + if hldr.mtype.ArgType.Kind() == reflect.Ptr { + argv = reflect.New(hldr.mtype.ArgType.Elem()) + } else { + argv = reflect.New(hldr.mtype.ArgType) + argIsValue = true + } + + if argIsValue { + argv = argv.Elem() + } + + // reply value + replyv = reflect.New(hldr.mtype.ReplyType.Elem()) + + function := hldr.mtype.method.Func + //function := hldr.rcvr + var returnValues []reflect.Value + + if err = rflutil.MergeMap(argv.Interface(), matches); err != nil { + h.errorHandler(ctx, h, w, r, err, http.StatusBadRequest) + return + } + + b, err := cf.Marshal(argv.Interface()) + if err != nil { + h.errorHandler(ctx, h, w, r, err, http.StatusBadRequest) + return + } + + hr := &rpcRequest{ + codec: cf, + service: h.sopts.Name, + contentType: ct, + method: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name), + body: b, + payload: argv.Interface(), + } + + var scode int + // define the handler func + fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) { + ctx = context.WithValue(ctx, rspCodeKey{}, &rspCodeVal{}) + ctx = metadata.NewContext(ctx, md) + returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) + + scode = GetRspCode(ctx) + // The return value for the method is an error. + if rerr := returnValues[0].Interface(); rerr != nil { + err = rerr.(error) + } + + return err + } + + // wrap the handler func + for i := len(h.sopts.HdlrWrappers); i > 0; i-- { + fn = h.sopts.HdlrWrappers[i-1](fn) + } + + if appErr := fn(ctx, hr, replyv.Interface()); appErr != nil { + b, err = cf.Marshal(appErr) + } else { + b, err = cf.Marshal(replyv.Interface()) + } + if err != nil && h.sopts.Logger.V(logger.ErrorLevel) { + h.sopts.Logger.Errorf(h.sopts.Context, "XXXXX: %v", err) + return + } + + w.Header().Set("content-Type", ct) + if scode != 0 { + w.WriteHeader(scode) + } else { + h.sopts.Logger.Warn(h.sopts.Context, "response code not set in handler via SetRspCode(ctx, http.StatusXXX)") + } + w.Write(b) +} diff --git a/http.go b/http.go index 7dc4f18..2579d89 100644 --- a/http.go +++ b/http.go @@ -7,7 +7,9 @@ import ( "fmt" "net" "net/http" + "reflect" "sort" + "strings" "sync" "time" @@ -16,6 +18,7 @@ import ( "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/server" + rutil "github.com/unistack-org/micro/v3/util/router" "golang.org/x/net/netutil" ) @@ -55,9 +58,6 @@ func (h *httpServer) Init(opts ...server.Option) error { } func (h *httpServer) Handle(handler server.Handler) error { - if _, ok := handler.Handler().(http.Handler); !ok { - return errors.New("Handle requires http.Handler") - } h.Lock() h.hd = handler h.Unlock() @@ -78,11 +78,77 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio } } - return &httpHandler{ - eps: eps, - hd: handler, - opts: options, + hdlr := &httpHandler{ + eps: eps, + hd: handler, + opts: options, + sopts: h.opts, } + + tp := reflect.TypeOf(handler) + + /* + for m := 0; m < tp.NumMethod(); m++ { + if e := register.ExtractEndpoint(tp.Method(m)); e != nil { + e.Name = name + "." + e.Name + + for k, v := range options.Metadata[e.Name] { + e.Metadata[k] = v + } + + eps = append(eps, e) + } + } + + */ + + hdlr.handlers = make(map[string][]patHandler) + for hn, md := range options.Metadata { + cmp, err := rutil.Parse(md["Path"]) + if err != nil && h.opts.Logger.V(logger.ErrorLevel) { + h.opts.Logger.Errorf(h.opts.Context, "parsing path pattern err: %v", err) + continue + } + tpl := cmp.Compile() + pat, err := rutil.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, tpl.Verb) + if err != nil && h.opts.Logger.V(logger.ErrorLevel) { + h.opts.Logger.Errorf(h.opts.Context, "creating new pattern err: %v", err) + continue + } + + var method reflect.Method + mname := hn[strings.Index(hn, ".")+1:] + for m := 0; m < tp.NumMethod(); m++ { + mn := tp.Method(m) + if mn.Name != mname { + continue + } + method = mn + break + } + + if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) { + h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname) + continue + } + + mtype, err := prepareEndpoint(method) + if err != nil && h.opts.Logger.V(logger.ErrorLevel) { + h.opts.Logger.Errorf(h.opts.Context, "%v", err) + continue + } else if mtype == nil { + continue + } + + rcvr := reflect.ValueOf(handler) + name := reflect.Indirect(rcvr).Type().Name() + + pth := patHandler{pat: pat, mtype: mtype, name: name, rcvr: rcvr} + hdlr.name = name + hdlr.handlers[md["Method"]] = append(hdlr.handlers[md["Method"]], pth) + } + + return hdlr } func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { @@ -254,10 +320,6 @@ func (h *httpServer) Start() error { hd := h.hd h.RUnlock() - if hd == nil { - return errors.New("Server required http.Handler") - } - // micro: config.Transport.Listen(config.Address) var ts net.Listener @@ -291,6 +353,10 @@ func (h *httpServer) Start() error { h.Unlock() handler, ok := hd.Handler().(http.Handler) + if !ok { + handler, ok = hd.(http.Handler) + } + if !ok { return errors.New("Server required http.Handler") } diff --git a/request.go b/request.go new file mode 100644 index 0000000..e70463f --- /dev/null +++ b/request.go @@ -0,0 +1,95 @@ +package http + +import ( + "io" + + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/metadata" +) + +type rpcRequest struct { + rw io.ReadWriter + service string + method string + endpoint string + target string + contentType string + codec codec.Codec + header metadata.Metadata + body []byte + stream bool + payload interface{} +} + +type rpcMessage struct { + topic string + contentType string + payload interface{} + header metadata.Metadata + body []byte + codec codec.Codec +} + +func (r *rpcRequest) ContentType() string { + return r.contentType +} + +func (r *rpcRequest) Service() string { + return r.service +} + +func (r *rpcRequest) Method() string { + return r.method +} + +func (r *rpcRequest) Endpoint() string { + return r.method +} + +func (r *rpcRequest) Codec() codec.Codec { + return r.codec +} + +func (r *rpcRequest) Header() metadata.Metadata { + return r.header +} + +func (r *rpcRequest) Read() ([]byte, error) { + f := &codec.Frame{} + if err := r.codec.ReadBody(r.rw, f); err != nil { + return nil, err + } + return f.Data, nil +} + +func (r *rpcRequest) Stream() bool { + return r.stream +} + +func (r *rpcRequest) Body() interface{} { + return r.payload +} + +func (r *rpcMessage) ContentType() string { + return r.contentType +} + +func (r *rpcMessage) Topic() string { + return r.topic +} + +func (r *rpcMessage) Payload() interface{} { + return r.payload +} + +func (r *rpcMessage) Header() metadata.Metadata { + return r.header +} + +func (r *rpcMessage) Body() []byte { + return r.body +} + +func (r *rpcMessage) Codec() codec.Codec { + return r.codec +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..37b67e2 --- /dev/null +++ b/server.go @@ -0,0 +1,106 @@ +package http + +import ( + "context" + "fmt" + "reflect" + "unicode" + "unicode/utf8" + + "github.com/unistack-org/micro/v3/server" +) + +type methodType struct { + method reflect.Method + ArgType reflect.Type + ReplyType reflect.Type + ContextType reflect.Type + stream bool +} + +// Is this an exported - upper case - name? +func isExported(name string) bool { + r, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(r) +} + +// Is this type exported or a builtin? +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +// prepareEndpoint() returns a methodType for the provided method or nil +// in case if the method was unsuitable. +func prepareEndpoint(method reflect.Method) (*methodType, error) { + mtype := method.Type + mname := method.Name + var replyType, argType, contextType reflect.Type + var stream bool + + // Endpoint() must be exported. + if method.PkgPath != "" { + return nil, fmt.Errorf("Endpoint must be exported") + } + + switch mtype.NumIn() { + case 3: + // assuming streaming + argType = mtype.In(2) + contextType = mtype.In(1) + stream = true + case 4: + // method that takes a context + argType = mtype.In(2) + replyType = mtype.In(3) + contextType = mtype.In(1) + default: + return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) + } + + if stream { + // check stream type + streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() + if !argType.Implements(streamType) { + return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) + } + } else { + // if not stream check the replyType + + // First arg need not be a pointer. + if !isExportedOrBuiltinType(argType) { + return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType) + } + + if replyType.Kind() != reflect.Ptr { + return nil, fmt.Errorf("method %v reply type not a pointer: %v", mname, replyType) + } + + // Reply type must be exported. + if !isExportedOrBuiltinType(replyType) { + return nil, fmt.Errorf("method %v reply type not exported: %v", mname, replyType) + } + } + + // Endpoint() needs one out. + if mtype.NumOut() != 1 { + return nil, fmt.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut()) + } + // The return type of the method must be error. + if returnType := mtype.Out(0); returnType != typeOfError { + return nil, fmt.Errorf("method %v returns %v not error", mname, returnType.String()) + } + + return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil +} + +func (m *methodType) prepareContext(ctx context.Context) reflect.Value { + if contextv := reflect.ValueOf(ctx); contextv.IsValid() { + return contextv + } + return reflect.Zero(m.ContextType) +}