flush_v3 #188

Merged
vtolstov merged 6 commits from :flush_v3 into v3 2024-03-26 14:52:34 +03:00
7 changed files with 1549 additions and 37 deletions
Showing only changes of commit 7c669c636d - Show all commits

24
.gitignore vendored Normal file
View File

@ -0,0 +1,24 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

16
go.mod
View File

@ -5,15 +5,19 @@ go 1.18
require ( require (
go.unistack.org/micro-codec-yaml/v3 v3.10.0 go.unistack.org/micro-codec-yaml/v3 v3.10.0
go.unistack.org/micro-proto/v3 v3.3.1 go.unistack.org/micro-proto/v3 v3.3.1
go.unistack.org/micro/v3 v3.10.14 go.unistack.org/micro/v3 v3.10.52
golang.org/x/net v0.7.0 golang.org/x/net v0.22.0
) )
require ( require (
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic v0.6.9 // indirect github.com/google/gnostic v0.7.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
golang.org/x/sys v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/grpc v1.62.1 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect sigs.k8s.io/yaml v1.4.0 // indirect
) )

1410
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -115,14 +115,14 @@ func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error)
md["Method"] = r.Method md["Method"] = r.Method
md["URL"] = r.URL.String() md["URL"] = r.URL.String()
md["Proto"] = r.Proto md["Proto"] = r.Proto
md["ContentLength"] = fmt.Sprintf("%d", r.ContentLength) md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength)
md["TransferEncoding"] = strings.Join(r.TransferEncoding, ",") md["Transfer-Encoding"] = strings.Join(r.TransferEncoding, ",")
md["Host"] = r.Host md["Host"] = r.Host
md["RequestURI"] = r.RequestURI md["RequestURI"] = r.RequestURI
if r.TLS != nil { if r.TLS != nil {
md["TLS"] = "true" md["TLS"] = "true"
md["TLS_ALPN"] = r.TLS.NegotiatedProtocol md["TLS-ALPN"] = r.TLS.NegotiatedProtocol
md["TLS_ServerName"] = r.TLS.ServerName md["TLS-ServerName"] = r.TLS.ServerName
} }
ctx = metadata.NewIncomingContext(ctx, md) ctx = metadata.NewIncomingContext(ctx, md)

View File

@ -2,13 +2,35 @@ package meter // import "go.unistack.org/micro-server-http/v3/handler/meter"
import ( import (
"bytes" "bytes"
"compress/gzip"
"context" "context"
"io"
"strings"
"sync"
codecpb "go.unistack.org/micro-proto/v3/codec" codecpb "go.unistack.org/micro-proto/v3/codec"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/meter"
) )
const (
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
)
var gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
// guard to fail early // guard to fail early
var _ MeterServiceServer = &Handler{} var _ MeterServiceServer = &Handler{}
@ -56,12 +78,46 @@ func NewHandler(opts ...Option) *Handler {
} }
func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error { func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
buf := bytes.NewBuffer(nil) log, ok := logger.FromContext(ctx)
if err := h.opts.Meter.Write(buf, h.opts.MeterOptions...); err != nil { if !ok {
return errors.InternalServerError(h.opts.Name, "%v", err) log = logger.DefaultLogger
}
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
w := io.Writer(buf)
if md, ok := metadata.FromIncomingContext(ctx); gzipAccepted(md) && ok {
md.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
}
if err := h.opts.Meter.Write(w, h.opts.MeterOptions...); err != nil {
log.Error(ctx, "http/meter write failed", err)
return nil
} }
rsp.Data = buf.Bytes() rsp.Data = buf.Bytes()
return nil return nil
} }
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(md metadata.Metadata) bool {
a, ok := md.Get(acceptEncodingHeader)
if !ok {
return false
}
if strings.Contains(a, "gzip") {
return true
}
return false
}

49
http.go
View File

