Compare commits

...

37 Commits

Author SHA1 Message Date
12e26d0018 hotfix (#186)
## Pull Request template
Please, go through these steps before clicking submit on this PR.

1. Give a descriptive title to your PR.
2. Provide a description of your changes.
3. Make sure you have some relevant tests.
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).

**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**

Co-authored-by: Gorbunov Kirill Andreevich <kgorbunov@mtsbank.ru>
Reviewed-on: #186
Co-authored-by: Кирилл Горбунов <kirya_gorbunov_2015@mail.ru>
Co-committed-by: Кирилл Горбунов <kirya_gorbunov_2015@mail.ru>
2024-03-26 14:53:14 +03:00
aab779b406 Merge pull request 'update http.Server run for v4' (#185) from devstigneev/micro-server-http:master into master
Reviewed-on: #185
2024-03-13 11:06:10 +03:00
3514bb9626 update http.Server run 2024-03-13 10:05:29 +03:00
c7dc998670 fixup panic
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-12 00:52:14 +03:00
954101f887 fixup handlers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-12 00:02:48 +03:00
4c3d26c39b fixup headers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-11 23:37:54 +03:00
a483e08ffa add gzip rsp #153 (#182)
## Pull Request template
Please, go through these steps before clicking submit on this PR.

1. Give a descriptive title to your PR.
2. Provide a description of your changes.
3. Make sure you have some relevant tests.
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).

**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**

