fully working micro http server implementation
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
74d8cb8538
commit
3ec25548a8
2
go.mod
2
go.mod
@ -3,7 +3,7 @@ module github.com/unistack-org/micro-server-http/v3
|
||||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/unistack-org/micro/v3 v3.2.0
|
||||
github.com/unistack-org/micro/v3 v3.2.7
|
||||
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
|
||||
)
|
||||
|
||||
|
4
go.sum
4
go.sum
@ -57,8 +57,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/unistack-org/micro/v3 v3.2.0 h1:1e6cFMHzHV+RjwwajwYJARpxQiEpnTCvV/2L9xeW4zc=
|
||||
github.com/unistack-org/micro/v3 v3.2.0/go.mod h1:J8XxJj4Pqa3Ee0a4biRRtut7UwTlfBq8QRe+s4PKGS0=
|
||||
github.com/unistack-org/micro/v3 v3.2.7 h1:+vLVmoQeE0z0cmIAKXSQXbxC4pxXpmkzckh9B9shogo=
|
||||
github.com/unistack-org/micro/v3 v3.2.7/go.mod h1:J8XxJj4Pqa3Ee0a4biRRtut7UwTlfBq8QRe+s4PKGS0=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
|
193
handler.go
193
handler.go
@ -1,18 +1,56 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
"github.com/unistack-org/micro/v3/register"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/util/qson"
|
||||
rflutil "github.com/unistack-org/micro/v3/util/reflect"
|
||||
rutil "github.com/unistack-org/micro/v3/util/router"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultErrorHandler = func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int) {
|
||||
w.WriteHeader(status)
|
||||
w.Write([]byte(err.Error()))
|
||||
}
|
||||
DefaultContentType = "application/json"
|
||||
)
|
||||
|
||||
type patHandler struct {
|
||||
pat rutil.Pattern
|
||||
mtype *methodType
|
||||
name string
|
||||
rcvr reflect.Value
|
||||
}
|
||||
|
||||
type httpHandler struct {
|
||||
name string
|
||||
opts server.HandlerOptions
|
||||
sopts server.Options
|
||||
eps []*register.Endpoint
|
||||
hd interface{}
|
||||
handlers map[string][]patHandler
|
||||
errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int)
|
||||
}
|
||||
|
||||
func (h *httpHandler) newCodec(ct string) (codec.Codec, error) {
|
||||
if cf, ok := h.sopts.Codecs[ct]; ok {
|
||||
return cf, nil
|
||||
}
|
||||
return nil, codec.ErrUnknownContentType
|
||||
}
|
||||
|
||||
func (h *httpHandler) Name() string {
|
||||
return "handler"
|
||||
return h.name
|
||||
}
|
||||
|
||||
func (h *httpHandler) Handler() interface{} {
|
||||
@ -26,3 +64,156 @@ func (h *httpHandler) Endpoints() []*register.Endpoint {
|
||||
func (h *httpHandler) Options() server.HandlerOptions {
|
||||
return h.opts
|
||||
}
|
||||
|
||||
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
path := r.URL.Path
|
||||
if !strings.HasPrefix(path, "/") {
|
||||
h.errorHandler(ctx, h, w, r, fmt.Errorf("path must contains /"), http.StatusBadRequest)
|
||||
}
|
||||
|
||||
ct := DefaultContentType
|
||||
if htype := r.Header.Get("Content-Type"); htype != "" {
|
||||
ct = htype
|
||||
}
|
||||
|
||||
cf, err := h.newCodec(ct)
|
||||
if err != nil {
|
||||
h.errorHandler(ctx, h, w, r, err, http.StatusBadRequest)
|
||||
}
|
||||
|
||||
components := strings.Split(path[1:], "/")
|
||||
l := len(components)
|
||||
var verb string
|
||||
idx := strings.LastIndex(components[l-1], ":")
|
||||
if idx == 0 {
|
||||
h.errorHandler(ctx, h, w, r, fmt.Errorf("not found"), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if idx > 0 {
|
||||
c := components[l-1]
|
||||
components[l-1], verb = c[:idx], c[idx+1:]
|
||||
}
|
||||
|
||||
matches := make(map[string]interface{})
|
||||
var match bool
|
||||
var hldr patHandler
|
||||
for _, hldr = range h.handlers[r.Method] {
|
||||
mp, err := hldr.pat.Match(components, verb)
|
||||
if err == nil {
|
||||
match = true
|
||||
for k, v := range mp {
|
||||
matches[k] = v
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !match {
|
||||
h.errorHandler(ctx, h, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
md, ok := metadata.FromContext(ctx)
|
||||
if !ok {
|
||||
md = metadata.New(0)
|
||||
}
|
||||
|
||||
for k, v := range r.Header {
|
||||
md.Set(k, strings.Join(v, ", "))
|
||||
}
|
||||
|
||||
// get fields from url values
|
||||
if len(r.URL.RawQuery) > 0 {
|
||||
umd := make(map[string]interface{})
|
||||
err = qson.Unmarshal(&umd, r.URL.RawQuery)
|
||||
if err != nil {
|
||||
h.errorHandler(ctx, h, w, r, err, http.StatusBadRequest)
|
||||
}
|
||||
for k, v := range umd {
|
||||
matches[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
var argv, replyv reflect.Value
|
||||
|
||||
// Decode the argument value.
|
||||
argIsValue := false // if true, need to indirect before calling.
|
||||
if hldr.mtype.ArgType.Kind() == reflect.Ptr {
|
||||
argv = reflect.New(hldr.mtype.ArgType.Elem())
|
||||
} else {
|
||||
argv = reflect.New(hldr.mtype.ArgType)
|
||||
argIsValue = true
|
||||
}
|
||||
|
||||
if argIsValue {
|
||||
argv = argv.Elem()
|
||||
}
|
||||
|
||||
// reply value
|
||||
replyv = reflect.New(hldr.mtype.ReplyType.Elem())
|
||||
|
||||
function := hldr.mtype.method.Func
|
||||
//function := hldr.rcvr
|
||||
var returnValues []reflect.Value
|
||||
|
||||
if err = rflutil.MergeMap(argv.Interface(), matches); err != nil {
|
||||
h.errorHandler(ctx, h, w, r, err, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := cf.Marshal(argv.Interface())
|
||||
if err != nil {
|
||||
h.errorHandler(ctx, h, w, r, err, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
hr := &rpcRequest{
|
||||
codec: cf,
|
||||
service: h.sopts.Name,
|
||||
contentType: ct,
|
||||
method: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
|
||||
body: b,
|
||||
payload: argv.Interface(),
|
||||
}
|
||||
|
||||
var scode int
|
||||
// define the handler func
|
||||
fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
|
||||
ctx = context.WithValue(ctx, rspCodeKey{}, &rspCodeVal{})
|
||||
ctx = metadata.NewContext(ctx, md)
|
||||
returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
|
||||
|
||||
scode = GetRspCode(ctx)
|
||||
// The return value for the method is an error.
|
||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||
err = rerr.(error)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// wrap the handler func
|
||||
for i := len(h.sopts.HdlrWrappers); i > 0; i-- {
|
||||
fn = h.sopts.HdlrWrappers[i-1](fn)
|
||||
}
|
||||
|
||||
if appErr := fn(ctx, hr, replyv.Interface()); appErr != nil {
|
||||
b, err = cf.Marshal(appErr)
|
||||
} else {
|
||||
b, err = cf.Marshal(replyv.Interface())
|
||||
}
|
||||
if err != nil && h.sopts.Logger.V(logger.ErrorLevel) {
|
||||
h.sopts.Logger.Errorf(h.sopts.Context, "XXXXX: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("content-Type", ct)
|
||||
if scode != 0 {
|
||||
w.WriteHeader(scode)
|
||||
} else {
|
||||
h.sopts.Logger.Warn(h.sopts.Context, "response code not set in handler via SetRspCode(ctx, http.StatusXXX)")
|
||||
}
|
||||
w.Write(b)
|
||||
}
|
||||
|
82
http.go
82
http.go
@ -7,7 +7,9 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -16,6 +18,7 @@ import (
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/register"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
rutil "github.com/unistack-org/micro/v3/util/router"
|
||||
"golang.org/x/net/netutil"
|
||||
)
|
||||
|
||||
@ -55,9 +58,6 @@ func (h *httpServer) Init(opts ...server.Option) error {
|
||||
}
|
||||
|
||||
func (h *httpServer) Handle(handler server.Handler) error {
|
||||
if _, ok := handler.Handler().(http.Handler); !ok {
|
||||
return errors.New("Handle requires http.Handler")
|
||||
}
|
||||
h.Lock()
|
||||
h.hd = handler
|
||||
h.Unlock()
|
||||
@ -78,11 +78,77 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio
|
||||
}
|
||||
}
|
||||
|
||||
return &httpHandler{
|
||||
hdlr := &httpHandler{
|
||||
eps: eps,
|
||||
hd: handler,
|
||||
opts: options,
|
||||
sopts: h.opts,
|
||||
}
|
||||
|
||||
tp := reflect.TypeOf(handler)
|
||||
|
||||
/*
|
||||
for m := 0; m < tp.NumMethod(); m++ {
|
||||
if e := register.ExtractEndpoint(tp.Method(m)); e != nil {
|
||||
e.Name = name + "." + e.Name
|
||||
|
||||
for k, v := range options.Metadata[e.Name] {
|
||||
e.Metadata[k] = v
|
||||
}
|
||||
|
||||
eps = append(eps, e)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
hdlr.handlers = make(map[string][]patHandler)
|
||||
for hn, md := range options.Metadata {
|
||||
cmp, err := rutil.Parse(md["Path"])
|
||||
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
|
||||
h.opts.Logger.Errorf(h.opts.Context, "parsing path pattern err: %v", err)
|
||||
continue
|
||||
}
|
||||
tpl := cmp.Compile()
|
||||
pat, err := rutil.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, tpl.Verb)
|
||||
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
|
||||
h.opts.Logger.Errorf(h.opts.Context, "creating new pattern err: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
var method reflect.Method
|
||||
mname := hn[strings.Index(hn, ".")+1:]
|
||||
for m := 0; m < tp.NumMethod(); m++ {
|
||||
mn := tp.Method(m)
|
||||
if mn.Name != mname {
|
||||
continue
|
||||
}
|
||||
method = mn
|
||||
break
|
||||
}
|
||||
|
||||
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
|
||||
h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname)
|
||||
continue
|
||||
}
|
||||
|
||||
mtype, err := prepareEndpoint(method)
|
||||
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
|
||||
h.opts.Logger.Errorf(h.opts.Context, "%v", err)
|
||||
continue
|
||||
} else if mtype == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
rcvr := reflect.ValueOf(handler)
|
||||
name := reflect.Indirect(rcvr).Type().Name()
|
||||
|
||||
pth := patHandler{pat: pat, mtype: mtype, name: name, rcvr: rcvr}
|
||||
hdlr.name = name
|
||||
hdlr.handlers[md["Method"]] = append(hdlr.handlers[md["Method"]], pth)
|
||||
}
|
||||
|
||||
return hdlr
|
||||
}
|
||||
|
||||
func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
@ -254,10 +320,6 @@ func (h *httpServer) Start() error {
|
||||
hd := h.hd
|
||||
h.RUnlock()
|
||||
|
||||
if hd == nil {
|
||||
return errors.New("Server required http.Handler")
|
||||
}
|
||||
|
||||
// micro: config.Transport.Listen(config.Address)
|
||||
var ts net.Listener
|
||||
|
||||
@ -291,6 +353,10 @@ func (h *httpServer) Start() error {
|
||||
h.Unlock()
|
||||
|
||||
handler, ok := hd.Handler().(http.Handler)
|
||||
if !ok {
|
||||
handler, ok = hd.(http.Handler)
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return errors.New("Server required http.Handler")
|
||||
}
|
||||
|
95
request.go
Normal file
95
request.go
Normal file
@ -0,0 +1,95 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
type rpcRequest struct {
|
||||
rw io.ReadWriter
|
||||
service string
|
||||
method string
|
||||
endpoint string
|
||||
target string
|
||||
contentType string
|
||||
codec codec.Codec
|
||||
header metadata.Metadata
|
||||
body []byte
|
||||
stream bool
|
||||
payload interface{}
|
||||
}
|
||||
|
||||
type rpcMessage struct {
|
||||
topic string
|
||||
contentType string
|
||||
payload interface{}
|
||||
header metadata.Metadata
|
||||
body []byte
|
||||
codec codec.Codec
|
||||
}
|
||||
|
||||
func (r *rpcRequest) ContentType() string {
|
||||
return r.contentType
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Service() string {
|
||||
return r.service
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Method() string {
|
||||
return r.method
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Endpoint() string {
|
||||
return r.method
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Codec() codec.Codec {
|
||||
return r.codec
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Header() metadata.Metadata {
|
||||
return r.header
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Read() ([]byte, error) {
|
||||
f := &codec.Frame{}
|
||||
if err := r.codec.ReadBody(r.rw, f); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f.Data, nil
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Stream() bool {
|
||||
return r.stream
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Body() interface{} {
|
||||
return r.payload
|
||||
}
|
||||
|
||||
func (r *rpcMessage) ContentType() string {
|
||||
return r.contentType
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Topic() string {
|
||||
return r.topic
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Payload() interface{} {
|
||||
return r.payload
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Header() metadata.Metadata {
|
||||
return r.header
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Body() []byte {
|
||||
return r.body
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Codec() codec.Codec {
|
||||
return r.codec
|
||||
}
|
106
server.go
Normal file
106
server.go
Normal file
@ -0,0 +1,106 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
type methodType struct {
|
||||
method reflect.Method
|
||||
ArgType reflect.Type
|
||||
ReplyType reflect.Type
|
||||
ContextType reflect.Type
|
||||
stream bool
|
||||
}
|
||||
|
||||
// Is this an exported - upper case - name?
|
||||
func isExported(name string) bool {
|
||||
r, _ := utf8.DecodeRuneInString(name)
|
||||
return unicode.IsUpper(r)
|
||||
}
|
||||
|
||||
// Is this type exported or a builtin?
|
||||
func isExportedOrBuiltinType(t reflect.Type) bool {
|
||||
for t.Kind() == reflect.Ptr {
|
||||
t = t.Elem()
|
||||
}
|
||||
// PkgPath will be non-empty even for an exported type,
|
||||
// so we need to check the type name as well.
|
||||
return isExported(t.Name()) || t.PkgPath() == ""
|
||||
}
|
||||
|
||||
// prepareEndpoint() returns a methodType for the provided method or nil
|
||||
// in case if the method was unsuitable.
|
||||
func prepareEndpoint(method reflect.Method) (*methodType, error) {
|
||||
mtype := method.Type
|
||||
mname := method.Name
|
||||
var replyType, argType, contextType reflect.Type
|
||||
var stream bool
|
||||
|
||||
// Endpoint() must be exported.
|
||||
if method.PkgPath != "" {
|
||||
return nil, fmt.Errorf("Endpoint must be exported")
|
||||
}
|
||||
|
||||
switch mtype.NumIn() {
|
||||
case 3:
|
||||
// assuming streaming
|
||||
argType = mtype.In(2)
|
||||
contextType = mtype.In(1)
|
||||
stream = true
|
||||
case 4:
|
||||
// method that takes a context
|
||||
argType = mtype.In(2)
|
||||
replyType = mtype.In(3)
|
||||
contextType = mtype.In(1)
|
||||
default:
|
||||
return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
|
||||
}
|
||||
|
||||
if stream {
|
||||
// check stream type
|
||||
streamType := reflect.TypeOf((*server.Stream)(nil)).Elem()
|
||||
if !argType.Implements(streamType) {
|
||||
return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType)
|
||||
}
|
||||
} else {
|
||||
// if not stream check the replyType
|
||||
|
||||
// First arg need not be a pointer.
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType)
|
||||
}
|
||||
|
||||
if replyType.Kind() != reflect.Ptr {
|
||||
return nil, fmt.Errorf("method %v reply type not a pointer: %v", mname, replyType)
|
||||
}
|
||||
|
||||
// Reply type must be exported.
|
||||
if !isExportedOrBuiltinType(replyType) {
|
||||
return nil, fmt.Errorf("method %v reply type not exported: %v", mname, replyType)
|
||||
}
|
||||
}
|
||||
|
||||
// Endpoint() needs one out.
|
||||
if mtype.NumOut() != 1 {
|
||||
return nil, fmt.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
|
||||
}
|
||||
// The return type of the method must be error.
|
||||
if returnType := mtype.Out(0); returnType != typeOfError {
|
||||
return nil, fmt.Errorf("method %v returns %v not error", mname, returnType.String())
|
||||
}
|
||||
|
||||
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil
|
||||
}
|
||||
|
||||
func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
|
||||
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
|
||||
return contextv
|
||||
}
|
||||
return reflect.Zero(m.ContextType)
|
||||
}
|
Loading…
Reference in New Issue
Block a user