Compare commits

..

No commits in common. "master" and "v4.0.0" have entirely different histories.

33 changed files with 573 additions and 2465 deletions

24
.gitignore vendored
View File

@ -1,24 +0,0 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

21
go.mod
View File

@ -3,23 +3,14 @@ module go.unistack.org/micro-server-http/v4
go 1.19 go 1.19
require ( require (
go.unistack.org/micro-codec-yaml/v4 v4.0.0 go.unistack.org/micro-proto/v4 v4.0.0
go.unistack.org/micro-proto/v4 v4.0.1 go.unistack.org/micro/v4 v4.0.1
go.unistack.org/micro/v4 v4.0.17 golang.org/x/net v0.7.0
go.unistack.org/protoc-gen-go-micro/v4 v4.0.13
golang.org/x/net v0.22.0
) )
require ( require (
github.com/fatih/structtag v1.2.0 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic v0.6.9 // indirect
github.com/google/gnostic v0.7.0 // indirect google.golang.org/protobuf v1.28.1 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
) )

1432
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -19,10 +19,10 @@ import (
) )
var ( var (
DefaultErrorHandler = func(ctx context.Context, s interface{}, w http.ResponseWriter, r *http.Request, err error, status int) { DefaultErrorHandler = func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int) {
w.WriteHeader(status) w.WriteHeader(status)
if _, cerr := w.Write([]byte(err.Error())); cerr != nil { if _, cerr := w.Write([]byte(err.Error())); cerr != nil {
logger.DefaultLogger.Error(ctx, fmt.Sprintf("write failed: %v", cerr)) logger.DefaultLogger.Errorf(ctx, "write failed: %v", cerr)
} }
} }
DefaultContentType = "application/json" DefaultContentType = "application/json"
@ -35,7 +35,7 @@ type patHandler struct {
} }
type httpHandler struct { type httpHandler struct {
opts server.HandleOptions opts server.HandlerOptions
hd interface{} hd interface{}
handlers *rhttp.Trie handlers *rhttp.Trie
name string name string
@ -56,287 +56,17 @@ func (h *httpHandler) Endpoints() []*register.Endpoint {
return h.eps return h.eps
} }
func (h *httpHandler) Options() server.HandleOptions { func (h *httpHandler) Options() server.HandlerOptions {
return h.opts return h.opts
} }
func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) { func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if handler == nil { // check for http.HandlerFunc handlers
return nil, fmt.Errorf("invalid handler specified: %v", handler) if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r)
return
} }
rtype := reflect.TypeOf(handler)
if rtype.NumIn() != 3 {
return nil, fmt.Errorf("invalid handler, NumIn != 3: %v", rtype.NumIn())
}
argType := rtype.In(1)
replyType := rtype.In(2)
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
return nil, fmt.Errorf("invalid handler, argument type not exported: %v", argType)
}
if replyType.Kind() != reflect.Ptr {
return nil, fmt.Errorf("invalid handler, reply type not a pointer: %v", replyType)
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
return nil, fmt.Errorf("invalid handler, reply type not exported: %v", replyType)
}
if rtype.NumOut() != 1 {
return nil, fmt.Errorf("invalid handler, has wrong number of outs: %v", rtype.NumOut())
}
// The return type of the method must be error.
if returnType := rtype.Out(0); returnType != typeOfError {
return nil, fmt.Errorf("invalid handler, returns %v not error", returnType.String())
}
return func(w http.ResponseWriter, r *http.Request) {
ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype
}
ctx := context.WithValue(r.Context(), rspCodeKey{}, &rspCodeVal{})
ctx = context.WithValue(ctx, rspHeaderKey{}, &rspHeaderVal{})
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(len(r.Header) + 8)
}
for k, v := range r.Header {
md[k] = v[0]
}
md["RemoteAddr"] = r.RemoteAddr
md["Method"] = r.Method
md["URL"] = r.URL.String()
md["Proto"] = r.Proto
md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength)
md["Transfer-Encoding"] = r.TransferEncoding[0]
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
if r.TLS != nil {
md["TLS"] = "true"
md["TLS-ALPN"] = r.TLS.NegotiatedProtocol
md["TLS-ServerName"] = r.TLS.ServerName
}
ctx = metadata.NewIncomingContext(ctx, md)
path := r.URL.Path
if r.Body != nil {
defer r.Body.Close()
}
matches := make(map[string]interface{})
var match bool
var hldr *patHandler
var handler *httpHandler
for _, shdlr := range h.handlers {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(r.Method, path)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
break
} else if err == rhttp.ErrMethodNotAllowed && !h.registerRPC {
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = w.Write([]byte("not matching route found"))
return
}
}
if !match && h.registerRPC {
microMethod, mok := md.Get(metadata.HeaderEndpoint)
if mok {
serviceMethod := strings.Split(microMethod, ".")
if len(serviceMethod) == 2 {
if shdlr, ok := h.handlers[serviceMethod[0]]; ok {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(http.MethodPost, "/"+microMethod)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
}
}
}
}
}
// get fields from url values
if len(r.URL.RawQuery) > 0 {
umd, cerr := rflutil.URLMap(r.URL.RawQuery)
if cerr != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(cerr.Error()))
return
}
for k, v := range umd {
matches[k] = v
}
}
cf, err := h.newCodec(ct)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
var argv, replyv reflect.Value
// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if hldr.mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(hldr.mtype.ArgType.Elem())
} else {
argv = reflect.New(hldr.mtype.ArgType)
argIsValue = true
}
if argIsValue {
argv = argv.Elem()
}
// reply value
replyv = reflect.New(hldr.mtype.ReplyType.Elem())
function := hldr.mtype.method.Func
var returnValues []reflect.Value
if r.Body != nil {
var buf []byte
buf, err = io.ReadAll(r.Body)
if err != nil && err != io.EOF {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
}
matches = rflutil.FlattenMap(matches)
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
hr := &rpcRequest{
codec: cf,
service: handler.sopts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
endpoint: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
payload: argv.Interface(),
header: md,
}
// define the handler func
fn := func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(fctx), argv, reflect.ValueOf(rsp)})
// The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil {
err = rerr.(error)
}
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd {
md[k] = v
}
}
return err
}
// wrap the handler func
// for i := len(handler.sopts.Hooks); i > 0; i-- {
// fn = handler.sopts.Hooks[i-1](fn)
// }
if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType)
if err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
ct = DefaultContentType
}
scode := int(200)
appErr := fn(ctx, hr, replyv.Interface())
w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md {
w.Header()[k] = []string{v}
}
}
if md := getRspHeader(ctx); md != nil {
for k, v := range md {
w.Header()[k] = v
}
}
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
if cf, err = h.newCodec(nct); err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
return
}
}
var buf []byte
if appErr != nil {
switch verr := appErr.(type) {
case *errors.Error:
scode = int(verr.Code)
buf, err = cf.Marshal(verr)
case *Error:
buf, err = cf.Marshal(verr.err)
default:
buf, err = cf.Marshal(appErr)
}
} else {
buf, err = cf.Marshal(replyv.Interface())
}
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Error(handler.sopts.Context, fmt.Sprintf("handler err: %v", err))
return
}
if nscode := GetRspCode(ctx); nscode != 0 {
scode = nscode
}
w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Error(ctx, fmt.Sprintf("write failed: %v", cerr))
}
}, nil
}
func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ct := DefaultContentType ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" { if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype ct = htype
@ -349,25 +79,20 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
md = metadata.New(len(r.Header) + 8) md = metadata.New(len(r.Header) + 8)
} }
for k, v := range r.Header { for k, v := range r.Header {
md[k] = v[0] md[k] = strings.Join(v, ", ")
} }
md["RemoteAddr"] = r.RemoteAddr md["RemoteAddr"] = r.RemoteAddr
if r.TLS != nil {
md["Scheme"] = "https"
} else {
md["Scheme"] = "http"
}
md["Method"] = r.Method md["Method"] = r.Method
md["URL"] = r.URL.String() md["URL"] = r.URL.String()
md["Proto"] = r.Proto md["Proto"] = r.Proto
md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength) md["ContentLength"] = fmt.Sprintf("%d", r.ContentLength)
if len(r.TransferEncoding) > 0 { md["TransferEncoding"] = strings.Join(r.TransferEncoding, ",")
md["Transfer-Encoding"] = r.TransferEncoding[0]
}
md["Host"] = r.Host md["Host"] = r.Host
md["RequestURI"] = r.RequestURI md["RequestURI"] = r.RequestURI
ctx = metadata.NewIncomingContext(ctx, md) ctx = metadata.NewIncomingContext(ctx, md)
defer r.Body.Close()
path := r.URL.Path path := r.URL.Path
if !strings.HasPrefix(path, "/") { if !strings.HasPrefix(path, "/") {
h.errorHandler(ctx, nil, w, r, fmt.Errorf("path must starts with /"), http.StatusBadRequest) h.errorHandler(ctx, nil, w, r, fmt.Errorf("path must starts with /"), http.StatusBadRequest)
@ -419,16 +144,11 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
if !match && h.hd != nil { if !match && h.hd != nil {
if hdlr, ok := h.hd.(http.Handler); ok { if hdlr, ok := h.hd.Handler().(http.Handler); ok {
hdlr.ServeHTTP(w, r) hdlr.ServeHTTP(w, r)
return return
} }
} else if !match { } else if !match {
// check for http.HandlerFunc handlers
if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r)
return
}
h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound) h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound)
return return
} }
@ -445,10 +165,6 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
if r.Body != nil {
defer r.Body.Close()
}
cf, err := h.newCodec(ct) cf, err := h.newCodec(ct)
if err != nil { if err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest) h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
@ -476,26 +192,21 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
function := hldr.mtype.method.Func function := hldr.mtype.method.Func
var returnValues []reflect.Value var returnValues []reflect.Value
if r.Body != nil { buf, err := io.ReadAll(r.Body)
var buf []byte if err != nil && err != io.EOF {
buf, err = io.ReadAll(r.Body) h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
if err != nil && err != io.EOF { return
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
} }
if len(matches) > 0 { if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
matches = rflutil.FlattenMap(matches) h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil { return
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest) }
return
} matches = rflutil.FlattenMap(matches)
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
} }
hr := &rpcRequest{ hr := &rpcRequest{
@ -523,17 +234,18 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
if nmd, ok := metadata.FromOutgoingContext(fctx); ok { if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd { for k, v := range nmd {
md[k] = v md.Set(k, v)
} }
} }
metadata.SetOutgoingContext(ctx, md)
return err return err
} }
// wrap the handler func // wrap the handler func
// for i := len(handler.sopts.HdlrWrappers); i > 0; i-- { for i := len(handler.sopts.HdlrWrappers); i > 0; i-- {
// fn = handler.sopts.HdlrWrappers[i-1](fn) fn = handler.sopts.HdlrWrappers[i-1](fn)
// } }
if ct == "application/x-www-form-urlencoded" { if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType) cf, err = h.newCodec(DefaultContentType)
@ -550,7 +262,7 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set(metadata.HeaderContentType, ct) w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md { for k, v := range md {
w.Header()[k] = []string{v} w.Header().Set(k, v)
} }
} }
if md := getRspHeader(ctx); md != nil { if md := getRspHeader(ctx); md != nil {
@ -567,7 +279,6 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
var buf []byte
if appErr != nil { if appErr != nil {
switch verr := appErr.(type) { switch verr := appErr.(type) {
case *errors.Error: case *errors.Error:
@ -583,7 +294,7 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) { if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Error(handler.sopts.Context, fmt.Sprintf("handler err: %v", err)) handler.sopts.Logger.Errorf(handler.sopts.Context, "handler err: %v", err)
return return
} }
@ -593,6 +304,6 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(scode) w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil { if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Error(ctx, fmt.Sprintf("write failed: %v", cerr)) handler.sopts.Logger.Errorf(ctx, "write failed: %v", cerr)
} }
} }

View File

@ -1,12 +0,0 @@
package handler
import (
// import required packages
_ "go.unistack.org/micro-proto/v4/openapiv3"
)
//go:generate sh -c "curl -L https://github.com/swagger-api/swagger-ui/archive/refs/tags/v4.18.3.zip -o - | bsdtar -C swagger-ui --strip-components=2 -xv swagger-ui-4.18.3/dist && rm swagger-ui/*.map swagger-ui/*-es-*.js swagger-ui/swagger-ui.js swagger-ui/swagger-initializer.js"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./meter/meter.proto"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./health/health.proto"

View File

@ -0,0 +1,8 @@
package health
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ health.proto"
import (
// import required packages
_ "go.unistack.org/micro-proto/v4/openapiv3"
)

View File

@ -7,7 +7,7 @@ import (
"go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v4/errors"
) )
var _ HealthServiceServer = (*Handler)(nil) var _ HealthServiceServer = &Handler{}
type Handler struct { type Handler struct {
opts Options opts Options

View File

@ -1,19 +1,45 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT. // Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-micro v4.0.2 // - protoc-gen-go-micro v4.10.2
// - protoc v4.23.4 // - protoc v4.21.12
// source: health/health.proto // source: health.proto
package health package health
import ( import (
context "context" context "context"
codec "go.unistack.org/micro-proto/v4/codec" codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4"
) )
var ( var (
HealthServiceName = "HealthService" HealthServiceName = "HealthService"
) )
var (
HealthServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type HealthServiceServer interface { type HealthServiceServer interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT. // Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v4.0.2 // protoc-gen-go-micro version: v4.10.2
// source: health/health.proto // source: health.proto
package health package health
@ -8,36 +8,9 @@ import (
context "context" context "context"
codec "go.unistack.org/micro-proto/v4/codec" codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4" v4 "go.unistack.org/micro-server-http/v4"
options "go.unistack.org/micro/v4/options"
server "go.unistack.org/micro/v4/server" server "go.unistack.org/micro/v4/server"
) )
var (
HealthServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type healthServiceServer struct { type healthServiceServer struct {
HealthServiceServer HealthServiceServer
} }
@ -54,7 +27,7 @@ func (h *healthServiceServer) Version(ctx context.Context, req *codec.Frame, rsp
return h.HealthServiceServer.Version(ctx, req, rsp) return h.HealthServiceServer.Version(ctx, req, rsp)
} }
func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts ...options.Option) error { func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts ...server.HandlerOption) error {
type healthService interface { type healthService interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
@ -64,7 +37,7 @@ func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts .
healthService healthService
} }
h := &healthServiceServer{sh} h := &healthServiceServer{sh}
var nopts []options.Option var nopts []server.HandlerOption
nopts = append(nopts, v4.HandlerEndpoints(HealthServiceServerEndpoints)) nopts = append(nopts, v4.HandlerEndpoints(HealthServiceServerEndpoints))
return s.Handle(&HealthService{h}, append(nopts, opts...)...) return s.Handle(s.NewHandler(&HealthService{h}, append(nopts, opts...)...))
} }