Co-authored-by: Gorbunov Kirill Andreevich <kgorbunov@mtsbank.ru>
Reviewed-on: #182
Co-authored-by: Кирилл Горбунов <kirya_gorbunov_2015@mail.ru>
Co-committed-by: Кирилл Горбунов <kirya_gorbunov_2015@mail.ru>
2024-03-11 12:31:19 +03:00
28eeca30c0 Merge pull request 'issue_179' (#180) from devstigneev/micro-server-http:issue_179 into master
Reviewed-on: #180
Reviewed-by: Василий Толстов <v.tolstov@unistack.org>
2024-03-08 23:05:31 +03:00
2e8462a0f1 fix logmessages 2024-03-08 18:03:50 +03:00
1432587369 sent err to channel 2024-03-08 18:00:02 +03:00
ea8c9dd22e update micro version and add gracefulTimeout && update log msg 2024-03-08 17:58:18 +03:00
49608197d0 removed comments code 2024-03-07 18:17:07 +03:00
41d7d145ce removed Listener.Stop 2024-03-07 18:16:02 +03:00
1d5142d619 removed using http.Serve and add using *http.Server + Shutdown 2024-03-07 18:13:46 +03:00
41f7bdf182 Merge pull request 'handler: add single page application handler' (#176) from spa-handler into master
Reviewed-on: #176
2023-08-19 23:22:00 +03:00
ed907c7076 handler: add single page application handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-08-19 23:19:54 +03:00
5c2cba3ecc Merge pull request 'micro v4 fix' (#175) from micro4 into master
Reviewed-on: #175
2023-08-16 15:27:32 +03:00
966bc31eb2 micro v4 fix
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-08-16 15:27:07 +03:00
259e1d275e Merge pull request 'handler: fix for latest micro' (#174) from microv4 into master
Reviewed-on: #174
2023-08-16 10:19:02 +03:00
fdbcff2ba0 handler: fix for latest micro
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-08-16 10:18:38 +03:00
3e814bf229 Merge pull request 'fix swagger handler' (#173) from handler-swagger into master
Reviewed-on: #173
2023-08-14 13:45:13 +03:00
13cdeb9397 fix swagger handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-08-14 13:44:57 +03:00
0e19afef86 Merge pull request 'options' (#171) from options into master
Reviewed-on: #171
2023-08-12 13:53:42 +03:00
7e850f75e0 update for latest micro changes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-08-12 13:53:04 +03:00
4c3d3058f6 handler/swagger: initial import
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-06-18 20:49:29 +03:00
bcd27b833a Merge pull request 'fix query param struct filling' (#168) from matchesfix into master
Reviewed-on: #168
2023-05-29 12:29:39 +03:00
090b5e3c07 fix query param struct filling
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-29 12:29:04 +03:00
95109c9dc2 Merge pull request 'add scheme to metadata' (#166) from scheme into master
Reviewed-on: #166
2023-05-19 23:28:22 +03:00
69dcf71d3f add scheme to metadata
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-19 23:24:53 +03:00
0a0a986a70 Merge pull request 'move down path handler after specific handler' (#164) from path-handler into master
Reviewed-on: #164
2023-05-19 23:04:01 +03:00
76fe748e4a move down path handler after specific handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-19 23:02:42 +03:00
9926d52f78 Merge pull request 'fix build' (#161) from fixup into master
Reviewed-on: #161
2023-05-09 18:48:11 +03:00
2ed04e3e24 fix build
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 18:47:56 +03:00
4a9cc0f03f Merge pull request 'cleanup message stuf from server' (#160) from cleanup into master
Reviewed-on: #160
2023-05-09 18:39:08 +03:00
99af727138 cleanup message stuf from server
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 18:38:49 +03:00
89fe1dd6bc Merge pull request 'export Server to allow to cast' (#159) from exportServer into master
Reviewed-on: #159
2023-05-09 18:34:39 +03:00
8b591f7fd4 export Server to allow to cast
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 18:34:24 +03:00
20 changed files with 1898 additions and 581 deletions

24
.gitignore vendored Normal file
View File

@ -0,0 +1,24 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

21
go.mod
View File

@ -3,14 +3,23 @@ module go.unistack.org/micro-server-http/v4
go 1.19 go 1.19
require ( require (
go.unistack.org/micro-proto/v4 v4.0.0 go.unistack.org/micro-codec-yaml/v4 v4.0.0
go.unistack.org/micro/v4 v4.0.1 go.unistack.org/micro-proto/v4 v4.0.1
golang.org/x/net v0.7.0 go.unistack.org/micro/v4 v4.0.17
go.unistack.org/protoc-gen-go-micro/v4 v4.0.13
golang.org/x/net v0.22.0
) )
require ( require (
github.com/golang/protobuf v1.5.2 // indirect github.com/fatih/structtag v1.2.0 // indirect
github.com/google/gnostic v0.6.9 // indirect github.com/golang/protobuf v1.5.4 // indirect
google.golang.org/protobuf v1.28.1 // indirect github.com/google/gnostic v0.7.0 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
) )

1432
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -19,10 +19,10 @@ import (
) )
var ( var (
DefaultErrorHandler = func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int) { DefaultErrorHandler = func(ctx context.Context, s interface{}, w http.ResponseWriter, r *http.Request, err error, status int) {
w.WriteHeader(status) w.WriteHeader(status)
if _, cerr := w.Write([]byte(err.Error())); cerr != nil { if _, cerr := w.Write([]byte(err.Error())); cerr != nil {
logger.DefaultLogger.Errorf(ctx, "write failed: %v", cerr) logger.DefaultLogger.Error(ctx, fmt.Sprintf("write failed: %v", cerr))
} }
} }
DefaultContentType = "application/json" DefaultContentType = "application/json"
@ -35,7 +35,7 @@ type patHandler struct {
} }
type httpHandler struct { type httpHandler struct {
opts server.HandlerOptions opts server.HandleOptions
hd interface{} hd interface{}
handlers *rhttp.Trie handlers *rhttp.Trie
name string name string
@ -56,11 +56,11 @@ func (h *httpHandler) Endpoints() []*register.Endpoint {
return h.eps return h.eps
} }
func (h *httpHandler) Options() server.HandlerOptions { func (h *httpHandler) Options() server.HandleOptions {
return h.opts return h.opts
} }
func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) { func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) {
if handler == nil { if handler == nil {
return nil, fmt.Errorf("invalid handler specified: %v", handler) return nil, fmt.Errorf("invalid handler specified: %v", handler)
} }
@ -109,16 +109,22 @@ func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, err
md = metadata.New(len(r.Header) + 8) md = metadata.New(len(r.Header) + 8)
} }
for k, v := range r.Header { for k, v := range r.Header {
md[k] = strings.Join(v, ", ") md[k] = v[0]
} }
md["RemoteAddr"] = r.RemoteAddr md["RemoteAddr"] = r.RemoteAddr
md["Method"] = r.Method md["Method"] = r.Method
md["URL"] = r.URL.String() md["URL"] = r.URL.String()
md["Proto"] = r.Proto md["Proto"] = r.Proto
md["ContentLength"] = fmt.Sprintf("%d", r.ContentLength) md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength)
md["TransferEncoding"] = strings.Join(r.TransferEncoding, ",") md["Transfer-Encoding"] = r.TransferEncoding[0]
md["Host"] = r.Host md["Host"] = r.Host
md["RequestURI"] = r.RequestURI md["RequestURI"] = r.RequestURI
if r.TLS != nil {
md["TLS"] = "true"
md["TLS-ALPN"] = r.TLS.NegotiatedProtocol
md["TLS-ServerName"] = r.TLS.ServerName
}
ctx = metadata.NewIncomingContext(ctx, md) ctx = metadata.NewIncomingContext(ctx, md)
path := r.URL.Path path := r.URL.Path
@ -257,18 +263,17 @@ func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, err
} }
if nmd, ok := metadata.FromOutgoingContext(fctx); ok { if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd { for k, v := range nmd {
md.Set(k, v) md[k] = v
} }
} }
metadata.SetOutgoingContext(ctx, md)
return err return err
} }
// wrap the handler func // wrap the handler func
for i := len(handler.sopts.HdlrWrappers); i > 0; i-- { // for i := len(handler.sopts.Hooks); i > 0; i-- {
fn = handler.sopts.HdlrWrappers[i-1](fn) // fn = handler.sopts.Hooks[i-1](fn)
} // }
if ct == "application/x-www-form-urlencoded" { if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType) cf, err = h.newCodec(DefaultContentType)
@ -285,14 +290,12 @@ func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, err
w.Header().Set(metadata.HeaderContentType, ct) w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md { for k, v := range md {
w.Header().Set(k, v) w.Header()[k] = []string{v}
} }
} }
if md := getRspHeader(ctx); md != nil { if md := getRspHeader(ctx); md != nil {
for k, v := range md { for k, v := range md {
for _, vv := range v { w.Header()[k] = v
w.Header().Add(k, vv)
}
} }
} }
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct { if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
@ -318,7 +321,7 @@ func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, err
} }
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) { if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Errorf(handler.sopts.Context, "handler err: %v", err) handler.sopts.Logger.Error(handler.sopts.Context, fmt.Sprintf("handler err: %v", err))
return return
} }
@ -328,18 +331,12 @@ func (h *httpServer) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, err
w.WriteHeader(scode) w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil { if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Errorf(ctx, "write failed: %v", cerr) handler.sopts.Logger.Error(ctx, fmt.Sprintf("write failed: %v", cerr))
} }
}, nil }, nil
} }
func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// check for http.HandlerFunc handlers
if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r)
return
}
ct := DefaultContentType ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" { if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype ct = htype
@ -352,22 +349,25 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
md = metadata.New(len(r.Header) + 8) md = metadata.New(len(r.Header) + 8)
} }
for k, v := range r.Header { for k, v := range r.Header {
md[k] = strings.Join(v, ", ") md[k] = v[0]
} }
md["RemoteAddr"] = r.RemoteAddr md["RemoteAddr"] = r.RemoteAddr
if r.TLS != nil {
md["Scheme"] = "https"
} else {
md["Scheme"] = "http"
}
md["Method"] = r.Method md["Method"] = r.Method
md["URL"] = r.URL.String() md["URL"] = r.URL.String()
md["Proto"] = r.Proto md["Proto"] = r.Proto
md["ContentLength"] = fmt.Sprintf("%d", r.ContentLength) md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength)
md["TransferEncoding"] = strings.Join(r.TransferEncoding, ",") if len(r.TransferEncoding) > 0 {
md["Transfer-Encoding"] = r.TransferEncoding[0]
}
md["Host"] = r.Host md["Host"] = r.Host
md["RequestURI"] = r.RequestURI md["RequestURI"] = r.RequestURI
ctx = metadata.NewIncomingContext(ctx, md) ctx = metadata.NewIncomingContext(ctx, md)
if r.Body != nil {
defer r.Body.Close()
}
path := r.URL.Path path := r.URL.Path
if !strings.HasPrefix(path, "/") { if !strings.HasPrefix(path, "/") {
h.errorHandler(ctx, nil, w, r, fmt.Errorf("path must starts with /"), http.StatusBadRequest) h.errorHandler(ctx, nil, w, r, fmt.Errorf("path must starts with /"), http.StatusBadRequest)
@ -419,11 +419,16 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
if !match && h.hd != nil { if !match && h.hd != nil {
if hdlr, ok := h.hd.Handler().(http.Handler); ok { if hdlr, ok := h.hd.(http.Handler); ok {
hdlr.ServeHTTP(w, r) hdlr.ServeHTTP(w, r)
return return
} }
} else if !match { } else if !match {
// check for http.HandlerFunc handlers
if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r)
return
}
h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound) h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound)
return return
} }
@ -440,6 +445,10 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
if r.Body != nil {
defer r.Body.Close()
}
cf, err := h.newCodec(ct) cf, err := h.newCodec(ct)
if err != nil { if err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest) h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
@ -481,10 +490,12 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
matches = rflutil.FlattenMap(matches) if len(matches) > 0 {
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil { matches = rflutil.FlattenMap(matches)
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest) if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
return h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
} }
hr := &rpcRequest{ hr := &rpcRequest{
@ -512,18 +523,17 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
if nmd, ok := metadata.FromOutgoingContext(fctx); ok { if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd { for k, v := range nmd {
md.Set(k, v) md[k] = v
} }
} }
metadata.SetOutgoingContext(ctx, md)
return err return err
} }
// wrap the handler func // wrap the handler func
for i := len(handler.sopts.HdlrWrappers); i > 0; i-- { // for i := len(handler.sopts.HdlrWrappers); i > 0; i-- {
fn = handler.sopts.HdlrWrappers[i-1](fn) // fn = handler.sopts.HdlrWrappers[i-1](fn)
} // }
if ct == "application/x-www-form-urlencoded" { if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType) cf, err = h.newCodec(DefaultContentType)
@ -540,7 +550,7 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set(metadata.HeaderContentType, ct) w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md { for k, v := range md {
w.Header().Set(k, v) w.Header()[k] = []string{v}
} }
} }
if md := getRspHeader(ctx); md != nil { if md := getRspHeader(ctx); md != nil {
@ -573,7 +583,7 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) { if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Errorf(handler.sopts.Context, "handler err: %v", err) handler.sopts.Logger.Error(handler.sopts.Context, fmt.Sprintf("handler err: %v", err))
return return
} }
@ -583,6 +593,6 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(scode) w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil { if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Errorf(ctx, "write failed: %v", cerr) handler.sopts.Logger.Error(ctx, fmt.Sprintf("write failed: %v", cerr))
} }
} }

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT. // Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-micro v4.0.0 // - protoc-gen-go-micro v4.0.2
// - protoc v3.21.12 // - protoc v4.23.4
// source: health/health.proto // source: health/health.proto
package health package health
@ -9,37 +9,11 @@ package health
import ( import (
context "context" context "context"
codec "go.unistack.org/micro-proto/v4/codec" codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4"
) )
var ( var (
HealthServiceName = "HealthService" HealthServiceName = "HealthService"
) )
var (
HealthServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type HealthServiceServer interface { type HealthServiceServer interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT. // Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v4.0.0 // protoc-gen-go-micro version: v4.0.2
// source: health/health.proto // source: health/health.proto
package health package health
@ -8,9 +8,36 @@ import (
context "context" context "context"
codec "go.unistack.org/micro-proto/v4/codec" codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4" v4 "go.unistack.org/micro-server-http/v4"
options "go.unistack.org/micro/v4/options"
server "go.unistack.org/micro/v4/server" server "go.unistack.org/micro/v4/server"
) )
var (
HealthServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
},
{
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type healthServiceServer struct { type healthServiceServer struct {
HealthServiceServer HealthServiceServer
} }
@ -27,7 +54,7 @@ func (h *healthServiceServer) Version(ctx context.Context, req *codec.Frame, rsp
return h.HealthServiceServer.Version(ctx, req, rsp) return h.HealthServiceServer.Version(ctx, req, rsp)
} }
func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts ...server.HandlerOption) error { func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts ...options.Option) error {
type healthService interface { type healthService interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
@ -37,7 +64,7 @@ func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts .
healthService healthService
} }
h := &healthServiceServer{sh} h := &healthServiceServer{sh}
var nopts []server.HandlerOption var nopts []options.Option
nopts = append(nopts, v4.HandlerEndpoints(HealthServiceServerEndpoints)) nopts = append(nopts, v4.HandlerEndpoints(HealthServiceServerEndpoints))
return s.Handle(s.NewHandler(&HealthService{h}, append(nopts, opts...)...)) return s.Handle(&HealthService{h}, append(nopts, opts...)...)
} }