@ -35,6 +35,7 @@ type Server struct {
pathHandlers *rhttp.Trie pathHandlers *rhttp.Trie
opts server.Options opts server.Options
registerRPC bool registerRPC bool
registerCORS bool
sync.RWMutex sync.RWMutex
registered bool registered bool
init bool init bool
@ -84,6 +85,10 @@ func (h *Server) Init(opts ...server.Option) error {
h.registerRPC = v h.registerRPC = v
} }
if v, ok := h.opts.Context.Value(registerCORSHandlerKey{}).(bool); ok {
h.registerCORS = v
}
if phs, ok := h.opts.Context.Value(pathHandlerKey{}).(*pathHandlerVal); ok && phs.h != nil { if phs, ok := h.opts.Context.Value(pathHandlerKey{}).(*pathHandlerVal); ok && phs.h != nil {
for pm, ps := range phs.h { for pm, ps := range phs.h {
for pp, ph := range ps { for pp, ph := range ps {
@ -227,11 +232,27 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md["Method"], md["Path"]) h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md["Method"], md["Path"])
} }
h.opts.Logger.Infof(h.opts.Context, fmt.Sprintf("try to detect cors handlers usage %v", h.registerCORS))
if h.registerCORS {
h.opts.Logger.Infof(h.opts.Context, "register cors handler for http.MethodOptions %s", md["Path"])
if err := hdlr.handlers.Insert([]string{http.MethodOptions}, md["Path"], pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md["Method"], md["Path"])
}
}
h.opts.Logger.Infof(h.opts.Context, fmt.Sprintf("try to detect rpc handlers usage %v", h.registerRPC))
if h.registerRPC { if h.registerRPC {
h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn) h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil { if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn) h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn)
} }
if h.registerCORS {
h.opts.Logger.Infof(h.opts.Context, "register cors handler for http.MethodOptions %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodOptions}, "/"+hn, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodOptions %s /%s", hn, hn)
}
}
} }
} }
@ -501,7 +522,6 @@ func (h *Server) Start() error {
h.Unlock() h.Unlock()
var handler http.Handler var handler http.Handler
var srvFunc func(net.Listener) error
// nolint: nestif // nolint: nestif
if h.opts.Context != nil { if h.opts.Context != nil {
@ -552,6 +572,7 @@ func (h *Server) Start() error {
fn := handler fn := handler
var hs *http.Server
if h.opts.Context != nil { if h.opts.Context != nil {
if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 { if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 {
// wrap the handler func // wrap the handler func
@ -559,25 +580,19 @@ func (h *Server) Start() error {
fn = mwf[i-1](fn) fn = mwf[i-1](fn)
} }
} }
if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil { var ok bool
if hs, ok = h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
hs.Handler = fn hs.Handler = fn
srvFunc = hs.Serve } else {
hs = &http.Server{Handler: fn}
} }
} }
if srvFunc != nil {
go func() { go func() {
if cerr := srvFunc(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) { if cerr := hs.Serve(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
h.opts.Logger.Error(h.opts.Context, cerr) h.opts.Logger.Error(h.opts.Context, cerr)
} }
}() }()
} else {
go func() {
if cerr := http.Serve(ts, fn); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
h.opts.Logger.Error(h.opts.Context, cerr)
}
}()
}
go func() { go func() {
t := new(time.Ticker) t := new(time.Ticker)
@ -641,7 +656,15 @@ func (h *Server) Start() error {
config.Logger.Errorf(config.Context, "Broker disconnect error: %s", err) config.Logger.Errorf(config.Context, "Broker disconnect error: %s", err)
} }
ch <- ts.Close() ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout)
defer cancel()
err := hs.Shutdown(ctx)
if err != nil {
err = hs.Close()
}
ch <- err
}() }()
return nil return nil

View File

@ -133,6 +133,13 @@ func RegisterRPCHandler(b bool) server.Option {
return server.SetOption(registerRPCHandlerKey{}, b) return server.SetOption(registerRPCHandlerKey{}, b)
} }
type registerCORSHandlerKey struct{}
// RegisterCORSHandler registers cors endpoints with /ServiceName.ServiceEndpoint method POPTIONSOST
func RegisterCORSHandler(b bool) server.Option {
return server.SetOption(registerCORSHandlerKey{}, b)
}
type handlerEndpointsKey struct{} type handlerEndpointsKey struct{}
type EndpointMetadata struct { type EndpointMetadata struct {