View File

@ -0,0 +1,8 @@
package meter
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ meter.proto"
import (
// import required packages
_ "go.unistack.org/micro-proto/v4/openapiv3"
)

View File

@ -2,38 +2,15 @@ package meter // import "go.unistack.org/micro-server-http/v4/handler/meter"
import ( import (
"bytes" "bytes"
"compress/gzip"
"context" "context"
"io"
"strings"
"sync"
codecpb "go.unistack.org/micro-proto/v4/codec" codecpb "go.unistack.org/micro-proto/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/options"
) )
const (
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
)
var gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
// guard to fail early // guard to fail early
var _ MeterServiceServer = (*Handler)(nil) var _ MeterServiceServer = &Handler{}
type Handler struct { type Handler struct {
opts Options opts Options
@ -42,10 +19,9 @@ type Handler struct {
type Option func(*Options) type Option func(*Options)
type Options struct { type Options struct {
Meter meter.Meter Meter meter.Meter
Name string Name string
MeterOptions []options.Option MeterOptions []meter.Option
DisableCompress bool
} }
func Meter(m meter.Meter) Option { func Meter(m meter.Meter) Option {
@ -60,20 +36,14 @@ func Name(name string) Option {
} }
} }
func DisableCompress(g bool) Option { func MeterOptions(opts ...meter.Option) Option {
return func(o *Options) {
o.DisableCompress = g
}
}
func MeterOptions(opts ...options.Option) Option {
return func(o *Options) { return func(o *Options) {
o.MeterOptions = append(o.MeterOptions, opts...) o.MeterOptions = append(o.MeterOptions, opts...)
} }
} }
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{Meter: meter.DefaultMeter, DisableCompress: false} options := Options{Meter: meter.DefaultMeter}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
@ -86,48 +56,12 @@ func NewHandler(opts ...Option) *Handler {
} }
func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error { func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
log, ok := logger.FromContext(ctx) buf := bytes.NewBuffer(nil)
if !ok { if err := h.opts.Meter.Write(buf, h.opts.MeterOptions...); err != nil {
log = logger.DefaultLogger return errors.InternalServerError(h.opts.Name, "%v", err)
}
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
w := io.Writer(buf)
if md, ok := metadata.FromIncomingContext(ctx); gzipAccepted(md) && ok && !h.opts.DisableCompress {
omd, _ := metadata.FromOutgoingContext(ctx)
omd.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
gz.Flush()
}
if err := h.opts.Meter.Write(w, h.opts.MeterOptions...); err != nil {
log.Error(ctx, "http/meter: write failed", err)
return nil
} }
rsp.Data = buf.Bytes() rsp.Data = buf.Bytes()
return nil return nil
} }
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(md metadata.Metadata) bool {
a, ok := md.Get(acceptEncodingHeader)
if !ok {
return false
}
if strings.Contains(a, "gzip") {
return true
}
return false
}