View File

@ -2,13 +2,36 @@ package meter // import "go.unistack.org/micro-server-http/v4/handler/meter"
import ( import (
"bytes" "bytes"
"compress/gzip"
"context" "context"
"io"
"strings"
"sync"
codecpb "go.unistack.org/micro-proto/v4/codec" codecpb "go.unistack.org/micro-proto/v4/codec"
"go.unistack.org/micro/v4/errors" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/options"
) )
const (
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
)
var gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
// guard to fail early // guard to fail early
var _ MeterServiceServer = (*Handler)(nil) var _ MeterServiceServer = (*Handler)(nil)
@ -19,9 +42,10 @@ type Handler struct {
type Option func(*Options) type Option func(*Options)
type Options struct { type Options struct {
Meter meter.Meter Meter meter.Meter
Name string Name string
MeterOptions []meter.Option MeterOptions []options.Option
DisableCompress bool
} }
func Meter(m meter.Meter) Option { func Meter(m meter.Meter) Option {
@ -36,14 +60,20 @@ func Name(name string) Option {
} }
} }
func MeterOptions(opts ...meter.Option) Option { func DisableCompress(g bool) Option {
return func(o *Options) {
o.DisableCompress = g
}
}
func MeterOptions(opts ...options.Option) Option {
return func(o *Options) { return func(o *Options) {
o.MeterOptions = append(o.MeterOptions, opts...) o.MeterOptions = append(o.MeterOptions, opts...)
} }
} }
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{Meter: meter.DefaultMeter} options := Options{Meter: meter.DefaultMeter, DisableCompress: false}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
@ -56,12 +86,48 @@ func NewHandler(opts ...Option) *Handler {
} }
func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error { func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
buf := bytes.NewBuffer(nil) log, ok := logger.FromContext(ctx)
if err := h.opts.Meter.Write(buf, h.opts.MeterOptions...); err != nil { if !ok {
return errors.InternalServerError(h.opts.Name, "%v", err) log = logger.DefaultLogger
}
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
w := io.Writer(buf)
if md, ok := metadata.FromIncomingContext(ctx); gzipAccepted(md) && ok && !h.opts.DisableCompress {
omd, _ := metadata.FromOutgoingContext(ctx)
omd.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
gz.Flush()
}
if err := h.opts.Meter.Write(w, h.opts.MeterOptions...); err != nil {
log.Error(ctx, "http/meter: write failed", err)
return nil
} }
rsp.Data = buf.Bytes() rsp.Data = buf.Bytes()
return nil return nil
} }
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(md metadata.Metadata) bool {
a, ok := md.Get(acceptEncodingHeader)
if !ok {
return false
}
if strings.Contains(a, "gzip") {
return true
}
return false
}

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT. // Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-micro v4.0.0 // - protoc-gen-go-micro v4.0.2
// - protoc v3.21.12 // - protoc v4.23.4
// source: meter/meter.proto // source: meter/meter.proto
package meter package meter
@ -9,23 +9,11 @@ package meter
import ( import (
context "context" context "context"
codec "go.unistack.org/micro-proto/v4/codec" codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4"
) )
var ( var (
MeterServiceName = "MeterService" MeterServiceName = "MeterService"
) )
var (
MeterServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type MeterServiceServer interface { type MeterServiceServer interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT. // Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v4.0.0 // protoc-gen-go-micro version: v4.0.2
// source: meter/meter.proto // source: meter/meter.proto
package meter package meter
@ -8,9 +8,22 @@ import (
context "context" context "context"
codec "go.unistack.org/micro-proto/v4/codec" codec "go.unistack.org/micro-proto/v4/codec"
v4 "go.unistack.org/micro-server-http/v4" v4 "go.unistack.org/micro-server-http/v4"
options "go.unistack.org/micro/v4/options"
server "go.unistack.org/micro/v4/server" server "go.unistack.org/micro/v4/server"
) )
var (
MeterServiceServerEndpoints = []v4.EndpointMetadata{
{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
},
}
)
type meterServiceServer struct { type meterServiceServer struct {
MeterServiceServer MeterServiceServer
} }
@ -19,7 +32,7 @@ func (h *meterServiceServer) Metrics(ctx context.Context, req *codec.Frame, rsp
return h.MeterServiceServer.Metrics(ctx, req, rsp) return h.MeterServiceServer.Metrics(ctx, req, rsp)
} }
func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...server.HandlerOption) error { func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...options.Option) error {
type meterService interface { type meterService interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
} }
@ -27,7 +40,7 @@ func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...
meterService meterService
} }
h := &meterServiceServer{sh} h := &meterServiceServer{sh}
var nopts []server.HandlerOption var nopts []options.Option
nopts = append(nopts, v4.HandlerEndpoints(MeterServiceServerEndpoints)) nopts = append(nopts, v4.HandlerEndpoints(MeterServiceServerEndpoints))
return s.Handle(s.NewHandler(&MeterService{h}, append(nopts, opts...)...)) return s.Handle(&MeterService{h}, append(nopts, opts...)...)
} }

