Compare commits


No commits in common. "master" and "v4.0.0" have entirely different histories.

33 changed files with 573 additions and 2465 deletions

.gitignore vendored
View File

@ -1,24 +0,0 @@
# Binaries for programs and plugins
# Test binary, built with `go test -c`
# Output of the go coverage tool, specifically when used with LiteIDE
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
# General

View File

@ -3,23 +3,14 @@ module
go 1.19
require ( v4.0.0 v4.0.1 v4.0.17 v4.0.13 v0.22.0 v4.0.0 v4.0.1 v0.7.0
require ( v1.2.0 // indirect v1.5.4 // indirect v0.7.0 // indirect v0.6.9-0.20230804172637-c7be7c783f49 // indirect v0.16.0 // indirect v0.18.0 // indirect v0.19.0 // indirect v1.33.0 // indirect v2.4.0 // indirect v1.5.2 // indirect v0.6.9 // indirect v1.28.1 // indirect v3.0.1 // indirect v1.4.0 // indirect


File diff suppressed because it is too large Load Diff

View File

@ -19,10 +19,10 @@ import (
var (
DefaultErrorHandler = func(ctx context.Context, s interface{}, w http.ResponseWriter, r *http.Request, err error, status int) {
DefaultErrorHandler = func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int) {
if _, cerr := w.Write([]byte(err.Error())); cerr != nil {
logger.DefaultLogger.Error(ctx, fmt.Sprintf("write failed: %v", cerr))
logger.DefaultLogger.Errorf(ctx, "write failed: %v", cerr)
DefaultContentType = "application/json"
@ -35,7 +35,7 @@ type patHandler struct {
type httpHandler struct {
opts server.HandleOptions
opts server.HandlerOptions
hd interface{}
handlers *rhttp.Trie
name string
@ -56,47 +56,17 @@ func (h *httpHandler) Endpoints() []*register.Endpoint {
return h.eps
func (h *httpHandler) Options() server.HandleOptions {
func (h *httpHandler) Options() server.HandlerOptions {
return h.opts
func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) {
if handler == nil {
return nil, fmt.Errorf("invalid handler specified: %v", handler)
func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// check for http.HandlerFunc handlers
if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r)
rtype := reflect.TypeOf(handler)
if rtype.NumIn() != 3 {
return nil, fmt.Errorf("invalid handler, NumIn != 3: %v", rtype.NumIn())
argType := rtype.In(1)
replyType := rtype.In(2)
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
return nil, fmt.Errorf("invalid handler, argument type not exported: %v", argType)
if replyType.Kind() != reflect.Ptr {
return nil, fmt.Errorf("invalid handler, reply type not a pointer: %v", replyType)
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
return nil, fmt.Errorf("invalid handler, reply type not exported: %v", replyType)
if rtype.NumOut() != 1 {
return nil, fmt.Errorf("invalid handler, has wrong number of outs: %v", rtype.NumOut())
// The return type of the method must be error.
if returnType := rtype.Out(0); returnType != typeOfError {
return nil, fmt.Errorf("invalid handler, returns %v not error", returnType.String())
return func(w http.ResponseWriter, r *http.Request) {
ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype
@ -109,264 +79,19 @@ func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error)
md = metadata.New(len(r.Header) + 8)
for k, v := range r.Header {
md[k] = v[0]
md[k] = strings.Join(v, ", ")
md["RemoteAddr"] = r.RemoteAddr
md["Method"] = r.Method
md["URL"] = r.URL.String()
md["Proto"] = r.Proto
md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength)
md["Transfer-Encoding"] = r.TransferEncoding[0]
md["ContentLength"] = fmt.Sprintf("%d", r.ContentLength)
md["TransferEncoding"] = strings.Join(r.TransferEncoding, ",")
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
if r.TLS != nil {
md["TLS"] = "true"
md["TLS-ALPN"] = r.TLS.NegotiatedProtocol
md["TLS-ServerName"] = r.TLS.ServerName
ctx = metadata.NewIncomingContext(ctx, md)
path := r.URL.Path
if r.Body != nil {
defer r.Body.Close()
matches := make(map[string]interface{})
var match bool
var hldr *patHandler
var handler *httpHandler
for _, shdlr := range h.handlers {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(r.Method, path)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
hldr = fh.(*patHandler)
handler = hdlr
} else if err == rhttp.ErrMethodNotAllowed && !h.registerRPC {
_, _ = w.Write([]byte("not matching route found"))
if !match && h.registerRPC {
microMethod, mok := md.Get(metadata.HeaderEndpoint)
if mok {
serviceMethod := strings.Split(microMethod, ".")
if len(serviceMethod) == 2 {
if shdlr, ok := h.handlers[serviceMethod[0]]; ok {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(http.MethodPost, "/"+microMethod)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
hldr = fh.(*patHandler)
handler = hdlr
// get fields from url values
if len(r.URL.RawQuery) > 0 {
umd, cerr := rflutil.URLMap(r.URL.RawQuery)
if cerr != nil {
_, _ = w.Write([]byte(cerr.Error()))
for k, v := range umd {
matches[k] = v
cf, err := h.newCodec(ct)
if err != nil {
_, _ = w.Write([]byte(err.Error()))
var argv, replyv reflect.Value
// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if hldr.mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(hldr.mtype.ArgType.Elem())
} else {
argv = reflect.New(hldr.mtype.ArgType)
argIsValue = true
if argIsValue {
argv = argv.Elem()
// reply value
replyv = reflect.New(hldr.mtype.ReplyType.Elem())
function := hldr.mtype.method.Func
var returnValues []reflect.Value
if r.Body != nil {
var buf []byte
buf, err = io.ReadAll(r.Body)
if err != nil && err != io.EOF {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
matches = rflutil.FlattenMap(matches)
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
hr := &rpcRequest{
codec: cf,
service: handler.sopts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s",, hldr.mtype.method.Name),
endpoint: fmt.Sprintf("%s.%s",, hldr.mtype.method.Name),
payload: argv.Interface(),
header: md,
// define the handler func
fn := func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(fctx), argv, reflect.ValueOf(rsp)})
// The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil {
err = rerr.(error)
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd {
md[k] = v
return err
// wrap the handler func
// for i := len(handler.sopts.Hooks); i > 0; i-- {
// fn = handler.sopts.Hooks[i-1](fn)
// }
if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType)
if err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
ct = DefaultContentType
scode := int(200)
appErr := fn(ctx, hr, replyv.Interface())
w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md {
w.Header()[k] = []string{v}
if md := getRspHeader(ctx); md != nil {
for k, v := range md {
w.Header()[k] = v
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
if cf, err = h.newCodec(nct); err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
var buf []byte
if appErr != nil {
switch verr := appErr.(type) {
case *errors.Error:
scode = int(verr.Code)
buf, err = cf.Marshal(verr)
case *Error:
buf, err = cf.Marshal(verr.err)
buf, err = cf.Marshal(appErr)
} else {
buf, err = cf.Marshal(replyv.Interface())
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Error(handler.sopts.Context, fmt.Sprintf("handler err: %v", err))
if nscode := GetRspCode(ctx); nscode != 0 {
scode = nscode
if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Error(ctx, fmt.Sprintf("write failed: %v", cerr))
}, nil
func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype
ctx := context.WithValue(r.Context(), rspCodeKey{}, &rspCodeVal{})
ctx = context.WithValue(ctx, rspHeaderKey{}, &rspHeaderVal{})
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(len(r.Header) + 8)
for k, v := range r.Header {
md[k] = v[0]
md["RemoteAddr"] = r.RemoteAddr
if r.TLS != nil {
md["Scheme"] = "https"
} else {
md["Scheme"] = "http"
md["Method"] = r.Method
md["URL"] = r.URL.String()
md["Proto"] = r.Proto
md["Content-Length"] = fmt.Sprintf("%d", r.ContentLength)
if len(r.TransferEncoding) > 0 {
md["Transfer-Encoding"] = r.TransferEncoding[0]
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
ctx = metadata.NewIncomingContext(ctx, md)
path := r.URL.Path
if !strings.HasPrefix(path, "/") {
@ -419,16 +144,11 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !match && h.hd != nil {
if hdlr, ok := h.hd.(http.Handler); ok {
if hdlr, ok := h.hd.Handler().(http.Handler); ok {
hdlr.ServeHTTP(w, r)
} else if !match {
// check for http.HandlerFunc handlers
if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r)
h.errorHandler(ctx, nil, w, r, fmt.Errorf("not matching route found"), http.StatusNotFound)
@ -445,10 +165,6 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
cf, err := h.newCodec(ct)
if err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
@ -476,9 +192,7 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
function := hldr.mtype.method.Func
var returnValues []reflect.Value
if r.Body != nil {
var buf []byte
buf, err = io.ReadAll(r.Body)
buf, err := io.ReadAll(r.Body)
if err != nil && err != io.EOF {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
@ -488,15 +202,12 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
if len(matches) > 0 {
matches = rflutil.FlattenMap(matches)
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
hr := &rpcRequest{
codec: cf,
@ -523,17 +234,18 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd {
md[k] = v
md.Set(k, v)
metadata.SetOutgoingContext(ctx, md)
return err
// wrap the handler func
// for i := len(handler.sopts.HdlrWrappers); i > 0; i-- {
// fn = handler.sopts.HdlrWrappers[i-1](fn)
// }
for i := len(handler.sopts.HdlrWrappers); i > 0; i-- {
fn = handler.sopts.HdlrWrappers[i-1](fn)
if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType)
@ -550,7 +262,7 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md {
w.Header()[k] = []string{v}
w.Header().Set(k, v)
if md := getRspHeader(ctx); md != nil {
@ -567,7 +279,6 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var buf []byte
if appErr != nil {
switch verr := appErr.(type) {
case *errors.Error:
@ -583,7 +294,7 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Error(handler.sopts.Context, fmt.Sprintf("handler err: %v", err))
handler.sopts.Logger.Errorf(handler.sopts.Context, "handler err: %v", err)
@ -593,6 +304,6 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Error(ctx, fmt.Sprintf("write failed: %v", cerr))
handler.sopts.Logger.Errorf(ctx, "write failed: %v", cerr)

View File

@ -1,12 +0,0 @@
package handler
import (
// import required packages
_ ""
//go:generate sh -c "curl -L -o - | bsdtar -C swagger-ui --strip-components=2 -xv swagger-ui-4.18.3/dist && rm swagger-ui/*.map swagger-ui/*-es-*.js swagger-ui/swagger-ui.js swagger-ui/swagger-initializer.js"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./meter/meter.proto"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./health/health.proto"

View File

@ -0,0 +1,8 @@
package health
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ health.proto"
import (
// import required packages
_ ""

View File

@ -7,7 +7,7 @@ import (
var _ HealthServiceServer = (*Handler)(nil)
var _ HealthServiceServer = &Handler{}
type Handler struct {
opts Options

View File

@ -1,19 +1,45 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions:
// - protoc-gen-go-micro v4.0.2
// - protoc v4.23.4
// source: health/health.proto
// - protoc-gen-go-micro v4.10.2
// - protoc v4.21.12
// source: health.proto
package health
import (
context "context"
codec ""
v4 ""
var (
HealthServiceName = "HealthService"
var (
HealthServiceServerEndpoints = []v4.EndpointMetadata{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
type HealthServiceServer interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v4.0.2
// source: health/health.proto
// protoc-gen-go-micro version: v4.10.2
// source: health.proto
package health
@ -8,36 +8,9 @@ import (
context "context"
codec ""
v4 ""
options ""
server ""
var (
HealthServiceServerEndpoints = []v4.EndpointMetadata{
Name: "HealthService.Live",
Path: "/live",
Method: "GET",
Body: "",
Stream: false,
Name: "HealthService.Ready",
Path: "/ready",
Method: "GET",
Body: "",
Stream: false,
Name: "HealthService.Version",
Path: "/version",
Method: "GET",
Body: "",
Stream: false,
type healthServiceServer struct {
@ -54,7 +27,7 @@ func (h *healthServiceServer) Version(ctx context.Context, req *codec.Frame, rsp
return h.HealthServiceServer.Version(ctx, req, rsp)
func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts ...options.Option) error {
func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts ...server.HandlerOption) error {
type healthService interface {
Live(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
Ready(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
@ -64,7 +37,7 @@ func RegisterHealthServiceServer(s server.Server, sh HealthServiceServer, opts .
h := &healthServiceServer{sh}
var nopts []options.Option
var nopts []server.HandlerOption
nopts = append(nopts, v4.HandlerEndpoints(HealthServiceServerEndpoints))
return s.Handle(&HealthService{h}, append(nopts, opts...)...)
return s.Handle(s.NewHandler(&HealthService{h}, append(nopts, opts...)...))

View File

@ -0,0 +1,8 @@
package meter
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ meter.proto"
import (
// import required packages
_ ""

View File

@ -2,38 +2,15 @@ package meter // import ""
import (
codecpb ""
const (
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
var gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
// guard to fail early
var _ MeterServiceServer = (*Handler)(nil)
var _ MeterServiceServer = &Handler{}
type Handler struct {
opts Options
@ -44,8 +21,7 @@ type Option func(*Options)
type Options struct {
Meter meter.Meter
Name string
MeterOptions []options.Option
DisableCompress bool
MeterOptions []meter.Option
func Meter(m meter.Meter) Option {
@ -60,20 +36,14 @@ func Name(name string) Option {
func DisableCompress(g bool) Option {
return func(o *Options) {
o.DisableCompress = g
func MeterOptions(opts ...options.Option) Option {
func MeterOptions(opts ...meter.Option) Option {
return func(o *Options) {
o.MeterOptions = append(o.MeterOptions, opts...)
func NewOptions(opts ...Option) Options {
options := Options{Meter: meter.DefaultMeter, DisableCompress: false}
options := Options{Meter: meter.DefaultMeter}
for _, o := range opts {
@ -86,48 +56,12 @@ func NewHandler(opts ...Option) *Handler {
func (h *Handler) Metrics(ctx context.Context, req *codecpb.Frame, rsp *codecpb.Frame) error {
log, ok := logger.FromContext(ctx)
if !ok {
log = logger.DefaultLogger
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
w := io.Writer(buf)
if md, ok := metadata.FromIncomingContext(ctx); gzipAccepted(md) && ok && !h.opts.DisableCompress {
omd, _ := metadata.FromOutgoingContext(ctx)
omd.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
defer gz.Close()
w = gz
if err := h.opts.Meter.Write(w, h.opts.MeterOptions...); err != nil {
log.Error(ctx, "http/meter: write failed", err)
return nil
buf := bytes.NewBuffer(nil)
if err := h.opts.Meter.Write(buf, h.opts.MeterOptions...); err != nil {
return errors.InternalServerError(h.opts.Name, "%v", err)
rsp.Data = buf.Bytes()
return nil
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(md metadata.Metadata) bool {
a, ok := md.Get(acceptEncodingHeader)
if !ok {
return false
if strings.Contains(a, "gzip") {
return true
return false

View File

@ -1,19 +1,31 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// versions:
// - protoc-gen-go-micro v4.0.2
// - protoc v4.23.4
// source: meter/meter.proto
// - protoc-gen-go-micro v4.10.2
// - protoc v4.21.12
// source: meter.proto
package meter
import (
context "context"
codec ""
v4 ""
var (
MeterServiceName = "MeterService"
var (
MeterServiceServerEndpoints = []v4.EndpointMetadata{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
type MeterServiceServer interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-micro. DO NOT EDIT.
// protoc-gen-go-micro version: v4.0.2
// source: meter/meter.proto
// protoc-gen-go-micro version: v4.10.2
// source: meter.proto
package meter
@ -8,22 +8,9 @@ import (
context "context"
codec ""
v4 ""
options ""
server ""
var (
MeterServiceServerEndpoints = []v4.EndpointMetadata{
Name: "MeterService.Metrics",
Path: "/metrics",
Method: "GET",
Body: "",
Stream: false,
type meterServiceServer struct {
@ -32,7 +19,7 @@ func (h *meterServiceServer) Metrics(ctx context.Context, req *codec.Frame, rsp
return h.MeterServiceServer.Metrics(ctx, req, rsp)
func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...options.Option) error {
func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...server.HandlerOption) error {
type meterService interface {
Metrics(ctx context.Context, req *codec.Frame, rsp *codec.Frame) error
@ -40,7 +27,7 @@ func RegisterMeterServiceServer(s server.Server, sh MeterServiceServer, opts ...
h := &meterServiceServer{sh}
var nopts []options.Option
var nopts []server.HandlerOption
nopts = append(nopts, v4.HandlerEndpoints(MeterServiceServerEndpoints))
return s.Handle(&MeterService{h}, append(nopts, opts...)...)
return s.Handle(s.NewHandler(&MeterService{h}, append(nopts, opts...)...))

View File

@ -1,49 +0,0 @@
package meter
import (
codecpb ""
func TestHandler_Metrics(t *testing.T) {
type fields struct {
opts Options
type args struct {
ctx context.Context
req *codecpb.Frame
rsp *codecpb.Frame
tests := []struct {
name string
fields fields
args args
wantErr bool
"Test #1",
opts: NewOptions(),
&codecpb.Frame{Data: []byte("gzip")},
for _, tt := range tests {
t.Run(, func(t *testing.T) {
h := &Handler{
opts: tt.fields.opts,
if err := h.Metrics(tt.args.ctx, tt.args.req, tt.args.rsp); (err != nil) != tt.wantErr {
t.Errorf("Metrics() error = %v, wantErr %v", err, tt.wantErr)
t.Logf("RSP: %v", tt.args.rsp.Data)

View File

@ -1,19 +0,0 @@
package spa
import (
// Handler serve files from dir and redirect to index if file not exists
var Handler = func(prefix string, dir fs.FS) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
f := http.StripPrefix(prefix, http.FileServer(http.FS(dir)))
if _, err := fs.Stat(dir, strings.TrimPrefix(r.RequestURI, prefix)); err != nil {
r.RequestURI = prefix
r.URL.Path = prefix
f.ServeHTTP(w, r)

Binary file not shown.


Width:  |  Height:  |  Size: 665 B

Binary file not shown.


Width:  |  Height:  |  Size: 628 B

View File

@ -1,16 +0,0 @@
html {
box-sizing: border-box;
overflow: -moz-scrollbars-vertical;
overflow-y: scroll;
*:after {
box-sizing: inherit;
body {
margin: 0;
background: #fafafa;

View File

@ -1,19 +0,0 @@
<!-- HTML for static distribution bundle build -->
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<title>Swagger UI</title>
<link rel="stylesheet" type="text/css" href="./swagger-ui.css" />
<link rel="stylesheet" type="text/css" href="index.css" />
<link rel="icon" type="image/png" href="./favicon-32x32.png" sizes="32x32" />
<link rel="icon" type="image/png" href="./favicon-16x16.png" sizes="16x16" />
<div id="swagger-ui"></div>
<script src="./swagger-ui-bundle.js" charset="UTF-8"> </script>
<script src="./swagger-ui-standalone-preset.js" charset="UTF-8"> </script>
<script src="./swagger-initializer.js" charset="UTF-8"> </script>

View File

@ -1,79 +0,0 @@
<!doctype html>
<html lang="en-US">
<title>Swagger UI: OAuth2 Redirect</title>
'use strict';
function run () {
var oauth2 = window.opener.swaggerUIRedirectOauth2;
var sentState = oauth2.state;
var redirectUrl = oauth2.redirectUrl;
var isValid, qp, arr;
if (/code|token|error/.test(window.location.hash)) {
qp = window.location.hash.substring(1).replace('?', '&');
} else {
qp =;
arr = qp.split("&");
arr.forEach(function (v,i,_arr) { _arr[i] = '"' + v.replace('=', '":"') + '"';});
qp = qp ? JSON.parse('{' + arr.join() + '}',
function (key, value) {
return key === "" ? value : decodeURIComponent(value);
) : {};
isValid = qp.state === sentState;
if ((
oauth2.auth.schema.get("flow") === "accessCode" ||
oauth2.auth.schema.get("flow") === "authorizationCode" ||
oauth2.auth.schema.get("flow") === "authorization_code"
) && !oauth2.auth.code) {
if (!isValid) {
source: "auth",
level: "warning",
message: "Authorization may be unsafe, passed state was changed in server. The passed state wasn't returned from auth server."
if (qp.code) {
delete oauth2.state;
oauth2.auth.code = qp.code;
oauth2.callback({auth: oauth2.auth, redirectUrl: redirectUrl});
} else {
let oauthErrorMsg;
if (qp.error) {
oauthErrorMsg = "["+qp.error+"]: " +
(qp.error_description ? qp.error_description+ ". " : "no accessCode received from the server. ") +
(qp.error_uri ? "More info: "+qp.error_uri : "");
source: "auth",
level: "error",
message: oauthErrorMsg || "[Authorization failed]: no accessCode received from the server."
} else {
oauth2.callback({auth: oauth2.auth, token: qp, isValid: isValid, redirectUrl: redirectUrl});
if (document.readyState !== 'loading') {
} else {
document.addEventListener('DOMContentLoaded', function () {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,142 +0,0 @@
package swaggerui // import ""
import (
//go:embed *.js *.css *.html *.png
var assets embed.FS
var (
Handler = func(prefix string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || path.Base(r.URL.Path) != "swagger-initializer.js" {
http.StripPrefix(prefix, http.FileServer(http.FS(assets))).ServeHTTP(w, r)
tpl := template.New("swagger-initializer.js").Funcs(TemplateFuncs)
ptpl, err := tpl.Parse(Template)
if err != nil {
_, _ = w.Write([]byte(err.Error()))
if err := ptpl.Execute(w, Config); err != nil {
_, _ = w.Write([]byte(err.Error()))
TemplateFuncs = template.FuncMap{
"isInt": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Int, reflect.Int8, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64:
return true
return false
"isBool": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Bool:
return true
return false
"isString": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.String:
return true
return false
"isSlice": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Slice:
return true
return false
"isMap": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Map:
return true
return false
Template = `
window.onload = function() {
//<editor-fold desc="Changeable Configuration Block">
window.ui = SwaggerUIBundle({
{{- range $k, $v := . }}
{{- if (eq (printf "%s" $v) "") -}}
{{- continue -}}
{{ end }}
{{ $k }}: {{ if isBool $v -}}
{{- $v -}},
{{- else if isInt $v -}}
{{- $v -}},
{{- else if isString $v -}}
"{{- $v -}}",
{{- else if and (isSlice $v) (or (eq (printf "%s" $k) "presets") (eq (printf "%s" $k) "plugins")) -}}
{{- range $v }}
{{ . }},
{{- end }}
{{- end -}}
{{ end }}
Config = map[string]interface{}{
"configUrl": "",
"dom_id": "#swagger-ui",
"domNode": "",
"spec": "",
"urls": []interface{}{
"url": "",
"name": "",
"url": "",
"deepLinking": true,
"displayOperationId": false,
"defaultModelsExpandDepth": 1,
"defaultModelExpandDepth": 1,
"displayRequestDuration": true,
"filter": true,
"operationsSorter": "alpha",
"showExtensions": true,
"tryItOutEnabled": true,
"presets": []string{
"plugins": []string{
"layout": "StandaloneLayout",

View File

@ -1,15 +0,0 @@
package swaggerui
import (
func TestTemplate(t *testing.T) {
h := http.NewServeMux()
h.HandleFunc("/", Handler(""))
if err := http.ListenAndServe(":8080", h); err != nil {

View File

@ -1,61 +0,0 @@
package swagger
import (
yamlcodec ""
rutil ""
// Handler append to generated swagger data from dst map[string]interface{}
var Handler = func(dst map[string]interface{}, fsys fs.FS) http.HandlerFunc {
c := yamlcodec.NewCodec()
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
path := r.URL.Path
if len(path) > 1 && path[0] == '/' {
path = path[1:]
buf, err := fs.ReadFile(fsys, path)
if err != nil {
_, _ = w.Write([]byte(err.Error()))
if dst == nil {
_, _ = w.Write(buf)
var src interface{}
if err = c.Unmarshal(buf, src); err != nil {
_, _ = w.Write([]byte(err.Error()))
if err = rutil.Merge(src, dst); err != nil {
_, _ = w.Write([]byte(err.Error()))
if buf, err = c.Marshal(src); err != nil {
_, _ = w.Write([]byte(err.Error()))
_, _ = w.Write(buf)

View File

@ -9,27 +9,29 @@ import (
rhttp ""
var _ server.Server = (*Server)(nil)
var _ server.Server = &httpServer{}
type Server struct {
hd interface{}
type httpServer struct {
hd server.Handler
rsvc *register.Service
handlers map[string]interface{}
handlers map[string]server.Handler
exit chan chan error
errorHandler func(context.Context, interface{}, http.ResponseWriter, *http.Request, error, int)
subscribers map[*httpSubscriber][]broker.Subscriber
errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int)
pathHandlers *rhttp.Trie
opts server.Options
registerRPC bool
@ -38,7 +40,7 @@ type Server struct {
init bool
func (h *Server) newCodec(ct string) (codec.Codec, error) {
func (h *httpServer) newCodec(ct string) (codec.Codec, error) {
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
@ -51,14 +53,14 @@ func (h *Server) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType
func (h *Server) Options() server.Options {
func (h *httpServer) Options() server.Options {
opts := h.opts
return opts
func (h *Server) Init(opts ...options.Option) error {
func (h *httpServer) Init(opts ...server.Option) error {
if len(opts) == 0 && h.init {
return nil
@ -68,11 +70,11 @@ func (h *Server) Init(opts ...options.Option) error {
for _, o := range opts {
if fn, ok := h.opts.Context.Value(errorHandlerKey{}).(func(ctx context.Context, s interface{}, w http.ResponseWriter, r *http.Request, err error, status int)); ok && fn != nil {
if fn, ok := h.opts.Context.Value(errorHandlerKey{}).(func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)); ok && fn != nil {
h.errorHandler = fn
if h.handlers == nil {
h.handlers = make(map[string]interface{})
h.handlers = make(map[string]server.Handler)
if h.pathHandlers == nil {
h.pathHandlers = rhttp.NewTrie()
@ -99,6 +101,10 @@ func (h *Server) Init(opts ...options.Option) error {
return err
if err := h.opts.Broker.Init(); err != nil {
return err
if err := h.opts.Tracer.Init(); err != nil {
return err
@ -111,6 +117,10 @@ func (h *Server) Init(opts ...options.Option) error {
return err
if err := h.opts.Transport.Init(); err != nil {
return err
@ -120,24 +130,12 @@ func (h *Server) Init(opts ...options.Option) error {
return nil
func (h *Server) Handle(handler interface{}, opts ...options.Option) error {
options := server.NewHandleOptions(opts...)
var endpointMetadata []EndpointMetadata
if v, ok := options.Context.Value(handlerEndpointsKey{}).([]EndpointMetadata); ok {
endpointMetadata = v
func (h *httpServer) Handle(handler server.Handler) error {
// passed unknown handler
hdlr, ok := handler.(*httpHandler)
if !ok {
if h.handlers == nil {
h.handlers = make(map[string]interface{})
for _, v := range endpointMetadata {
h.handlers[v.Name] = h.newHTTPHandler(handler, opts...)
h.hd = handler
return nil
@ -150,11 +148,19 @@ func (h *Server) Handle(handler interface{}, opts ...options.Option) error {
return nil
// passed micro compat handler
if h.handlers == nil {
h.handlers = make(map[string]server.Handler)
h.handlers[handler.Name()] = handler
return nil
func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *httpHandler {
options := server.NewHandleOptions(opts...)
func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.NewHandlerOptions(opts...)
eps := make([]*register.Endpoint, 0, len(options.Metadata))
for name, metadata := range options.Metadata {
@ -198,16 +204,16 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname))
h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname)
mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("%v", err))
h.opts.Logger.Errorf(h.opts.Context, "%v", err)
} else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
h.opts.Logger.Errorf(h.opts.Context, "nil mtype for %s", mname)
@ -218,13 +224,13 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht = name
if err := hdlr.handlers.Insert([]string{md["Method"]}, md["Path"], pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %s %s", md["Method"][0], md["Path"][0]))
h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md["Method"], md["Path"])
if h.registerRPC {
h.opts.Logger.Info(h.opts.Context, fmt.Sprintf("register rpc handler for http.MethodPost %s /%s", hn, hn))
h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn)
@ -248,16 +254,16 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht
if method.Name == "" && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil method for %s", mname))
h.opts.Logger.Errorf(h.opts.Context, "nil method for %s", mname)
mtype, err := prepareEndpoint(method)
if err != nil && h.opts.Logger.V(logger.ErrorLevel) {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("%v", err))
h.opts.Logger.Errorf(h.opts.Context, "%v", err)
} else if mtype == nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("nil mtype for %s", mname))
h.opts.Logger.Errorf(h.opts.Context, "nil mtype for %s", mname)
@ -268,13 +274,13 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht = name
if err := hdlr.handlers.Insert([]string{md.Method}, md.Path, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add handler for %s %s", md.Method, md.Path))
h.opts.Logger.Errorf(h.opts.Context, "cant add handler for %s %s", md.Method, md.Path)
if h.registerRPC {
h.opts.Logger.Info(h.opts.Context, fmt.Sprintf("register rpc handler for http.MethodPost %s /%s", hn, hn))
h.opts.Logger.Infof(h.opts.Context, "register rpc handler for http.MethodPost %s /%s", hn, hn)
if err := hdlr.handlers.Insert([]string{http.MethodPost}, "/"+hn, pth); err != nil {
h.opts.Logger.Error(h.opts.Context, fmt.Sprintf("cant add rpc handler for http.MethodPost %s /%s", hn, hn))
h.opts.Logger.Errorf(h.opts.Context, "cant add rpc handler for http.MethodPost %s /%s", hn, hn)
@ -282,15 +288,40 @@ func (h *Server) newHTTPHandler(handler interface{}, opts ...options.Option) *ht
return hdlr
func (h *Server) Register() error {
func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, handler, opts...)
func (h *httpServer) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*httpSubscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
if err := server.ValidateSubscriber(sb); err != nil {
return err
_, ok = h.subscribers[sub]
if ok {
return fmt.Errorf("subscriber %v already exists", h)
h.subscribers[sub] = nil
return nil
func (h *httpServer) Register() error {
var eps []*register.Endpoint
for _, hdlr := range h.handlers {
hd, ok := hdlr.(*httpHandler)
if !ok {
eps = append(eps, hd.Endpoints()...)
eps = append(eps, hdlr.Endpoints()...)
rsvc := h.rsvc
config := h.opts
@ -308,16 +339,31 @@ func (h *Server) Register() error {
if err != nil {
return err
service.Nodes[0].Metadata.Set("protocol", "http")
service.Nodes[0].Metadata["protocol"] = "http"
service.Endpoints = eps
subscriberList := make([]*httpSubscriber, 0, len(h.subscribers))
for e := range h.subscribers {
// Only advertise non internal subscribers
subscriberList = append(subscriberList, e)
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
for _, e := range subscriberList {
service.Endpoints = append(service.Endpoints, e.Endpoints()...)
registered := h.registered
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
@ -332,6 +378,29 @@ func (h *Server) Register() error {
for sb := range h.subscribers {
handler := h.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
h.subscribers[sb] = []broker.Subscriber{sub}
h.registered = true
h.rsvc = service
@ -339,7 +408,7 @@ func (h *Server) Register() error {
return nil
func (h *Server) Deregister() error {
func (h *httpServer) Deregister() error {
config := h.opts
@ -350,7 +419,7 @@ func (h *Server) Deregister() error {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("Deregistering node: %s", service.Nodes[0].ID))
config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID)
if err := server.DefaultDeregisterFunc(service, config); err != nil {
@ -366,11 +435,28 @@ func (h *Server) Deregister() error {
h.registered = false
subCtx := h.opts.Context
for sb, subs := range h.subscribers {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
for _, sub := range subs {
config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", sub.Topic())
if err := sub.Unsubscribe(subCtx); err != nil {
config.Logger.Errorf(config.Context, "failed to unsubscribe topic: %s, error: %v", sb.Topic(), err)
return err
h.subscribers[sb] = nil
return nil
func (h *Server) Start() error {
func (h *httpServer) Start() error {
config := h.opts
@ -400,7 +486,7 @@ func (h *Server) Start() error {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("Listening on %s", ts.Addr().String()))
config.Logger.Infof(config.Context, "Listening on %s", ts.Addr().String())
@ -408,12 +494,13 @@ func (h *Server) Start() error {
var handler http.Handler
var srvFunc func(net.Listener) error
// nolint: nestif
if h.opts.Context != nil {
if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
if hs.Handler == nil && h.hd != nil {
if hdlr, ok := h.hd.(http.Handler); ok {
if hdlr, ok := h.hd.Handler().(http.Handler); ok {
hs.Handler = hdlr
handler = hs.Handler
@ -429,7 +516,7 @@ func (h *Server) Start() error {
case len(h.handlers) > 0 && h.hd != nil:
handler = h
case handler == nil && h.hd != nil:
if hdlr, ok := h.hd.(http.Handler); ok {
if hdlr, ok := h.hd.Handler().(http.Handler); ok {
handler = hdlr
@ -438,9 +525,13 @@ func (h *Server) Start() error {
return fmt.Errorf("cant process with nil handler")
if err := config.Broker.Connect(h.opts.Context); err != nil {
return err
if err := config.RegisterCheck(h.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error: %s", config.Name, config.ID, err))
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err)
} else {
if err = h.Register(); err != nil {
@ -450,7 +541,6 @@ func (h *Server) Start() error {
fn := handler
var hs *http.Server
if h.opts.Context != nil {
if mwf, ok := h.opts.Context.Value(middlewareKey{}).([]func(http.Handler) http.Handler); ok && len(mwf) > 0 {
// wrap the handler func
@ -458,19 +548,25 @@ func (h *Server) Start() error {
fn = mwf[i-1](fn)
var ok bool
if hs, ok = h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
if hs, ok := h.opts.Context.Value(serverKey{}).(*http.Server); ok && hs != nil {
hs.Handler = fn
} else {
hs = &http.Server{Handler: fn}
srvFunc = hs.Serve
if srvFunc != nil {
go func() {
if cerr := hs.Serve(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
h.opts.Logger.Error(h.opts.Context, cerr.Error())
if cerr := srvFunc(ts); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
h.opts.Logger.Error(h.opts.Context, cerr)
} else {
go func() {
if cerr := http.Serve(ts, fn); cerr != nil && !errors.Is(cerr, net.ErrClosed) {
h.opts.Logger.Error(h.opts.Context, cerr)
go func() {
t := new(time.Ticker)
@ -496,28 +592,28 @@ func (h *Server) Start() error {
// nolint: nestif
if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr))
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
// deregister self in case of error
if err := h.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error: %s", config.Name, config.ID, err))
config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err)
} else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error: %s", config.Name, config.ID, rerr))
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr)
if err := h.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error: %s", config.Name, config.ID, err))
config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err)
if err := h.Register(); err != nil {
config.Logger.Error(config.Context, fmt.Sprintf("Server register error: %s", err))
config.Logger.Errorf(config.Context, "Server register error: %s", err)
// wait for exit
case ch = <-h.exit:
@ -527,46 +623,43 @@ func (h *Server) Start() error {
// deregister
if err := h.Deregister(); err != nil {
config.Logger.Error(config.Context, fmt.Sprintf("Server deregister error: %s", err))
config.Logger.Errorf(config.Context, "Server deregister error: %s", err)
ctx, cancel := context.WithTimeout(context.Background(), h.opts.GracefulTimeout)
defer cancel()
err := hs.Shutdown(ctx)
if err != nil {
err = hs.Close()
if err := config.Broker.Disconnect(config.Context); err != nil {
config.Logger.Errorf(config.Context, "Broker disconnect error: %s", err)
ch <- err
ch <- ts.Close()
return nil
func (h *Server) Stop() error {
func (h *httpServer) Stop() error {
ch := make(chan error)
h.exit <- ch
return <-ch
func (h *Server) String() string {
func (h *httpServer) String() string {
return "http"
func (h *Server) Name() string {
func (h *httpServer) Name() string {
return h.opts.Name
func NewServer(opts ...options.Option) *Server {
func NewServer(opts ...server.Option) *httpServer {
options := server.NewOptions(opts...)
eh := DefaultErrorHandler
if v, ok := options.Context.Value(errorHandlerKey{}).(errorHandler); ok && v != nil {
eh = v
return &Server{
return &httpServer{
opts: options,
exit: make(chan chan error),
subscribers: make(map[*httpSubscriber][]broker.Subscriber),
errorHandler: eh,
pathHandlers: rhttp.NewTrie(),

View File

@ -5,7 +5,7 @@ import (
// SetError pass error to caller
@ -79,24 +79,24 @@ func GetRspCode(ctx context.Context) int {
type middlewareKey struct{}
// Middleware passes http middlewares
func Middleware(mw ...func(http.Handler) http.Handler) options.Option {
return options.ContextOption(middlewareKey{}, mw)
func Middleware(mw ...func(http.Handler) http.Handler) server.Option {
return server.SetOption(middlewareKey{}, mw)
type serverKey struct{}
// HTTPServer provide ability to pass *http.Server
func HTTPServer(hs *http.Server) options.Option {
return options.ContextOption(serverKey{}, hs)
// Server provide ability to pass *http.Server
func Server(hs *http.Server) server.Option {
return server.SetOption(serverKey{}, hs)
type errorHandler func(ctx context.Context, s interface{}, w http.ResponseWriter, r *http.Request, err error, status int)
type errorHandler func(ctx context.Context, s server.Handler, w http.ResponseWriter, r *http.Request, err error, status int)
type errorHandlerKey struct{}
// ErrorHandler specifies handler for errors
func ErrorHandler(fn errorHandler) options.Option {
return options.ContextOption(errorHandlerKey{}, fn)
func ErrorHandler(fn errorHandler) server.Option {
return server.SetOption(errorHandlerKey{}, fn)
type (
@ -107,18 +107,12 @@ type (
// PathHandler specifies http handler for path regexp
func PathHandler(method, path string, handler http.HandlerFunc) options.Option {
return func(src interface{}) error {
vctx, err := options.Get(src, ".Context")
if err != nil {
return err
func PathHandler(method, path string, handler http.HandlerFunc) server.Option {
return func(o *server.Options) {
if o.Context == nil {
o.Context = context.Background()
ctx, ok := vctx.(context.Context)
if !ok {
return fmt.Errorf("invalid option")
v, ok := ctx.Value(pathHandlerKey{}).(*pathHandlerVal)
v, ok := o.Context.Value(pathHandlerKey{}).(*pathHandlerVal)
if !ok {
v = &pathHandlerVal{h: make(map[string]map[string]http.HandlerFunc)}
@ -127,17 +121,16 @@ func PathHandler(method, path string, handler http.HandlerFunc) options.Option {
m = make(map[string]http.HandlerFunc)
v.h[method] = m
ctx = context.WithValue(ctx, pathHandlerKey{}, v)
m[path] = handler
return options.Set(src, ctx, ".Context")
o.Context = context.WithValue(o.Context, pathHandlerKey{}, v)
type registerRPCHandlerKey struct{}
// RegisterRPCHandler registers compatibility endpoints with /ServiceName.ServiceEndpoint method POST
func RegisterRPCHandler(b bool) options.Option {
return options.ContextOption(registerRPCHandlerKey{}, b)
func RegisterRPCHandler(b bool) server.Option {
return server.SetOption(registerRPCHandlerKey{}, b)
type handlerEndpointsKey struct{}
@ -150,8 +143,8 @@ type EndpointMetadata struct {
Stream bool
func HandlerEndpoints(md []EndpointMetadata) options.Option {
return options.ContextOption(handlerEndpointsKey{}, md)
func HandlerEndpoints(md []EndpointMetadata) server.HandlerOption {
return server.SetHandlerOption(handlerEndpointsKey{}, md)
type handlerOptions struct {

View File

@ -8,7 +8,10 @@ import (
var _ server.Request = &rpcRequest{}
var (
_ server.Request = &rpcRequest{}
_ server.Message = &rpcMessage{}
type rpcRequest struct {
rw io.ReadWriter
@ -22,6 +25,14 @@ type rpcRequest struct {
stream bool
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
func (r *rpcRequest) ContentType() string {
return r.contentType
@ -61,3 +72,23 @@ func (r *rpcRequest) Stream() bool {
func (r *rpcRequest) Body() interface{} {
return r.payload
func (r *rpcMessage) ContentType() string {
return r.contentType
func (r *rpcMessage) Topic() string {
return r.topic
func (r *rpcMessage) Body() interface{} {
return r.payload
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
func (r *rpcMessage) Codec() codec.Codec {
return r.codec

View File

@ -10,8 +10,6 @@ import (
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type methodType struct {
ArgType reflect.Type
ReplyType reflect.Type

subscriber.go Normal file
View File

@ -0,0 +1,208 @@
package http
import (
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type handler struct {
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
type httpSubscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*register.Endpoint
opts server.SubscriberOptions
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.NewSubscriberOptions(opts...)
var endpoints []*register.Endpoint
var handlers []*handler
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: "Func",
Request: register.ExtractSubValue(typ),
Metadata: metadata.New(2),
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
handlers = append(handlers, h)
ep := &register.Endpoint{
Name: name + "." + method.Name,
Request: register.ExtractSubValue(method.Type),
Metadata: metadata.New(2),
ep.Metadata.Set("topic", topic)
ep.Metadata.Set("subscriber", "true")
endpoints = append(endpoints, ep)
return &httpSubscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
if err != nil {
return err
hdr := metadata.Copy(msg.Header)
delete(hdr, "Content-Type")
ctx := metadata.NewIncomingContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
if isVal {
req = req.Elem()
buf := bytes.NewBuffer(msg.Body)
if err := cf.ReadHeader(buf, &codec.Message{}, codec.Event); err != nil {
return err
if err := cf.ReadBody(buf, req.Interface()); err != nil {
return err
fn := func(ctx context.Context, msg server.Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
return err.(error)
return nil
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
go func() {
results <- fn(ctx, &httpMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
codec: cf,
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil {
errors = append(errors, err.Error())
if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
return nil
func (s *httpSubscriber) Topic() string {
return s.topic
func (s *httpSubscriber) Subscriber() interface{} {
return s.subscriber
func (s *httpSubscriber) Endpoints() []*register.Endpoint {
return s.endpoints
func (s *httpSubscriber) Options() server.SubscriberOptions {
return s.opts

View File

@ -1,8 +0,0 @@
//go:build tools
package http
import (
_ ""
_ ""

View File

@ -31,13 +31,15 @@ func FillRequest(ctx context.Context, req interface{}, opts ...FillRequestOption
cookies := md["Cookie"]
cookies := strings.Split(md["Cookie"], ";")
cmd := make(map[string]string, len(cookies))
kv := strings.Split(cookies, "=")
for _, cookie := range cookies {
kv := strings.Split(cookie, "=")
if len(kv) != 2 {
return nil
cmd[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
for idx := 0; idx < len(options.cookies)/2; idx += 2 {
k := http.CanonicalHeaderKey(options.cookies[idx])
v, ok := cmd[k]