View File

@ -1,19 +1,31 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT. // Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-micro v4.0.2 // - protoc-gen-go-micro v4.10.2
// - protoc v4.23.4 // - protoc v4.21.12
// source: meter/meter.proto // source: meter.proto
package meter package meter
import ( import (
context "context" context "context"
codec "go.unistack.org/micro-proto/v4/codec" codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4"
) )
var ( var (
MeterServiceName = "MeterService" MeterServiceName = "MeterService"
) )
var (
MeterServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type MeterServiceServer interface { type MeterServiceServer interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT. // Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v4.0.2 // protoc-gen-go-micro version: v4.10.2
// source: meter/meter.proto // source: meter.proto
package meter package meter
@ -8,22 +8,9 @@ import (
context "context" context "context"
codec "go.unistack.org/micro-proto/v4/codec" codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4" v4 "go.unistack.org/micro-server-http/v4"
options "go.unistack.org/micro/v4/options"
server "go.unistack.org/micro/v4/server" server "go.unistack.org/micro/v4/server"
) )
var (
MeterServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type meterServiceServer struct { type meterServiceServer struct {
MeterServiceServer MeterServiceServer
} }
@ -32,7 +19,7 @@ func (h *meterServiceServer) Metrics(ctx context.Context, req *codec.Frame, rsp
return h.MeterServiceServer.Metrics(ctx, req, rsp) return h.MeterServiceServer.Metrics(ctx, req, rsp)
} }
func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...options.Option) error { func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...server.HandlerOption) error {
type meterService interface { type meterService interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
} }
@ -40,7 +27,7 @@ func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...
meterService meterService
} }
h := &meterServiceServer{sh} h := &meterServiceServer{sh}
var nopts []options.Option var nopts []server.HandlerOption
nopts = append(nopts, v4.HandlerEndpoints(MeterServiceServerEndpoints)) nopts = append(nopts, v4.HandlerEndpoints(MeterServiceServerEndpoints))
return s.Handle(&MeterService{h}, append(nopts, opts...)...) return s.Handle(s.NewHandler(&MeterService{h}, append(nopts, opts...)...))
} }