View File

@ -0,0 +1,49 @@
package meter
import (
"context"
"testing"
codecpb "go.unistack.org/micro-proto/v4/codec"
)
func TestHandler_Metrics(t *testing.T) {
type fields struct {
opts Options
}
type args struct {
ctx context.Context
req *codecpb.Frame
rsp *codecpb.Frame
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
"Test #1",
fields{
opts: NewOptions(),
},
args{
context.Background(),
&codecpb.Frame{Data: []byte("gzip")},
&codecpb.Frame{},
},
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := &Handler{
opts: tt.fields.opts,
}
if err := h.Metrics(tt.args.ctx, tt.args.req, tt.args.rsp); (err != nil) != tt.wantErr {
t.Errorf("Metrics() error = %v, wantErr %v", err, tt.wantErr)
}
t.Logf("RSP: %v", tt.args.rsp.Data)
})
}
}

19
handler/spa/spa.go Normal file
View File

@ -0,0 +1,19 @@
package spa
import (
"io/fs"
"net/http"
"strings"
)
// Handler serve files from dir and redirect to index if file not exists
var Handler = func(prefix string, dir fs.FS) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
f := http.StripPrefix(prefix, http.FileServer(http.FS(dir)))
if _, err := fs.Stat(dir, strings.TrimPrefix(r.RequestURI, prefix)); err != nil {
r.RequestURI = prefix
r.URL.Path = prefix
}
f.ServeHTTP(w, r)
}
}

View File

@ -1,20 +0,0 @@
window.onload = function() {
//<editor-fold desc="Changeable Configuration Block">
// the following lines will be replaced by docker/configurator, when it runs in a docker-container
window.ui = SwaggerUIBundle({
url: "{{ .SWAGGER }}",
dom_id: '#swagger-ui',
deepLinking: true,
presets: [
SwaggerUIBundle.presets.apis,
SwaggerUIStandalonePreset
],
plugins: [
SwaggerUIBundle.plugins.DownloadUrl
],
layout: "StandaloneLayout"
});
//</editor-fold>
};

View File

@ -0,0 +1,61 @@
package swagger
import (
"io/fs"
"net/http"
yamlcodec "go.unistack.org/micro-codec-yaml/v4"
rutil "go.unistack.org/micro/v4/util/reflect"
)
// Handler append to generated swagger data from dst map[string]interface{}
var Handler = func(dst map[string]interface{}, fsys fs.FS) http.HandlerFunc {
c := yamlcodec.NewCodec()
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusNotFound)
return
}
path := r.URL.Path
if len(path) > 1 && path[0] == '/' {
path = path[1:]
}
buf, err := fs.ReadFile(fsys, path)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if dst == nil {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
return
}
var src interface{}
if err = c.Unmarshal(buf, src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if err = rutil.Merge(src, dst); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if buf, err = c.Marshal(src); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf)
}
}

257
http.go
View File

@ -9,29 +9,27 @@ import (
"net" "net"
"net/http" "net/http"
"reflect" "reflect"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/register" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/server" "go.unistack.org/micro/v4/server"
rhttp "go.unistack.org/micro/v4/util/http" rhttp "go.unistack.org/micro/v4/util/http"
"golang.org/x/net/netutil" "golang.org/x/net/netutil"
) )
var _ server.Server = &httpServer{} var _ server.Server = (*Server)(nil)
type httpServer struct { type Server struct {
hd server.Handler hd interface{}
rsvc *register.Service rsvc *register.Service
handlers map[string]server.Handler handlers map[string]interface{}
exit chan chan error exit chan chan error
subscribers map[*httpSubscriber][]broker.Subscriber errorHandler func(context.Context, interface{}, http.ResponseWriter, *http.Request, error, int)
errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int)
pathHandlers *rhttp.Trie pathHandlers *rhttp.Trie
opts server.Options opts server.Options
registerRPC bool registerRPC bool
@ -40,7 +38,7 @@ type httpServer struct {
init bool init bool
} }
func (h *httpServer) newCodec(ct string) (codec.Codec, error) { func (h *Server) newCodec(ct string) (codec.Codec, error) {
if idx := strings.IndexRune(ct, ';'); idx >= 0 { if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx] ct = ct[:idx]
} }
@ -53,14 +51,14 @@ func (h *httpServer) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType return nil, codec.ErrUnknownContentType
} }
func (h *httpServer) Options() server.Options { func (h *Server) Options() server.Options {
h.Lock() h.Lock()
opts := h.opts opts := h.opts
h.Unlock() h.Unlock()
return opts return opts
} }
func (h *httpServer) Init(opts ...server.Option) error { func (h *Server) Init(opts ...options.Option) error {
if len(opts) == 0 && h.init { if len(opts) == 0 && h.init {
return nil return nil
} }
@ -70,11 +68,11 @@ func (h *httpServer) Init(opts ...server.Option) error {
for _, o := range opts { for _, o := range opts {
o(&h.opts) o(&h.opts)
} }
if fn, ok := h.opts.Context.Value(errorHandlerKey{}).(func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)); ok && fn != nil { if fn, ok := h.opts.Context.Value(errorHandlerKey{}).(func(ctx context.Context, s interface{}, w http.ResponseWriter, r *http.Request, err error, status int)); ok && fn != nil {
h.errorHandler = fn h.errorHandler = fn
} }
if h.handlers == nil { if h.handlers == nil {
h.handlers = make(map[string]server.Handler) h.handlers = make(map[string]interface{})
} }
if h.pathHandlers == nil { if h.pathHandlers == nil {
h.pathHandlers = rhttp.NewTrie() h.pathHandlers = rhttp.NewTrie()
@ -101,10 +99,6 @@ func (h *httpServer) Init(opts ...server.Option) error {
h.RUnlock() h.RUnlock()
return err return err
} }
if err := h.opts.Broker.Init(); err != nil {
h.RUnlock()
return err
}
if err := h.opts.Tracer.Init(); err != nil { if err := h.opts.Tracer.Init(); err != nil {
h.RUnlock() h.RUnlock()
return err return err
@ -117,10 +111,6 @@ func (h *httpServer) Init(opts ...server.Option) error {
h.RUnlock() h.RUnlock()
return err return err
} }
if err := h.opts.Transport.Init(); err != nil {
h.RUnlock()
return err
}
h.RUnlock() h.RUnlock()
h.Lock() h.Lock()
@ -130,12 +120,24 @@ func (h *httpServer) Init(opts ...server.Option) error {
return nil return nil
} }
func (h *httpServer) Handle(handler server.Handler) error { func (h *Server) Handle(handler interface{}, opts ...options.Option) error {
options := server.NewHandleOptions(opts...)
var endpointMetadata []EndpointMetadata
if v, ok := options.Context.Value(handlerEndpointsKey{}).([]EndpointMetadata); ok {
endpointMetadata = v
}
// passed unknown handler // passed unknown handler
hdlr, ok := handler.(*httpHandler) hdlr, ok := handler.(*httpHandler)
if !ok { if !ok {
h.Lock() h.Lock()
h.hd = handler if h.handlers == nil {
h.handlers = make(map[string]interface{})
}
for _, v := range endpointMetadata {
h.handlers[v.Name] = h.newHTTPHandler(handler, opts...)
}
h.Unlock() h.Unlock()
return nil return nil
} }
@ -148,19 +150,11 @@ func (h *httpServer) Handle(handler server.Handler) error {
return nil return nil
} }
// passed micro compat handler
h.Lock()
if h.handlers == nil {
h.handlers = make(map[string]server.Handler)
}
h.handlers[handler.Name()] = handler
h.Unlock()
return nil return nil
} }
func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *httpHandler {
options := server.NewHandlerOptions(opts...) options := server.NewHandleOptions(opts...)
eps := make([]*register.Endpoint, 0, len(options.Metadata)) eps := make([]*register.Endpoint, 0, len(options.Metadata))
for name, metadata := range options.Metadata { for name, metadata := range options.Metadata {
@ -204,16 +198,16 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio
} }
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) { if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname))
continue continue
} }
mtype, err := prepareEndpoint(method) mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) { if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Errorf(h.opts.Context, "%v", err) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("%v", err))
continue continue
} else if mtype == nil { } else if mtype == nil {
h.opts.Logger.Errorf(h.opts.Context, "nil mtype for %s", mname) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
continue continue
} }
@ -224,13 +218,13 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio
hdlr.name = name hdlr.name = name
if err := hdlr.handlers.Insert([]string{md["Method"]}, md["Path"], pth); err != nil { if err := hdlr.handlers.Insert([]string{md["Method"]}, md["Path"], pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md["Method"], md["Path"]) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %s %s", md["Method"][0], md["Path"][0]))
} }
if h.registerRPC { if h.registerRPC {
h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn) h.opts.Logger.Info(h.opts.Context, fmt.Sprintf("register rpc handler for http.MethodPost %s /%s", hn, hn))
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil { if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
} }
} }
} }
@ -254,16 +248,16 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio
} }
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) { if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname))
continue continue
} }
mtype, err := prepareEndpoint(method) mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) { if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Errorf(h.opts.Context, "%v", err) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("%v", err))
continue continue
} else if mtype == nil { } else if mtype == nil {
h.opts.Logger.Errorf(h.opts.Context, "nil mtype for %s", mname) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
continue continue
} }
@ -274,13 +268,13 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio
hdlr.name = name hdlr.name = name
if err := hdlr.handlers.Insert([]string{md.Method}, md.Path, pth); err != nil { if err := hdlr.handlers.Insert([]string{md.Method}, md.Path, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md.Method, md.Path) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %s %s", md.Method, md.Path))
} }
if h.registerRPC { if h.registerRPC {
h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn) h.opts.Logger.Info(h.opts.Context, fmt.Sprintf("register rpc handler for http.MethodPost %s /%s", hn, hn))
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil { if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn) h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
} }
} }
} }
@ -288,40 +282,15 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio
return hdlr return hdlr
} }
func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { func (h *Server) Register() error {
return newSubscriber(topic, handler, opts...)
}
func (h *httpServer) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*httpSubscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := server.ValidateSubscriber(sb); err != nil {
return err
}
h.RLock()
_, ok = h.subscribers[sub]
h.RUnlock()
if ok {
return fmt.Errorf("subscriber %v already exists", h)
}
h.Lock()
h.subscribers[sub] = nil
h.Unlock()
return nil
}
func (h *httpServer) Register() error {
var eps []*register.Endpoint var eps []*register.Endpoint
h.RLock() h.RLock()
for _, hdlr := range h.handlers { for _, hdlr := range h.handlers {
eps = append(eps, hdlr.Endpoints()...) hd, ok := hdlr.(*httpHandler)
if !ok {
continue
}
eps = append(eps, hd.Endpoints()...)
} }
rsvc := h.rsvc rsvc := h.rsvc
config := h.opts config := h.opts
@ -339,31 +308,16 @@ func (h *httpServer) Register() error {
if err != nil { if err != nil {
return err return err
} }
service.Nodes[0].Metadata["protocol"] = "http" service.Nodes[0].Metadata.Set("protocol", "http")
service.Endpoints = eps service.Endpoints = eps
h.Lock()
subscriberList := make([]*httpSubscriber, 0, len(h.subscribers))
for e := range h.subscribers {
// Only advertise non internal subscribers
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
for _, e := range subscriberList {
service.Endpoints = append(service.Endpoints, e.Endpoints()...)
}
h.Unlock()
h.RLock() h.RLock()
registered := h.registered registered := h.registered
h.RUnlock() h.RUnlock()
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID) config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
} }
} }
@ -378,29 +332,6 @@ func (h *httpServer) Register() error {
} }
h.Lock() h.Lock()
for sb := range h.subscribers {
handler := h.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
h.Unlock()
return err
}
h.subscribers[sb] = []broker.Subscriber{sub}
}
h.registered = true h.registered = true
h.rsvc = service h.rsvc = service
h.Unlock() h.Unlock()
@ -408,7 +339,7 @@ func (h *httpServer) Register() error {
return nil return nil
} }
func (h *httpServer) Deregister() error { func (h *Server) Deregister() error {
h.RLock() h.RLock()
config := h.opts config := h.opts
h.RUnlock() h.RUnlock()
@ -419,7 +350,7 @@ func (h *httpServer) Deregister() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID) config.Logger.Info(config.Context, fmt.Sprintf("Deregistering node: %s", service.Nodes[0].ID))
} }
if err := server.DefaultDeregisterFunc(service, config); err != nil { if err := server.DefaultDeregisterFunc(service, config); err != nil {
@ -435,28 +366,11 @@ func (h *httpServer) Deregister() error {
} }
h.registered = false h.registered = false
subCtx := h.opts.Context
for sb, subs := range h.subscribers {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
for _, sub := range subs {
config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", sub.Topic())
if err := sub.Unsubscribe(subCtx); err != nil {
h.Unlock()
config.Logger.Errorf(config.Context, "failed to unsubscribe topic: %s, error: %v", sb.Topic(), err)
return err
}
}
h.subscribers[sb] = nil
}
h.Unlock() h.Unlock()
return nil return nil
} }
func (h *httpServer) Start() error { func (h *Server) Start() error {
h.RLock() h.RLock()
config := h.opts config := h.opts
h.RUnlock() h.RUnlock()
@ -486,7 +400,7 @@ func (h *httpServer) Start() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Listening on %s", ts.Addr().String()) config.Logger.Info(config.Context, fmt.Sprintf("Listening on %s", ts.Addr().String()))
} }
h.Lock() h.Lock()
@ -494,13 +408,12 @@ func (h *httpServer) Start() error {
h.Unlock() h.Unlock()
var handler http.Handler var handler http.Handler
var srvFunc func(net.Listener) error
// nolint: nestif // nolint: nestif
if h.opts.Context != nil { if h.opts.Context != nil {
if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil { if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
if hs.Handler == nil && h.hd != nil { if hs.Handler == nil && h.hd != nil {
if hdlr, ok := h.hd.Handler().(http.Handler); ok { if hdlr, ok := h.hd.(http.Handler); ok {
hs.Handler = hdlr hs.Handler = hdlr
handler = hs.Handler handler = hs.Handler
} }
@ -516,7 +429,7 @@ func (h *httpServer) Start() error {
case len(h.handlers) > 0 && h.hd != nil: case len(h.handlers) > 0 && h.hd != nil:
handler = h handler = h
case handler == nil && h.hd != nil: case handler == nil && h.hd != nil:
if hdlr, ok := h.hd.Handler().(http.Handler); ok { if hdlr, ok := h.hd.(http.Handler); ok {
handler = hdlr handler = hdlr
} }
} }
@ -525,13 +438,9 @@ func (h *httpServer) Start() error {
return fmt.Errorf("cant process with nil handler") return fmt.Errorf("cant process with nil handler")
} }
if err := config.Broker.Connect(h.opts.Context); err != nil {
return err
}
if err := config.RegisterCheck(h.opts.Context); err != nil { if err := config.RegisterCheck(h.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err) config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error: %s", config.Name, config.ID, err))
} }
} else { } else {
if err = h.Register(); err != nil { if err = h.Register(); err != nil {
@ -541,6 +450,7 @@ func (h *httpServer) Start() error {
fn := handler fn := handler
var hs *http.Server
if h.opts.Context != nil { if h.opts.Context != nil {
if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 { if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 {
// wrap the handler func // wrap the handler func
@ -548,25 +458,19 @@ func (h *httpServer) Start() error {
fn = mwf[i-1](fn) fn = mwf[i-1](fn)
} }
} }
if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil { var ok bool
if hs, ok = h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
hs.Handler = fn hs.Handler = fn
srvFunc = hs.Serve } else {
hs = &http.Server{Handler: fn}
} }
} }
if srvFunc != nil { go func() {
go func() { if cerr := hs.Serve(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
if cerr := srvFunc(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) { h.opts.Logger.Error(h.opts.Context, cerr.Error())
h.opts.Logger.Error(h.opts.Context, cerr) }
} }()
}()
} else {
go func() {
if cerr := http.Serve(ts, fn); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
h.opts.Logger.Error(h.opts.Context, cerr)
}
}()
}
go func() { go func() {
t := new(time.Ticker) t := new(time.Ticker)
@ -592,28 +496,28 @@ func (h *httpServer) Start() error {
// nolint: nestif // nolint: nestif
if rerr != nil && registered { if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr) config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr))
} }
// deregister self in case of error // deregister self in case of error
if err := h.Deregister(); err != nil { if err := h.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err) config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error: %s", config.Name, config.ID, err))
} }
} }
} else if rerr != nil && !registered { } else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr) config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error: %s", config.Name, config.ID, rerr))
} }
continue continue
} }
if err := h.Register(); err != nil { if err := h.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err) config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error: %s", config.Name, config.ID, err))
} }
} }
if err := h.Register(); err != nil { if err := h.Register(); err != nil {
config.Logger.Errorf(config.Context, "Server register error: %s", err) config.Logger.Error(config.Context, fmt.Sprintf("Server register error: %s", err))
} }
// wait for exit // wait for exit
case ch = <-h.exit: case ch = <-h.exit:
@ -623,43 +527,46 @@ func (h *httpServer) Start() error {
// deregister // deregister
if err := h.Deregister(); err != nil { if err := h.Deregister(); err != nil {
config.Logger.Errorf(config.Context, "Server deregister error: %s", err) config.Logger.Error(config.Context, fmt.Sprintf("Server deregister error: %s", err))
} }
if err := config.Broker.Disconnect(config.Context); err != nil { ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout)
config.Logger.Errorf(config.Context, "Broker disconnect error: %s", err) defer cancel()
err := hs.Shutdown(ctx)
if err != nil {
err = hs.Close()
} }
ch <- ts.Close() ch <- err
}() }()
return nil return nil
} }
func (h *httpServer) Stop() error { func (h *Server) Stop() error {
ch := make(chan error) ch := make(chan error)
h.exit <- ch h.exit <- ch
return <-ch return <-ch
} }
func (h *httpServer) String() string { func (h *Server) String() string {
return "http" return "http"
} }
func (h *httpServer) Name() string { func (h *Server) Name() string {
return h.opts.Name return h.opts.Name
} }
func NewServer(opts ...server.Option) *httpServer { func NewServer(opts ...options.Option) *Server {
options := server.NewOptions(opts...) options := server.NewOptions(opts...)
eh := DefaultErrorHandler eh := DefaultErrorHandler
if v, ok := options.Context.Value(errorHandlerKey{}).(errorHandler); ok && v != nil { if v, ok := options.Context.Value(errorHandlerKey{}).(errorHandler); ok && v != nil {
eh = v eh = v
} }
return &httpServer{ return &Server{
opts: options, opts: options,
exit: make(chan chan error), exit: make(chan chan error),
subscribers: make(map[*httpSubscriber][]broker.Subscriber),
errorHandler: eh, errorHandler: eh,
pathHandlers: rhttp.NewTrie(), pathHandlers: rhttp.NewTrie(),
} }

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"go.unistack.org/micro/v4/server" "go.unistack.org/micro/v4/options"
) )
// SetError pass error to caller // SetError pass error to caller
@ -79,24 +79,24 @@ func GetRspCode(ctx context.Context) int {
type middlewareKey struct{} type middlewareKey struct{}
// Middleware passes http middlewares // Middleware passes http middlewares
func Middleware(mw ...func(http.Handler) http.Handler) server.Option { func Middleware(mw ...func(http.Handler) http.Handler) options.Option {
return server.SetOption(middlewareKey{}, mw) return options.ContextOption(middlewareKey{}, mw)
} }
type serverKey struct{} type serverKey struct{}
// Server provide ability to pass *http.Server // HTTPServer provide ability to pass *http.Server
func Server(hs *http.Server) server.Option { func HTTPServer(hs *http.Server) options.Option {
return server.SetOption(serverKey{}, hs) return options.ContextOption(serverKey{}, hs)
} }
type errorHandler func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int) type errorHandler func(ctx context.Context, s interface{}, w http.ResponseWriter, r *http.Request, err error, status int)
type errorHandlerKey struct{} type errorHandlerKey struct{}
// ErrorHandler specifies handler for errors // ErrorHandler specifies handler for errors
func ErrorHandler(fn errorHandler) server.Option { func ErrorHandler(fn errorHandler) options.Option {
return server.SetOption(errorHandlerKey{}, fn) return options.ContextOption(errorHandlerKey{}, fn)
} }
type ( type (
@ -107,12 +107,18 @@ type (
) )
// PathHandler specifies http handler for path regexp // PathHandler specifies http handler for path regexp
func PathHandler(method, path string, handler http.HandlerFunc) server.Option { func PathHandler(method, path string, handler http.HandlerFunc) options.Option {
return func(o *server.Options) { return func(src interface{}) error {
if o.Context == nil { vctx, err := options.Get(src, ".Context")
o.Context = context.Background() if err != nil {
return err
} }
v, ok := o.Context.Value(pathHandlerKey{}).(*pathHandlerVal) ctx, ok := vctx.(context.Context)
if !ok {
return fmt.Errorf("invalid option")
}
v, ok := ctx.Value(pathHandlerKey{}).(*pathHandlerVal)
if !ok { if !ok {
v = &pathHandlerVal{h: make(map[string]map[string]http.HandlerFunc)} v = &pathHandlerVal{h: make(map[string]map[string]http.HandlerFunc)}
} }
@ -121,16 +127,17 @@ func PathHandler(method, path string, handler http.HandlerFunc) server.Option {
m = make(map[string]http.HandlerFunc) m = make(map[string]http.HandlerFunc)
v.h[method] = m v.h[method] = m
} }
ctx = context.WithValue(ctx, pathHandlerKey{}, v)
m[path] = handler m[path] = handler
o.Context = context.WithValue(o.Context, pathHandlerKey{}, v) return options.Set(src, ctx, ".Context")
} }
} }
type registerRPCHandlerKey struct{} type registerRPCHandlerKey struct{}
// RegisterRPCHandler registers compatibility endpoints with /ServiceName.ServiceEndpoint method POST // RegisterRPCHandler registers compatibility endpoints with /ServiceName.ServiceEndpoint method POST
func RegisterRPCHandler(b bool) server.Option { func RegisterRPCHandler(b bool) options.Option {
return server.SetOption(registerRPCHandlerKey{}, b) return options.ContextOption(registerRPCHandlerKey{}, b)
} }
type handlerEndpointsKey struct{} type handlerEndpointsKey struct{}
@ -143,8 +150,8 @@ type EndpointMetadata struct {
Stream bool Stream bool
} }
func HandlerEndpoints(md []EndpointMetadata) server.HandlerOption { func HandlerEndpoints(md []EndpointMetadata) options.Option {
return server.SetHandlerOption(handlerEndpointsKey{}, md) return options.ContextOption(handlerEndpointsKey{}, md)
} }
type handlerOptions struct { type handlerOptions struct {

View File

@ -8,10 +8,7 @@ import (
"go.unistack.org/micro/v4/server" "go.unistack.org/micro/v4/server"
) )
var ( var _ server.Request = &rpcRequest{}
_ server.Request = &rpcRequest{}
_ server.Message = &rpcMessage{}
)
type rpcRequest struct { type rpcRequest struct {
rw io.ReadWriter rw io.ReadWriter
@ -25,14 +22,6 @@ type rpcRequest struct {
stream bool stream bool
} }
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
}
func (r *rpcRequest) ContentType() string { func (r *rpcRequest) ContentType() string {
return r.contentType return r.contentType
} }
@ -72,23 +61,3 @@ func (r *rpcRequest) Stream() bool {
func (r *rpcRequest) Body() interface{} { func (r *rpcRequest) Body() interface{} {
return r.payload return r.payload
} }
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Body() interface{} {
return r.payload
}
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}
func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}