View File

@ -1,49 +0,0 @@
package meter
import (
"context"
"testing"
codecpb "go.unistack.org/micro-proto/v4/codec"
)
func TestHandler_Metrics(t *testing.T) {
type fields struct {
opts Options
}
type args struct {
ctx context.Context
req *codecpb.Frame
rsp *codecpb.Frame
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
"Test #1",
fields{
opts: NewOptions(),
},
args{
context.Background(),
&codecpb.Frame{Data: []byte("gzip")},
&codecpb.Frame{},
},
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := &Handler{
opts: tt.fields.opts,
}
if err := h.Metrics(tt.args.ctx, tt.args.req, tt.args.rsp); (err != nil) != tt.wantErr {
t.Errorf("Metrics() error = %v, wantErr %v", err, tt.wantErr)
}
t.Logf("RSP: %v", tt.args.rsp.Data)
})
}
}

View File

@ -1,19 +0,0 @@
package spa
import (
"io/fs"
"net/http"
"strings"
)
// Handler serve files from dir and redirect to index if file not exists
var Handler = func(prefix string, dir fs.FS) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
f := http.StripPrefix(prefix, http.FileServer(http.FS(dir)))
if _, err := fs.Stat(dir, strings.TrimPrefix(r.RequestURI, prefix)); err != nil {
r.RequestURI = prefix
r.URL.Path = prefix
}
f.ServeHTTP(w, r)
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 665 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 628 B

View File

@ -1,16 +0,0 @@
html {
box-sizing: border-box;
overflow: -moz-scrollbars-vertical;
overflow-y: scroll;
}
*,
*:before,
*:after {
box-sizing: inherit;
}
body {
margin: 0;
background: #fafafa;
}

View File

@ -1,19 +0,0 @@
<!-- HTML for static distribution bundle build -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Swagger UI</title>
<link rel="stylesheet" type="text/css" href="./swagger-ui.css" />
<link rel="stylesheet" type="text/css" href="index.css" />
<link rel="icon" type="image/png" href="./favicon-32x32.png" sizes="32x32" />
<link rel="icon" type="image/png" href="./favicon-16x16.png" sizes="16x16" />
</head>
<body>
<div id="swagger-ui"></div>
<script src="./swagger-ui-bundle.js" charset="UTF-8"> </script>
<script src="./swagger-ui-standalone-preset.js" charset="UTF-8"> </script>
<script src="./swagger-initializer.js" charset="UTF-8"> </script>
</body>
</html>

View File

@ -1,79 +0,0 @@
<!doctype html>
<html lang="en-US">
<head>
<title>Swagger UI: OAuth2 Redirect</title>
</head>
<body>
<script>
'use strict';
function run () {
var oauth2 = window.opener.swaggerUIRedirectOauth2;
var sentState = oauth2.state;
var redirectUrl = oauth2.redirectUrl;
var isValid, qp, arr;
if (/code|token|error/.test(window.location.hash)) {
qp = window.location.hash.substring(1).replace('?', '&');
} else {
qp = location.search.substring(1);
}
arr = qp.split("&");
arr.forEach(function (v,i,_arr) { _arr[i] = '"' + v.replace('=', '":"') + '"';});
qp = qp ? JSON.parse('{' + arr.join() + '}',
function (key, value) {
return key === "" ? value : decodeURIComponent(value);
}
) : {};
isValid = qp.state === sentState;
if ((
oauth2.auth.schema.get("flow") === "accessCode" ||
oauth2.auth.schema.get("flow") === "authorizationCode" ||
oauth2.auth.schema.get("flow") === "authorization_code"
) && !oauth2.auth.code) {
if (!isValid) {
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "warning",
message: "Authorization may be unsafe, passed state was changed in server. The passed state wasn't returned from auth server."
});
}
if (qp.code) {
delete oauth2.state;
oauth2.auth.code = qp.code;
oauth2.callback({auth: oauth2.auth, redirectUrl: redirectUrl});
} else {
let oauthErrorMsg;
if (qp.error) {
oauthErrorMsg = "["+qp.error+"]: " +
(qp.error_description ? qp.error_description+ ". " : "no accessCode received from the server. ") +
(qp.error_uri ? "More info: "+qp.error_uri : "");
}
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "error",
message: oauthErrorMsg || "[Authorization failed]: no accessCode received from the server."
});
}
} else {
oauth2.callback({auth: oauth2.auth, token: qp, isValid: isValid, redirectUrl: redirectUrl});
}
window.close();
}
if (document.readyState !== 'loading') {
run();
} else {
document.addEventListener('DOMContentLoaded', function () {
run();
});
}
</script>
</body>
</html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,142 +0,0 @@
package swaggerui // import "go.unistack.org/micro-server-http/v4/handler/swagger-ui"
import (
"embed"
"html/template"
"net/http"
"path"
"reflect"
)
//go:embed *.js *.css *.html *.png
var assets embed.FS
var (
Handler = func(prefix string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || path.Base(r.URL.Path) != "swagger-initializer.js" {
http.StripPrefix(prefix, http.FileServer(http.FS(assets))).ServeHTTP(w, r)
return
}
tpl := template.New("swagger-initializer.js").Funcs(TemplateFuncs)
ptpl, err := tpl.Parse(Template)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if err := ptpl.Execute(w, Config); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
}
}
TemplateFuncs = template.FuncMap{
"isInt": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Int, reflect.Int8, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64:
return true
default:
return false
}
},
"isBool": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Bool:
return true
default:
return false
}
},
"isString": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.String:
return true
default:
return false
}
},
"isSlice": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Slice:
return true
default:
return false
}
},
"isMap": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Map:
return true
default:
return false
}
},
}
Template = `
window.onload = function() {
//<editor-fold desc="Changeable Configuration Block">
window.ui = SwaggerUIBundle({
{{- range $k, $v := . }}
{{- if (eq (printf "%s" $v) "") -}}
{{- continue -}}
{{ end }}
{{ $k }}: {{ if isBool $v -}}
{{- $v -}},
{{- else if isInt $v -}}
{{- $v -}},
{{- else if isString $v -}}
"{{- $v -}}",
{{- else if and (isSlice $v) (or (eq (printf "%s" $k) "presets") (eq (printf "%s" $k) "plugins")) -}}
[
{{- range $v }}
{{ . }},
{{- end }}
],
{{- end -}}
{{ end }}
});
//</editor-fold>
};`
Config = map[string]interface{}{
"configUrl": "",
"dom_id": "#swagger-ui",
/*
"domNode": "",
"spec": "",
"urls": []interface{}{
map[string]interface{}{
"url": "",
"name": "",
},
},
},
*/
"url": "https://petstore.swagger.io/v2/swagger.json",
"deepLinking": true,
"displayOperationId": false,
"defaultModelsExpandDepth": 1,
"defaultModelExpandDepth": 1,
"displayRequestDuration": true,
"filter": true,
"operationsSorter": "alpha",
"showExtensions": true,
"tryItOutEnabled": true,
"presets": []string{
"SwaggerUIBundle.presets.apis",
"SwaggerUIStandalonePreset",
},
"plugins": []string{
"SwaggerUIBundle.plugins.DownloadUrl",
},
"layout": "StandaloneLayout",
}
)