View File

@ -10,6 +10,8 @@ import (
"go.unistack.org/micro/v4/server" "go.unistack.org/micro/v4/server"
) )
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type methodType struct { type methodType struct {
ArgType reflect.Type ArgType reflect.Type
ReplyType reflect.Type ReplyType reflect.Type

View File

@ -1,208 +0,0 @@
package http
import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/server"
)
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type handler struct {
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type httpSubscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*register.Endpoint
opts server.SubscriberOptions
}
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.NewSubscriberOptions(opts...)
var endpoints []*register.Endpoint
var handlers []*handler
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: "Func",
Request: register.ExtractSubValue(typ),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: name + "." + method.Name,
Request: register.ExtractSubValue(method.Type),
Metadata: metadata.New(2),
}
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
}
}
return &httpSubscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
}
}
func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
if err != nil {
return err
}
hdr := metadata.Copy(msg.Header)
delete(hdr, "Content-Type")
ctx := metadata.NewIncomingContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
buf := bytes.NewBuffer(msg.Body)
if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil {
return err
}
if err := cf.ReadBody(buf, req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg server.Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}
return nil
}
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
go func() {
results <- fn(ctx, &httpMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
codec: cf,
})
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return nil
}
}
func (s *httpSubscriber) Topic() string {
return s.topic
}
func (s *httpSubscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *httpSubscriber) Endpoints() []*register.Endpoint {
return s.endpoints
}
func (s *httpSubscriber) Options() server.SubscriberOptions {
return s.opts
}

8
tools.go Normal file
View File

@ -0,0 +1,8 @@
//go:build tools
package http
import (
_ "go.unistack.org/micro-proto/v4"
_ "go.unistack.org/protoc-gen-go-micro/v4"
)

12
util.go
View File

@ -31,15 +31,13 @@ func FillRequest(ctx context.Context, req interface{}, opts ...FillRequestOption
} }
} }
cookies := strings.Split(md["Cookie"], ";") cookies := md["Cookie"]
cmd := make(map[string]string, len(cookies)) cmd := make(map[string]string, len(cookies))
for _, cookie := range cookies { kv := strings.Split(cookies, "=")
kv := strings.Split(cookie, "=") if len(kv) != 2 {
if len(kv) != 2 { return nil
continue
}
cmd[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
} }
cmd[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
for idx := 0; idx < len(options.cookies)/2; idx += 2 { for idx := 0; idx < len(options.cookies)/2; idx += 2 {
k := http.CanonicalHeaderKey(options.cookies[idx]) k := http.CanonicalHeaderKey(options.cookies[idx])
v, ok := cmd[k] v, ok := cmd[k]