View File

@ -1,15 +0,0 @@
package swaggerui
import (
"net/http"
"testing"
)
func TestTemplate(t *testing.T) {
t.Skip()
h := http.NewServeMux()
h.HandleFunc("/", Handler(""))
if err := http.ListenAndServe(":8080", h); err != nil {
t.Fatal(err)
}
}

View File

@ -1,61 +0,0 @@
package swagger
import (
"io/fs"
"net/http"
yamlcodec "go.unistack.org/micro-codec-yaml/v4"
rutil "go.unistack.org/micro/v4/util/reflect"
)
// Handler append to generated swagger data from dst map[string]interface{}
var Handler = func(dst map[string]interface{}, fsys fs.FS) http.HandlerFunc {
c := yamlcodec.NewCodec()
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotFound)
return
}
path := r.URL.Path
if len(path) > 1 && path[0] == '/' {
path = path[1:]
}
buf, err := fs.ReadFile(fsys, path)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if dst == nil {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
return
}
var src interface{}
if err = c.Unmarshal(buf, src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if err = rutil.Merge(src, dst); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if buf, err = c.Marshal(src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
}
}

257
http.go
View File

@ -9,27 +9,29 @@ import (
"net" "net"
"net/http" "net/http"
"reflect" "reflect"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/server" "go.unistack.org/micro/v4/server"
rhttp "go.unistack.org/micro/v4/util/http" rhttp "go.unistack.org/micro/v4/util/http"
"golang.org/x/net/netutil" "golang.org/x/net/netutil"
) )
var _ server.Server = (*Server)(nil) var _ server.Server = &httpServer{}
type Server struct { type httpServer struct {
hd interface{} hd server.Handler
rsvc *register.Service rsvc *register.Service
handlers map[string]interface{} handlers map[string]server.Handler
exit chan chan error exit chan chan error
errorHandler func(context.Context, interface{}, http.ResponseWriter, *http.Request, error, int) subscribers map[*httpSubscriber][]broker.Subscriber
errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int)
pathHandlers *rhttp.Trie pathHandlers *rhttp.Trie
opts server.Options opts server.Options
registerRPC bool registerRPC bool
@ -38,7 +40,7 @@ type Server struct {
init bool init bool
} }
func (h *Server) newCodec(ct string) (codec.Codec, error) { func (h *httpServer) newCodec(ct string) (codec.Codec, error) {
if idx := strings.IndexRune(ct, ';'); idx >= 0 { if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx] ct = ct[:idx]
} }
@ -51,14 +53,14 @@ func (h *Server) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType return nil, codec.ErrUnknownContentType
} }
func (h *Server) Options() server.Options { func (h *httpServer) Options() server.Options {
h.Lock() h.Lock()
opts := h.opts opts := h.opts
h.Unlock() h.Unlock()
return opts return opts
} }
func (h *Server) Init(opts ...options.Option) error { func (h *httpServer) Init(opts ...server.Option) error {
if len(opts) == 0 && h.init { if len(opts) == 0 && h.init {
return nil return nil
} }
@ -68,11 +70,11 @@ func (h *Server) Init(opts ...options.Option) error {
for _, o := range opts { for _, o := range opts {
o(&h.opts) o(&h.opts)
} }
if fn, ok := h.opts.Context.Value(errorHandlerKey{}).(func(ctx context.Context, s interface{}, w http.ResponseWriter, r *http.Request, err error, status int)); ok && fn != nil { if fn, ok := h.opts.Context.Value(errorHandlerKey{}).(func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)); ok && fn != nil {
h.errorHandler = fn h.errorHandler = fn
} }
if h.handlers == nil { if h.handlers == nil {
h.handlers = make(map[string]interface{}) h.handlers = make(map[string]server.Handler)
} }
if h.pathHandlers == nil { if h.pathHandlers == nil {
h.pathHandlers = rhttp.NewTrie() h.pathHandlers = rhttp.NewTrie()
@ -99,6 +101,10 @@ func (h *Server) Init(opts ...options.Option) error {
h.RUnlock() h.RUnlock()
return err return err
} }
if err := h.opts.Broker.Init(); err != nil {
h.RUnlock()
return err
}
if err := h.opts.Tracer.Init(); err != nil { if err := h.opts.Tracer.Init(); err != nil {
h.RUnlock() h.RUnlock()
return err return err
@ -111,6 +117,10 @@ func (h *Server) Init(opts ...options.Option) error {
h.RUnlock() h.RUnlock()
return err return err
} }
if err := h.opts.Transport.Init(); err != nil {
h.RUnlock()
return err
}
h.RUnlock() h.RUnlock()
h.Lock() h.Lock()
@ -120,24 +130,12 @@ func (h *Server) Init(opts ...options.Option) error {
return nil return nil
} }
func (h *Server) Handle(handler interface{}, opts ...options.Option) error { func (h *httpServer) Handle(handler server.Handler) error {
options := server.NewHandleOptions(opts...)
var endpointMetadata []EndpointMetadata
if v, ok := options.Context.Value(handlerEndpointsKey{}).([]EndpointMetadata); ok {
endpointMetadata = v
}
// passed unknown handler // passed unknown handler
hdlr, ok := handler.(*httpHandler) hdlr, ok := handler.(*httpHandler)
if !ok { if !ok {
h.Lock() h.Lock()
if h.handlers == nil { h.hd = handler
h.handlers = make(map[string]interface{})
}
for _, v := range endpointMetadata {
h.handlers[v.Name] = h.newHTTPHandler(handler, opts...)
}
h.Unlock() h.Unlock()
return nil return nil
} }
@ -150,11 +148,19 @@ func (h *Server) Handle(handler interface{}, opts ...options.Option) error {
return nil return nil
} }
// passed micro compat handler
h.Lock()
if h.handlers == nil {
h.handlers = make(map[string]server.Handler)
}
h.handlers[handler.Name()] = handler
h.Unlock()
return nil return nil
} }
func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *httpHandler { func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.NewHandleOptions(opts...) options := server.NewHandlerOptions(opts...)
eps := make([]*register.Endpoint, 0, len(options.Metadata)) eps := make([]*register.Endpoint, 0, len(options.Metadata))
for name, metadata := range options.Metadata { for name, metadata := range options.Metadata {
@ -198,16 +204,16 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht
} }
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) { if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname)) h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname)
continue continue
} }
mtype, err := prepareEndpoint(method) mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) { if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("%v", err)) h.opts.Logger.Errorf(h.opts.Context, "%v", err)
continue continue
} else if mtype == nil { } else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname)) h.opts.Logger.Errorf(h.opts.Context, "nil mtype for %s", mname)
continue continue
} }
@ -218,13 +224,13 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht
hdlr.name = name hdlr.name = name
if err := hdlr.handlers.Insert([]string{md["Method"]}, md["Path"], pth); err != nil { if err := hdlr.handlers.Insert([]string{md["Method"]}, md["Path"], pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %s %s", md["Method"][0], md["Path"][0])) h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md["Method"], md["Path"])
} }
if h.registerRPC { if h.registerRPC {
h.opts.Logger.Info(h.opts.Context, fmt.Sprintf("register rpc handler for http.MethodPost %s /%s", hn, hn)) h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil { if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn)) h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn)
} }
} }
} }
@ -248,16 +254,16 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht
} }
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) { if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname)) h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname)
continue continue
} }
mtype, err := prepareEndpoint(method) mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) { if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("%v", err)) h.opts.Logger.Errorf(h.opts.Context, "%v", err)
continue continue
} else if mtype == nil { } else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname)) h.opts.Logger.Errorf(h.opts.Context, "nil mtype for %s", mname)
continue continue
} }
@ -268,13 +274,13 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht
hdlr.name = name hdlr.name = name
if err := hdlr.handlers.Insert([]string{md.Method}, md.Path, pth); err != nil { if err := hdlr.handlers.Insert([]string{md.Method}, md.Path, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %s %s", md.Method, md.Path)) h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md.Method, md.Path)
} }
if h.registerRPC { if h.registerRPC {
h.opts.Logger.Info(h.opts.Context, fmt.Sprintf("register rpc handler for http.MethodPost %s /%s", hn, hn)) h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil { if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn)) h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn)
} }
} }
} }
@ -282,15 +288,40 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht
return hdlr return hdlr
} }
func (h *Server) Register() error { func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, handler, opts...)
}
func (h *httpServer) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*httpSubscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := server.ValidateSubscriber(sb); err != nil {
return err
}
h.RLock()
_, ok = h.subscribers[sub]
h.RUnlock()
if ok {
return fmt.Errorf("subscriber %v already exists", h)
}
h.Lock()
h.subscribers[sub] = nil
h.Unlock()
return nil
}
func (h *httpServer) Register() error {
var eps []*register.Endpoint var eps []*register.Endpoint
h.RLock() h.RLock()
for _, hdlr := range h.handlers { for _, hdlr := range h.handlers {
hd, ok := hdlr.(*httpHandler) eps = append(eps, hdlr.Endpoints()...)
if !ok {
continue
}
eps = append(eps, hd.Endpoints()...)
} }
rsvc := h.rsvc rsvc := h.rsvc
config := h.opts config := h.opts
@ -308,16 +339,31 @@ func (h *Server) Register() error {
if err != nil { if err != nil {
return err return err
} }
service.Nodes[0].Metadata.Set("protocol", "http") service.Nodes[0].Metadata["protocol"] = "http"
service.Endpoints = eps service.Endpoints = eps
h.Lock()
subscriberList := make([]*httpSubscriber, 0, len(h.subscribers))
for e := range h.subscribers {
// Only advertise non internal subscribers
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
for _, e := range subscriberList {
service.Endpoints = append(service.Endpoints, e.Endpoints()...)
}
h.Unlock()
h.RLock() h.RLock()
registered := h.registered registered := h.registered
h.RUnlock() h.RUnlock()
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)) config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
} }
} }
@ -332,6 +378,29 @@ func (h *Server) Register() error {
} }
h.Lock() h.Lock()
for sb := range h.subscribers {
handler := h.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
h.Unlock()
return err
}
h.subscribers[sb] = []broker.Subscriber{sub}
}
h.registered = true h.registered = true
h.rsvc = service h.rsvc = service
h.Unlock() h.Unlock()
@ -339,7 +408,7 @@ func (h *Server) Register() error {
return nil return nil
} }
func (h *Server) Deregister() error { func (h *httpServer) Deregister() error {
h.RLock() h.RLock()
config := h.opts config := h.opts
h.RUnlock() h.RUnlock()
@ -350,7 +419,7 @@ func (h *Server) Deregister() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("Deregistering node: %s", service.Nodes[0].ID)) config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID)
} }
if err := server.DefaultDeregisterFunc(service, config); err != nil { if err := server.DefaultDeregisterFunc(service, config); err != nil {
@ -366,11 +435,28 @@ func (h *Server) Deregister() error {
} }
h.registered = false h.registered = false
subCtx := h.opts.Context
for sb, subs := range h.subscribers {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
for _, sub := range subs {
config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", sub.Topic())
if err := sub.Unsubscribe(subCtx); err != nil {
h.Unlock()
config.Logger.Errorf(config.Context, "failed to unsubscribe topic: %s, error: %v", sb.Topic(), err)
return err
}
}
h.subscribers[sb] = nil
}
h.Unlock() h.Unlock()
return nil return nil
} }
func (h *Server) Start() error { func (h *httpServer) Start() error {
h.RLock() h.RLock()
config := h.opts config := h.opts
h.RUnlock() h.RUnlock()
@ -400,7 +486,7 @@ func (h *Server) Start() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("Listening on %s", ts.Addr().String())) config.Logger.Infof(config.Context, "Listening on %s", ts.Addr().String())
} }
h.Lock() h.Lock()
@ -408,12 +494,13 @@ func (h *Server) Start() error {
h.Unlock() h.Unlock()
var handler http.Handler var handler http.Handler
var srvFunc func(net.Listener) error
// nolint: nestif // nolint: nestif
if h.opts.Context != nil { if h.opts.Context != nil {
if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil { if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
if hs.Handler == nil && h.hd != nil { if hs.Handler == nil && h.hd != nil {
if hdlr, ok := h.hd.(http.Handler); ok { if hdlr, ok := h.hd.Handler().(http.Handler); ok {
hs.Handler = hdlr hs.Handler = hdlr
handler = hs.Handler handler = hs.Handler
} }
@ -429,7 +516,7 @@ func (h *Server) Start() error {
case len(h.handlers) > 0 && h.hd != nil: case len(h.handlers) > 0 && h.hd != nil:
handler = h handler = h
case handler == nil && h.hd != nil: case handler == nil && h.hd != nil:
if hdlr, ok := h.hd.(http.Handler); ok { if hdlr, ok := h.hd.Handler().(http.Handler); ok {
handler = hdlr handler = hdlr
} }
} }
@ -438,9 +525,13 @@ func (h *Server) Start() error {
return fmt.Errorf("cant process with nil handler") return fmt.Errorf("cant process with nil handler")
} }
if err := config.Broker.Connect(h.opts.Context); err != nil {
return err
}
if err := config.RegisterCheck(h.opts.Context); err != nil { if err := config.RegisterCheck(h.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error: %s", config.Name, config.ID, err)) config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err)
} }
} else { } else {
if err = h.Register(); err != nil { if err = h.Register(); err != nil {
@ -450,7 +541,6 @@ func (h *Server) Start() error {
fn := handler fn := handler
var hs *http.Server
if h.opts.Context != nil { if h.opts.Context != nil {
if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 { if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 {
// wrap the handler func // wrap the handler func
@ -458,19 +548,25 @@ func (h *Server) Start() error {
fn = mwf[i-1](fn) fn = mwf[i-1](fn)
} }
} }
var ok bool if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
if hs, ok = h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
hs.Handler = fn hs.Handler = fn
} else { srvFunc = hs.Serve
hs = &http.Server{Handler: fn}
} }
} }
go func() { if srvFunc != nil {
if cerr := hs.Serve(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) { go func() {
h.opts.Logger.Error(h.opts.Context, cerr.Error()) if cerr := srvFunc(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
} h.opts.Logger.Error(h.opts.Context, cerr)
}() }
}()
} else {
go func() {
if cerr := http.Serve(ts, fn); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
h.opts.Logger.Error(h.opts.Context, cerr)
}
}()
}
go func() { go func() {
t := new(time.Ticker) t := new(time.Ticker)
@ -496,28 +592,28 @@ func (h *Server) Start() error {
// nolint: nestif // nolint: nestif
if rerr != nil && registered { if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)) config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
} }
// deregister self in case of error // deregister self in case of error
if err := h.Deregister(); err != nil { if err := h.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error: %s", config.Name, config.ID, err)) config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err)
} }
} }
} else if rerr != nil && !registered { } else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error: %s", config.Name, config.ID, rerr)) config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr)
} }
continue continue
} }
if err := h.Register(); err != nil { if err := h.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error: %s", config.Name, config.ID, err)) config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err)
} }
} }
if err := h.Register(); err != nil { if err := h.Register(); err != nil {
config.Logger.Error(config.Context, fmt.Sprintf("Server register error: %s", err)) config.Logger.Errorf(config.Context, "Server register error: %s", err)
} }
// wait for exit // wait for exit
case ch = <-h.exit: case ch = <-h.exit:
@ -527,46 +623,43 @@ func (h *Server) Start() error {
// deregister // deregister
if err := h.Deregister(); err != nil { if err := h.Deregister(); err != nil {
config.Logger.Error(config.Context, fmt.Sprintf("Server deregister error: %s", err)) config.Logger.Errorf(config.Context, "Server deregister error: %s", err)
} }
ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout) if err := config.Broker.Disconnect(config.Context); err != nil {
defer cancel() config.Logger.Errorf(config.Context, "Broker disconnect error: %s", err)
err := hs.Shutdown(ctx)
if err != nil {
err = hs.Close()
} }
ch <- err ch <- ts.Close()
}() }()
return nil return nil
} }
func (h *Server) Stop() error { func (h *httpServer) Stop() error {
ch := make(chan error) ch := make(chan error)
h.exit <- ch h.exit <- ch
return <-ch return <-ch
} }
func (h *Server) String() string { func (h *httpServer) String() string {
return "http" return "http"
} }
func (h *Server) Name() string { func (h *httpServer) Name() string {
return h.opts.Name return h.opts.Name
} }
func NewServer(opts ...options.Option) *Server { func NewServer(opts ...server.Option) *httpServer {
options := server.NewOptions(opts...) options := server.NewOptions(opts...)
eh := DefaultErrorHandler eh := DefaultErrorHandler
if v, ok := options.Context.Value(errorHandlerKey{}).(errorHandler); ok && v != nil { if v, ok := options.Context.Value(errorHandlerKey{}).(errorHandler); ok && v != nil {
eh = v eh = v
} }
return &Server{ return &httpServer{
opts: options, opts: options,
exit: make(chan chan error), exit: make(chan chan error),
subscribers: make(map[*httpSubscriber][]broker.Subscriber),
errorHandler: eh, errorHandler: eh,
pathHandlers: rhttp.NewTrie(), pathHandlers: rhttp.NewTrie(),
} }

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/server"
) )
// SetError pass error to caller // SetError pass error to caller
@ -79,24 +79,24 @@ func GetRspCode(ctx context.Context) int {
type middlewareKey struct{} type middlewareKey struct{}
// Middleware passes http middlewares // Middleware passes http middlewares
func Middleware(mw ...func(http.Handler) http.Handler) options.Option { func Middleware(mw ...func(http.Handler) http.Handler) server.Option {
return options.ContextOption(middlewareKey{}, mw) return server.SetOption(middlewareKey{}, mw)
} }
type serverKey struct{} type serverKey struct{}
// HTTPServer provide ability to pass *http.Server // Server provide ability to pass *http.Server
func HTTPServer(hs *http.Server) options.Option { func Server(hs *http.Server) server.Option {
return options.ContextOption(serverKey{}, hs) return server.SetOption(serverKey{}, hs)
} }
type errorHandler func(ctx context.Context, s interface{}, w http.ResponseWriter, r *http.Request, err error, status int) type errorHandler func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)
type errorHandlerKey struct{} type errorHandlerKey struct{}
// ErrorHandler specifies handler for errors // ErrorHandler specifies handler for errors
func ErrorHandler(fn errorHandler) options.Option { func ErrorHandler(fn errorHandler) server.Option {
return options.ContextOption(errorHandlerKey{}, fn) return server.SetOption(errorHandlerKey{}, fn)
} }
type ( type (
@ -107,18 +107,12 @@ type (
) )
// PathHandler specifies http handler for path regexp // PathHandler specifies http handler for path regexp
func PathHandler(method, path string, handler http.HandlerFunc) options.Option { func PathHandler(method, path string, handler http.HandlerFunc) server.Option {
return func(src interface{}) error { return func(o *server.Options) {
vctx, err := options.Get(src, ".Context") if o.Context == nil {
if err != nil { o.Context = context.Background()
return err
} }
ctx, ok := vctx.(context.Context) v, ok := o.Context.Value(pathHandlerKey{}).(*pathHandlerVal)
if !ok {
return fmt.Errorf("invalid option")
}
v, ok := ctx.Value(pathHandlerKey{}).(*pathHandlerVal)
if !ok { if !ok {
v = &pathHandlerVal{h: make(map[string]map[string]http.HandlerFunc)} v = &pathHandlerVal{h: make(map[string]map[string]http.HandlerFunc)}
} }
@ -127,17 +121,16 @@ func PathHandler(method, path string, handler http.HandlerFunc) options.Option {
m = make(map[string]http.HandlerFunc) m = make(map[string]http.HandlerFunc)
v.h[method] = m v.h[method] = m
} }
ctx = context.WithValue(ctx, pathHandlerKey{}, v)
m[path] = handler m[path] = handler
return options.Set(src, ctx, ".Context") o.Context = context.WithValue(o.Context, pathHandlerKey{}, v)
} }
} }
type registerRPCHandlerKey struct{} type registerRPCHandlerKey struct{}
// RegisterRPCHandler registers compatibility endpoints with /ServiceName.ServiceEndpoint method POST // RegisterRPCHandler registers compatibility endpoints with /ServiceName.ServiceEndpoint method POST
func RegisterRPCHandler(b bool) options.Option { func RegisterRPCHandler(b bool) server.Option {
return options.ContextOption(registerRPCHandlerKey{}, b) return server.SetOption(registerRPCHandlerKey{}, b)
} }
type handlerEndpointsKey struct{} type handlerEndpointsKey struct{}
@ -150,8 +143,8 @@ type EndpointMetadata struct {
Stream bool Stream bool
} }
func HandlerEndpoints(md []EndpointMetadata) options.Option { func HandlerEndpoints(md []EndpointMetadata) server.HandlerOption {
return options.ContextOption(handlerEndpointsKey{}, md) return server.SetHandlerOption(handlerEndpointsKey{}, md)
} }
type handlerOptions struct { type handlerOptions struct {

View File

@ -8,7 +8,10 @@ import (
"go.unistack.org/micro/v4/server" "go.unistack.org/micro/v4/server"
) )
var _ server.Request = &rpcRequest{} var (
_ server.Request = &rpcRequest{}
_ server.Message = &rpcMessage{}
)
type rpcRequest struct { type rpcRequest struct {
rw io.ReadWriter rw io.ReadWriter
@ -22,6 +25,14 @@ type rpcRequest struct {
stream bool stream bool
} }
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
}
func (r *rpcRequest) ContentType() string { func (r *rpcRequest) ContentType() string {
return r.contentType return r.contentType
} }
@ -61,3 +72,23 @@ func (r *rpcRequest) Stream() bool {
func (r *rpcRequest) Body() interface{} { func (r *rpcRequest) Body() interface{} {
return r.payload return r.payload
} }
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Body() interface{} {
return r.payload
}
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}
func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}

View File

@ -10,8 +10,6 @@ import (
"go.unistack.org/micro/v4/server" "go.unistack.org/micro/v4/server"
) )
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type methodType struct { type methodType struct {
ArgType reflect.Type ArgType reflect.Type
ReplyType reflect.Type ReplyType reflect.Type

208
subscriber.go Normal file
View File

@ -0,0 +1,208 @@
package http
import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/server"
)
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type handler struct {
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type httpSubscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*register.Endpoint
opts server.SubscriberOptions
}
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.NewSubscriberOptions(opts...)
var endpoints []*register.Endpoint
var handlers []*handler
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: "Func",
Request: register.ExtractSubValue(typ),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: name + "." + method.Name,
Request: register.ExtractSubValue(method.Type),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
}
}
return &httpSubscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
}
}
func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
if err != nil {
return err
}
hdr := metadata.Copy(msg.Header)
delete(hdr, "Content-Type")
ctx := metadata.NewIncomingContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
buf := bytes.NewBuffer(msg.Body)
if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil {
return err
}
if err := cf.ReadBody(buf, req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg server.Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}
return nil
}
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
go func() {
results <- fn(ctx, &httpMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
codec: cf,
})
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return nil
}
}
func (s *httpSubscriber) Topic() string {
return s.topic
}
func (s *httpSubscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *httpSubscriber) Endpoints() []*register.Endpoint {
return s.endpoints
}
func (s *httpSubscriber) Options() server.SubscriberOptions {
return s.opts
}

View File

@ -1,8 +0,0 @@
//go:build tools
package http
import (
_ "go.unistack.org/micro-proto/v4"
_ "go.unistack.org/protoc-gen-go-micro/v4"
)

12
util.go
View File

@ -31,13 +31,15 @@ func FillRequest(ctx context.Context, req interface{}, opts ...FillRequestOption
} }
} }
cookies := md["Cookie"] cookies := strings.Split(md["Cookie"], ";")
cmd := make(map[string]string, len(cookies)) cmd := make(map[string]string, len(cookies))
kv := strings.Split(cookies, "=") for _, cookie := range cookies {
if len(kv) != 2 { kv := strings.Split(cookie, "=")
return nil if len(kv) != 2 {
continue
}
cmd[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
} }
cmd[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
for idx := 0; idx < len(options.cookies)/2; idx += 2 { for idx := 0; idx < len(options.cookies)/2; idx += 2 {
k := http.CanonicalHeaderKey(options.cookies[idx]) k := http.CanonicalHeaderKey(options.cookies[idx])
v, ok := cmd[k] v, ok := cmd[k]